diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java index f272a575cb..e4676b79cf 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -21,6 +21,26 @@ public final class SentryKafkaConsumerBeanPostProcessor implements BeanPostProcessor, PriorityOrdered { + private static final @NotNull String RECORD_INTERCEPTOR_FIELD_NAME = "recordInterceptor"; + + private final @NotNull String recordInterceptorFieldName; + + public SentryKafkaConsumerBeanPostProcessor() { + this(RECORD_INTERCEPTOR_FIELD_NAME); + } + + SentryKafkaConsumerBeanPostProcessor(final @NotNull String recordInterceptorFieldName) { + this.recordInterceptorFieldName = recordInterceptorFieldName; + } + + private static final class InterceptorReadFailedException extends Exception { + private static final long serialVersionUID = 1L; + + InterceptorReadFailedException(final @NotNull Throwable cause) { + super(cause); + } + } + @Override @SuppressWarnings("unchecked") public @NotNull Object postProcessAfterInitialization( @@ -29,7 +49,23 @@ public final class SentryKafkaConsumerBeanPostProcessor final @NotNull AbstractKafkaListenerContainerFactory factory = (AbstractKafkaListenerContainerFactory) bean; - final @Nullable RecordInterceptor existing = getExistingInterceptor(factory); + final @Nullable RecordInterceptor existing; + try { + existing = getExistingInterceptor(factory); + } catch (InterceptorReadFailedException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.ERROR, + e, + "Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read " + + "existing recordInterceptor via reflection. Refusing to install Sentry's " + + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", + beanName); + return bean; + } + if (existing instanceof SentryKafkaRecordInterceptor) { return bean; } @@ -42,25 +78,16 @@ public final class SentryKafkaConsumerBeanPostProcessor return bean; } - @SuppressWarnings("unchecked") private @Nullable RecordInterceptor getExistingInterceptor( - final @NotNull AbstractKafkaListenerContainerFactory factory) { + final @NotNull AbstractKafkaListenerContainerFactory factory) + throws InterceptorReadFailedException { try { final @NotNull Field field = - AbstractKafkaListenerContainerFactory.class.getDeclaredField("recordInterceptor"); + AbstractKafkaListenerContainerFactory.class.getDeclaredField(recordInterceptorFieldName); field.setAccessible(true); return (RecordInterceptor) field.get(factory); - } catch (NoSuchFieldException | IllegalAccessException e) { - ScopesAdapter.getInstance() - .getOptions() - .getLogger() - .log( - SentryLevel.WARNING, - "Unable to read existing recordInterceptor from " - + "AbstractKafkaListenerContainerFactory via reflection. " - + "If you had a custom RecordInterceptor, it may not be chained with Sentry's interceptor.", - e); - return null; + } catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) { + throw new InterceptorReadFailedException(e); } } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt index 8595cb9ae7..0a642c0694 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -3,9 +3,12 @@ package io.sentry.spring.jakarta.kafka import kotlin.test.Test import kotlin.test.assertSame import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.mockito.kotlin.mock import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.listener.RecordInterceptor class SentryKafkaConsumerBeanPostProcessorTest { @@ -55,4 +58,67 @@ class SentryKafkaConsumerBeanPostProcessorTest { assertSame(someBean, result) } + + @Test + fun `chains existing customer RecordInterceptor as delegate`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val installed = field.get(factory) + assertTrue( + installed is SentryKafkaRecordInterceptor<*, *>, + "expected SentryKafkaRecordInterceptor, got ${installed?.javaClass}", + ) + + val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") + delegateField.isAccessible = true + assertSame( + customerInterceptor, + delegateField.get(installed), + "customer interceptor must be preserved as delegate", + ) + } + + @Test + fun `skips installation when reflection fails and preserves customer interceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + assertSame(customerInterceptor, field.get(factory)) + + val processor = SentryKafkaConsumerBeanPostProcessor("missingRecordInterceptor") + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + assertSame( + customerInterceptor, + field.get(factory), + "customer interceptor must remain installed when Sentry cannot read it", + ) + } }