From ca694476e9d27cd6d3454b2e278dc20aa8a607d9 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 12:38:39 +0200 Subject: [PATCH 1/2] ref(kafka): Extract sentry-kafka module from spring-jakarta Move Kafka producer interceptor to a new sentry-kafka module and rename to SentryKafkaProducerInterceptor. Add SentryKafkaConsumerInterceptor for vanilla kafka-clients users. Spring integration now depends on sentry-kafka and passes a Spring-specific trace origin. This allows non-Spring applications to use Kafka queue instrumentation directly via kafka-clients interceptor config. Co-Authored-By: Claude --- README.md | 1 + buildSrc/src/main/java/Config.kt | 1 + gradle/libs.versions.toml | 1 + sentry-kafka/README.md | 5 + sentry-kafka/api/sentry-kafka.api | 25 +++ sentry-kafka/build.gradle.kts | 83 ++++++++++ .../kafka/SentryKafkaConsumerInterceptor.java | 95 ++++++++++++ .../kafka/SentryKafkaProducerInterceptor.java | 33 ++-- .../SentryKafkaConsumerInterceptorTest.kt | 72 +++++++++ .../SentryKafkaProducerInterceptorTest.kt | 98 ++++++++++++ .../build.gradle.kts | 1 + .../build.gradle.kts | 1 + .../build.gradle.kts | 1 + sentry-spring-boot-jakarta/build.gradle.kts | 1 + .../api/sentry-spring-jakarta.api | 8 - sentry-spring-jakarta/build.gradle.kts | 2 + .../SentryKafkaProducerBeanPostProcessor.java | 10 +- .../kafka/SentryKafkaRecordInterceptor.java | 3 +- ...entryKafkaProducerBeanPostProcessorTest.kt | 7 +- .../kafka/SentryKafkaRecordInterceptorTest.kt | 3 +- .../kafka/SentryProducerInterceptorTest.kt | 142 ------------------ .../main/java/io/sentry/util/SpanUtils.java | 2 + settings.gradle.kts | 1 + 23 files changed, 418 insertions(+), 178 deletions(-) create mode 100644 sentry-kafka/README.md create mode 100644 sentry-kafka/api/sentry-kafka.api create mode 100644 sentry-kafka/build.gradle.kts create mode 100644 sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java rename sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java => sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java (74%) create mode 100644 sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt create mode 100644 sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt delete mode 100644 sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt diff --git a/README.md b/README.md index 25fedc8217..72737932c5 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Sentry SDK for Java and Android | sentry | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry?style=for-the-badge&logo=sentry&color=green) | 21 | | sentry-jul | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jul?style=for-the-badge&logo=sentry&color=green) | | sentry-jdbc | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jdbc?style=for-the-badge&logo=sentry&color=green) | +| sentry-kafka | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-kafka?style=for-the-badge&logo=sentry&color=green) | | sentry-apollo | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo?style=for-the-badge&logo=sentry&color=green) | 21 | | sentry-apollo-3 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-3?style=for-the-badge&logo=sentry&color=green) | 21 | | sentry-apollo-4 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-4?style=for-the-badge&logo=sentry&color=green) | 21 | diff --git a/buildSrc/src/main/java/Config.kt b/buildSrc/src/main/java/Config.kt index b5d1dafeb7..0e353f1c5e 100644 --- a/buildSrc/src/main/java/Config.kt +++ b/buildSrc/src/main/java/Config.kt @@ -80,6 +80,7 @@ object Config { val SENTRY_JCACHE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jcache" val SENTRY_QUARTZ_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.quartz" val SENTRY_JDBC_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jdbc" + val SENTRY_KAFKA_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.kafka" val SENTRY_OPENFEATURE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.openfeature" val SENTRY_LAUNCHDARKLY_SERVER_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.launchdarkly-server" val SENTRY_LAUNCHDARKLY_ANDROID_SDK_NAME = "$SENTRY_ANDROID_SDK_NAME.launchdarkly" diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index bede68144b..2238800c53 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -184,6 +184,7 @@ springboot3-starter-jdbc = { module = "org.springframework.boot:spring-boot-star springboot3-starter-actuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springboot3" } springboot3-starter-cache = { module = "org.springframework.boot:spring-boot-starter-cache", version.ref = "springboot3" } spring-kafka3 = { module = "org.springframework.kafka:spring-kafka", version = "3.3.5" } +kafka-clients = { module = "org.apache.kafka:kafka-clients", version = "3.8.1" } springboot4-otel = { module = "io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter", version.ref = "otelInstrumentation" } springboot4-resttestclient = { module = "org.springframework.boot:spring-boot-resttestclient", version.ref = "springboot4" } springboot4-starter = { module = "org.springframework.boot:spring-boot-starter", version.ref = "springboot4" } diff --git a/sentry-kafka/README.md b/sentry-kafka/README.md new file mode 100644 index 0000000000..ef4b531985 --- /dev/null +++ b/sentry-kafka/README.md @@ -0,0 +1,5 @@ +# sentry-kafka + +This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly. + +Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks. diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api new file mode 100644 index 0000000000..30faaa1256 --- /dev/null +++ b/sentry-kafka/api/sentry-kafka.api @@ -0,0 +1,25 @@ +public final class io/sentry/kafka/BuildConfig { + public static final field SENTRY_KAFKA_SDK_NAME Ljava/lang/String; + public static final field VERSION_NAME Ljava/lang/String; +} + +public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor { + public static final field TRACE_ORIGIN Ljava/lang/String; + public fun (Lio/sentry/IScopes;)V + public fun close ()V + public fun configure (Ljava/util/Map;)V + public fun onCommit (Ljava/util/Map;)V + public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords; +} + +public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { + public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; + public static final field TRACE_ORIGIN Ljava/lang/String; + public fun (Lio/sentry/IScopes;)V + public fun (Lio/sentry/IScopes;Ljava/lang/String;)V + public fun close ()V + public fun configure (Ljava/util/Map;)V + public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V + public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; +} + diff --git a/sentry-kafka/build.gradle.kts b/sentry-kafka/build.gradle.kts new file mode 100644 index 0000000000..ee3ba0d4a6 --- /dev/null +++ b/sentry-kafka/build.gradle.kts @@ -0,0 +1,83 @@ +import net.ltgt.gradle.errorprone.errorprone +import org.jetbrains.kotlin.gradle.tasks.KotlinCompile + +plugins { + `java-library` + id("io.sentry.javadoc") + alias(libs.plugins.kotlin.jvm) + jacoco + alias(libs.plugins.errorprone) + alias(libs.plugins.gradle.versions) + alias(libs.plugins.buildconfig) +} + +tasks.withType().configureEach { + compilerOptions.jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_1_8 +} + +dependencies { + api(projects.sentry) + compileOnly(libs.kafka.clients) + compileOnly(libs.jetbrains.annotations) + compileOnly(libs.nopen.annotations) + + errorprone(libs.errorprone.core) + errorprone(libs.nopen.checker) + errorprone(libs.nullaway) + + // tests + testImplementation(projects.sentryTestSupport) + testImplementation(kotlin(Config.kotlinStdLib)) + testImplementation(libs.kotlin.test.junit) + testImplementation(libs.mockito.kotlin) + testImplementation(libs.mockito.inline) + testImplementation(libs.kafka.clients) +} + +configure { test { java.srcDir("src/test/java") } } + +jacoco { toolVersion = libs.versions.jacoco.get() } + +tasks.jacocoTestReport { + reports { + xml.required.set(true) + html.required.set(false) + } +} + +tasks { + jacocoTestCoverageVerification { + violationRules { rule { limit { minimum = Config.QualityPlugins.Jacoco.minimumCoverage } } } + } + check { + dependsOn(jacocoTestCoverageVerification) + dependsOn(jacocoTestReport) + } +} + +tasks.withType().configureEach { + options.errorprone { + check("NullAway", net.ltgt.gradle.errorprone.CheckSeverity.ERROR) + option("NullAway:AnnotatedPackages", "io.sentry") + } +} + +buildConfig { + useJavaOutput() + packageName("io.sentry.kafka") + buildConfigField("String", "SENTRY_KAFKA_SDK_NAME", "\"${Config.Sentry.SENTRY_KAFKA_SDK_NAME}\"") + buildConfigField("String", "VERSION_NAME", "\"${project.version}\"") +} + +tasks.jar { + manifest { + attributes( + "Sentry-Version-Name" to project.version, + "Sentry-SDK-Name" to Config.Sentry.SENTRY_KAFKA_SDK_NAME, + "Sentry-SDK-Package-Name" to "maven:io.sentry:sentry-kafka", + "Implementation-Vendor" to "Sentry", + "Implementation-Title" to project.name, + "Implementation-Version" to project.version, + ) + } +} diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java new file mode 100644 index 0000000000..caa773352e --- /dev/null +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java @@ -0,0 +1,95 @@ +package io.sentry.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.IScopes; +import io.sentry.ITransaction; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +@ApiStatus.Internal +public final class SentryKafkaConsumerInterceptor implements ConsumerInterceptor { + + public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer"; + + private final @NotNull IScopes scopes; + + public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) { + this.scopes = scopes; + } + + @Override + public @NotNull ConsumerRecords onConsume(final @NotNull ConsumerRecords records) { + if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) { + return records; + } + + final @NotNull ConsumerRecord firstRecord = records.iterator().next(); + + try { + final @Nullable TransactionContext continued = continueTrace(firstRecord); + final @NotNull TransactionContext txContext = + continued != null ? continued : new TransactionContext("queue.receive", "queue.receive"); + txContext.setName("queue.receive"); + txContext.setOperation("queue.receive"); + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(false); + + final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions); + if (!transaction.isNoOp()) { + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic()); + transaction.setData("messaging.batch.message.count", records.count()); + transaction.setStatus(SpanStatus.OK); + transaction.finish(); + } + } catch (Throwable ignored) { + // Instrumentation must never break the customer's Kafka poll loop. + } + + return records; + } + + @Override + public void onCommit(final @NotNull Map offsets) {} + + @Override + public void close() {} + + @Override + public void configure(final @Nullable Map configs) {} + + private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); + final @Nullable List baggageHeaders = + baggage != null ? Collections.singletonList(baggage) : null; + return scopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } +} diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java similarity index 74% rename from sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java rename to sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java index 7e589511c4..c6b3184b39 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryProducerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -1,4 +1,4 @@ -package io.sentry.spring.jakarta.kafka; +package io.sentry.kafka; import io.sentry.BaggageHeader; import io.sentry.DateUtils; @@ -19,28 +19,23 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -/** - * A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing - * headers into outgoing records. - * - *

The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing - * "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link - * #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread. - * - *

If the customer already has a {@link ProducerInterceptor}, the {@link - * SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link - * org.springframework.kafka.support.CompositeProducerInterceptor}. - */ @ApiStatus.Internal -public final class SentryProducerInterceptor implements ProducerInterceptor { +public final class SentryKafkaProducerInterceptor implements ProducerInterceptor { - static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer"; - static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; + public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer"; + public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; private final @NotNull IScopes scopes; + private final @NotNull String traceOrigin; - public SentryProducerInterceptor(final @NotNull IScopes scopes) { + public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) { + this(scopes, TRACE_ORIGIN); + } + + public SentryKafkaProducerInterceptor( + final @NotNull IScopes scopes, final @NotNull String traceOrigin) { this.scopes = scopes; + this.traceOrigin = traceOrigin; } @Override @@ -56,7 +51,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) { try { final @NotNull SpanOptions spanOptions = new SpanOptions(); - spanOptions.setOrigin(TRACE_ORIGIN); + spanOptions.setOrigin(traceOrigin); final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); if (span.isNoOp()) { @@ -71,7 +66,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) { span.setStatus(SpanStatus.OK); span.finish(); } catch (Throwable ignored) { - // Instrumentation must never break the customer's Kafka send + // Instrumentation must never break the customer's Kafka send. } return record; diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt new file mode 100644 index 0000000000..daee640793 --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt @@ -0,0 +1,72 @@ +package io.sentry.kafka + +import io.sentry.IScopes +import io.sentry.ITransaction +import io.sentry.SentryOptions +import io.sentry.TransactionContext +import io.sentry.TransactionOptions +import kotlin.test.Test +import kotlin.test.assertSame +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +class SentryKafkaConsumerInterceptorTest { + + @Test + fun `does nothing when queue tracing is disabled`() { + val scopes = mock() + val options = SentryOptions().apply { isEnableQueueTracing = false } + whenever(scopes.options).thenReturn(options) + + val interceptor = SentryKafkaConsumerInterceptor(scopes) + val records = singleRecordBatch() + + val result = interceptor.onConsume(records) + + assertSame(records, result) + verify(scopes, never()).startTransaction(any(), any()) + } + + @Test + fun `starts and finishes queue receive transaction for consumed batch`() { + val scopes = mock() + val options = SentryOptions().apply { isEnableQueueTracing = true } + val transaction = mock() + + whenever(scopes.options).thenReturn(options) + whenever(scopes.continueTrace(any(), any())).thenReturn(null) + whenever(scopes.startTransaction(any(), any())) + .thenReturn(transaction) + whenever(transaction.isNoOp).thenReturn(false) + + val interceptor = SentryKafkaConsumerInterceptor(scopes) + + interceptor.onConsume(singleRecordBatch()) + + verify(scopes).startTransaction(any(), any()) + verify(transaction).setData("messaging.system", "kafka") + verify(transaction).setData("messaging.destination.name", "my-topic") + verify(transaction).setData("messaging.batch.message.count", 1) + verify(transaction).finish() + } + + @Test + fun `commit callback is no-op`() { + val interceptor = SentryKafkaConsumerInterceptor(mock()) + + interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1))) + } + + private fun singleRecordBatch(): ConsumerRecords { + val partition = TopicPartition("my-topic", 0) + val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") + return ConsumerRecords(mapOf(partition to listOf(record))) + } +} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt new file mode 100644 index 0000000000..99b487c1c0 --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -0,0 +1,98 @@ +package io.sentry.kafka + +import io.sentry.IScopes +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.TransactionContext +import io.sentry.test.initForTest +import java.nio.charset.StandardCharsets +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.ProducerRecord +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever + +class SentryKafkaProducerInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var options: SentryOptions + + @BeforeTest + fun setup() { + initForTest { it.dsn = "https://key@sentry.io/proj" } + scopes = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + } + whenever(scopes.options).thenReturn(options) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun createTransaction(): SentryTracer { + val tx = SentryTracer(TransactionContext("tx", "op"), scopes) + whenever(scopes.span).thenReturn(tx) + return tx + } + + @Test + fun `creates queue publish span and injects headers`() { + val tx = createTransaction() + val interceptor = SentryKafkaProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + interceptor.onSend(record) + + assertEquals(1, tx.spans.size) + val span = tx.spans.first() + assertEquals("queue.publish", span.operation) + assertEquals("my-topic", span.description) + assertEquals("kafka", span.data["messaging.system"]) + assertEquals("my-topic", span.data["messaging.destination.name"]) + assertEquals(SentryKafkaProducerInterceptor.TRACE_ORIGIN, span.spanContext.origin) + assertTrue(span.isFinished) + + val sentryTraceHeader = record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) + assertNotNull(sentryTraceHeader) + + val enqueuedTimeHeader = + record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) + assertNotNull(enqueuedTimeHeader) + val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toDouble() + assertTrue(enqueuedTime > 0) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + val tx = createTransaction() + options.isEnableQueueTracing = false + val interceptor = SentryKafkaProducerInterceptor(scopes) + + interceptor.onSend(ProducerRecord("my-topic", "key", "value")) + + assertEquals(0, tx.spans.size) + } + + @Test + fun `returns original record when no active span`() { + whenever(scopes.span).thenReturn(null) + val interceptor = SentryKafkaProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + val result = interceptor.onSend(record) + + assertSame(record, result) + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts index 0156bec277..87909294cd 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/build.gradle.kts @@ -54,6 +54,7 @@ dependencies { // kafka implementation(libs.spring.kafka3) + implementation(projects.sentryKafka) // cache tracing implementation(libs.springboot3.starter.cache) diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts index 4bf7d5e5f6..0f20925f78 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/build.gradle.kts @@ -58,6 +58,7 @@ dependencies { // kafka implementation(libs.spring.kafka3) + implementation(projects.sentryKafka) // cache tracing implementation(libs.springboot3.starter.cache) diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts index e100f6a5ad..d58c3b53d7 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-jakarta/build.gradle.kts @@ -61,6 +61,7 @@ dependencies { // kafka implementation(libs.spring.kafka3) + implementation(projects.sentryKafka) // OpenFeature SDK implementation(libs.openfeature) diff --git a/sentry-spring-boot-jakarta/build.gradle.kts b/sentry-spring-boot-jakarta/build.gradle.kts index cd669b6f50..36b7dad3cc 100644 --- a/sentry-spring-boot-jakarta/build.gradle.kts +++ b/sentry-spring-boot-jakarta/build.gradle.kts @@ -71,6 +71,7 @@ dependencies { testImplementation(projects.sentryApacheHttpClient5) testImplementation(projects.sentryGraphql) testImplementation(projects.sentryGraphql22) + testImplementation(projects.sentryKafka) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryCore) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryAgent) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryAgentcustomization) diff --git a/sentry-spring-jakarta/api/sentry-spring-jakarta.api b/sentry-spring-jakarta/api/sentry-spring-jakarta.api index 0ba6c77725..edfa6399d7 100644 --- a/sentry-spring-jakarta/api/sentry-spring-jakarta.api +++ b/sentry-spring-jakarta/api/sentry-spring-jakarta.api @@ -266,14 +266,6 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor : public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V } -public final class io/sentry/spring/jakarta/kafka/SentryProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { - public fun (Lio/sentry/IScopes;)V - public fun close ()V - public fun configure (Ljava/util/Map;)V - public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V - public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; -} - public class io/sentry/spring/jakarta/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring-jakarta/build.gradle.kts b/sentry-spring-jakarta/build.gradle.kts index 93367d803f..cbf2e5346b 100644 --- a/sentry-spring-jakarta/build.gradle.kts +++ b/sentry-spring-jakarta/build.gradle.kts @@ -29,6 +29,7 @@ tasks.withType().configureEach { dependencies { api(projects.sentry) + compileOnly(projects.sentryKafka) compileOnly(platform(SpringBootPlugin.BOM_COORDINATES)) compileOnly(Config.Libs.springWeb) compileOnly(Config.Libs.springAop) @@ -59,6 +60,7 @@ dependencies { // tests testImplementation(projects.sentryTestSupport) testImplementation(projects.sentryGraphql) + testImplementation(projects.sentryKafka) testImplementation(kotlin(Config.kotlinStdLib)) testImplementation(libs.awaitility.kotlin) testImplementation(libs.context.propagation) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java index 6ede82add7..4ce6a7c5ed 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -2,6 +2,7 @@ import io.sentry.ScopesAdapter; import io.sentry.SentryLevel; +import io.sentry.kafka.SentryKafkaProducerInterceptor; import java.lang.reflect.Field; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.jetbrains.annotations.ApiStatus; @@ -15,7 +16,7 @@ import org.springframework.kafka.support.CompositeProducerInterceptor; /** - * Sets a {@link SentryProducerInterceptor} on {@link KafkaTemplate} beans via {@link + * Sets a {@link SentryKafkaProducerInterceptor} on {@link KafkaTemplate} beans via {@link * KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced. * *

If the template already has a {@link ProducerInterceptor}, both are composed using {@link @@ -35,13 +36,14 @@ public final class SentryKafkaProducerBeanPostProcessor final @NotNull KafkaTemplate template = (KafkaTemplate) bean; final @Nullable ProducerInterceptor existing = getExistingInterceptor(template); - if (existing instanceof SentryProducerInterceptor) { + if (existing instanceof SentryKafkaProducerInterceptor) { return bean; } @SuppressWarnings("rawtypes") - final SentryProducerInterceptor sentryInterceptor = - new SentryProducerInterceptor<>(ScopesAdapter.getInstance()); + final SentryKafkaProducerInterceptor sentryInterceptor = + new SentryKafkaProducerInterceptor<>( + ScopesAdapter.getInstance(), "auto.queue.spring_jakarta.kafka.producer"); if (existing != null) { @SuppressWarnings("rawtypes") diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index a48a3ab970..9cfda3c237 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -11,6 +11,7 @@ import io.sentry.SpanStatus; import io.sentry.TransactionContext; import io.sentry.TransactionOptions; +import io.sentry.kafka.SentryKafkaProducerInterceptor; import io.sentry.util.SpanUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -170,7 +171,7 @@ private boolean isIgnored() { } final @Nullable String enqueuedTimeStr = - headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr != null) { try { final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt index 25e1d3348e..f0247178f2 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -1,5 +1,6 @@ package io.sentry.spring.jakarta.kafka +import io.sentry.kafka.SentryKafkaProducerInterceptor import kotlin.test.Test import kotlin.test.assertSame import kotlin.test.assertTrue @@ -18,17 +19,17 @@ class SentryKafkaProducerBeanPostProcessorTest { } @Test - fun `sets SentryProducerInterceptor on KafkaTemplate`() { + fun `sets SentryKafkaProducerInterceptor on KafkaTemplate`() { val template = KafkaTemplate(mock>()) val processor = SentryKafkaProducerBeanPostProcessor() processor.postProcessAfterInitialization(template, "kafkaTemplate") - assertTrue(readInterceptor(template) is SentryProducerInterceptor<*, *>) + assertTrue(readInterceptor(template) is SentryKafkaProducerInterceptor<*, *>) } @Test - fun `does not double-wrap when SentryProducerInterceptor already set`() { + fun `does not double-wrap when SentryKafkaProducerInterceptor already set`() { val template = KafkaTemplate(mock>()) val processor = SentryKafkaProducerBeanPostProcessor() diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 15bbb6a293..1239b4007e 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -9,6 +9,7 @@ import io.sentry.SentryTraceHeader import io.sentry.SentryTracer import io.sentry.SpanDataConvention import io.sentry.TransactionContext +import io.sentry.kafka.SentryKafkaProducerInterceptor import io.sentry.test.initForTest import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @@ -99,7 +100,7 @@ class SentryKafkaRecordInterceptorTest { } enqueuedTime?.let { headers.add( - SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, it.toByteArray(StandardCharsets.UTF_8), ) } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt deleted file mode 100644 index f877b1e7d2..0000000000 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryProducerInterceptorTest.kt +++ /dev/null @@ -1,142 +0,0 @@ -package io.sentry.spring.jakarta.kafka - -import io.sentry.IScopes -import io.sentry.Sentry -import io.sentry.SentryOptions -import io.sentry.SentryTraceHeader -import io.sentry.SentryTracer -import io.sentry.TransactionContext -import io.sentry.test.initForTest -import java.nio.charset.StandardCharsets -import kotlin.test.AfterTest -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertNotNull -import kotlin.test.assertSame -import kotlin.test.assertTrue -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.clients.producer.RecordMetadata -import org.apache.kafka.common.TopicPartition -import org.mockito.kotlin.mock -import org.mockito.kotlin.whenever - -class SentryProducerInterceptorTest { - - private lateinit var scopes: IScopes - private lateinit var options: SentryOptions - - @BeforeTest - fun setup() { - initForTest { it.dsn = "https://key@sentry.io/proj" } - scopes = mock() - options = - SentryOptions().apply { - dsn = "https://key@sentry.io/proj" - isEnableQueueTracing = true - } - whenever(scopes.options).thenReturn(options) - } - - @AfterTest - fun teardown() { - Sentry.close() - } - - private fun createTransaction(): SentryTracer { - val tx = SentryTracer(TransactionContext("tx", "op"), scopes) - whenever(scopes.span).thenReturn(tx) - return tx - } - - @Test - fun `creates queue publish span with correct op and data`() { - val tx = createTransaction() - val interceptor = SentryProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - interceptor.onSend(record) - - assertEquals(1, tx.spans.size) - val span = tx.spans.first() - assertEquals("queue.publish", span.operation) - assertEquals("my-topic", span.description) - assertEquals("kafka", span.data["messaging.system"]) - assertEquals("my-topic", span.data["messaging.destination.name"]) - assertTrue(span.isFinished) - } - - @Test - fun `does not create span when queue tracing is disabled`() { - val tx = createTransaction() - options.isEnableQueueTracing = false - val interceptor = SentryProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - interceptor.onSend(record) - - assertEquals(0, tx.spans.size) - } - - @Test - fun `does not create span when no active span`() { - whenever(scopes.span).thenReturn(null) - val interceptor = SentryProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - val result = interceptor.onSend(record) - - assertSame(record, result) - } - - @Test - fun `injects sentry-trace, baggage, and enqueued-time headers`() { - createTransaction() - val interceptor = SentryProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - val result = interceptor.onSend(record) - - val resultHeaders = result.headers() - val sentryTraceHeader = resultHeaders.lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) - assertNotNull(sentryTraceHeader, "sentry-trace header should be injected") - - val enqueuedTimeHeader = - resultHeaders.lastHeader(SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) - assertNotNull(enqueuedTimeHeader, "sentry-task-enqueued-time header should be injected") - val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toDouble() - assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch seconds value") - } - - @Test - fun `span is finished synchronously in onSend`() { - val tx = createTransaction() - val interceptor = SentryProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - interceptor.onSend(record) - - assertEquals(1, tx.spans.size) - assertTrue(tx.spans.first().isFinished, "span should be finished after onSend returns") - } - - @Test - fun `onAcknowledgement does not throw`() { - val interceptor = SentryProducerInterceptor(scopes) - val metadata = RecordMetadata(TopicPartition("my-topic", 0), 0, 0, 0, 0, 0) - - interceptor.onAcknowledgement(metadata, null) - } - - @Test - fun `close does not throw`() { - val interceptor = SentryProducerInterceptor(scopes) - - interceptor.close() - } - - @Test - fun `trace origin is set correctly`() { - assertEquals("auto.queue.spring_jakarta.kafka.producer", SentryProducerInterceptor.TRACE_ORIGIN) - } -} diff --git a/sentry/src/main/java/io/sentry/util/SpanUtils.java b/sentry/src/main/java/io/sentry/util/SpanUtils.java index 7f21422ba6..c324feed84 100644 --- a/sentry/src/main/java/io/sentry/util/SpanUtils.java +++ b/sentry/src/main/java/io/sentry/util/SpanUtils.java @@ -42,6 +42,8 @@ public final class SpanUtils { origins.add("auto.http.ktor-client"); origins.add("auto.queue.spring_jakarta.kafka.producer"); origins.add("auto.queue.spring_jakarta.kafka.consumer"); + origins.add("auto.queue.kafka.producer"); + origins.add("auto.queue.kafka.consumer"); } if (SentryOpenTelemetryMode.AGENT == mode) { diff --git a/settings.gradle.kts b/settings.gradle.kts index 8d431d5fbd..4b1c606bc6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -58,6 +58,7 @@ include( "sentry-graphql-22", "sentry-graphql-core", "sentry-jdbc", + "sentry-kafka", "sentry-opentelemetry:sentry-opentelemetry-bootstrap", "sentry-opentelemetry:sentry-opentelemetry-core", "sentry-opentelemetry:sentry-opentelemetry-agentcustomization", From 07349388fe218828775e5c4909e4fb06fead9817 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 13 Apr 2026 12:58:43 +0200 Subject: [PATCH 2/2] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f94db09aad..5ce9b04b72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- Add `sentry-kafka` module for Kafka queue instrumentation without Spring ([#5288](https://github.com/getsentry/sentry-java/pull/5288)) - Add Kafka queue tracing for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)), ([#5255](https://github.com/getsentry/sentry-java/pull/5255)), ([#5256](https://github.com/getsentry/sentry-java/pull/5256)) - Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250)) - Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136))