-
-
Notifications
You must be signed in to change notification settings - Fork 468
feat(spring-jakarta): [Queue Instrumentation 3] Add Kafka producer instrumentation #5254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/queue-instrumentation-sample
Are you sure you want to change the base?
Changes from all commits
be5af44
5049ffc
915e42b
fdb3a03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.ScopesAdapter; | ||
| import io.sentry.SentryLevel; | ||
| import java.lang.reflect.Field; | ||
| import org.apache.kafka.clients.producer.ProducerInterceptor; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
| import org.springframework.beans.BeansException; | ||
| import org.springframework.beans.factory.config.BeanPostProcessor; | ||
| import org.springframework.core.Ordered; | ||
| import org.springframework.core.PriorityOrdered; | ||
| import org.springframework.kafka.core.KafkaTemplate; | ||
| import org.springframework.kafka.support.CompositeProducerInterceptor; | ||
|
|
||
| /** | ||
| * Sets a {@link SentryProducerInterceptor} on {@link KafkaTemplate} beans via {@link | ||
| * KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced. | ||
| * | ||
| * <p>If the template already has a {@link ProducerInterceptor}, both are composed using {@link | ||
| * CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public | ||
| * getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry | ||
| * interceptor is set. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryKafkaProducerBeanPostProcessor | ||
| implements BeanPostProcessor, PriorityOrdered { | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public @NotNull Object postProcessAfterInitialization( | ||
| final @NotNull Object bean, final @NotNull String beanName) throws BeansException { | ||
| if (bean instanceof KafkaTemplate) { | ||
| final @NotNull KafkaTemplate<?, ?> template = (KafkaTemplate<?, ?>) bean; | ||
| final @Nullable ProducerInterceptor<?, ?> existing = getExistingInterceptor(template); | ||
|
|
||
| if (existing instanceof SentryProducerInterceptor) { | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("rawtypes") | ||
| final SentryProducerInterceptor sentryInterceptor = | ||
| new SentryProducerInterceptor<>(ScopesAdapter.getInstance()); | ||
|
|
||
| if (existing != null) { | ||
| @SuppressWarnings("rawtypes") | ||
| final CompositeProducerInterceptor composite = | ||
| new CompositeProducerInterceptor(sentryInterceptor, existing); | ||
| template.setProducerInterceptor(composite); | ||
| } else { | ||
| template.setProducerInterceptor(sentryInterceptor); | ||
| } | ||
| } | ||
| return bean; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private @Nullable ProducerInterceptor<?, ?> getExistingInterceptor( | ||
| final @NotNull KafkaTemplate<?, ?> template) { | ||
| try { | ||
| final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor"); | ||
| field.setAccessible(true); | ||
| return (ProducerInterceptor<?, ?>) field.get(template); | ||
| } catch (NoSuchFieldException | IllegalAccessException e) { | ||
| ScopesAdapter.getInstance() | ||
| .getOptions() | ||
| .getLogger() | ||
| .log( | ||
| SentryLevel.WARNING, | ||
| "Unable to read existing producerInterceptor from KafkaTemplate via reflection. " | ||
| + "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.", | ||
| e); | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public int getOrder() { | ||
| return Ordered.LOWEST_PRECEDENCE; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| package io.sentry.spring.jakarta.kafka; | ||
|
|
||
| import io.sentry.BaggageHeader; | ||
| import io.sentry.IScopes; | ||
| import io.sentry.ISpan; | ||
| import io.sentry.SentryTraceHeader; | ||
| import io.sentry.SpanDataConvention; | ||
| import io.sentry.SpanOptions; | ||
| import io.sentry.SpanStatus; | ||
| import io.sentry.util.TracingUtils; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Map; | ||
| import org.apache.kafka.clients.producer.ProducerInterceptor; | ||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||
| import org.apache.kafka.clients.producer.RecordMetadata; | ||
| import org.apache.kafka.common.header.Headers; | ||
| import org.jetbrains.annotations.ApiStatus; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.jetbrains.annotations.Nullable; | ||
|
|
||
| /** | ||
| * A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing | ||
| * headers into outgoing records. | ||
| * | ||
| * <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing | ||
| * "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link | ||
| * #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread. | ||
| * | ||
| * <p>If the customer already has a {@link ProducerInterceptor}, the {@link | ||
| * SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link | ||
| * org.springframework.kafka.support.CompositeProducerInterceptor}. | ||
| */ | ||
| @ApiStatus.Internal | ||
| public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> { | ||
|
|
||
| static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer"; | ||
| static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; | ||
|
|
||
| private final @NotNull IScopes scopes; | ||
|
|
||
| public SentryProducerInterceptor(final @NotNull IScopes scopes) { | ||
| this.scopes = scopes; | ||
| } | ||
|
|
||
| @Override | ||
| public @NotNull ProducerRecord<K, V> onSend(final @NotNull ProducerRecord<K, V> record) { | ||
| if (!scopes.getOptions().isEnableQueueTracing()) { | ||
| return record; | ||
| } | ||
|
|
||
| final @Nullable ISpan activeSpan = scopes.getSpan(); | ||
| if (activeSpan == null || activeSpan.isNoOp()) { | ||
| return record; | ||
| } | ||
|
|
||
| final @NotNull SpanOptions spanOptions = new SpanOptions(); | ||
| spanOptions.setOrigin(TRACE_ORIGIN); | ||
| final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); | ||
| if (span.isNoOp()) { | ||
| return record; | ||
| } | ||
|
|
||
| span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); | ||
| span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); | ||
|
|
||
| try { | ||
| injectHeaders(record.headers(), span); | ||
| } catch (Throwable ignored) { | ||
| // Header injection must not break the send | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to have any info on the fact, that header injection failed? A log for example? |
||
| } | ||
|
|
||
| span.setStatus(SpanStatus.OK); | ||
| span.finish(); | ||
|
|
||
| return record; | ||
| } | ||
|
|
||
| @Override | ||
| public void onAcknowledgement( | ||
| final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} | ||
|
|
||
| @Override | ||
| public void close() {} | ||
|
|
||
| @Override | ||
| public void configure(final @Nullable Map<String, ?> configs) {} | ||
|
|
||
| private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { | ||
| final @Nullable TracingUtils.TracingHeaders tracingHeaders = | ||
| TracingUtils.trace(scopes, null, span); | ||
| if (tracingHeaders != null) { | ||
| final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); | ||
| headers.remove(sentryTraceHeader.getName()); | ||
| headers.add( | ||
| sentryTraceHeader.getName(), | ||
| sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); | ||
| if (baggageHeader != null) { | ||
| headers.remove(baggageHeader.getName()); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to remove the full baggageHeader here? Would that not potentially remove baggage information from other instrumentation libraries? Ok with it if its intentional, just raising the point |
||
| headers.add( | ||
| baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
| } | ||
|
|
||
| headers.remove(SENTRY_ENQUEUED_TIME_HEADER); | ||
| headers.add( | ||
| SENTRY_ENQUEUED_TIME_HEADER, | ||
| String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| package io.sentry.spring.jakarta.kafka | ||
|
|
||
| import kotlin.test.Test | ||
| import kotlin.test.assertSame | ||
| import kotlin.test.assertTrue | ||
| import org.apache.kafka.clients.producer.ProducerInterceptor | ||
| import org.mockito.kotlin.mock | ||
| import org.springframework.kafka.core.KafkaTemplate | ||
| import org.springframework.kafka.core.ProducerFactory | ||
| import org.springframework.kafka.support.CompositeProducerInterceptor | ||
|
|
||
| class SentryKafkaProducerBeanPostProcessorTest { | ||
|
|
||
| private fun readInterceptor(template: KafkaTemplate<*, *>): Any? { | ||
| val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor") | ||
| field.isAccessible = true | ||
| return field.get(template) | ||
| } | ||
|
|
||
| @Test | ||
| fun `sets SentryProducerInterceptor on KafkaTemplate`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
|
|
||
| assertTrue(readInterceptor(template) is SentryProducerInterceptor<*, *>) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not double-wrap when SentryProducerInterceptor already set`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
| val firstInterceptor = readInterceptor(template) | ||
|
|
||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
| val secondInterceptor = readInterceptor(template) | ||
|
|
||
| assertSame(firstInterceptor, secondInterceptor) | ||
| } | ||
|
|
||
| @Test | ||
| fun `does not modify non-KafkaTemplate beans`() { | ||
| val someBean = "not a kafka template" | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| val result = processor.postProcessAfterInitialization(someBean, "someBean") | ||
|
|
||
| assertSame(someBean, result) | ||
| } | ||
|
|
||
| @Test | ||
| fun `returns the same bean instance`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
|
|
||
| val result = processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
|
|
||
| assertSame(template, result, "BPP should return the same bean, not a replacement") | ||
| } | ||
|
|
||
| @Test | ||
| fun `composes with existing customer interceptor using CompositeProducerInterceptor`() { | ||
| val template = KafkaTemplate<String, String>(mock<ProducerFactory<String, String>>()) | ||
| val customerInterceptor = mock<ProducerInterceptor<String, String>>() | ||
| template.setProducerInterceptor(customerInterceptor) | ||
|
|
||
| val processor = SentryKafkaProducerBeanPostProcessor() | ||
| processor.postProcessAfterInitialization(template, "kafkaTemplate") | ||
|
|
||
| assertTrue( | ||
| readInterceptor(template) is CompositeProducerInterceptor<*, *>, | ||
| "Should use CompositeProducerInterceptor when existing interceptor is present", | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one possible edge case here: If for some reason a user already manually registered the
SentryInterceptorand another one in a composite we would wrap that again and haveSentryInterceptorregistered twice