Skip to content

feat(infrastructure): add OpenTelemetry tracing spans to infrastructure components#61

Merged
yilmaztayfun merged 1 commit into
masterfrom
60-enhancement-add-opentelemetry-tracing-spans-to-distributed-event-bus-pipeline
Apr 25, 2026
Merged

feat(infrastructure): add OpenTelemetry tracing spans to infrastructure components#61
yilmaztayfun merged 1 commit into
masterfrom
60-enhancement-add-opentelemetry-tracing-spans-to-distributed-event-bus-pipeline

Conversation

@yilmaztayfun
Copy link
Copy Markdown
Contributor

@yilmaztayfun yilmaztayfun commented Apr 25, 2026

Summary

  • Add a shared InfrastructureActivitySource (BBT.Aether.Infrastructure) and register it in the telemetry pipeline
  • Instrument distributed lock (Dapr & Redis) and distributed cache (Dapr & Redis) operations with Activity spans and semantic tags
  • Instrument background job scheduling, execution, and dispatch (Dapr Jobs) with Activity spans
  • Instrument distributed event bus pipeline -- publishing, outbox processing, inbox processing, and handler invocation -- with Activity spans

Changes

  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs — new shared ActivitySource
  • framework/src/BBT.Aether.AspNetCore/.../AetherTelemetryServiceCollectionExtensions.cs — register source
  • framework/src/BBT.Aether.Infrastructure/.../DistributedLock/Dapr/DaprDistributedLockService.cs — lock spans
  • framework/src/BBT.Aether.Infrastructure/.../DistributedLock/Redis/RedisDistributedLockService.cs — lock spans
  • framework/src/BBT.Aether.Infrastructure/.../DistributedLock/Redis/RedisLockHandle.cs — release span
  • framework/src/BBT.Aether.Infrastructure/.../DistributedCache/Dapr/DaprDistributedCacheService.cs — cache spans
  • framework/src/BBT.Aether.Infrastructure/.../DistributedCache/Redis/RedisDistributedCacheService.cs — cache spans
  • framework/src/BBT.Aether.Infrastructure/.../BackgroundJob/BackgroundJobService.cs — job spans
  • framework/src/BBT.Aether.Infrastructure/.../BackgroundJob/Dapr/DaprJobScheduler.cs — scheduler spans
  • framework/src/BBT.Aether.Infrastructure/.../BackgroundJob/Dapr/DaprJobExecutionBridge.cs — execution span
  • framework/src/BBT.Aether.Infrastructure/.../BackgroundJob/JobDispatcher.cs — dispatch span
  • framework/src/BBT.Aether.Infrastructure/.../Events/Distributed/DistributedEventBusBase.cs — publish spans
  • framework/src/BBT.Aether.Infrastructure/.../Events/Distributed/DaprEventBus.cs — broker spans
  • framework/src/BBT.Aether.Infrastructure/.../Events/Processing/OutboxProcessor.cs — outbox span
  • framework/src/BBT.Aether.Infrastructure/.../Events/Processing/InboxProcessor.cs — inbox span
  • framework/src/BBT.Aether.Infrastructure/.../Events/Distributed/DistributedEventInvoker.cs — invoker span
  • framework/docs/distributed-lock/README.md — tracing docs
  • framework/docs/background-job/README.md — tracing docs
  • framework/docs/distributed-events/README.md — tracing docs
  • framework/docs/telemetry/README.md — updated infrastructure spans list

Test Plan

  • Run dotnet build — solution compiles with zero errors
  • Deploy to dev environment and verify spans appear in Jaeger/Tempo
  • Publish an event directly and confirm EventBus.PublishEventBus.PublishToBroker hierarchy
  • Publish via outbox and confirm Outbox.ProcessEventBus.PublishEnvelopeEventBus.PublishToBroker hierarchy
  • Consume an event and confirm Inbox.ProcessInbox.Invoke hierarchy
  • Acquire/release a distributed lock and confirm DistributedLock.Acquire / DistributedLock.Release spans
  • Perform cache get/set/remove and confirm DistributedCache.* spans
  • Schedule and execute a background job and confirm BackgroundJob.* spans
  • Verify all spans carry expected semantic tags (event.name, lock.provider, cache.key, etc.)

