feat(infrastructure): add OpenTelemetry tracing spans to infrastructure components#61
Conversation
…re components Instrument distributed lock, distributed cache, background job, and event bus operations with Activity spans via a new shared InfrastructureActivitySource. Each span carries semantic tags (provider, resource, event name, topic, etc.) and records exceptions following OpenTelemetry conventions. Closes #60
Reviewer's GuideAdds a shared Sequence diagram for EventBus publish and outbox tracing spanssequenceDiagram
actor Client
participant Api as AspNetCore_API
participant Bus as DistributedEventBusBase
participant Outbox as OutboxProcessor
participant DaprBus as DaprEventBus
participant Dapr as Dapr_PubSub
Client->>Api: HTTP request
Api->>Bus: PublishAsync(event, useOutbox=false)
activate Bus
Note over Bus: Start Activity EventBus.Publish<br/>set event.name, event.topic, event.use_outbox
Bus->>DaprBus: PublishToBrokerAsync<TEvent>(topic, serializedEnvelope)
activate DaprBus
Note over DaprBus: Start Activity EventBus.PublishToBroker<br/>set event.topic, event.pubsub_name, event.broker=dapr
DaprBus->>Dapr: PublishEventAsync(pubSubName, topic, envelope)
Dapr-->>DaprBus: 200 OK
DaprBus-->Bus: complete
deactivate DaprBus
Note over Bus: Set ActivityStatusCode.Ok
Bus-->Api: complete
deactivate Bus
Note over Client,Outbox: === Outbox path ===
Client->>Api: HTTP request
Api->>Bus: PublishAsync(event, useOutbox=true)
activate Bus
Note over Bus: Start Activity EventBus.Publish<br/>event.use_outbox=true
Bus->>Outbox: StoreInOutboxAsync(envelope)
Outbox-->Bus: persisted
Note over Bus: Set ActivityStatusCode.Ok
Bus-->Api: complete
deactivate Bus
loop background processing
Outbox->>Outbox: ProcessOutboxMessagesAsync
activate Outbox
Note over Outbox: Start Activity Outbox.Process<br/>set event.name, event.topic, event.pubsub_name,<br/>outbox.message_id, outbox.retry_count
Outbox->>Bus: PublishEnvelopeAsync(serializedEnvelope, topic, pubSubName)
activate Bus
Note over Bus: Start Activity EventBus.PublishEnvelope<br/>set event.topic, event.pubsub_name
Bus->>DaprBus: PublishToBrokerAsync(topic, pubSubName, serializedEnvelope)
activate DaprBus
Note over DaprBus: Start Activity EventBus.PublishToBroker<br/>event.broker=dapr
DaprBus->>Dapr: PublishEventAsync(pubSubName, topic, envelope)
Dapr-->>DaprBus: 200 OK
DaprBus-->Bus: complete
deactivate DaprBus
Note over Bus: Set ActivityStatusCode.Ok
Bus-->Outbox: complete
deactivate Bus
Note over Outbox: Set ActivityStatusCode.Ok
Outbox-->Outbox: Mark message as Processed
deactivate Outbox
end
Sequence diagram for background job scheduling and execution spanssequenceDiagram
actor Client
participant Api as AspNetCore_API
participant JobSvc as BackgroundJobService
participant Scheduler as DaprJobScheduler
participant DaprJobs as Dapr_Jobs
participant Bridge as DaprJobExecutionBridge
participant Dispatcher as JobDispatcher
participant Handler as BackgroundJobHandler
Client->>Api: HTTP request
Api->>JobSvc: EnqueueAsync(handlerName, jobName, schedule, payload)
activate JobSvc
Note over JobSvc: Start Activity BackgroundJob.Enqueue<br/>set job.handler_name, job.name, job.schedule, job.id
JobSvc->>Scheduler: ScheduleAsync(handlerName, jobName, schedule, payload)
activate Scheduler
Note over Scheduler: Start Activity BackgroundJob.Schedule<br/>set job.scheduler=dapr, job.handler_name, job.name, job.schedule
Scheduler->>DaprJobs: ScheduleJobAsync(jobName, schedule, payload)
DaprJobs-->>Scheduler: scheduled
Note over Scheduler: Set ActivityStatusCode.Ok
Scheduler-->JobSvc: complete
deactivate Scheduler
Note over JobSvc: Set ActivityStatusCode.Ok
JobSvc-->Api: jobId
deactivate JobSvc
%% === Execution via Dapr callback ===
DaprJobs->>Bridge: ExecuteAsync(jobName, payload)
activate Bridge
Note over Bridge: Start Activity BackgroundJob.Execute<br/>set job.scheduler=dapr, job.name
Bridge->>Dispatcher: DispatchAsync(jobId, handlerName, dataPayload)
activate Dispatcher
Note over Dispatcher: Start Activity BackgroundJob.Dispatch<br/>set job.id, job.handler_name
Dispatcher->>Handler: HandleAsync(argsPayload)
activate Handler
Handler-->>Dispatcher: completed
deactivate Handler
Note over Dispatcher: Set job.name, job.status=completed<br/>Set ActivityStatusCode.Ok
Dispatcher-->Bridge: complete
deactivate Dispatcher
Note over Bridge: Set job.handler_name, job.id<br/>Set ActivityStatusCode.Ok
Bridge-->>DaprJobs: completed
deactivate Bridge
Class diagram for InfrastructureActivitySource and instrumented servicesclassDiagram
class InfrastructureActivitySource {
<<static>>
+const string SourceName
+const string Version
+static ActivitySource Source
}
class DaprDistributedLockService {
+Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
+Task~(bool Acquired, T? Result)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
-Activity? StartLockActivity(string operationName, string resourceId, int expiryInSeconds)
-static void RecordException(Activity? activity, Exception ex)
}
class RedisDistributedLockService {
+Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
+Task~(bool Acquired, T? Result)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
-static Activity? StartLockActivity(string operationName, string resourceId, int expiryInSeconds)
-static void RecordException(Activity? activity, Exception ex)
}
class RedisLockHandle {
+ValueTask DisposeAsync()
-ValueTask ReleaseAsync()
}
class DaprDistributedCacheService {
+Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
+Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions? options, CancellationToken cancellationToken)
+Task RemoveAsync(string key, CancellationToken cancellationToken)
+Task RefreshAsync(string key, CancellationToken cancellationToken)
-Activity? StartCacheActivity(string operationName, string key)
-static void RecordException(Activity? activity, Exception ex)
}
class RedisDistributedCacheService {
+Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
+Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions? options, CancellationToken cancellationToken)
+Task RemoveAsync(string key, CancellationToken cancellationToken)
+Task RefreshAsync(string key, CancellationToken cancellationToken)
-static Activity? StartCacheActivity(string operationName, string key)
-static void RecordException(Activity? activity, Exception ex)
}
class BackgroundJobService {
+Task~Guid~ EnqueueAsync~TPayload~(string handlerName, string jobName, string schedule, TPayload payload, BackgroundJobMetadata? metadata, CancellationToken cancellationToken)
+Task UpdateAsync(Guid id, string newSchedule, CancellationToken cancellationToken)
+Task~bool~ DeleteAsync(Guid id, CancellationToken cancellationToken)
-static void RecordException(Activity? activity, Exception ex)
}
class DaprJobScheduler {
+Task ScheduleAsync(string handlerName, string jobName, string schedule, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
+Task UpdateScheduleAsync(string handlerName, string jobName, string newSchedule, CancellationToken cancellationToken)
+Task DeleteAsync(string handlerName, string jobName, CancellationToken cancellationToken)
-static Activity? StartSchedulerActivity(string operationName, string handlerName, string jobName)
-static void RecordException(Activity? activity, Exception ex)
}
class DaprJobExecutionBridge {
+Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class JobDispatcher {
+Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
}
class DistributedEventBusBase {
+Task PublishAsync~TEvent~(TEvent @event, bool useOutbox, CancellationToken cancellationToken)
+Task PublishAsync(object @event, CloudEventMetadata metadata, string? subject, bool useOutbox, CancellationToken cancellationToken)
+Task PublishEnvelopeAsync(byte[] serializedEnvelope, string topicName, string pubSubName, CancellationToken cancellationToken)
-static void RecordException(Activity? activity, Exception ex)
}
class DaprEventBus {
+Task PublishToBrokerAsync~TEvent~(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken)
+Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken)
-static void RecordException(Activity? activity, Exception ex)
}
class OutboxProcessor {
+Task ProcessAsync(CancellationToken cancellationToken)
-Task ProcessOutboxMessagesAsync(CancellationToken cancellationToken)
-static void RecordException(Activity? activity, Exception ex)
}
class InboxProcessor {
+Task ProcessAsync(CancellationToken cancellationToken)
-Task ProcessSingleEventAsync(InboxMessage inboxMessage, IServiceProvider scopedServiceProvider, CancellationToken cancellationToken)
-static void RecordException(Activity? activity, Exception ex)
}
class DistributedEventInvoker~T~ {
+Task InvokeAsync(IServiceProvider serviceProvider, ReadOnlyMemory~byte~ body, CancellationToken cancellationToken)
-static void RecordException(Activity? activity, Exception ex)
}
InfrastructureActivitySource <.. DaprDistributedLockService : uses Source
InfrastructureActivitySource <.. RedisDistributedLockService : uses Source
InfrastructureActivitySource <.. RedisLockHandle : uses Source
InfrastructureActivitySource <.. DaprDistributedCacheService : uses Source
InfrastructureActivitySource <.. RedisDistributedCacheService : uses Source
InfrastructureActivitySource <.. BackgroundJobService : uses Source
InfrastructureActivitySource <.. DaprJobScheduler : uses Source
InfrastructureActivitySource <.. DaprJobExecutionBridge : uses Source
InfrastructureActivitySource <.. JobDispatcher : uses Source
InfrastructureActivitySource <.. DistributedEventBusBase : uses Source
InfrastructureActivitySource <.. DaprEventBus : uses Source
InfrastructureActivitySource <.. OutboxProcessor : uses Source
InfrastructureActivitySource <.. InboxProcessor : uses Source
InfrastructureActivitySource <.. DistributedEventInvoker~T~ : uses Source
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (20)
📝 WalkthroughWalkthroughThis PR adds comprehensive OpenTelemetry tracing instrumentation across distributed event bus, background job, distributed lock, and distributed cache infrastructure components using the Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- There’s quite a bit of duplicated tracing logic (e.g., repeated
RecordExceptionhelpers and very similarStart*Activityimplementations across locks, cache, jobs, events); consider centralizing these as shared helpers or extension methods underBBT.Aether.Telemetryto reduce repetition and keep behavior consistent. - The custom tag keys (e.g.,
event.*,cache.*,lock.*,job.*) are clear but diverge from OpenTelemetry semantic conventions for messaging/cache/locks; it may be worth mapping these to (or augmenting them with) the standardmessaging.*/db.*/exception.*attributes so downstream tooling can understand them better.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- There’s quite a bit of duplicated tracing logic (e.g., repeated `RecordException` helpers and very similar `Start*Activity` implementations across locks, cache, jobs, events); consider centralizing these as shared helpers or extension methods under `BBT.Aether.Telemetry` to reduce repetition and keep behavior consistent.
- The custom tag keys (e.g., `event.*`, `cache.*`, `lock.*`, `job.*`) are clear but diverge from OpenTelemetry semantic conventions for messaging/cache/locks; it may be worth mapping these to (or augmenting them with) the standard `messaging.*`/`db.*`/`exception.*` attributes so downstream tooling can understand them better.
## Individual Comments
### Comment 1
<location path="framework/docs/telemetry/README.md" line_range="11" />
<code_context>
- **OpenTelemetry Standard** - Industry-standard observability
- **Three Pillars** - Traces, Metrics, and Logs
-- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core
+- **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus
- **OTLP Exporters** - Export to any OTLP-compatible backend
- **Environment Variable Support** - Standard OTEL_* variables
</code_context>
<issue_to_address>
**nitpick (typo):** Consider using the canonical `ASP.NET Core` naming for consistency.
This list currently uses `AspNetCore`, while other docs (e.g., trace examples) and the official product name use `ASP.NET Core`. Updating it here would align with the official naming and keep the docs consistent.
```suggestion
- **Automatic Instrumentation** - ASP.NET Core, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| - **OpenTelemetry Standard** - Industry-standard observability | ||
| - **Three Pillars** - Traces, Metrics, and Logs | ||
| - **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core | ||
| - **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus |
There was a problem hiding this comment.
nitpick (typo): Consider using the canonical ASP.NET Core naming for consistency.
This list currently uses AspNetCore, while other docs (e.g., trace examples) and the official product name use ASP.NET Core. Updating it here would align with the official naming and keep the docs consistent.
| - **Automatic Instrumentation** - AspNetCore, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus | |
| - **Automatic Instrumentation** - ASP.NET Core, HttpClient, EF Core, Distributed Lock, Distributed Cache, Event Bus |
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive OpenTelemetry instrumentation across the Aether infrastructure, including distributed locks, caches, background jobs, and the event bus. It adds a new InfrastructureActivitySource and updates documentation to reflect these changes. I have identified two significant improvements: the Dapr distributed lock implementation currently lacks instance-level uniqueness in its lock owner identifier, which could lead to race conditions, and the exception recording logic is currently duplicated across many files and should be centralized for better maintainability.
| private string GetClientIdentifier() | ||
| { | ||
| return | ||
| ($"{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.{applicationInfoAccessor.ApplicationName}") |
There was a problem hiding this comment.
The lock owner identifier is missing the InstanceId, which is correctly included in the Redis implementation. In Dapr's distributed lock API, the owner string is used to authorize the Unlock operation. If multiple instances of the same service share the same owner string, one instance could inadvertently unlock a resource held by another instance, leading to potential race conditions. Including applicationInfoAccessor.InstanceId ensures that the lock owner is unique to the specific instance.
($"{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.{applicationInfoAccessor.ApplicationName}.{applicationInfoAccessor.InstanceId}")| public static readonly ActivitySource Source = new(SourceName, Version); | ||
| } |
There was a problem hiding this comment.
The RecordException logic is duplicated across 11+ files in this pull request. Centralizing it here as a static helper method improves maintainability and ensures consistent exception recording (including stack traces) across all infrastructure components following OpenTelemetry semantic conventions.
public static readonly ActivitySource Source = new(SourceName, Version);
/// <summary>
/// Records an exception to the activity following OpenTelemetry semantic conventions.
/// </summary>
public static void RecordException(Activity? activity, Exception ex)
{
if (activity == null) return;
activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
{ "exception.message", ex.Message },
{ "exception.stacktrace", ex.ToString() }
}));
}
}
|
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Duplication | 90 |
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.



Summary
InfrastructureActivitySource(BBT.Aether.Infrastructure) and register it in the telemetry pipelineChanges
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs— new shared ActivitySourceframework/src/BBT.Aether.AspNetCore/.../AetherTelemetryServiceCollectionExtensions.cs— register sourceframework/src/BBT.Aether.Infrastructure/.../DistributedLock/Dapr/DaprDistributedLockService.cs— lock spansframework/src/BBT.Aether.Infrastructure/.../DistributedLock/Redis/RedisDistributedLockService.cs— lock spansframework/src/BBT.Aether.Infrastructure/.../DistributedLock/Redis/RedisLockHandle.cs— release spanframework/src/BBT.Aether.Infrastructure/.../DistributedCache/Dapr/DaprDistributedCacheService.cs— cache spansframework/src/BBT.Aether.Infrastructure/.../DistributedCache/Redis/RedisDistributedCacheService.cs— cache spansframework/src/BBT.Aether.Infrastructure/.../BackgroundJob/BackgroundJobService.cs— job spansframework/src/BBT.Aether.Infrastructure/.../BackgroundJob/Dapr/DaprJobScheduler.cs— scheduler spansframework/src/BBT.Aether.Infrastructure/.../BackgroundJob/Dapr/DaprJobExecutionBridge.cs— execution spanframework/src/BBT.Aether.Infrastructure/.../BackgroundJob/JobDispatcher.cs— dispatch spanframework/src/BBT.Aether.Infrastructure/.../Events/Distributed/DistributedEventBusBase.cs— publish spansframework/src/BBT.Aether.Infrastructure/.../Events/Distributed/DaprEventBus.cs— broker spansframework/src/BBT.Aether.Infrastructure/.../Events/Processing/OutboxProcessor.cs— outbox spanframework/src/BBT.Aether.Infrastructure/.../Events/Processing/InboxProcessor.cs— inbox spanframework/src/BBT.Aether.Infrastructure/.../Events/Distributed/DistributedEventInvoker.cs— invoker spanframework/docs/distributed-lock/README.md— tracing docsframework/docs/background-job/README.md— tracing docsframework/docs/distributed-events/README.md— tracing docsframework/docs/telemetry/README.md— updated infrastructure spans listTest Plan
dotnet build— solution compiles with zero errorsEventBus.Publish→EventBus.PublishToBrokerhierarchyOutbox.Process→EventBus.PublishEnvelope→EventBus.PublishToBrokerhierarchyInbox.Process→Inbox.InvokehierarchyDistributedLock.Acquire/DistributedLock.ReleasespansDistributedCache.*spansBackgroundJob.*spansevent.name,lock.provider,cache.key, etc.)Closes #60
Made with Cursor
Summary by Sourcery
Instrument infrastructure components with OpenTelemetry spans and document the new tracing behavior.
New Features:
Documentation:
Summary by CodeRabbit
New Features
Documentation