-
-
Notifications
You must be signed in to change notification settings - Fork 468
feat(samples): [Queue Instrumentation 15] Add opt-in Kafka console e2e coverage #5289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
adinauer
merged 12 commits into
feat/queue-instrumentation
from
feat/queue-instrumentation-kafka-console-sample
May 5, 2026
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
007d27f
feat(kafka): Add no-arg producer interceptor for Kafka config
adinauer 82cfc37
feat(kafka): Add consumer demo to console sample
adinauer cb4d2ac
ref(samples): Extract Kafka console showcase into dedicated class
adinauer 33c4c79
feat(samples): Add opt-in Kafka console e2e coverage
adinauer 58b67b2
ref(samples): Move KafkaShowcase to kafka subpackage
adinauer daeba53
Update KafkaShowcase.java
adinauer a222362
Update KafkaShowcase.java
adinauer 7661f6c
Update KafkaShowcase.java
adinauer e02a907
Format code
getsentry-bot efd8727
fix
adinauer 540ea07
ref(samples): Clarify Kafka setup in console showcase
adinauer 2844be7
Merge remote-tracking branch 'origin/feat/queue-instrumentation-kafka…
adinauer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
132 changes: 132 additions & 0 deletions
132
...s/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| package io.sentry.samples.console.kafka; | ||
|
|
||
| import io.sentry.ISentryLifecycleToken; | ||
| import io.sentry.ITransaction; | ||
| import io.sentry.Sentry; | ||
| import io.sentry.kafka.SentryKafkaConsumerInterceptor; | ||
| import io.sentry.kafka.SentryKafkaProducerInterceptor; | ||
| import java.time.Duration; | ||
| import java.util.Collections; | ||
| import java.util.Properties; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.TimeUnit; | ||
| import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
| import org.apache.kafka.clients.producer.KafkaProducer; | ||
| import org.apache.kafka.clients.producer.ProducerConfig; | ||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||
| import org.apache.kafka.common.serialization.StringDeserializer; | ||
| import org.apache.kafka.common.serialization.StringSerializer; | ||
|
|
||
| public final class KafkaShowcase { | ||
|
|
||
| public static final String TOPIC = "sentry-topic-console-sample"; | ||
|
|
||
| private KafkaShowcase() {} | ||
|
|
||
| public static void runKafkaWithSentryInterceptors(final String bootstrapServers) { | ||
| final CountDownLatch consumedLatch = new CountDownLatch(1); | ||
| final Thread consumerThread = | ||
| startConsumerWithSentryInterceptor(bootstrapServers, consumedLatch); | ||
| final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers); | ||
|
|
||
| final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); | ||
| try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { | ||
| try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) { | ||
| Thread.sleep(500); | ||
| producer.send(new ProducerRecord<>(TOPIC, "sentry-kafka sample message")).get(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } catch (Exception ignoredException) { | ||
| // local broker may not be available when running the sample | ||
| } | ||
|
|
||
| try { | ||
| consumedLatch.await(5, TimeUnit.SECONDS); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } finally { | ||
| consumerThread.interrupt(); | ||
| try { | ||
| consumerThread.join(1000); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| transaction.finish(); | ||
| } | ||
| } | ||
|
|
||
| public static Properties createProducerPropertiesWithSentry(final String bootstrapServers) { | ||
| final Properties producerProperties = new Properties(); | ||
| producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
| producerProperties.put( | ||
| ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
| producerProperties.put( | ||
| ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
|
|
||
| // Required for Sentry queue tracing in kafka-clients producer setup. | ||
| producerProperties.put( | ||
| ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); | ||
|
|
||
| // Optional tuning for sample stability in CI/local runs. | ||
| producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); | ||
| producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); | ||
| producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000); | ||
|
|
||
| return producerProperties; | ||
| } | ||
|
|
||
| public static Properties createConsumerPropertiesWithSentry(final String bootstrapServers) { | ||
| final Properties consumerProperties = new Properties(); | ||
| consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
| consumerProperties.put( | ||
| ConsumerConfig.GROUP_ID_CONFIG, "sentry-console-sample-" + UUID.randomUUID()); | ||
| consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
| consumerProperties.put( | ||
| ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
| consumerProperties.put( | ||
| ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
|
|
||
| // Required for Sentry queue tracing in kafka-clients consumer setup. | ||
| consumerProperties.put( | ||
| ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName()); | ||
|
|
||
| // Optional tuning for sample stability in CI/local runs. | ||
| consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); | ||
| consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); | ||
|
|
||
| return consumerProperties; | ||
| } | ||
|
|
||
| private static Thread startConsumerWithSentryInterceptor( | ||
| final String bootstrapServers, final CountDownLatch consumedLatch) { | ||
| final Thread consumerThread = | ||
| new Thread( | ||
| () -> { | ||
| final Properties consumerProperties = | ||
| createConsumerPropertiesWithSentry(bootstrapServers); | ||
|
|
||
| try (KafkaConsumer<String, String> consumer = | ||
| new KafkaConsumer<>(consumerProperties)) { | ||
| consumer.subscribe(Collections.singletonList(TOPIC)); | ||
|
|
||
| while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { | ||
| final ConsumerRecords<String, String> records = | ||
| consumer.poll(Duration.ofMillis(500)); | ||
| if (!records.isEmpty()) { | ||
| consumedLatch.countDown(); | ||
| break; | ||
| } | ||
| } | ||
| } catch (Exception ignored) { | ||
| // local broker may not be available when running the sample | ||
| } | ||
| }, | ||
| "sentry-kafka-sample-consumer"); | ||
| consumerThread.start(); | ||
| return consumerThread; | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.