Closes #60

Made with Cursor

Summary by Sourcery

Instrument infrastructure components with OpenTelemetry spans and document the new tracing behavior.

New Features:

  • Add a shared Infrastructure ActivitySource and register it in the telemetry pipeline for infrastructure tracing.
  • Emit OpenTelemetry spans for distributed event bus publishing, outbox processing, and inbox processing operations.
  • Emit OpenTelemetry spans for distributed lock acquisition, execution, and release for both Dapr and Redis providers.
  • Emit OpenTelemetry spans for distributed cache operations (get, set, remove, refresh) for both Dapr and Redis providers.
  • Emit OpenTelemetry spans for background job scheduling, execution, and dispatch including Dapr job scheduler integration.

Documentation:

  • Document tracing behavior, span names, and semantic tags for distributed events, background jobs, and distributed locks.
  • Update telemetry documentation to include infrastructure spans and automatic instrumentation details.

Summary by CodeRabbit

  • New Features

    • Added OpenTelemetry tracing for background jobs, distributed cache, distributed locks, and event bus operations.
    • Infrastructure operations now emit detailed spans with semantic tags for enhanced observability.
  • Documentation

    • Added comprehensive tracing guides documenting span names, tags, and example trace hierarchies for all instrumented features.

…re components

Instrument distributed lock, distributed cache, background job, and
event bus operations with Activity spans via a new shared
InfrastructureActivitySource. Each span carries semantic tags
(provider, resource, event name, topic, etc.) and records exceptions
following OpenTelemetry conventions.

Closes #60
@yilmaztayfun yilmaztayfun requested review from a team April 25, 2026 06:22
@yilmaztayfun yilmaztayfun linked an issue Apr 25, 2026 that may be closed by this pull request
10 tasks
@yilmaztayfun yilmaztayfun requested a review from a team April 25, 2026 06:22
@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented Apr 25, 2026

Reviewer's Guide

Adds a shared InfrastructureActivitySource and wires it into Aether telemetry, then instruments distributed lock, distributed cache, background job, and distributed event bus pipelines (including outbox/inbox processing) with OpenTelemetry Activity spans and semantic tags, plus documentation updates describing the new tracing behavior.

Sequence diagram for EventBus publish and outbox tracing spans

sequenceDiagram
    actor Client
    participant Api as AspNetCore_API
    participant Bus as DistributedEventBusBase
    participant Outbox as OutboxProcessor
    participant DaprBus as DaprEventBus
    participant Dapr as Dapr_PubSub

    Client->>Api: HTTP request
    Api->>Bus: PublishAsync(event, useOutbox=false)
    activate Bus
    Note over Bus: Start Activity EventBus.Publish<br/>set event.name, event.topic, event.use_outbox
    Bus->>DaprBus: PublishToBrokerAsync<TEvent>(topic, serializedEnvelope)
    activate DaprBus
    Note over DaprBus: Start Activity EventBus.PublishToBroker<br/>set event.topic, event.pubsub_name, event.broker=dapr
    DaprBus->>Dapr: PublishEventAsync(pubSubName, topic, envelope)
    Dapr-->>DaprBus: 200 OK
    DaprBus-->Bus: complete
    deactivate DaprBus
    Note over Bus: Set ActivityStatusCode.Ok
    Bus-->Api: complete
    deactivate Bus

    Note over Client,Outbox: === Outbox path ===

    Client->>Api: HTTP request
    Api->>Bus: PublishAsync(event, useOutbox=true)
    activate Bus
    Note over Bus: Start Activity EventBus.Publish<br/>event.use_outbox=true
    Bus->>Outbox: StoreInOutboxAsync(envelope)
    Outbox-->Bus: persisted
    Note over Bus: Set ActivityStatusCode.Ok
    Bus-->Api: complete
    deactivate Bus

    loop background processing
        Outbox->>Outbox: ProcessOutboxMessagesAsync
        activate Outbox
        Note over Outbox: Start Activity Outbox.Process<br/>set event.name, event.topic, event.pubsub_name,<br/>outbox.message_id, outbox.retry_count
        Outbox->>Bus: PublishEnvelopeAsync(serializedEnvelope, topic, pubSubName)
        activate Bus
        Note over Bus: Start Activity EventBus.PublishEnvelope<br/>set event.topic, event.pubsub_name
        Bus->>DaprBus: PublishToBrokerAsync(topic, pubSubName, serializedEnvelope)
        activate DaprBus
        Note over DaprBus: Start Activity EventBus.PublishToBroker<br/>event.broker=dapr
        DaprBus->>Dapr: PublishEventAsync(pubSubName, topic, envelope)
        Dapr-->>DaprBus: 200 OK
        DaprBus-->Bus: complete
        deactivate DaprBus
        Note over Bus: Set ActivityStatusCode.Ok
        Bus-->Outbox: complete
        deactivate Bus
        Note over Outbox: Set ActivityStatusCode.Ok
        Outbox-->Outbox: Mark message as Processed
        deactivate Outbox
    end
