v1.0.23#62
Conversation
…re components Instrument distributed lock, distributed cache, background job, and event bus operations with Activity spans via a new shared InfrastructureActivitySource. Each span carries semantic tags (provider, resource, event name, topic, etc.) and records exceptions following OpenTelemetry conventions. Closes #60
…try-tracing-spans-to-distributed-event-bus-pipeline feat(infrastructure): add OpenTelemetry tracing spans to infrastructure components
Reviewer's GuideIntroduce centralized OpenTelemetry instrumentation for Aether infrastructure components (event bus, inbox/outbox processing, distributed lock, distributed cache, background jobs, Dapr integrations) via a new Infrastructure ActivitySource and corresponding documentation updates. Sequence diagram for event publishing with OpenTelemetry instrumentationsequenceDiagram
actor Client
participant Api as AspNetCore_Controller
participant EventBus as DistributedEventBusBase
participant DaprBus as DaprEventBus
participant Broker as Dapr_PubSub
participant InfraSrc as InfrastructureActivitySource
Client->>Api: POST /api/orders
Api->>EventBus: PublishAsync(orderCreatedEvent, metadata)
EventBus->>InfraSrc: StartActivity(EventBus.Publish)
activate EventBus
EventBus->>EventBus: CreateEnvelopeFromMetadata
EventBus->>EventBus: Decide useOutbox
alt useOutbox == true
EventBus->>EventBus: StoreInOutboxAsync
EventBus-->>InfraSrc: SetStatus(Ok)
else useOutbox == false
EventBus->>EventBus: EventSerializer.Serialize
EventBus->>DaprBus: PublishToBrokerAsync(topic, payload)
DaprBus->>InfraSrc: StartActivity(EventBus.PublishToBroker)
activate DaprBus
DaprBus->>DaprBus: EventSerializer.Deserialize<object>
DaprBus->>Broker: PublishEventAsync(pubSubName, topic, envelope)
Broker-->>DaprBus: Ack
DaprBus-->>InfraSrc: SetStatus(Ok)
deactivate DaprBus
EventBus-->>InfraSrc: SetStatus(Ok)
end
deactivate EventBus
Api-->>Client: 200 OK
Sequence diagram for background job scheduling and execution with tracingsequenceDiagram
actor Client
participant Api as AspNetCore_Controller
participant JobSvc as BackgroundJobService
participant Scheduler as DaprJobScheduler
participant DaprJobs as Dapr_Jobs_Runtime
participant Bridge as DaprJobExecutionBridge
participant Dispatcher as JobDispatcher
participant Handler as IBackgroundJobHandler
participant InfraSrc as InfrastructureActivitySource
Client->>Api: POST /api/orders
Api->>JobSvc: EnqueueAsync(handlerName, jobName, schedule, payload)
JobSvc->>InfraSrc: StartActivity(BackgroundJob.Enqueue)
activate JobSvc
JobSvc->>Scheduler: ScheduleAsync(handlerName, jobName, schedule, payload)
Scheduler->>InfraSrc: StartActivity(BackgroundJob.Schedule)
activate Scheduler
Scheduler->>DaprJobs: ScheduleJobAsync(jobName, schedule, payload)
DaprJobs-->>Scheduler: Ack
Scheduler-->>InfraSrc: SetStatus(Ok)
deactivate Scheduler
JobSvc-->>InfraSrc: SetStatus(Ok)
deactivate JobSvc
Api-->>Client: 202 Accepted
rect rgb(230,230,230)
note over DaprJobs: At scheduled time
end
DaprJobs->>Bridge: ExecuteAsync(jobName, payload)
Bridge->>InfraSrc: StartActivity(BackgroundJob.Execute)
activate Bridge
Bridge->>Dispatcher: DispatchAsync(jobId, handlerName, dataPayload)
Dispatcher->>InfraSrc: StartActivity(BackgroundJob.Dispatch)
activate Dispatcher
Dispatcher->>Handler: HandleAsync(args)
Handler-->>Dispatcher: Completed
Dispatcher-->>InfraSrc: SetStatus(Ok)
deactivate Dispatcher
Bridge-->>InfraSrc: SetStatus(Ok)
deactivate Bridge
Class diagram for InfrastructureActivitySource usage across infrastructure componentsclassDiagram
class InfrastructureActivitySource {
+const string SourceName
+const string Version
+static ActivitySource Source
}
class DistributedEventBusBase {
+Task PublishAsync~TEvent~(TEvent eventObj, bool useOutbox, CancellationToken cancellationToken)
+Task PublishAsync(object eventObj, EventMetadata metadata, string subject, bool useOutbox, CancellationToken cancellationToken)
+Task PublishEnvelopeAsync(byte[] serializedEnvelope, string topicName, string pubSubName, CancellationToken cancellationToken)
#Task PublishToBrokerAsync~TEvent~(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken)
#Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken)
}
class DaprEventBus {
-DaprClient daprClient
-AetherEventBusOptions options
+Task PublishToBrokerAsync~TEvent~(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken)
+Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken)
}
class RedisDistributedLockService {
+Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
+Task~(bool,T?)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
}
class DaprDistributedLockService {
+Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
+Task~(bool,T?)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
+Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
}
class RedisDistributedCacheService {
+Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
+Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions options, CancellationToken cancellationToken)
+Task RemoveAsync(string key, CancellationToken cancellationToken)
+Task RefreshAsync(string key, CancellationToken cancellationToken)
}
class DaprDistributedCacheService {
+Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
+Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions options, CancellationToken cancellationToken)
+Task RemoveAsync(string key, CancellationToken cancellationToken)
+Task RefreshAsync(string key, CancellationToken cancellationToken)
}
class BackgroundJobService {
+Task~Guid~ EnqueueAsync~TPayload~(string handlerName, string jobName, string schedule, TPayload payload, object? metadata, CancellationToken cancellationToken)
+Task UpdateAsync(Guid id, string newSchedule, CancellationToken cancellationToken)
+Task~bool~ DeleteAsync(Guid id, CancellationToken cancellationToken)
}
class DaprJobScheduler {
+Task ScheduleAsync(string handlerName, string jobName, string schedule, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
+Task UpdateScheduleAsync(string handlerName, string jobName, string newSchedule, CancellationToken cancellationToken)
+Task DeleteAsync(string handlerName, string jobName, CancellationToken cancellationToken)
}
class JobDispatcher {
+Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
}
class DaprJobExecutionBridge {
+Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
}
class OutboxProcessor {
+Task ExecuteAsync(CancellationToken cancellationToken)
}
class InboxProcessor {
+Task ExecuteAsync(CancellationToken cancellationToken)
}
class DistributedEventInvoker~T~ {
+Task InvokeAsync(IServiceProvider serviceProvider, ReadOnlyMemory~byte~ body, CancellationToken cancellationToken)
}
class RedisLockHandle {
+ValueTask DisposeAsync()
}
InfrastructureActivitySource <.. DistributedEventBusBase : uses
InfrastructureActivitySource <.. DaprEventBus : uses
InfrastructureActivitySource <.. RedisDistributedLockService : uses
InfrastructureActivitySource <.. DaprDistributedLockService : uses
InfrastructureActivitySource <.. RedisDistributedCacheService : uses
InfrastructureActivitySource <.. DaprDistributedCacheService : uses
InfrastructureActivitySource <.. BackgroundJobService : uses
InfrastructureActivitySource <.. DaprJobScheduler : uses
InfrastructureActivitySource <.. JobDispatcher : uses
InfrastructureActivitySource <.. DaprJobExecutionBridge : uses
InfrastructureActivitySource <.. OutboxProcessor : uses
InfrastructureActivitySource <.. InboxProcessor : uses
InfrastructureActivitySource <.. DistributedEventInvoker~T~ : uses
InfrastructureActivitySource <.. RedisLockHandle : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- There are many duplicated
RecordExceptionhelpers across infrastructure classes; consider centralizing this in a shared telemetry utility (e.g., extension methods onActivity) to avoid drift and keep exception tagging consistent. - Span names and tag keys (e.g.,
event.*,job.*,lock.*,cache.*) are currently hard-coded in multiple places; introducing constants or a small semantic-conventions helper would reduce typo risk and keep naming consistent over time. - In
DaprDistributedCacheService.GetAsyncyou now catchExceptionand only record it on the activity without logging or rethrowing; consider at least logging the exception or narrowing the catch to avoid silently swallowing cache errors.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- There are many duplicated `RecordException` helpers across infrastructure classes; consider centralizing this in a shared telemetry utility (e.g., extension methods on `Activity`) to avoid drift and keep exception tagging consistent.
- Span names and tag keys (e.g., `event.*`, `job.*`, `lock.*`, `cache.*`) are currently hard-coded in multiple places; introducing constants or a small semantic-conventions helper would reduce typo risk and keep naming consistent over time.
- In `DaprDistributedCacheService.GetAsync` you now catch `Exception` and only record it on the activity without logging or rethrowing; consider at least logging the exception or narrowing the catch to avoid silently swallowing cache errors.
## Individual Comments
### Comment 1
<location path="framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs" line_range="77-83" />
<code_context>
}
- public override Task RemoveAsync(string key, CancellationToken cancellationToken = default)
+ public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default)
{
- return _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
+ using var activity = StartCacheActivity("DistributedCache.Remove", key);
+
+ await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
+ activity?.SetStatus(ActivityStatusCode.Ok);
}
</code_context>
<issue_to_address>
**suggestion:** `RemoveAsync` lacks error handling/exception tagging whereas other cache methods add tracing around failures.
Here we now start an activity but don’t mirror the error-handling pattern used in `GetAsync`/`SetAsync`. To keep tracing consistent (and align with the Redis implementation), wrap `DeleteStateAsync` in a try/catch and call `RecordException`/set error status on failure before rethrowing or handling the exception.
```suggestion
public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default)
{
using var activity = StartCacheActivity("DistributedCache.Remove", key);
try
{
await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
RecordException(activity, ex);
throw;
}
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default) | ||
| { | ||
| return _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken); | ||
| using var activity = StartCacheActivity("DistributedCache.Remove", key); | ||
|
|
||
| await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken); | ||
| activity?.SetStatus(ActivityStatusCode.Ok); | ||
| } |
There was a problem hiding this comment.
suggestion: RemoveAsync lacks error handling/exception tagging whereas other cache methods add tracing around failures.
Here we now start an activity but don’t mirror the error-handling pattern used in GetAsync/SetAsync. To keep tracing consistent (and align with the Redis implementation), wrap DeleteStateAsync in a try/catch and call RecordException/set error status on failure before rethrowing or handling the exception.
| public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default) | |
| { | |
| return _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken); | |
| using var activity = StartCacheActivity("DistributedCache.Remove", key); | |
| await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken); | |
| activity?.SetStatus(ActivityStatusCode.Ok); | |
| } | |
| public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default) | |
| { | |
| using var activity = StartCacheActivity("DistributedCache.Remove", key); | |
| try | |
| { | |
| await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken); | |
| activity?.SetStatus(ActivityStatusCode.Ok); | |
| } | |
| catch (Exception ex) | |
| { | |
| RecordException(activity, ex); | |
| throw; | |
| } | |
| } |
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive OpenTelemetry tracing across the infrastructure layer, including background jobs, distributed caching, distributed locking, and the event bus. It adds a new BBT.Aether.Infrastructure ActivitySource, instruments various service implementations with spans and semantic tags, and updates the documentation to reflect these observability features. Feedback focuses on reducing code duplication by centralizing the RecordException logic into a shared extension method within the InfrastructureActivitySource class, as the current implementation repeats this logic across multiple files.
| public static readonly ActivitySource Source = new(SourceName, Version); | ||
| } |
There was a problem hiding this comment.
The RecordException logic is duplicated across approximately 12 files in this pull request. To improve maintainability and ensure consistent telemetry across the infrastructure layer, it is recommended to define this logic once as an extension method on Activity here.
public static readonly ActivitySource Source = new(SourceName, Version);
public static void RecordException(this Activity? activity, Exception ex)
{
if (activity == null) return;
activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
{ "exception.message", ex.Message },
}));
}
}| private static void RecordException(Activity? activity, Exception ex) | ||
| { | ||
| if (activity == null) return; | ||
|
|
||
| activity.SetStatus(ActivityStatusCode.Error, ex.Message); | ||
| activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection | ||
| { | ||
| { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, | ||
| { "exception.message", ex.Message }, | ||
| })); | ||
| } |
| if (activity != null) | ||
| { | ||
| activity.SetStatus(ActivityStatusCode.Error, ex.Message); | ||
| activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection | ||
| { | ||
| { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, | ||
| { "exception.message", ex.Message }, | ||
| })); | ||
| } |
| if (activity != null) | ||
| { | ||
| activity.SetStatus(ActivityStatusCode.Error, ex.Message); | ||
| activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection | ||
| { | ||
| { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, | ||
| { "exception.message", ex.Message }, | ||
| })); | ||
| } |
| if (activity != null) | ||
| { | ||
| activity.SetStatus(ActivityStatusCode.Error, ex.Message); | ||
| activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection | ||
| { | ||
| { "exception.type", ex.GetType().FullName ?? ex.GetType().Name }, | ||
| { "exception.message", ex.Message }, | ||
| })); | ||
| } |
|
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Complexity | 224 |
| Duplication | 90 |
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.



Summary by Sourcery
Add centralized OpenTelemetry-based tracing for Aether infrastructure components including distributed events, background jobs, distributed lock, and distributed cache.
Enhancements:
Documentation: