Skip to content

v1.0.23#62

Merged
yilmaztayfun merged 2 commits into
release-v1.0from
master
Apr 25, 2026
Merged

v1.0.23#62
yilmaztayfun merged 2 commits into
release-v1.0from
master

Conversation

@yilmaztayfun
Copy link
Copy Markdown
Contributor

@yilmaztayfun yilmaztayfun commented Apr 25, 2026

Summary by Sourcery

Add centralized OpenTelemetry-based tracing for Aether infrastructure components including distributed events, background jobs, distributed lock, and distributed cache.

Enhancements:

  • Instrument event bus publishing, inbox/outbox processing, background job scheduling/execution, distributed locks, and distributed cache operations with Infrastructure ActivitySource spans and standardized tags/exception handling.
  • Introduce a shared InfrastructureActivitySource and wire it into the existing telemetry pipeline so infrastructure spans are automatically collected.
  • Enhance Dapr-based and Redis-based implementations to emit detailed spans for broker, cache, and lock interactions without changing public APIs.

Documentation:

  • Extend distributed events, background job, distributed lock, and telemetry documentation with tracing sections that describe emitted spans, semantic tags, and example trace hierarchies.

yilmaztayfun and others added 2 commits April 25, 2026 09:20
…re components

Instrument distributed lock, distributed cache, background job, and
event bus operations with Activity spans via a new shared
InfrastructureActivitySource. Each span carries semantic tags
(provider, resource, event name, topic, etc.) and records exceptions
following OpenTelemetry conventions.

Closes #60
…try-tracing-spans-to-distributed-event-bus-pipeline

feat(infrastructure): add OpenTelemetry tracing spans to infrastructure components
@yilmaztayfun yilmaztayfun self-assigned this Apr 25, 2026
@yilmaztayfun yilmaztayfun requested review from a team April 25, 2026 06:23
@sourcery-ai
Copy link
Copy Markdown

sourcery-ai Bot commented Apr 25, 2026

Reviewer's Guide

Introduce centralized OpenTelemetry instrumentation for Aether infrastructure components (event bus, inbox/outbox processing, distributed lock, distributed cache, background jobs, Dapr integrations) via a new Infrastructure ActivitySource and corresponding documentation updates.

Sequence diagram for event publishing with OpenTelemetry instrumentation

sequenceDiagram
    actor Client
    participant Api as AspNetCore_Controller
    participant EventBus as DistributedEventBusBase
    participant DaprBus as DaprEventBus
    participant Broker as Dapr_PubSub
    participant InfraSrc as InfrastructureActivitySource

    Client->>Api: POST /api/orders
    Api->>EventBus: PublishAsync(orderCreatedEvent, metadata)
    EventBus->>InfraSrc: StartActivity(EventBus.Publish)
    activate EventBus
    EventBus->>EventBus: CreateEnvelopeFromMetadata
    EventBus->>EventBus: Decide useOutbox
    alt useOutbox == true
        EventBus->>EventBus: StoreInOutboxAsync
        EventBus-->>InfraSrc: SetStatus(Ok)
    else useOutbox == false
        EventBus->>EventBus: EventSerializer.Serialize
        EventBus->>DaprBus: PublishToBrokerAsync(topic, payload)
        DaprBus->>InfraSrc: StartActivity(EventBus.PublishToBroker)
        activate DaprBus
        DaprBus->>DaprBus: EventSerializer.Deserialize<object>
        DaprBus->>Broker: PublishEventAsync(pubSubName, topic, envelope)
        Broker-->>DaprBus: Ack
        DaprBus-->>InfraSrc: SetStatus(Ok)
        deactivate DaprBus
        EventBus-->>InfraSrc: SetStatus(Ok)
    end
    deactivate EventBus
    Api-->>Client: 200 OK
Loading

Sequence diagram for background job scheduling and execution with tracing