Loading

Sequence diagram for background job scheduling and execution spans

sequenceDiagram
    actor Client
    participant Api as AspNetCore_API
    participant JobSvc as BackgroundJobService
    participant Scheduler as DaprJobScheduler
    participant DaprJobs as Dapr_Jobs
    participant Bridge as DaprJobExecutionBridge
    participant Dispatcher as JobDispatcher
    participant Handler as BackgroundJobHandler

    Client->>Api: HTTP request
    Api->>JobSvc: EnqueueAsync(handlerName, jobName, schedule, payload)
    activate JobSvc
    Note over JobSvc: Start Activity BackgroundJob.Enqueue<br/>set job.handler_name, job.name, job.schedule, job.id
    JobSvc->>Scheduler: ScheduleAsync(handlerName, jobName, schedule, payload)
    activate Scheduler
    Note over Scheduler: Start Activity BackgroundJob.Schedule<br/>set job.scheduler=dapr, job.handler_name, job.name, job.schedule
    Scheduler->>DaprJobs: ScheduleJobAsync(jobName, schedule, payload)
    DaprJobs-->>Scheduler: scheduled
    Note over Scheduler: Set ActivityStatusCode.Ok
    Scheduler-->JobSvc: complete
    deactivate Scheduler
    Note over JobSvc: Set ActivityStatusCode.Ok
    JobSvc-->Api: jobId
    deactivate JobSvc

    %% === Execution via Dapr callback ===

    DaprJobs->>Bridge: ExecuteAsync(jobName, payload)
    activate Bridge
    Note over Bridge: Start Activity BackgroundJob.Execute<br/>set job.scheduler=dapr, job.name
    Bridge->>Dispatcher: DispatchAsync(jobId, handlerName, dataPayload)
    activate Dispatcher
    Note over Dispatcher: Start Activity BackgroundJob.Dispatch<br/>set job.id, job.handler_name
    Dispatcher->>Handler: HandleAsync(argsPayload)
    activate Handler
    Handler-->>Dispatcher: completed
    deactivate Handler
    Note over Dispatcher: Set job.name, job.status=completed<br/>Set ActivityStatusCode.Ok
    Dispatcher-->Bridge: complete
    deactivate Dispatcher
    Note over Bridge: Set job.handler_name, job.id<br/>Set ActivityStatusCode.Ok
    Bridge-->>DaprJobs: completed
    deactivate Bridge
Loading

Class diagram for InfrastructureActivitySource and instrumented services

