Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHRONICLE.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cratis context set dev
| `namespaces` | list |
| `event-types` | list, show |
| `events` | get, count (tail), has |
| `observers` | list, show, replay, replay-partition, retry-partition |
| `observers` | list, show, replay, replay-partition, retry-partition, clear-quarantine |
| `failed-partitions` | list, show |
| `recommendations` | list, perform, ignore |
| `identities` | list |
Expand Down
3 changes: 1 addition & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
<!-- Transitive dependency overrides to address known vulnerabilities -->
<PackageVersion Include="Snappier" Version="1.3.1" />
<!-- Integration Testing -->
<PackageVersion Include="mongodb.driver" Version="3.8.0" />
<PackageVersion Include="Testcontainers" Version="4.11.0" />
<!-- Testing -->
<PackageVersion Include="Cratis.Specifications" Version="3.0.5" />
Expand All @@ -37,4 +36,4 @@
<PackageVersion Include="NSubstitute" Version="5.3.0" />
<PackageVersion Include="coverlet.collector" Version="10.0.0" />
</ItemGroup>
</Project>
</Project>
2 changes: 1 addition & 1 deletion Documentation/chronicle/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Commands that operate within a specific event store or namespace accept the foll
| `namespaces` | List namespaces within an event store. |
| `event-types` | List and inspect registered event type definitions. |
| `events` | Query events from an event sequence or retrieve the tail sequence number. |
| `observers` | List, inspect, replay, and retry observers. |
| `observers` | List, inspect, replay, retry, and clear quarantine for observers. |
| `subscriptions` | Manage cross-event-store subscriptions for event forwarding. |
| `failed-partitions` | List and inspect partitions where an observer has failed. |
| `projections` | List and inspect projection definitions. |
Expand Down
35 changes: 34 additions & 1 deletion Documentation/chronicle/observers.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Observers are the processing units that consume events from an event sequence an

## list

Lists all observers registered in the specified event store and namespace, including their current state and sequence position.
Lists all observers registered in the specified event store and namespace, including their current state, quarantine status, and sequence position.

```bash
cratis chronicle observers list
Expand Down Expand Up @@ -190,6 +190,39 @@ Retry a failed partition:
cratis chronicle observers retry-partition my-observer-id user-42
```

## clear-quarantine

Clears quarantine for a quarantined observer so it can resume processing.

```bash
cratis chronicle observers clear-quarantine <OBSERVER_ID>
```

The command prompts for confirmation before proceeding.

### Arguments

| Argument | Description |
|---|---|
| `OBSERVER_ID` | The observer whose quarantine should be cleared. |

### Options

| Flag | Description |
|---|---|
| `-e, --event-store <NAME>` | Event store. Defaults to `default`. |
| `-n, --namespace <NAME>` | Namespace. Defaults to `Default`. |
| `--sequence <NAME>` | Event sequence. Defaults to `event-log`. |
| `-y, --yes` | Skip confirmation prompt. |

### Examples

Clear quarantine for an observer:

```bash
cratis chronicle observers clear-quarantine my-observer-id
```

## Troubleshooting Workflow

Use this sequence when an observer appears stuck, behind, or failing:
Expand Down
2 changes: 1 addition & 1 deletion Integration/Chronicle/Chronicle.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<PackageReference Include="Snappier" />
<PackageReference Include="Cratis.Chronicle.Connections" />
<PackageReference Include="Cratis.Chronicle.XUnit.Integration" />
<PackageReference Include="mongodb.driver" />
<PackageReference Include="MongoDB.Driver" />
<PackageReference Include="Testcontainers" />
<PackageReference Include="xunit" />
<PackageReference Include="xunit.runner.visualstudio">
Expand Down
7 changes: 7 additions & 0 deletions Integration/Chronicle/for_Observers/when_listing_observers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,12 @@ public class context : given.a_connected_cli

[Fact] void should_have_output() => (Context.Result.StandardOutput.Length > 0).ShouldBeTrue();

[Fact]
void should_include_is_quarantined_field()
{
var first = JsonDocument.Parse(Context.Result.StandardOutput).RootElement.EnumerateArray().First();
first.TryGetProperty("isQuarantined", out _).ShouldBeTrue();
}

[Fact] void should_have_no_errors() => Context.Result.StandardError.ShouldEqual(string.Empty);
}
1 change: 1 addition & 0 deletions Source/Cli.Specs/Cli.Specs.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<TargetFramework>net10.0</TargetFramework>
<IsTestProject>true</IsTestProject>
<IsPackable>false</IsPackable>
<NoWarn>$(NoWarn);CS0436</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Cli.for_ClearObserverQuarantineCommand.when_invoking_clear_quarantine;