sequenceDiagram
    actor Client
    participant Api as AspNetCore_Controller
    participant JobSvc as BackgroundJobService
    participant Scheduler as DaprJobScheduler
    participant DaprJobs as Dapr_Jobs_Runtime
    participant Bridge as DaprJobExecutionBridge
    participant Dispatcher as JobDispatcher
    participant Handler as IBackgroundJobHandler
    participant InfraSrc as InfrastructureActivitySource

    Client->>Api: POST /api/orders
    Api->>JobSvc: EnqueueAsync(handlerName, jobName, schedule, payload)
    JobSvc->>InfraSrc: StartActivity(BackgroundJob.Enqueue)
    activate JobSvc
    JobSvc->>Scheduler: ScheduleAsync(handlerName, jobName, schedule, payload)
    Scheduler->>InfraSrc: StartActivity(BackgroundJob.Schedule)
    activate Scheduler
    Scheduler->>DaprJobs: ScheduleJobAsync(jobName, schedule, payload)
    DaprJobs-->>Scheduler: Ack
    Scheduler-->>InfraSrc: SetStatus(Ok)
    deactivate Scheduler
    JobSvc-->>InfraSrc: SetStatus(Ok)
    deactivate JobSvc
    Api-->>Client: 202 Accepted

    rect rgb(230,230,230)
        note over DaprJobs: At scheduled time
    end

    DaprJobs->>Bridge: ExecuteAsync(jobName, payload)
    Bridge->>InfraSrc: StartActivity(BackgroundJob.Execute)
    activate Bridge
    Bridge->>Dispatcher: DispatchAsync(jobId, handlerName, dataPayload)
    Dispatcher->>InfraSrc: StartActivity(BackgroundJob.Dispatch)
    activate Dispatcher
    Dispatcher->>Handler: HandleAsync(args)
    Handler-->>Dispatcher: Completed
    Dispatcher-->>InfraSrc: SetStatus(Ok)
    deactivate Dispatcher
    Bridge-->>InfraSrc: SetStatus(Ok)
    deactivate Bridge
Loading

Class diagram for InfrastructureActivitySource usage across infrastructure components

