diff --git a/CHRONICLE.md b/CHRONICLE.md index 2934b2c..7142ee6 100644 --- a/CHRONICLE.md +++ b/CHRONICLE.md @@ -30,7 +30,7 @@ cratis context set dev | `namespaces` | list | | `event-types` | list, show | | `events` | get, count (tail), has | -| `observers` | list, show, replay, replay-partition, retry-partition | +| `observers` | list, show, replay, replay-partition, retry-partition, clear-quarantine | | `failed-partitions` | list, show | | `recommendations` | list, perform, ignore | | `identities` | list | diff --git a/Directory.Packages.props b/Directory.Packages.props index 5879055..30b9954 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -25,7 +25,6 @@ - @@ -37,4 +36,4 @@ - \ No newline at end of file + diff --git a/Documentation/chronicle/index.md b/Documentation/chronicle/index.md index a4fb418..c7cbb90 100644 --- a/Documentation/chronicle/index.md +++ b/Documentation/chronicle/index.md @@ -30,7 +30,7 @@ Commands that operate within a specific event store or namespace accept the foll | `namespaces` | List namespaces within an event store. | | `event-types` | List and inspect registered event type definitions. | | `events` | Query events from an event sequence or retrieve the tail sequence number. | -| `observers` | List, inspect, replay, and retry observers. | +| `observers` | List, inspect, replay, retry, and clear quarantine for observers. | | `subscriptions` | Manage cross-event-store subscriptions for event forwarding. | | `failed-partitions` | List and inspect partitions where an observer has failed. | | `projections` | List and inspect projection definitions. | diff --git a/Documentation/chronicle/observers.md b/Documentation/chronicle/observers.md index a22764b..e9d32e1 100644 --- a/Documentation/chronicle/observers.md +++ b/Documentation/chronicle/observers.md @@ -4,7 +4,7 @@ Observers are the processing units that consume events from an event sequence an ## list -Lists all observers registered in the specified event store and namespace, including their current state and sequence position. +Lists all observers registered in the specified event store and namespace, including their current state, quarantine status, and sequence position. ```bash cratis chronicle observers list @@ -190,6 +190,39 @@ Retry a failed partition: cratis chronicle observers retry-partition my-observer-id user-42 ``` +## clear-quarantine + +Clears quarantine for a quarantined observer so it can resume processing. + +```bash +cratis chronicle observers clear-quarantine +``` + +The command prompts for confirmation before proceeding. + +### Arguments + +| Argument | Description | +|---|---| +| `OBSERVER_ID` | The observer whose quarantine should be cleared. | + +### Options + +| Flag | Description | +|---|---| +| `-e, --event-store ` | Event store. Defaults to `default`. | +| `-n, --namespace ` | Namespace. Defaults to `Default`. | +| `--sequence ` | Event sequence. Defaults to `event-log`. | +| `-y, --yes` | Skip confirmation prompt. | + +### Examples + +Clear quarantine for an observer: + +```bash +cratis chronicle observers clear-quarantine my-observer-id +``` + ## Troubleshooting Workflow Use this sequence when an observer appears stuck, behind, or failing: diff --git a/Integration/Chronicle/Chronicle.csproj b/Integration/Chronicle/Chronicle.csproj index 15ef726..fd92bcd 100644 --- a/Integration/Chronicle/Chronicle.csproj +++ b/Integration/Chronicle/Chronicle.csproj @@ -21,7 +21,7 @@ - + diff --git a/Integration/Chronicle/for_Observers/when_listing_observers.cs b/Integration/Chronicle/for_Observers/when_listing_observers.cs index 44921ca..2e7ca0d 100644 --- a/Integration/Chronicle/for_Observers/when_listing_observers.cs +++ b/Integration/Chronicle/for_Observers/when_listing_observers.cs @@ -19,5 +19,12 @@ public class context : given.a_connected_cli [Fact] void should_have_output() => (Context.Result.StandardOutput.Length > 0).ShouldBeTrue(); + [Fact] + void should_include_is_quarantined_field() + { + var first = JsonDocument.Parse(Context.Result.StandardOutput).RootElement.EnumerateArray().First(); + first.TryGetProperty("isQuarantined", out _).ShouldBeTrue(); + } + [Fact] void should_have_no_errors() => Context.Result.StandardError.ShouldEqual(string.Empty); } diff --git a/Source/Cli.Specs/Cli.Specs.csproj b/Source/Cli.Specs/Cli.Specs.csproj index 6c03f7a..5940b37 100644 --- a/Source/Cli.Specs/Cli.Specs.csproj +++ b/Source/Cli.Specs/Cli.Specs.csproj @@ -5,6 +5,7 @@ net10.0 true false + $(NoWarn);CS0436 diff --git a/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_does_not_exist.cs b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_does_not_exist.cs new file mode 100644 index 0000000..1eefc0e --- /dev/null +++ b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_does_not_exist.cs @@ -0,0 +1,15 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Cli.for_ClearObserverQuarantineCommand.when_invoking_clear_quarantine; + +public class and_method_does_not_exist : Specification +{ + bool _result; + + async Task Because() => _result = await ClearObserverQuarantineInvoker.TryClear(new FakeObservers(), "event-store", "namespace", "observer-id", "event-log"); + + [Fact] void should_return_false() => _result.ShouldBeFalse(); + + class FakeObservers; +} diff --git a/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_exists.cs b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_exists.cs new file mode 100644 index 0000000..0515ebd --- /dev/null +++ b/Source/Cli.Specs/for_ClearObserverQuarantineCommand/when_invoking_clear_quarantine/and_method_exists.cs @@ -0,0 +1,46 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Cli.for_ClearObserverQuarantineCommand.when_invoking_clear_quarantine; + +public class and_method_exists : Specification +{ + FakeObservers _observers; + bool _result; + + void Establish() => _observers = new(); + + async Task Because() => _result = await ClearObserverQuarantineInvoker.TryClear(_observers, "event-store", "namespace", "observer-id", "event-log"); + + [Fact] void should_invoke_clear_quarantine() => _observers.WasInvoked.ShouldBeTrue(); + [Fact] void should_return_true() => _result.ShouldBeTrue(); + [Fact] void should_set_event_store() => _observers.Command.EventStore.ShouldEqual("event-store"); + [Fact] void should_set_namespace() => _observers.Command.Namespace.ShouldEqual("namespace"); + [Fact] void should_set_observer_id() => _observers.Command.ObserverId.ShouldEqual("observer-id"); + [Fact] void should_set_event_sequence_id() => _observers.Command.EventSequenceId.ShouldEqual("event-log"); + + class FakeObservers + { + public bool WasInvoked { get; private set; } + + public FakeClearObserverQuarantine Command { get; private set; } = null!; + + public Task ClearObserverQuarantine(FakeClearObserverQuarantine command, int ignored = 0) + { + WasInvoked = true; + Command = command; + return Task.CompletedTask; + } + } + + class FakeClearObserverQuarantine + { + public string EventStore { get; set; } = string.Empty; + + public string Namespace { get; set; } = string.Empty; + + public string ObserverId { get; set; } = string.Empty; + + public string EventSequenceId { get; set; } = string.Empty; + } +} diff --git a/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_has_quarantined_value.cs b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_has_quarantined_value.cs new file mode 100644 index 0000000..266fa00 --- /dev/null +++ b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_has_quarantined_value.cs @@ -0,0 +1,18 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Contracts.Observation; + +namespace Cratis.Cli.for_ListObserversCommand.when_determining_if_observer_is_quarantined; + +public class and_running_state_has_quarantined_value : Specification +{ + bool _isQuarantined; + + void Because() => _isQuarantined = ListObserversCommand.IsQuarantined(new ObserverInformation + { + RunningState = (ObserverRunningState)5 + }); + + [Fact] void should_return_true() => _isQuarantined.ShouldBeTrue(); +} diff --git a/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_is_active.cs b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_is_active.cs new file mode 100644 index 0000000..c0c9c09 --- /dev/null +++ b/Source/Cli.Specs/for_ListObserversCommand/when_determining_if_observer_is_quarantined/and_running_state_is_active.cs @@ -0,0 +1,18 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Cratis.Chronicle.Contracts.Observation; + +namespace Cratis.Cli.for_ListObserversCommand.when_determining_if_observer_is_quarantined; + +public class and_running_state_is_active : Specification +{ + bool _isQuarantined; + + void Because() => _isQuarantined = ListObserversCommand.IsQuarantined(new ObserverInformation + { + RunningState = ObserverRunningState.Active + }); + + [Fact] void should_return_false() => _isQuarantined.ShouldBeFalse(); +} diff --git a/Source/Cli/Commands/Chronicle/Observers/ClearObserverQuarantineCommand.cs b/Source/Cli/Commands/Chronicle/Observers/ClearObserverQuarantineCommand.cs new file mode 100644 index 0000000..53448c7 --- /dev/null +++ b/Source/Cli/Commands/Chronicle/Observers/ClearObserverQuarantineCommand.cs @@ -0,0 +1,109 @@ +// Copyright (c) Cratis. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +namespace Cratis.Cli.Commands.Chronicle.Observers; + +/// +/// Clears quarantine for an observer. +/// +[LlmDescription("Clears quarantine for a quarantined observer so it can resume processing. Prompts for confirmation unless --yes is specified.")] +[CliCommand("clear-quarantine", "Clear quarantine for an observer", Branch = typeof(ChronicleBranch.Observers), DynamicCompletion = "observers")] +[CliExample("chronicle", "observers", "clear-quarantine", "550e8400-e29b-41d4-a716-446655440000")] +[LlmOption("", "string", "Observer identifier (from 'cratis observers list') (positional)")] +public class ClearObserverQuarantineCommand : ChronicleCommand +{ + /// + protected override async Task ExecuteCommandAsync(IServices services, ObserverCommandSettings settings, string format) + { + if (!ConfirmationHelper.ShouldProceed(settings, $"Are you sure you want to clear quarantine for observer '{settings.ObserverId}'?")) + { + OutputFormatter.WriteMessage(format, "Aborted."); + return ExitCodes.Success; + } + + var cleared = await ClearObserverQuarantineInvoker.TryClear( + services.Observers, + settings.ResolveEventStore(), + settings.ResolveNamespace(), + settings.ObserverId, + settings.EventSequenceId); + + if (!cleared) + { + OutputFormatter.WriteError( + format, + "Connected Chronicle contracts do not support clearing observer quarantine.", + "Upgrade Cratis.Chronicle.Contracts and Cratis.Chronicle.Connections to a version that includes observer quarantine clear support.", + ExitCodes.ValidationErrorCode); + return ExitCodes.ValidationError; + } + + OutputFormatter.WriteMessage(format, $"Quarantine cleared for observer '{settings.ObserverId}'."); + return ExitCodes.Success; + } +} + +static class ClearObserverQuarantineInvoker +{ + public static async Task TryClear(object observers, string eventStore, string @namespace, string observerId, string eventSequenceId) + { + var method = observers.GetType() + .GetMethods() + .FirstOrDefault(_ => _.Name == "ClearObserverQuarantine" && _.GetParameters().Length > 0); + + if (method is null) + { + return false; + } + + var parameters = method.GetParameters(); + var command = Activator.CreateInstance(parameters[0].ParameterType); + if (command is null) + { + return false; + } + + if (!SetPropertyIfPresent(command, "EventStore", eventStore) || + !SetPropertyIfPresent(command, "Namespace", @namespace) || + !SetPropertyIfPresent(command, "ObserverId", observerId)) + { + return false; + } + + SetPropertyIfPresent(command, "EventSequenceId", eventSequenceId); + + var arguments = new object?[parameters.Length]; + arguments[0] = command; + for (var i = 1; i < parameters.Length; i++) + { + if (parameters[i].HasDefaultValue) + { + arguments[i] = parameters[i].DefaultValue; + } + else if (parameters[i].ParameterType.IsValueType) + { + arguments[i] = Activator.CreateInstance(parameters[i].ParameterType); + } + } + + if (method.Invoke(observers, arguments) is not Task task) + { + return false; + } + + await task; + return true; + } + + static bool SetPropertyIfPresent(object instance, string propertyName, string value) + { + var property = instance.GetType().GetProperty(propertyName); + if (property?.CanWrite != true) + { + return false; + } + + property.SetValue(instance, value); + return true; + } +} diff --git a/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs b/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs index 81519aa..53b6fd6 100644 --- a/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs +++ b/Source/Cli/Commands/Chronicle/Observers/ListObserversCommand.cs @@ -14,6 +14,8 @@ namespace Cratis.Cli.Commands.Chronicle.Observers; [LlmOption("-t, --type", "string", "Filter by type: reactor, reducer, projection, or all. Invalid values return an error.")] public class ListObserversCommand : ChronicleCommand { + const int QuarantinedRunningStateValue = 5; + /// /// Filters observers by type name, returning all observers when the type is "all". /// @@ -51,6 +53,15 @@ internal static bool IsValidType(string type, out string errorMessage) return false; } + /// + /// Determines whether an observer is quarantined. + /// + /// The observer to inspect. + /// if quarantined; otherwise . + internal static bool IsQuarantined(ObserverInformation observer) => + string.Equals(observer.RunningState.ToString(), "Quarantined", StringComparison.OrdinalIgnoreCase) || + (int)observer.RunningState == QuarantinedRunningStateValue; + /// protected override async Task ExecuteCommandAsync(IServices services, ListObserversSettings settings, string format) { @@ -75,6 +86,7 @@ protected override async Task ExecuteCommandAsync(IServices services, ListO id = obs.Id, type = obs.Type.ToString(), runningState = obs.RunningState.ToString(), + isQuarantined = IsQuarantined(obs), nextEventSequenceNumber = obs.NextEventSequenceNumber == ulong.MaxValue ? null : (ulong?)obs.NextEventSequenceNumber, lastHandledEventSequenceNumber = obs.LastHandledEventSequenceNumber == ulong.MaxValue ? null : (ulong?)obs.LastHandledEventSequenceNumber, isSubscribed = obs.IsSubscribed @@ -86,12 +98,13 @@ protected override async Task ExecuteCommandAsync(IServices services, ListO OutputFormatter.Write( format, filtered, - ["Id", "Type", "State", "Next#", "LastHandled#", "Subscribed"], + ["Id", "Type", "State", "Quarantined", "Next#", "LastHandled#", "Subscribed"], obs => [ obs.Id, obs.Type.ToString(), obs.RunningState.ToString(), + IsQuarantined(obs).ToString(), obs.NextEventSequenceNumber == ulong.MaxValue ? "(never)" : obs.NextEventSequenceNumber.ToString(), obs.LastHandledEventSequenceNumber == ulong.MaxValue ? "(never)" : obs.LastHandledEventSequenceNumber.ToString(), obs.IsSubscribed.ToString() diff --git a/Source/Cli/README.md b/Source/Cli/README.md index f92c2e7..47f300f 100644 --- a/Source/Cli/README.md +++ b/Source/Cli/README.md @@ -189,6 +189,7 @@ Lists all observers registered in an event store/namespace. |--------|-------------|---------| | `-e, --event-store ` | Event store name | `default` | | `-n, --namespace ` | Namespace | `default` | +| *(output field)* `isQuarantined` | Indicates whether the observer is quarantined | | #### `observers replay` @@ -231,6 +232,21 @@ Retries a failed partition of an observer without full replay. Same arguments and options as `replay-partition`. +#### `observers clear-quarantine` + +```bash +cratis observers clear-quarantine [options] +``` + +Clears quarantine for a quarantined observer. + +| Argument / Option | Description | Default | +|-------------------|-------------|---------| +| `` | The observer ID _(required)_ | | +| `-e, --event-store ` | Event store name | `default` | +| `-n, --namespace ` | Namespace | `default` | +| `--sequence ` | Event sequence ID | `event-log` | + --- ### `failed-partitions`