Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cluster/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ All notable changes to `crucible/cluster` are documented here.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- **`Supervisor.Forget`.** Discards a supervisor's per-actor restart bookkeeping
(spent-restart counter and any scheduled backoff restart) for an actor id. A host
calls it when an actor is permanently stopped, so the restart map does not
accumulate one entry per distinct actor id for the process lifetime under churn.

### Fixed

- **Backoff overflow.** `backoffDelay` clamps to `math.MaxInt64` before the
`time.Duration` conversion, so an uncapped schedule (no max set) with a high
restart count no longer overflows into a negative or wrapped delay.
- **Nil-respawner Tick.** `Supervisor.Tick` guards against a Respawner cleared
between scheduling a backoff and the due tick: the due restart is a no-op and the
pending restart is preserved rather than panicking.

### Documentation

- **Capture consistency boundary.** `Capture`'s godoc now states that the instance
snapshot and actor-tree snapshot are read as two separate operations, so the
caller must quiesce the instance (no concurrent Fire or delivery) for the pair to
be point-in-time consistent.

## [0.1.0]

The first release of the host-side distribution runtime for the `crucible/state`
Expand Down
18 changes: 17 additions & 1 deletion cluster/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ func (s *Supervisor) scheduleBackoff(esc *state.ActorEscalation) bool {
}