classDiagram
    class InfrastructureActivitySource {
        +const string SourceName
        +const string Version
        +static ActivitySource Source
    }

    class DistributedEventBusBase {
        +Task PublishAsync~TEvent~(TEvent eventObj, bool useOutbox, CancellationToken cancellationToken)
        +Task PublishAsync(object eventObj, EventMetadata metadata, string subject, bool useOutbox, CancellationToken cancellationToken)
        +Task PublishEnvelopeAsync(byte[] serializedEnvelope, string topicName, string pubSubName, CancellationToken cancellationToken)
        #Task PublishToBrokerAsync~TEvent~(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken)
        #Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken)
    }

    class DaprEventBus {
        -DaprClient daprClient
        -AetherEventBusOptions options
        +Task PublishToBrokerAsync~TEvent~(string topic, byte[] serializedEnvelope, CancellationToken cancellationToken)
        +Task PublishToBrokerAsync(string topic, string pubSubName, byte[] serializedEnvelope, CancellationToken cancellationToken)
    }

    class RedisDistributedLockService {
        +Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
        +Task~(bool,T?)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
    }

    class DaprDistributedLockService {
        +Task~IAsyncDisposable?~ TryAcquireLockAsync(string resourceId, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ReleaseLockAsync(string resourceId, CancellationToken cancellationToken)
        +Task~(bool,T?)~ ExecuteWithLockAsync~T~(string resourceId, Func~Task~T~~ function, int expiryInSeconds, CancellationToken cancellationToken)
        +Task~bool~ ExecuteWithLockAsync(string resourceId, Func~Task~ action, int expiryInSeconds, CancellationToken cancellationToken)
    }

    class RedisDistributedCacheService {
        +Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
        +Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions options, CancellationToken cancellationToken)
        +Task RemoveAsync(string key, CancellationToken cancellationToken)
        +Task RefreshAsync(string key, CancellationToken cancellationToken)
    }

    class DaprDistributedCacheService {
        +Task~T?~ GetAsync~T~(string key, CancellationToken cancellationToken)
        +Task SetAsync~T~(string key, T value, DistributedCacheEntryOptions options, CancellationToken cancellationToken)
        +Task RemoveAsync(string key, CancellationToken cancellationToken)
        +Task RefreshAsync(string key, CancellationToken cancellationToken)
    }

    class BackgroundJobService {
        +Task~Guid~ EnqueueAsync~TPayload~(string handlerName, string jobName, string schedule, TPayload payload, object? metadata, CancellationToken cancellationToken)
        +Task UpdateAsync(Guid id, string newSchedule, CancellationToken cancellationToken)
        +Task~bool~ DeleteAsync(Guid id, CancellationToken cancellationToken)
    }

    class DaprJobScheduler {
        +Task ScheduleAsync(string handlerName, string jobName, string schedule, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
        +Task UpdateScheduleAsync(string handlerName, string jobName, string newSchedule, CancellationToken cancellationToken)
        +Task DeleteAsync(string handlerName, string jobName, CancellationToken cancellationToken)
    }

    class JobDispatcher {
        +Task DispatchAsync(Guid jobId, string handlerName, ReadOnlyMemory~byte~ jobPayload, CancellationToken cancellationToken)
    }

    class DaprJobExecutionBridge {
        +Task ExecuteAsync(string jobName, ReadOnlyMemory~byte~ payload, CancellationToken cancellationToken)
    }

    class OutboxProcessor {
        +Task ExecuteAsync(CancellationToken cancellationToken)
    }

    class InboxProcessor {
        +Task ExecuteAsync(CancellationToken cancellationToken)
    }

    class DistributedEventInvoker~T~ {
        +Task InvokeAsync(IServiceProvider serviceProvider, ReadOnlyMemory~byte~ body, CancellationToken cancellationToken)
    }

    class RedisLockHandle {
        +ValueTask DisposeAsync()
    }

    InfrastructureActivitySource <.. DistributedEventBusBase : uses
    InfrastructureActivitySource <.. DaprEventBus : uses
    InfrastructureActivitySource <.. RedisDistributedLockService : uses
    InfrastructureActivitySource <.. DaprDistributedLockService : uses
    InfrastructureActivitySource <.. RedisDistributedCacheService : uses
    InfrastructureActivitySource <.. DaprDistributedCacheService : uses
    InfrastructureActivitySource <.. BackgroundJobService : uses
    InfrastructureActivitySource <.. DaprJobScheduler : uses
    InfrastructureActivitySource <.. JobDispatcher : uses
    InfrastructureActivitySource <.. DaprJobExecutionBridge : uses
    InfrastructureActivitySource <.. OutboxProcessor : uses
    InfrastructureActivitySource <.. InboxProcessor : uses
    InfrastructureActivitySource <.. DistributedEventInvoker~T~ : uses
    InfrastructureActivitySource <.. RedisLockHandle : uses
Loading

File-Level Changes

Change Details Files
Add Infrastructure ActivitySource and wire it into telemetry configuration so infrastructure spans are emitted and collected.
  • Introduce InfrastructureActivitySource static class exposing ActivitySource "BBT.Aether.Infrastructure" with version metadata.
  • Register the new infrastructure ActivitySource in AddAetherTelemetry so traces from infrastructure components are included.
  • Document the new infrastructure-level spans and tags in telemetry README, including how they relate to context propagation.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Telemetry/InfrastructureActivitySource.cs
framework/src/BBT.Aether.AspNetCore/Microsoft/Extensions/DependencyInjection/AetherTelemetryServiceCollectionExtensions.cs
framework/docs/telemetry/README.md
Instrument distributed event bus publishing, Dapr broker integration, inbox/outbox processors, and event invocation with OpenTelemetry spans and exception recording.
  • Wrap DistributedEventBusBase.PublishAsync overloads and PublishEnvelopeAsync in producer activities with event metadata tags and unified exception handling via RecordException helper.
  • Instrument DaprEventBus.PublishToBrokerAsync overloads as producer spans tagged with topic, pubsub name, and broker, reusing a RecordException helper.
  • Add producer spans to OutboxProcessor message publication, tagging event/outbox metadata and recording retry status and errors.
  • Add Inbox.Process and Inbox.Invoke spans around inbox message deserialization and handler invocation, with event identity tags and failure tagging when handlers are missing or deserialization fails.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventBusBase.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DaprEventBus.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Distributed/DistributedEventInvoker.cs
framework/docs/distributed-events/README.md
Add tracing to distributed lock implementations (Redis and Dapr) and their disposal logic, including lock acquisition, execution, and release flows.
  • Create helper methods in RedisDistributedLockService and DaprDistributedLockService to start lock-related activities and consistently tag provider, resource ID, store name (Dapr), and expiry.
  • Wrap TryAcquireLockAsync, ExecuteWithLockAsync, and ReleaseLockAsync in activities that track lock.acquired/lock.released and mark status as OK or Error via shared RecordException helpers.
  • Instrument RedisLockHandle.DisposeAsync with a DistributedLock.Release span and lock outcome tags.
  • Document lock tracing behavior, span names, and tags in the distributed-lock README and cross-link with telemetry docs.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisDistributedLockService.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Redis/RedisLockHandle.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedLock/Dapr/DaprDistributedLockService.cs
framework/docs/distributed-lock/README.md
Instrument distributed cache operations (Redis and Dapr implementations) with client spans and cache-specific tags.
  • Add StartCacheActivity and RecordException helpers to RedisDistributedCacheService and use them in GetAsync, SetAsync, RemoveAsync, and RefreshAsync, tagging key, provider, hit/miss, TTL, and marking activity status.
  • Wrap DaprDistributedCacheService Get/Set/Remove in activities, tagging provider, key, store name, TTL, cache.hit, and setting status or recording exceptions as needed.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Redis/RedisDistributedCacheService.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs
framework/docs/distributed-events/README.md
framework/docs/distributed-lock/README.md
Enhance background job scheduling and execution pipeline (service, scheduler, dispatcher, Dapr bridge) with tracing spans and job metadata tags.
  • Wrap BackgroundJobService.EnqueueAsync, UpdateAsync, and DeleteAsync in producer spans tagged with job id, handler name, job name, and schedule; add a RecordException helper for consistent error reporting.
  • Instrument DaprJobScheduler.ScheduleAsync/UpdateScheduleAsync/DeleteAsync with client activities, tagging scheduler backend, handler, job name, and schedule, and centralizing span creation and error recording.
  • Add BackgroundJob.Dispatch internal span capturing job id, handler, job name, status transitions (completed, failed, cancelled), and exception events in JobDispatcher.
  • Wrap DaprJobExecutionBridge.ExecuteAsync in a consumer span that tags job scheduler/name/id/handler and records failures.
  • Extend background-job documentation with tracing sections describing span taxonomy, tags, and example trace hierarchies.
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobScheduler.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
framework/docs/background-job/README.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 25, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5a7e5113-b6c9-474d-856f-db0dac1b8ba0

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch master

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yilmaztayfun yilmaztayfun merged commit 62ce2a6 into release-v1.0 Apr 25, 2026
4 of 6 checks passed
Copy link
Copy Markdown

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • There are many duplicated RecordException helpers across infrastructure classes; consider centralizing this in a shared telemetry utility (e.g., extension methods on Activity) to avoid drift and keep exception tagging consistent.
  • Span names and tag keys (e.g., event.*, job.*, lock.*, cache.*) are currently hard-coded in multiple places; introducing constants or a small semantic-conventions helper would reduce typo risk and keep naming consistent over time.
  • In DaprDistributedCacheService.GetAsync you now catch Exception and only record it on the activity without logging or rethrowing; consider at least logging the exception or narrowing the catch to avoid silently swallowing cache errors.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- There are many duplicated `RecordException` helpers across infrastructure classes; consider centralizing this in a shared telemetry utility (e.g., extension methods on `Activity`) to avoid drift and keep exception tagging consistent.
- Span names and tag keys (e.g., `event.*`, `job.*`, `lock.*`, `cache.*`) are currently hard-coded in multiple places; introducing constants or a small semantic-conventions helper would reduce typo risk and keep naming consistent over time.
- In `DaprDistributedCacheService.GetAsync` you now catch `Exception` and only record it on the activity without logging or rethrowing; consider at least logging the exception or narrowing the catch to avoid silently swallowing cache errors.

## Individual Comments

### Comment 1
<location path="framework/src/BBT.Aether.Infrastructure/BBT/Aether/DistributedCache/Dapr/DaprDistributedCacheService.cs" line_range="77-83" />
<code_context>
     }