public class and_method_does_not_exist : Specification
{
bool _result;

async Task Because() => _result = await ClearObserverQuarantineInvoker.TryClear(new FakeObservers(), "event-store", "namespace", "observer-id", "event-log");

[Fact] void should_return_false() => _result.ShouldBeFalse();

class FakeObservers;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Cli.for_ClearObserverQuarantineCommand.when_invoking_clear_quarantine;

public class and_method_exists : Specification
{
FakeObservers _observers;
bool _result;

void Establish() => _observers = new();

async Task Because() => _result = await ClearObserverQuarantineInvoker.TryClear(_observers, "event-store", "namespace", "observer-id", "event-log");

[Fact] void should_invoke_clear_quarantine() => _observers.WasInvoked.ShouldBeTrue();
[Fact] void should_return_true() => _result.ShouldBeTrue();
[Fact] void should_set_event_store() => _observers.Command.EventStore.ShouldEqual("event-store");
[Fact] void should_set_namespace() => _observers.Command.Namespace.ShouldEqual("namespace");
[Fact] void should_set_observer_id() => _observers.Command.ObserverId.ShouldEqual("observer-id");
[Fact] void should_set_event_sequence_id() => _observers.Command.EventSequenceId.ShouldEqual("event-log");

class FakeObservers
{
public bool WasInvoked { get; private set; }

public FakeClearObserverQuarantine Command { get; private set; } = null!;

public Task ClearObserverQuarantine(FakeClearObserverQuarantine command, int ignored = 0)
{
WasInvoked = true;
Command = command;
return Task.CompletedTask;
}
}

class FakeClearObserverQuarantine
{
public string EventStore { get; set; } = string.Empty;

public string Namespace { get; set; } = string.Empty;

public string ObserverId { get; set; } = string.Empty;

public string EventSequenceId { get; set; } = string.Empty;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Contracts.Observation;

namespace Cratis.Cli.for_ListObserversCommand.when_determining_if_observer_is_quarantined;

public class and_running_state_has_quarantined_value : Specification
{
bool _isQuarantined;

void Because() => _isQuarantined = ListObserversCommand.IsQuarantined(new ObserverInformation
{
RunningState = (ObserverRunningState)5
});

[Fact] void should_return_true() => _isQuarantined.ShouldBeTrue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Cratis.Chronicle.Contracts.Observation;

namespace Cratis.Cli.for_ListObserversCommand.when_determining_if_observer_is_quarantined;

public class and_running_state_is_active : Specification
{
bool _isQuarantined;

void Because() => _isQuarantined = ListObserversCommand.IsQuarantined(new ObserverInformation
{
RunningState = ObserverRunningState.Active
});

[Fact] void should_return_false() => _isQuarantined.ShouldBeFalse();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) Cratis. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Cratis.Cli.Commands.Chronicle.Observers;

/// <summary>
/// Clears quarantine for an observer.
/// </summary>
[LlmDescription("Clears quarantine for a quarantined observer so it can resume processing. Prompts for confirmation unless --yes is specified.")]
[CliCommand("clear-quarantine", "Clear quarantine for an observer", Branch = typeof(ChronicleBranch.Observers), DynamicCompletion = "observers")]
[CliExample("chronicle", "observers", "clear-quarantine", "550e8400-e29b-41d4-a716-446655440000")]
[LlmOption("<OBSERVER_ID>", "string", "Observer identifier (from 'cratis observers list') (positional)")]
public class ClearObserverQuarantineCommand : ChronicleCommand<ObserverCommandSettings>
{
/// <inheritdoc/>
protected override async Task<int> ExecuteCommandAsync(IServices services, ObserverCommandSettings settings, string format)
{
if (!ConfirmationHelper.ShouldProceed(settings, $"Are you sure you want to clear quarantine for observer '{settings.ObserverId}'?"))
{
OutputFormatter.WriteMessage(format, "Aborted.");
return ExitCodes.Success;
}

var cleared = await ClearObserverQuarantineInvoker.TryClear(
services.Observers,
settings.ResolveEventStore(),
settings.ResolveNamespace(),
settings.ObserverId,
settings.EventSequenceId);

if (!cleared)
{
OutputFormatter.WriteError(
format,
"Connected Chronicle contracts do not support clearing observer quarantine.",
"Upgrade Cratis.Chronicle.Contracts and Cratis.Chronicle.Connections to a version that includes observer quarantine clear support.",
ExitCodes.ValidationErrorCode);
return ExitCodes.ValidationError;
}

OutputFormatter.WriteMessage(format, $"Quarantine cleared for observer '{settings.ObserverId}'.");
return ExitCodes.Success;
}
}

static class ClearObserverQuarantineInvoker
{
public static async Task<bool> TryClear(object observers, string eventStore, string @namespace, string observerId, string eventSequenceId)
{
var method = observers.GetType()
.GetMethods()
.FirstOrDefault(_ => _.Name == "ClearObserverQuarantine" && _.GetParameters().Length > 0);

if (method is null)
{
return false;
}

var parameters = method.GetParameters();
var command = Activator.CreateInstance(parameters[0].ParameterType);
if (command is null)
{
return false;
}

if (!SetPropertyIfPresent(command, "EventStore", eventStore) ||
!SetPropertyIfPresent(command, "Namespace", @namespace) ||
!SetPropertyIfPresent(command, "ObserverId", observerId))
{
return false;
}

SetPropertyIfPresent(command, "EventSequenceId", eventSequenceId);

var arguments = new object?[parameters.Length];
arguments[0] = command;
for (var i = 1; i < parameters.Length; i++)
{
if (parameters[i].HasDefaultValue)
{
arguments[i] = parameters[i].DefaultValue;
}
else if (parameters[i].ParameterType.IsValueType)
{
arguments[i] = Activator.CreateInstance(parameters[i].ParameterType);
}
}

if (method.Invoke(observers, arguments) is not Task task)
{
return false;
}

await task;
return true;
}

static bool SetPropertyIfPresent(object instance, string propertyName, string value)
{
var property = instance.GetType().GetProperty(propertyName);
if (property?.CanWrite != true)
{
return false;
}

property.SetValue(instance, value);
return true;
}
}
15 changes: 14 additions & 1 deletion Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Cratis.Cli.Commands.Chronicle.Observers;
[LlmOption("-t, --type", "string", "Filter by type: reactor, reducer, projection, or all. Invalid values return an error.")]
public class ListObserversCommand : ChronicleCommand<ListObserversSettings>
{
const int QuarantinedRunningStateValue = 5;

/// <summary>
/// Filters observers by type name, returning all observers when the type is "all".
/// </summary>
Expand Down Expand Up @@ -51,6 +53,15 @@ internal static bool IsValidType(string type, out string errorMessage)
return false;
}

/// <summary>
/// Determines whether an observer is quarantined.
/// </summary>
/// <param name="observer">The observer to inspect.</param>
/// <returns><see langword="true"/> if quarantined; otherwise <see langword="false"/>.</returns>
internal static bool IsQuarantined(ObserverInformation observer) =>
string.Equals(observer.RunningState.ToString(), "Quarantined", StringComparison.OrdinalIgnoreCase) ||
(int)observer.RunningState == QuarantinedRunningStateValue;

/// <inheritdoc/>
protected override async Task<int> ExecuteCommandAsync(IServices services, ListObserversSettings settings, string format)
{
Expand All @@ -75,6 +86,7 @@ protected override async Task<int> ExecuteCommandAsync(IServices services, ListO
id = obs.Id,
type = obs.Type.ToString(),
runningState = obs.RunningState.ToString(),
isQuarantined = IsQuarantined(obs),
nextEventSequenceNumber = obs.NextEventSequenceNumber == ulong.MaxValue ? null : (ulong?)obs.NextEventSequenceNumber,
lastHandledEventSequenceNumber = obs.LastHandledEventSequenceNumber == ulong.MaxValue ? null : (ulong?)obs.LastHandledEventSequenceNumber,
isSubscribed = obs.IsSubscribed
Expand All @@ -86,12 +98,13 @@ protected override async Task<int> ExecuteCommandAsync(IServices services, ListO
OutputFormatter.Write(
format,
filtered,
["Id", "Type", "State", "Next#", "LastHandled#", "Subscribed"],
["Id", "Type", "State", "Quarantined", "Next#", "LastHandled#", "Subscribed"],
obs =>
[
obs.Id,
obs.Type.ToString(),
obs.RunningState.ToString(),
IsQuarantined(obs).ToString(),
obs.NextEventSequenceNumber == ulong.MaxValue ? "(never)" : obs.NextEventSequenceNumber.ToString(),
obs.LastHandledEventSequenceNumber == ulong.MaxValue ? "(never)" : obs.LastHandledEventSequenceNumber.ToString(),
obs.IsSubscribed.ToString()
Expand Down
Loading
Loading