classDiagram
    class InfrastructureActivitySource {
        <<static>>
        +const string SourceName
        +const string Version
        +static ActivitySource Source
    }

    class DaprDistributedLockService {
        +Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
        +Task~(bool Acquired, T? Result)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
        -Activity? StartLockActivity(string operationName, string resourceId, int expiryInSeconds)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class RedisDistributedLockService {
        +Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
        +Task~(bool Acquired, T? Result)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
        -static Activity? StartLockActivity(string operationName, string resourceId, int expiryInSeconds)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class RedisLockHandle {
        +ValueTask DisposeAsync()
        -ValueTask ReleaseAsync()
    }

    class DaprDistributedCacheService {
        +Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
        +Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions? options, CancellationToken cancellationToken)
        +Task RemoveAsync(string key, CancellationToken cancellationToken)
        +Task RefreshAsync(string key, CancellationToken cancellationToken)
        -Activity? StartCacheActivity(string operationName, string key)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class RedisDistributedCacheService {
        +Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
        +Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions? options, CancellationToken cancellationToken)
        +Task RemoveAsync(string key, CancellationToken cancellationToken)
        +Task RefreshAsync(string key, CancellationToken cancellationToken)
        -static Activity? StartCacheActivity(string operationName, string key)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class BackgroundJobService {
        +Task~Guid~ EnqueueAsync~TPayload~(string handlerName, string jobName, string schedule, TPayload payload, BackgroundJobMetadata? metadata, CancellationToken cancellationToken)
        +Task UpdateAsync(Guid id, string newSchedule, CancellationToken cancellationToken)
        +Task~bool~ DeleteAsync(Guid id, CancellationToken cancellationToken)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class DaprJobScheduler {
        +Task ScheduleAsync(string handlerName, string jobName, string schedule, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
        +Task UpdateScheduleAsync(string handlerName, string jobName, string newSchedule, CancellationToken cancellationToken)
        +Task DeleteAsync(string handlerName, string jobName, CancellationToken cancellationToken)
        -static Activity? StartSchedulerActivity(string operationName, string handlerName, string jobName)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class DaprJobExecutionBridge {
        +Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
    }

    class JobDispatcher {
        +Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
    }

    class DistributedEventBusBase {
        +Task PublishAsync~TEvent~(TEvent @event, bool useOutbox, CancellationToken cancellationToken)
        +Task PublishAsync(object @event, CloudEventMetadata metadata, string? subject, bool useOutbox, CancellationToken cancellationToken)
        +Task PublishEnvelopeAsync(byte[] serializedEnvelope, string topicName, string pubSubName, CancellationToken cancellationToken)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class DaprEventBus {
        +Task PublishToBrokerAsync~TEvent~(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken)
        +Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class OutboxProcessor {
        +Task ProcessAsync(CancellationToken cancellationToken)
        -Task ProcessOutboxMessagesAsync(CancellationToken cancellationToken)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class InboxProcessor {
        +Task ProcessAsync(CancellationToken cancellationToken)
        -Task ProcessSingleEventAsync(InboxMessage inboxMessage, IServiceProvider scopedServiceProvider, CancellationToken cancellationToken)
        -static void RecordException(Activity? activity, Exception ex)
    }

    class DistributedEventInvoker~T~ {
        +Task InvokeAsync(IServiceProvider serviceProvider, ReadOnlyMemory~byte~ body, CancellationToken cancellationToken)
        -static void RecordException(Activity? activity, Exception ex)
    }

    InfrastructureActivitySource <.. DaprDistributedLockService : uses Source
    InfrastructureActivitySource <.. RedisDistributedLockService : uses Source
    InfrastructureActivitySource <.. RedisLockHandle : uses Source
    InfrastructureActivitySource <.. DaprDistributedCacheService : uses Source
    InfrastructureActivitySource <.. RedisDistributedCacheService : uses Source
    InfrastructureActivitySource <.. BackgroundJobService : uses Source
    InfrastructureActivitySource <.. DaprJobScheduler : uses Source
    InfrastructureActivitySource <.. DaprJobExecutionBridge : uses Source
    InfrastructureActivitySource <.. JobDispatcher : uses Source
    InfrastructureActivitySource <.. DistributedEventBusBase : uses Source
    InfrastructureActivitySource <.. DaprEventBus : uses Source
    InfrastructureActivitySource <.. OutboxProcessor : uses Source
    InfrastructureActivitySource <.. InboxProcessor : uses Source
    InfrastructureActivitySource <.. DistributedEventInvoker~T~ : uses Source
Loading

File-Level Changes

Change Details Files
Introduce shared infrastructure ActivitySource and register it with the telemetry pipeline so infrastructure spans are exported.
  • Add InfrastructureActivitySource static class exposing a named ActivitySource for infrastructure spans.
  • Register BBT.Aether.Infrastructure as a tracing source in AddAetherTelemetry so spans from infrastructure components are collected.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs
framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs
Instrument distributed lock implementations (Dapr and Redis) with lock-related spans and semantic tags.
  • Wrap lock acquisition, release, and ExecuteWithLock operations in DistributedLock.* activities using the shared ActivitySource.
  • Add tags such as lock.provider, lock.resource_id, lock.store_name, lock.expiry_seconds, lock.acquired, and lock.released.
  • Record exceptions on activities with status Error and exception events in both Dapr and Redis services and the Redis lock handle.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs
framework/docs/distributed-lock/README.md
Instrument distributed cache implementations (Dapr and Redis) with cache spans and tags.
  • Wrap Get/Set/Remove/Refresh operations in DistributedCache.* activities using the shared ActivitySource.
  • Add tags including cache.provider, cache.key, cache.store_name, cache.hit, and cache.ttl_seconds where applicable.
  • Record errors on activities and attach exception details when cache operations fail.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs
framework/docs/telemetry/README.md
Instrument background job lifecycle (scheduling, execution, dispatch) with spans and job metadata tags.
  • Add BackgroundJob.Enqueue, BackgroundJob.Update, and BackgroundJob.Delete activities in BackgroundJobService with job id, handler name, job name, and schedule tags plus exception recording.
  • Add scheduler-level activities BackgroundJob.Schedule, BackgroundJob.Schedule.Update, and BackgroundJob.Schedule.Delete in the Dapr scheduler with job.scheduler, handler, and job name tags.
  • Add consumer-side activities BackgroundJob.Execute in the Dapr execution bridge and BackgroundJob.Dispatch in the dispatcher with tags for job id, name, handler, scheduler, and status, and record exceptions where failures occur.
  • Document the new scheduling and execution spans and tags in the background job README and cross-link telemetry docs.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
framework/docs/background-job/README.md
framework/docs/telemetry/README.md
Instrument distributed event bus publishing, outbox processing, inbox processing, and handler invocation with tracing spans and event metadata tags.
  • Wrap publish operations in EventBus.Publish activities in DistributedEventBusBase, adding tags for event name, topic, pubsub name, and event.use_outbox, and mark success or record exceptions; introduce EventBus.PublishEnvelope for pre-serialized envelopes used by the outbox.
  • Wrap Dapr broker publish operations in EventBus.PublishToBroker activities in DaprEventBus, adding broker, topic, and pubsub tags and exception recording, and provide both generic and non-generic overrides.
  • Add Outbox.Process activities around per-message outbox publishing, tagging event name, topic, pubsub name, message id, and retry count and updating status based on outcome.
  • Add Inbox.Process around inbox message processing in InboxProcessor and Inbox.Invoke in DistributedEventInvoker, tagging event identity, version, handler name, and marking errors on deserialization/handler resolution/handler failures.
  • Extend distributed-events README and telemetry README with span names, tags, and example trace hierarchies covering direct publish, outbox publish, and inbox consumption.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
framework/docs/distributed-events/README.md
framework/docs/telemetry/README.md

Assessment against linked issues

Issue Objective Addressed Explanation
#60 Add OpenTelemetry Activity spans to the distributed event bus pipeline (DistributedEventBusBase.PublishAsync overloads, PublishEnvelopeAsync, DaprEventBus.PublishToBrokerAsync overloads, OutboxProcessor.ProcessOutboxMessagesAsync, InboxProcessor.ProcessSingleEventAsync, DistributedEventInvoker.InvokeAsync), including the specified span names, semantic tags, and error recording with ActivityStatusCode.Error and an exception event.
#60 Integrate the shared InfrastructureActivitySource (BBT.Aether.Infrastructure) into the telemetry setup and update documentation (distributed-events/README.md and telemetry/README.md) to describe the event bus tracing spans, tags, and example trace hierarchies.

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 25, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7317a146-2087-48bd-8fb9-f4e01b65dad6

📥 Commits

Reviewing files that changed from the base of the PR and between a237853 and 2e31af3.

📒 Files selected for processing (20)
  • framework/docs/background-job/README.md
  • framework/docs/distributed-events/README.md
  • framework/docs/distributed-lock/README.md
  • framework/docs/telemetry/README.md
  • framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs
  • framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs

📝 Walkthrough

Walkthrough

This PR adds comprehensive OpenTelemetry tracing instrumentation across distributed event bus, background job, distributed lock, and distributed cache infrastructure components using the BBT.Aether.Infrastructure ActivitySource. Changes include semantic tagging, exception recording, and updated documentation reflecting the new observability capabilities.

Changes

Cohort / File(s) Summary
Documentation: Telemetry Overview
framework/docs/telemetry/README.md
Extended feature list to claim automatic instrumentation for distributed lock, cache, and event bus. Added "Infrastructure Spans" subsection enumerating specific span names and semantic conventions for all infrastructure operations (locks, cache, background jobs, events).
Documentation: Feature-Specific Guides
framework/docs/background-job/README.md, framework/docs/distributed-events/README.md, framework/docs/distributed-lock/README.md
Added new "Tracing" sections documenting emitted span names, semantic tags, example trace hierarchies, and OpenTelemetry ActivitySource usage for each feature area.
Telemetry Infrastructure Setup
framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs
Introduced new InfrastructureActivitySource public class centralizing tracing configuration (source name, version, shared ActivitySource instance). Updated DI registration to add "BBT.Aether.Infrastructure" source to OpenTelemetry tracer.
Background Job Tracing
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
Added Activity instrumentation around job enqueue/update/delete (BackgroundJobService), scheduler registration/update/delete (DaprJobScheduler), execution dispatch (DaprJobExecutionBridge), and dispatch processing (JobDispatcher). Each records job identifiers, status codes, and exceptions with semantic tags.
Distributed Lock Tracing
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs
Added Activity instrumentation for acquire/release/execute operations across Dapr and Redis implementations. Each activity records lock acquisition/release outcomes, provider/resource identifiers, expiry, and exceptions using semantic tags.
Distributed Cache Tracing
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs
Added Activity instrumentation for get/set/remove/refresh operations across Dapr and Redis. Each activity records cache hits, TTL values, provider, key, and exceptions with semantic tags.
Distributed Event Bus Tracing
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs, framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs
Added Activity instrumentation across event publishing (PublishAsync, PublishEnvelopeAsync, PublishToBrokerAsync), outbox/inbox processing, and handler invocation. Activities record event identity, topic, pubsub name, outbox metadata, and exceptions. PublishEnvelopeAsync signature changed from expression-bodied to async method.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • darcoakk
  • mokoker

Poem

🐰 Hops with glee over shimmering traces,
ActivitySources light up the places,
From cache to lock to events that flow,
OpenTelemetry's observability glow!
Each span a story, each tag a clue—
Now Jaeger shows exactly what's true! 🌟

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch 60-enhancement-add-opentelemetry-tracing-spans-to-distributed-event-bus-pipeline

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yilmaztayfun yilmaztayfun merged commit d9f3fc4 into master Apr 25, 2026
4 of 6 checks passed
Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • There’s quite a bit of duplicated tracing logic (e.g., repeated RecordException helpers and very similar Start*Activity implementations across locks, cache, jobs, events); consider centralizing these as shared helpers or extension methods under BBT.Aether.Telemetry to reduce repetition and keep behavior consistent.
  • The custom tag keys (e.g., event.*, cache.*, lock.*, job.*) are clear but diverge from OpenTelemetry semantic conventions for messaging/cache/locks; it may be worth mapping these to (or augmenting them with) the standard messaging.*/db.*/exception.* attributes so downstream tooling can understand them better.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- There’s quite a bit of duplicated tracing logic (e.g., repeated `RecordException` helpers and very similar `Start*Activity` implementations across locks, cache, jobs, events); consider centralizing these as shared helpers or extension methods under `BBT.Aether.Telemetry` to reduce repetition and keep behavior consistent.
- The custom tag keys (e.g., `event.*`, `cache.*`, `lock.*`, `job.*`) are clear but diverge from OpenTelemetry semantic conventions for messaging/cache/locks; it may be worth mapping these to (or augmenting them with) the standard `messaging.*`/`db.*`/`exception.*` attributes so downstream tooling can understand them better.

## Individual Comments

### Comment 1
<location path="framework/docs/telemetry/README.md" line_range="11" />
<code_context>
 - **OpenTelemetry Standard** - Industry-standard observability
 - **Three Pillars** - Traces, Metrics, and Logs
-- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core
+- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus
 - **OTLP Exporters** - Export to any OTLP-compatible backend
 - **Environment Variable Support** - Standard OTEL_* variables
</code_context>
<issue_to_address>
**nitpick (typo):** Consider using the canonical `ASP.NET Core` naming for consistency.

This list currently uses `AspNetCore`, while other docs (e.g., trace examples) and the official product name use `ASP.NET Core`. Updating it here would align with the official naming and keep the docs consistent.

```suggestion
- **Automatic Instrumentation** - ASP.NET Core, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

- **OpenTelemetry Standard** - Industry-standard observability
- **Three Pillars** - Traces, Metrics, and Logs
- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core
- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick (typo): Consider using the canonical ASP.NET Core naming for consistency.

This list currently uses AspNetCore, while other docs (e.g., trace examples) and the official product name use ASP.NET Core. Updating it here would align with the official naming and keep the docs consistent.

Suggested change
- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus
- **Automatic Instrumentation** - ASP.NET Core, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces comprehensive OpenTelemetry instrumentation across the Aether infrastructure, including distributed locks, caches, background jobs, and the event bus. It adds a new InfrastructureActivitySource and updates documentation to reflect these changes. I have identified two significant improvements: the Dapr distributed lock implementation currently lacks instance-level uniqueness in its lock owner identifier, which could lead to race conditions, and the exception recording logic is currently duplicated across many files and should be centralized for better maintainability.

private string GetClientIdentifier()
{
return
($"{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.{applicationInfoAccessor.ApplicationName}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The lock owner identifier is missing the InstanceId, which is correctly included in the Redis implementation. In Dapr's distributed lock API, the owner string is used to authorize the Unlock operation. If multiple instances of the same service share the same owner string, one instance could inadvertently unlock a resource held by another instance, leading to potential race conditions. Including applicationInfoAccessor.InstanceId ensures that the lock owner is unique to the specific instance.

            ($"{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.{applicationInfoAccessor.ApplicationName}.{applicationInfoAccessor.InstanceId}")

Comment on lines +24 to +25
public static readonly ActivitySource Source = new(SourceName, Version);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The RecordException logic is duplicated across 11+ files in this pull request. Centralizing it here as a static helper method improves maintainability and ensures consistent exception recording (including stack traces) across all infrastructure components following OpenTelemetry semantic conventions.

    public static readonly ActivitySource Source = new(SourceName, Version);

    /// <summary>
    /// Records an exception to the activity following OpenTelemetry semantic conventions.
    /// </summary>
    public static void RecordException(Activity? activity, Exception ex)
    {
        if (activity == null) return;

        activity.SetStatus(ActivityStatusCode.Error, ex.Message);
        activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
        {
            { "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
            { "exception.message", ex.Message },
            { "exception.stacktrace", ex.ToString() }
        }));
    }
}

@sonarqubecloud
Copy link
Copy Markdown

@codacy-production
Copy link
Copy Markdown

Up to standards ✅

🟢 Issues 0 issues

Results:
0 new issues

View in Codacy

🟢 Metrics 90 duplication

Metric Results
Duplication 90

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[enhancement] Add OpenTelemetry tracing spans to distributed event bus pipeline

1 participant