-    public override Task RemoveAsync(string key, CancellationToken cancellationToken = default)
+    public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default)
     {
-        return _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
+        using var activity = StartCacheActivity("DistributedCache.Remove", key);
+
+        await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
+        activity?.SetStatus(ActivityStatusCode.Ok);
     }
</code_context>
<issue_to_address>
**suggestion:** `RemoveAsync` lacks error handling/exception tagging whereas other cache methods add tracing around failures.

Here we now start an activity but don’t mirror the error-handling pattern used in `GetAsync`/`SetAsync`. To keep tracing consistent (and align with the Redis implementation), wrap `DeleteStateAsync` in a try/catch and call `RecordException`/set error status on failure before rethrowing or handling the exception.

```suggestion
    public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default)
    {
        using var activity = StartCacheActivity("DistributedCache.Remove", key);

        try
        {
            await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
            activity?.SetStatus(ActivityStatusCode.Ok);
        }
        catch (Exception ex)
        {
            RecordException(activity, ex);
            throw;
        }
    }
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +77 to 83
public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default)
{
return _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
using var activity = StartCacheActivity("DistributedCache.Remove", key);

await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
activity?.SetStatus(ActivityStatusCode.Ok);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: RemoveAsync lacks error handling/exception tagging whereas other cache methods add tracing around failures.

Here we now start an activity but don’t mirror the error-handling pattern used in GetAsync/SetAsync. To keep tracing consistent (and align with the Redis implementation), wrap DeleteStateAsync in a try/catch and call RecordException/set error status on failure before rethrowing or handling the exception.

Suggested change
public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default)
{
return _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
using var activity = StartCacheActivity("DistributedCache.Remove", key);
await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
activity?.SetStatus(ActivityStatusCode.Ok);
}
public override async Task RemoveAsync(string key, CancellationToken cancellationToken = default)
{
using var activity = StartCacheActivity("DistributedCache.Remove", key);
try
{
await _daprClient.DeleteStateAsync(storeName, key, cancellationToken: cancellationToken);
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
RecordException(activity, ex);
throw;
}
}

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces comprehensive OpenTelemetry tracing across the infrastructure layer, including background jobs, distributed caching, distributed locking, and the event bus. It adds a new BBT.Aether.Infrastructure ActivitySource, instruments various service implementations with spans and semantic tags, and updates the documentation to reflect these observability features. Feedback focuses on reducing code duplication by centralizing the RecordException logic into a shared extension method within the InfrastructureActivitySource class, as the current implementation repeats this logic across multiple files.

Comment on lines +24 to +25
public static readonly ActivitySource Source = new(SourceName, Version);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The RecordException logic is duplicated across approximately 12 files in this pull request. To improve maintainability and ensure consistent telemetry across the infrastructure layer, it is recommended to define this logic once as an extension method on Activity here.

    public static readonly ActivitySource Source = new(SourceName, Version);

    public static void RecordException(this Activity? activity, Exception ex)
    {
        if (activity == null) return;

        activity.SetStatus(ActivityStatusCode.Error, ex.Message);
        activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
        {
            { "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
            { "exception.message", ex.Message },
        }));
    }
}

Comment on lines +271 to +281
private static void RecordException(Activity? activity, Exception ex)
{
if (activity == null) return;

activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
{ "exception.message", ex.Message },
}));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This private method is duplicated in many files. It should be removed in favor of the shared RecordException extension method in InfrastructureActivitySource to reduce code duplication and improve maintainability.

Comment on lines +78 to +86
if (activity != null)
{
activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
{ "exception.message", ex.Message },
}));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exception recording logic here is inlined, which is inconsistent with other parts of the PR that use a helper method. Please use the shared RecordException extension method from InfrastructureActivitySource instead.

            activity.RecordException(ex);

Comment on lines +138 to +146
if (activity != null)
{
activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
{ "exception.message", ex.Message },
}));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exception recording logic here is inlined. For consistency and to reduce duplication, please use the shared RecordException extension method from InfrastructureActivitySource.

            activity.RecordException(ex);

Comment on lines +81 to +89
if (activity != null)
{
activity.SetStatus(ActivityStatusCode.Error, ex.Message);
activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection
{
{ "exception.type", ex.GetType().FullName ?? ex.GetType().Name },
{ "exception.message", ex.Message },
}));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exception recording logic here is inlined. Please use the shared RecordException extension method from InfrastructureActivitySource for better consistency.

            activity.RecordException(ex);

@sonarqubecloud
Copy link
Copy Markdown

@codacy-production
Copy link
Copy Markdown

Up to standards ✅

🟢 Issues 0 issues

Results:
0 new issues

View in Codacy

🟢 Metrics 224 complexity · 90 duplication

Metric Results
Complexity 224
Duplication 90

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant