diff --git a/CHRONICLE.md b/CHRONICLE.md
index 2934b2c..7142ee6 100644
--- a/CHRONICLE.md
+++ b/CHRONICLE.md
@@ -30,7 +30,7 @@ cratis context set dev
| `namespaces` | list |
| `event-types` | list, show |
| `events` | get, count (tail), has |
-| `observers` | list, show, replay, replay-partition, retry-partition |
+| `observers` | list, show, replay, replay-partition, retry-partition, clear-quarantine |
| `failed-partitions` | list, show |
| `recommendations` | list, perform, ignore |
| `identities` | list |
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 5879055..30b9954 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -25,7 +25,6 @@
-
@@ -37,4 +36,4 @@
-
\ No newline at end of file
+
diff --git a/Documentation/chronicle/index.md b/Documentation/chronicle/index.md
index a4fb418..c7cbb90 100644
--- a/Documentation/chronicle/index.md
+++ b/Documentation/chronicle/index.md
@@ -30,7 +30,7 @@ Commands that operate within a specific event store or namespace accept the foll
| `namespaces` | List namespaces within an event store. |
| `event-types` | List and inspect registered event type definitions. |
| `events` | Query events from an event sequence or retrieve the tail sequence number. |
-| `observers` | List, inspect, replay, and retry observers. |
+| `observers` | List, inspect, replay, retry, and clear quarantine for observers. |
| `subscriptions` | Manage cross-event-store subscriptions for event forwarding. |
| `failed-partitions` | List and inspect partitions where an observer has failed. |
| `projections` | List and inspect projection definitions. |
diff --git a/Documentation/chronicle/observers.md b/Documentation/chronicle/observers.md
index a22764b..e9d32e1 100644
--- a/Documentation/chronicle/observers.md
+++ b/Documentation/chronicle/observers.md
@@ -4,7 +4,7 @@ Observers are the processing units that consume events from an event sequence an
## list
-Lists all observers registered in the specified event store and namespace, including their current state and sequence position.
+Lists all observers registered in the specified event store and namespace, including their current state, quarantine status, and sequence position.
```bash
cratis chronicle observers list
@@ -190,6 +190,39 @@ Retry a failed partition:
cratis chronicle observers retry-partition my-observer-id user-42
```
+## clear-quarantine
+
+Clears quarantine for a quarantined observer so it can resume processing.
+
+```bash
+cratis chronicle observers clear-quarantine
+```
+
+The command prompts for confirmation before proceeding.
+
+### Arguments
+
+| Argument | Description |
+|---|---|
+| `OBSERVER_ID` | The observer whose quarantine should be cleared. |
+
+### Options
+
+| Flag | Description |
+|---|---|
+| `-e, --event-store ` | Event store. Defaults to `default`. |
+| `-n, --namespace ` | Namespace. Defaults to `Default`. |
+| `--sequence ` | Event sequence. Defaults to `event-log`. |
+| `-y, --yes` | Skip confirmation prompt. |
+
+### Examples
+
+Clear quarantine for an observer:
+
+```bash
+cratis chronicle observers clear-quarantine my-observer-id
+```
+
## Troubleshooting Workflow
Use this sequence when an observer appears stuck, behind, or failing:
diff --git a/Integration/Chronicle/Chronicle.csproj b/Integration/Chronicle/Chronicle.csproj
index 15ef726..fd92bcd 100644
--- a/Integration/Chronicle/Chronicle.csproj
+++ b/Integration/Chronicle/Chronicle.csproj
@@ -21,7 +21,7 @@
-
+
diff --git a/Integration/Chronicle/for_Observers/when_listing_observers.cs b/Integration/Chronicle/for_Observers/when_listing_observers.cs
index 44921ca..2e7ca0d 100644
--- a/Integration/Chronicle/for_Observers/when_listing_observers.cs
+++ b/Integration/Chronicle/for_Observers/when_listing_observers.cs
@@ -19,5 +19,12 @@ public class context : given.a_connected_cli
[Fact] void should_have_output() => (Context.Result.StandardOutput.Length > 0).ShouldBeTrue();
+ [Fact]
+ void should_include_is_quarantined_field()
+ {
+ var first = JsonDocument.Parse(Context.Result.StandardOutput).RootElement.EnumerateArray().First();
+ first.TryGetProperty("isQuarantined", out _).ShouldBeTrue();
+ }
+
[Fact] void should_have_no_errors() => Context.Result.StandardError.ShouldEqual(string.Empty);
}
diff --git a/Source/Cli.Specs/Cli.Specs.csproj b/Source/Cli.Specs/Cli.Specs.csproj
index 6c03f7a..5940b37 100644
--- a/Source/Cli.Specs/Cli.Specs.csproj
+++ b/Source/Cli.Specs/Cli.Specs.csproj
@@ -5,6 +5,7 @@
net10.0
true
false
+ $(NoWarn);CS0436
diff --git a/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_does_not_exist.cs b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_does_not_exist.cs
new file mode 100644
index 0000000..1eefc0e
--- /dev/null
+++ b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_does_not_exist.cs
@@ -0,0 +1,15 @@
+// Copyright (c) Cratis. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+namespace Cratis.Cli.for_ClearObserverQuarantineCommand.when_invoking_clear_quarantine;
+
+public class and_method_does_not_exist : Specification
+{
+ bool _result;
+
+ async Task Because() => _result = await ClearObserverQuarantineInvoker.TryClear(new FakeObservers(), "event-store", "namespace", "observer-id", "event-log");
+
+ [Fact] void should_return_false() => _result.ShouldBeFalse();
+
+ class FakeObservers;
+}
diff --git a/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_exists.cs b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_exists.cs
new file mode 100644
index 0000000..0515ebd
--- /dev/null
+++ b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_exists.cs
@@ -0,0 +1,46 @@
+// Copyright (c) Cratis. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+namespace Cratis.Cli.for_ClearObserverQuarantineCommand.when_invoking_clear_quarantine;
+
+public class and_method_exists : Specification
+{
+ FakeObservers _observers;
+ bool _result;
+
+ void Establish() => _observers = new();
+
+ async Task Because() => _result = await ClearObserverQuarantineInvoker.TryClear(_observers, "event-store", "namespace", "observer-id", "event-log");
+
+ [Fact] void should_invoke_clear_quarantine() => _observers.WasInvoked.ShouldBeTrue();
+ [Fact] void should_return_true() => _result.ShouldBeTrue();
+ [Fact] void should_set_event_store() => _observers.Command.EventStore.ShouldEqual("event-store");
+ [Fact] void should_set_namespace() => _observers.Command.Namespace.ShouldEqual("namespace");
+ [Fact] void should_set_observer_id() => _observers.Command.ObserverId.ShouldEqual("observer-id");
+ [Fact] void should_set_event_sequence_id() => _observers.Command.EventSequenceId.ShouldEqual("event-log");
+
+ class FakeObservers
+ {
+ public bool WasInvoked { get; private set; }
+
+ public FakeClearObserverQuarantine Command { get; private set; } = null!;
+
+ public Task ClearObserverQuarantine(FakeClearObserverQuarantine command, int ignored = 0)
+ {
+ WasInvoked = true;
+ Command = command;
+ return Task.CompletedTask;
+ }
+ }
+
+ class FakeClearObserverQuarantine
+ {
+ public string EventStore { get; set; } = string.Empty;
+
+ public string Namespace { get; set; } = string.Empty;
+
+ public string ObserverId { get; set; } = string.Empty;
+
+ public string EventSequenceId { get; set; } = string.Empty;
+ }
+}
diff --git a/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_has_quarantined_value.cs b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_has_quarantined_value.cs
new file mode 100644
index 0000000..266fa00
--- /dev/null
+++ b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_has_quarantined_value.cs
@@ -0,0 +1,18 @@
+// Copyright (c) Cratis. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+using Cratis.Chronicle.Contracts.Observation;
+
+namespace Cratis.Cli.for_ListObserversCommand.when_determining_if_observer_is_quarantined;
+
+public class and_running_state_has_quarantined_value : Specification
+{
+ bool _isQuarantined;
+
+ void Because() => _isQuarantined = ListObserversCommand.IsQuarantined(new ObserverInformation
+ {
+ RunningState = (ObserverRunningState)5
+ });
+
+ [Fact] void should_return_true() => _isQuarantined.ShouldBeTrue();
+}
diff --git a/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_is_active.cs b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_is_active.cs
new file mode 100644
index 0000000..c0c9c09
--- /dev/null
+++ b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_is_active.cs
@@ -0,0 +1,18 @@
+// Copyright (c) Cratis. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+using Cratis.Chronicle.Contracts.Observation;
+
+namespace Cratis.Cli.for_ListObserversCommand.when_determining_if_observer_is_quarantined;
+
+public class and_running_state_is_active : Specification
+{
+ bool _isQuarantined;
+
+ void Because() => _isQuarantined = ListObserversCommand.IsQuarantined(new ObserverInformation
+ {
+ RunningState = ObserverRunningState.Active
+ });
+
+ [Fact] void should_return_false() => _isQuarantined.ShouldBeFalse();
+}
diff --git a/Source/Cli/Commands/Chronicle/Observers/ClearObserverQuarantineCommand.cs b/Source/Cli/Commands/Chronicle/Observers/ClearObserverQuarantineCommand.cs
new file mode 100644
index 0000000..53448c7
--- /dev/null
+++ b/Source/Cli/Commands/Chronicle/Observers/ClearObserverQuarantineCommand.cs
@@ -0,0 +1,109 @@
+// Copyright (c) Cratis. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+namespace Cratis.Cli.Commands.Chronicle.Observers;
+
+///
+/// Clears quarantine for an observer.
+///
+[LlmDescription("Clears quarantine for a quarantined observer so it can resume processing. Prompts for confirmation unless --yes is specified.")]
+[CliCommand("clear-quarantine", "Clear quarantine for an observer", Branch = typeof(ChronicleBranch.Observers), DynamicCompletion = "observers")]
+[CliExample("chronicle", "observers", "clear-quarantine", "550e8400-e29b-41d4-a716-446655440000")]
+[LlmOption("", "string", "Observer identifier (from 'cratis observers list') (positional)")]
+public class ClearObserverQuarantineCommand : ChronicleCommand
+{
+ ///
+ protected override async Task ExecuteCommandAsync(IServices services, ObserverCommandSettings settings, string format)
+ {
+ if (!ConfirmationHelper.ShouldProceed(settings, $"Are you sure you want to clear quarantine for observer '{settings.ObserverId}'?"))
+ {
+ OutputFormatter.WriteMessage(format, "Aborted.");
+ return ExitCodes.Success;
+ }
+
+ var cleared = await ClearObserverQuarantineInvoker.TryClear(
+ services.Observers,
+ settings.ResolveEventStore(),
+ settings.ResolveNamespace(),
+ settings.ObserverId,
+ settings.EventSequenceId);
+
+ if (!cleared)
+ {
+ OutputFormatter.WriteError(
+ format,
+ "Connected Chronicle contracts do not support clearing observer quarantine.",
+ "Upgrade Cratis.Chronicle.Contracts and Cratis.Chronicle.Connections to a version that includes observer quarantine clear support.",
+ ExitCodes.ValidationErrorCode);
+ return ExitCodes.ValidationError;
+ }
+
+ OutputFormatter.WriteMessage(format, $"Quarantine cleared for observer '{settings.ObserverId}'.");
+ return ExitCodes.Success;
+ }
+}
+
+static class ClearObserverQuarantineInvoker
+{
+ public static async Task TryClear(object observers, string eventStore, string @namespace, string observerId, string eventSequenceId)
+ {
+ var method = observers.GetType()
+ .GetMethods()
+ .FirstOrDefault(_ => _.Name == "ClearObserverQuarantine" && _.GetParameters().Length > 0);
+
+ if (method is null)
+ {
+ return false;
+ }
+
+ var parameters = method.GetParameters();
+ var command = Activator.CreateInstance(parameters[0].ParameterType);
+ if (command is null)
+ {
+ return false;
+ }
+
+ if (!SetPropertyIfPresent(command, "EventStore", eventStore) ||
+ !SetPropertyIfPresent(command, "Namespace", @namespace) ||
+ !SetPropertyIfPresent(command, "ObserverId", observerId))
+ {
+ return false;
+ }
+
+ SetPropertyIfPresent(command, "EventSequenceId", eventSequenceId);
+
+ var arguments = new object?[parameters.Length];
+ arguments[0] = command;
+ for (var i = 1; i < parameters.Length; i++)
+ {
+ if (parameters[i].HasDefaultValue)
+ {
+ arguments[i] = parameters[i].DefaultValue;
+ }
+ else if (parameters[i].ParameterType.IsValueType)
+ {
+ arguments[i] = Activator.CreateInstance(parameters[i].ParameterType);
+ }
+ }
+
+ if (method.Invoke(observers, arguments) is not Task task)
+ {
+ return false;
+ }
+
+ await task;
+ return true;
+ }
+
+ static bool SetPropertyIfPresent(object instance, string propertyName, string value)
+ {
+ var property = instance.GetType().GetProperty(propertyName);
+ if (property?.CanWrite != true)
+ {
+ return false;
+ }
+
+ property.SetValue(instance, value);
+ return true;
+ }
+}
diff --git a/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs b/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs
index 81519aa..53b6fd6 100644
--- a/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs
+++ b/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs
@@ -14,6 +14,8 @@ namespace Cratis.Cli.Commands.Chronicle.Observers;
[LlmOption("-t, --type", "string", "Filter by type: reactor, reducer, projection, or all. Invalid values return an error.")]
public class ListObserversCommand : ChronicleCommand
{
+ const int QuarantinedRunningStateValue = 5;
+
///
/// Filters observers by type name, returning all observers when the type is "all".
///
@@ -51,6 +53,15 @@ internal static bool IsValidType(string type, out string errorMessage)
return false;
}
+ ///
+ /// Determines whether an observer is quarantined.
+ ///
+ /// The observer to inspect.
+ /// if quarantined; otherwise .
+ internal static bool IsQuarantined(ObserverInformation observer) =>
+ string.Equals(observer.RunningState.ToString(), "Quarantined", StringComparison.OrdinalIgnoreCase) ||
+ (int)observer.RunningState == QuarantinedRunningStateValue;
+
///
protected override async Task ExecuteCommandAsync(IServices services, ListObserversSettings settings, string format)
{
@@ -75,6 +86,7 @@ protected override async Task ExecuteCommandAsync(IServices services, ListO
id = obs.Id,
type = obs.Type.ToString(),
runningState = obs.RunningState.ToString(),
+ isQuarantined = IsQuarantined(obs),
nextEventSequenceNumber = obs.NextEventSequenceNumber == ulong.MaxValue ? null : (ulong?)obs.NextEventSequenceNumber,
lastHandledEventSequenceNumber = obs.LastHandledEventSequenceNumber == ulong.MaxValue ? null : (ulong?)obs.LastHandledEventSequenceNumber,
isSubscribed = obs.IsSubscribed
@@ -86,12 +98,13 @@ protected override async Task ExecuteCommandAsync(IServices services, ListO
OutputFormatter.Write(
format,
filtered,
- ["Id", "Type", "State", "Next#", "LastHandled#", "Subscribed"],
+ ["Id", "Type", "State", "Quarantined", "Next#", "LastHandled#", "Subscribed"],
obs =>
[
obs.Id,
obs.Type.ToString(),
obs.RunningState.ToString(),
+ IsQuarantined(obs).ToString(),
obs.NextEventSequenceNumber == ulong.MaxValue ? "(never)" : obs.NextEventSequenceNumber.ToString(),
obs.LastHandledEventSequenceNumber == ulong.MaxValue ? "(never)" : obs.LastHandledEventSequenceNumber.ToString(),
obs.IsSubscribed.ToString()
diff --git a/Source/Cli/README.md b/Source/Cli/README.md
index f92c2e7..47f300f 100644
--- a/Source/Cli/README.md
+++ b/Source/Cli/README.md
@@ -189,6 +189,7 @@ Lists all observers registered in an event store/namespace.
|--------|-------------|---------|
| `-e, --event-store ` | Event store name | `default` |
| `-n, --namespace ` | Namespace | `default` |
+| *(output field)* `isQuarantined` | Indicates whether the observer is quarantined | |
#### `observers replay`
@@ -231,6 +232,21 @@ Retries a failed partition of an observer without full replay.
Same arguments and options as `replay-partition`.
+#### `observers clear-quarantine`
+
+```bash
+cratis observers clear-quarantine [options]
+```
+
+Clears quarantine for a quarantined observer.
+
+| Argument / Option | Description | Default |
+|-------------------|-------------|---------|
+| `` | The observer ID _(required)_ | |
+| `-e, --event-store ` | Event store name | `default` |
+| `-n, --namespace ` | Namespace | `default` |
+| `--sequence ` | Event sequence ID | `event-log` |
+
---
### `failed-partitions`