// backoffDelay is initial*factor^n capped at max (and floored at initial when the
// factor would shrink it).
// factor would shrink it). When no max is set (max==0) the growth is unbounded, so
// the result is clamped to math.MaxInt64 before the time.Duration conversion: a
// large n with factor>1 can drive the float past the int64 range, and converting
// an out-of-range float64 to time.Duration is undefined and can yield a negative
// (or wrapped) delay that would fire a backoff immediately or never.
func backoffDelay(pol backoffPolicy, n int) time.Duration {
d := float64(pol.initial) * math.Pow(pol.factor, float64(n))
if d < float64(pol.initial) {
Expand All @@ -79,6 +83,9 @@ func backoffDelay(pol backoffPolicy, n int) time.Duration {
if pol.max > 0 && d > float64(pol.max) {
d = float64(pol.max)
}
if d > float64(math.MaxInt64) {
return time.Duration(math.MaxInt64)
}
return time.Duration(d)
}

Expand All @@ -90,6 +97,15 @@ func (s *Supervisor) Tick(ctx context.Context) int {
now := s.clock.Now()
s.mu.Lock()
respawner := s.respawner
// A Backoff schedule is only created while a Respawner is wired
// (scheduleBackoff returns false otherwise), but the Respawner can be cleared
// between scheduling and Tick; guard so a due restart with no Respawner is a
// no-op rather than a nil-pointer panic. The pending restarts are left in place
// so a Respawner wired again before the next Tick still applies them.
if respawner == nil {
s.mu.Unlock()
return 0
}
var due, keep []pendingRestart
for _, p := range s.pending {
if p.dueAt.After(now) {
Expand Down
43 changes: 43 additions & 0 deletions cluster/backoff_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package cluster

import (
"math"
"testing"
"time"
)

// TestBackoffDelay_ClampsOnUncappedOverflow verifies that with no max set (max==0)
// a large restart count with factor>1 does not overflow the time.Duration
// conversion into a negative or wrapped delay: the result is clamped to
// math.MaxInt64. Without the clamp, float64(initial)*factor^n exceeds the int64
// range and time.Duration(d) is undefined, which would fire a backoff immediately
// or never.
func TestBackoffDelay_ClampsOnUncappedOverflow(t *testing.T) {
pol := backoffPolicy{maxRestarts: 1000, initial: time.Second, max: 0, factor: 2.0}
// 2^60 seconds is far beyond math.MaxInt64 nanoseconds.
got := backoffDelay(pol, 60)
if got != time.Duration(math.MaxInt64) {
t.Fatalf("uncapped overflow delay = %d, want clamped to MaxInt64 (%d)", got, int64(math.MaxInt64))
}
if got < 0 {
t.Fatalf("delay must never be negative, got %d", got)
}
}

// TestBackoffDelay_RespectsExplicitMax confirms a set max caps growth well below
// the overflow clamp, so the clamp only governs the uncapped case.
func TestBackoffDelay_RespectsExplicitMax(t *testing.T) {
pol := backoffPolicy{maxRestarts: 1000, initial: time.Second, max: 5 * time.Second, factor: 2.0}
if got := backoffDelay(pol, 60); got != 5*time.Second {
t.Fatalf("capped delay = %v, want 5s", got)
}
}

// TestBackoffDelay_FloorsAtInitial confirms a shrinking factor never produces a
// delay below initial.
func TestBackoffDelay_FloorsAtInitial(t *testing.T) {
pol := backoffPolicy{maxRestarts: 10, initial: time.Second, max: 10 * time.Second, factor: 0.5}
if got := backoffDelay(pol, 5); got != time.Second {
t.Fatalf("floored delay = %v, want 1s", got)
}
}
83 changes: 83 additions & 0 deletions cluster/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,86 @@ func TestSupervisor_DefaultClock(t *testing.T) {
t.Fatalf("Tick with nothing pending = %d, want 0", n)
}
}

// TestSupervisor_TickNilRespawnerNoPanic confirms a Tick whose Respawner was
// cleared after a backoff was scheduled does not panic: the due restart is a no-op
// and stays pending so a Respawner wired again later still applies it.
func TestSupervisor_TickNilRespawnerNoPanic(t *testing.T) {
ctx := context.Background()
clock := state.NewFakeClock(time.Unix(0, 0))
sup := cluster.NewSupervisor(
cluster.WithBackoff("child", 3, 100*time.Millisecond, time.Second, 2.0),
cluster.WithClock(clock),
)
sys := backoffSystem(t, sup)

sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
clock.Advance(100 * time.Millisecond)

// Clear the respawner between scheduling and the due Tick.
sup.SetRespawner(nil)
if n := sup.Tick(ctx); n != 0 {
t.Fatalf("Tick with a cleared respawner restarted %d, want 0", n)
}

// The pending restart was preserved: rewiring a respawner and ticking applies it.
sup.SetRespawner(sys)
if n := sup.Tick(ctx); n != 1 {
t.Fatalf("Tick after rewiring the respawner restarted %d, want 1", n)
}
}

// TestSupervisor_ForgetEvictsBookkeeping confirms Forget drops an actor's spent
// restart counter and any pending restart, bounding the restarts map under churn.
// After Forget a re-spawn of the same id earns a fresh restart budget.
func TestSupervisor_ForgetEvictsBookkeeping(t *testing.T) {
ctx := context.Background()
clock := state.NewFakeClock(time.Unix(0, 0))
sup := cluster.NewSupervisor(
cluster.WithRestart("child", 1),
cluster.WithClock(clock),
)
sys := backoffSystem(t, sup)

// Spend the single restart budget.
sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
if sys.Running() != 1 {
t.Fatalf("after first (restart) failure Running() = %d, want 1", sys.Running())
}
// Budget spent: a second failure escalates rather than restarts.
sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
if sys.Running() != 0 {
t.Fatalf("after budget-exhausting failure Running() = %d, want 0", sys.Running())
}

// Forget the actor (a genuine teardown), then re-spawn it: it earns a fresh
// budget, so the next failure restarts again rather than escalating.
sup.Forget("worker-1")
if _, err := sys.Respawn(ctx, "child", "worker-1", nil); err != nil {
t.Fatalf("Respawn after Forget: %v", err)
}
sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
if sys.Running() != 1 {
t.Fatalf("after Forget+respawn, a failure should restart within the fresh budget; Running() = %d, want 1", sys.Running())
}
}

// TestSupervisor_ForgetPendingBackoff confirms Forget removes a not-yet-due backoff
// restart, so a forgotten actor is not restarted by a later Tick.
func TestSupervisor_ForgetPendingBackoff(t *testing.T) {
ctx := context.Background()
clock := state.NewFakeClock(time.Unix(0, 0))
sup := cluster.NewSupervisor(
cluster.WithBackoff("child", 3, 100*time.Millisecond, time.Second, 2.0),
cluster.WithClock(clock),
)
sys := backoffSystem(t, sup)

sys.Local().SettleError(ctx, "worker-1", errors.New("boom"))
sup.Forget("worker-1") // drop the scheduled restart before it is due

clock.Advance(time.Second)
if n := sup.Tick(ctx); n != 0 {
t.Fatalf("Tick after forgetting the pending restart restarted %d, want 0", n)
}
}
12 changes: 11 additions & 1 deletion cluster/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,17 @@ type Checkpoint struct {

// Capture snapshots a running instance, its actor tree, and its machine definition
// into a Checkpoint ready to ship to another node. It is a pure read: it neither
// fires the instance nor mutates any actor. Call it at a quiescent point.
// fires the instance nor mutates any actor.
//
// # Consistency boundary
//
// The instance snapshot and the actor-tree snapshot are read as two separate
// operations, not under one combined lock, so Capture does not by itself produce a
// point-in-time-consistent pair. The caller must ensure no Fire or actor delivery
// runs against the instance for the duration of the call — that is what "quiescent"
// means here. Captured under concurrent firing, the snapshot and the actor tree may
// reflect different instants and Restore could rebuild a tree that does not match
// the kernel configuration. Quiesce the instance (stop driving it) before Capture.
func Capture[S comparable, E comparable, C any](inst *state.Instance[S, E, C], sys *state.ActorSystem[S, E, C], machine *state.Machine[S, E, C]) (Checkpoint, error) {
snap, err := state.MarshalSnapshot(inst.Snapshot())
if err != nil {
Expand Down
62 changes: 62 additions & 0 deletions cluster/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,68 @@ func TestMigration_BreakingTargetRefused(t *testing.T) {
}
}

// TestMigration_Restore_BadMachineIR reports an error when the captured machine IR
// cannot be decoded, rather than silently proceeding with no compatibility gate.
func TestMigration_Restore_BadMachineIR(t *testing.T) {
ctx := context.Background()
cp := capturedInB(t)
cp.MachineIR = json.RawMessage(`{not valid ir`)

_, _, err := cluster.Restore(ctx, cp, migSource())
if err == nil {
t.Fatal("Restore with a corrupt machine IR must error")
}
if errors.Is(err, cluster.ErrIncompatibleMigration) {
t.Fatalf("a decode failure must not masquerade as an incompatible-migration refusal: %v", err)
}
}

// TestMigration_Restore_BadSnapshot reports an error when the captured snapshot
// cannot be unmarshaled, after the compatibility gate has passed.
func TestMigration_Restore_BadSnapshot(t *testing.T) {
ctx := context.Background()
cp := capturedInB(t)
cp.Snapshot = json.RawMessage(`{"current": 12345}`) // wrong shape for the snapshot

_, _, err := cluster.Restore(ctx, cp, migSource())
if err == nil {
t.Fatal("Restore with a corrupt snapshot must error")
}
}

// TestMigration_Restore_BadActors reports an error when a captured actor entry
// cannot be decoded, rather than restoring a partial actor tree.
func TestMigration_Restore_BadActors(t *testing.T) {
ctx := context.Background()
cp := capturedInB(t)
cp.Actors = map[string]json.RawMessage{"w-bad": json.RawMessage(`{not an actor`)}

_, _, err := cluster.Restore(ctx, cp, migSource(), cluster.WithActorBehaviors(map[string]state.ActorBehavior{
"child": childBehavior(),
}))
if err == nil {
t.Fatal("Restore with a corrupt actor entry must error")
}
}

// TestMigration_Capture_Marshalable confirms Capture succeeds on a well-formed
// instance and that the captured snapshot, IR, and actor tree are all valid JSON
// (the marshal/serialize/snapshot success paths the error branches guard).
func TestMigration_Capture_Marshalable(t *testing.T) {
cp := capturedInB(t)
if !json.Valid(cp.Snapshot) {
t.Error("captured Snapshot is not valid JSON")
}
if !json.Valid(cp.MachineIR) {
t.Error("captured MachineIR is not valid JSON")
}
for id, raw := range cp.Actors {
if !json.Valid(raw) {
t.Errorf("captured actor %q is not valid JSON", id)
}
}
}

// TestMigration_ActorsMove confirms a migrated instance carries its running actors.
func TestMigration_ActorsMove(t *testing.T) {
ctx := context.Background()
Expand Down
23 changes: 23 additions & 0 deletions cluster/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,29 @@ func (s *Supervisor) tryRestart(ctx context.Context, esc *state.ActorEscalation)
return true
}

// Forget discards the supervisor's per-actor restart bookkeeping for actorID: its
// spent-restart counter and any not-yet-applied backoff restart scheduled for it.
// A host calls it when an actor is permanently stopped (not restarted), so the
// supervisor's restart map does not accumulate one entry per distinct actor id for
// the process lifetime under churn. Forgetting an unknown id is a no-op. After
// Forget a re-spawn of the same id starts a fresh restart budget, so call it only
// for a genuine teardown, never between a failure and its restart.
func (s *Supervisor) Forget(actorID string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.restarts, actorID)
if len(s.pending) == 0 {
return
}
kept := s.pending[:0]
for _, p := range s.pending {
if p.actorID != actorID {
kept = append(kept, p)
}
}
s.pending = kept
}

// Handled returns a snapshot of the failures this supervisor has processed, in
// order. The returned slice is a copy and safe to retain.
func (s *Supervisor) Handled() []HandledEscalation {
Expand Down
31 changes: 29 additions & 2 deletions durable/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,33 @@ All notable changes to `crucible/durable` are documented here.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- **Recording-clock buffer race.** Closing a step's `Record` (draining the
recording clock) and the clock-read index reads used to mirror timer deadlines
now take the recording clock's mutex, so they are safe against a `Now()` the
scheduler reads from a host's timer goroutine — the concurrency the recording
clock's documentation promises.
- **FileStore directory durability.** Atomic checkpoint writes now fsync the
parent directory after the rename, so the rename itself survives a crash and the
documented crash-durability holds, not only the file's bytes. The directory sync
is split by build tag (`filestore_dirsync_other.go` for POSIX, a no-op
`filestore_dirsync_windows.go` for Windows) because Windows does not permit
opening a directory handle for sync — the file rename plus the file's own fsync
are sufficient for Windows crash durability.
- **Actor replay error.** A recorded actor done-data payload that fails to decode
is now surfaced as a replay error instead of being silently dropped, so a
corrupt journal fails loudly rather than re-firing the parent with nil data.

### Removed

- **`WithRetainTail`.** The per-checkpoint option was inert: time-travel retention
is a store-level capability (`NewMemStore` `WithHistory`, the `HistoryStore`
seam), so the option changed nothing. The `CheckpointOption` seam itself is
retained for additive per-checkpoint policy.

## [0.1.0]

The first release of the host-side durable-execution runtime for the
Expand Down Expand Up @@ -43,7 +70,7 @@ kernel, which stays pure and stdlib-only.
recorded step, read-only, running no service, actor, clock, or effect. Backed
by the `HistoryStore` seam, falling back to `Load` otherwise.
- **Tuning options.** `WithCheckpointEvery` to bound recovery replay,
`WithEventCodec` for event serialization, and per-call append/checkpoint
options.
`WithEventCodec` for event serialization, and a per-append idempotency key
(`WithIdempotencyKey`).

[0.1.0]: https://github.com/stablekernel/crucible/releases/tag/durable/v0.1.0
Loading
Loading