From 1b8728a39154da7f018ec3b8876df7a0e66cbe01 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:08:29 -0400 Subject: [PATCH 1/5] fix(durable): close clock-buffer race, fsync checkpoint dir, surface actor replay error, drop inert WithRetainTail Signed-off-by: Joshua Temple --- durable/CHANGELOG.md | 27 ++++++++++++++-- durable/actor.go | 21 ++++++++----- durable/actor_internal_test.go | 57 ++++++++++++++++++++++++++++++++++ durable/clock.go | 41 ++++++++++++++++++++++++ durable/clock_internal_test.go | 41 ++++++++++++++++++++++++ durable/filestore.go | 30 +++++++++++++++--- durable/filestore_test.go | 8 +++-- durable/memstore.go | 2 +- durable/memstore_test.go | 10 +++--- durable/options.go | 25 +++++---------- durable/runner.go | 46 ++++++++++++++------------- 11 files changed, 250 insertions(+), 58 deletions(-) create mode 100644 durable/actor_internal_test.go diff --git a/durable/CHANGELOG.md b/durable/CHANGELOG.md index 9715faf..3a82425 100644 --- a/durable/CHANGELOG.md +++ b/durable/CHANGELOG.md @@ -5,6 +5,29 @@ All notable changes to `crucible/durable` are documented here. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Fixed + +- **Recording-clock buffer race.** Closing a step's `Record` (draining the + recording clock) and the clock-read index reads used to mirror timer deadlines + now take the recording clock's mutex, so they are safe against a `Now()` the + scheduler reads from a host's timer goroutine — the concurrency the recording + clock's documentation promises. +- **FileStore directory durability.** Atomic checkpoint writes now fsync the + parent directory after the rename, so the rename itself survives a crash and the + documented crash-durability holds, not only the file's bytes. +- **Actor replay error.** A recorded actor done-data payload that fails to decode + is now surfaced as a replay error instead of being silently dropped, so a + corrupt journal fails loudly rather than re-firing the parent with nil data. + +### Removed + +- **`WithRetainTail`.** The per-checkpoint option was inert: time-travel retention + is a store-level capability (`NewMemStore` `WithHistory`, the `HistoryStore` + seam), so the option changed nothing. The `CheckpointOption` seam itself is + retained for additive per-checkpoint policy. + ## [0.1.0] The first release of the host-side durable-execution runtime for the @@ -43,7 +66,7 @@ kernel, which stays pure and stdlib-only. recorded step, read-only, running no service, actor, clock, or effect. Backed by the `HistoryStore` seam, falling back to `Load` otherwise. - **Tuning options.** `WithCheckpointEvery` to bound recovery replay, - `WithEventCodec` for event serialization, and per-call append/checkpoint - options. + `WithEventCodec` for event serialization, and a per-append idempotency key + (`WithIdempotencyKey`). [0.1.0]: https://github.com/stablekernel/crucible/releases/tag/durable/v0.1.0 diff --git a/durable/actor.go b/durable/actor.go index 30beb41..792ba00 100644 --- a/durable/actor.go +++ b/durable/actor.go @@ -242,7 +242,11 @@ func replayActor[S comparable, E comparable, C any](ctx context.Context, inst *s } var res state.FireResult[S] if mp.HasData { - res = inst.Fire(ctx, event, state.WithEventData(replayActorData(mp))) + data, derr := replayActorData(mp) + if derr != nil { + return fmt.Errorf("durable: decoding recorded actor %q done-data: %w", entry.CorrelationID, derr) + } + res = inst.Fire(ctx, event, state.WithEventData(data)) } else { res = inst.Fire(ctx, event) } @@ -255,18 +259,21 @@ func replayActor[S comparable, E comparable, C any](ctx context.Context, inst *s // replayActorData reconstructs the actor outcome a settling fire carried into the // parent transition's Assign: the reconstructed error for an onError fire, the // JSON-decoded done-data for an onDone fire, or nil when the actor completed with no -// output. -func replayActorData(mp actorMessagePayload) any { +// output. A done-data payload that fails to decode is surfaced as an error rather +// than silently dropped, so a corrupt journal fails replay loudly instead of +// re-firing the parent with nil data it never saw on the live path. +func replayActorData(mp actorMessagePayload) (any, error) { if mp.Error != nil { - return errors.New(*mp.Error) + return errors.New(*mp.Error), nil } if len(mp.Data) > 0 { var data any - if err := json.Unmarshal(mp.Data, &data); err == nil { - return data + if err := json.Unmarshal(mp.Data, &data); err != nil { + return nil, fmt.Errorf("unmarshaling actor done-data: %w", err) } + return data, nil } - return nil + return nil, nil } // actorEntries returns the recorded actor transitions carried by a single Record, diff --git a/durable/actor_internal_test.go b/durable/actor_internal_test.go new file mode 100644 index 0000000..6d81a00 --- /dev/null +++ b/durable/actor_internal_test.go @@ -0,0 +1,57 @@ +package durable + +import ( + "encoding/json" + "testing" +) + +// TestReplayActorData covers the three reconstruction branches and the +// surfaced-error path: an onError payload becomes an error, a valid onDone payload +// decodes to its value, an absent payload is nil, and a corrupt done-data payload +// is reported rather than silently dropped (so replay fails loudly instead of +// re-firing the parent with nil data it never saw live). +func TestReplayActorData(t *testing.T) { + errMsg := "boom" + + t.Run("error", func(t *testing.T) { + got, err := replayActorData(actorMessagePayload{Error: &errMsg}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + gotErr, ok := got.(error) + if !ok || gotErr.Error() != errMsg { + t.Fatalf("want reconstructed error %q, got %v", errMsg, got) + } + }) + + t.Run("valid data", func(t *testing.T) { + got, err := replayActorData(actorMessagePayload{Data: json.RawMessage(`{"k":1}`)}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + m, ok := got.(map[string]any) + if !ok || m["k"] != float64(1) { + t.Fatalf("want decoded map, got %#v", got) + } + }) + + t.Run("no data", func(t *testing.T) { + got, err := replayActorData(actorMessagePayload{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != nil { + t.Fatalf("want nil for absent payload, got %#v", got) + } + }) + + t.Run("corrupt data surfaces error", func(t *testing.T) { + got, err := replayActorData(actorMessagePayload{Data: json.RawMessage(`{not json`)}) + if err == nil { + t.Fatal("want an error for undecodable done-data, got nil") + } + if got != nil { + t.Fatalf("want nil value alongside the error, got %#v", got) + } + }) +} diff --git a/durable/clock.go b/durable/clock.go index 67d7697..ad5081f 100644 --- a/durable/clock.go +++ b/durable/clock.go @@ -69,6 +69,47 @@ func (c *recordingClock) Now() time.Time { // through Now + Tick, so After carries no recorded nondeterminism. func (c *recordingClock) After(d time.Duration) <-chan time.Time { return c.base.After(d) } +// drain returns and clears the readings accumulated since the last drain, taken +// under the same mutex Now() appends under. The durable Handle calls it to close +// a step's Record; holding the lock makes the drain safe against a Now() the +// scheduler's timer goroutine issues concurrently (the documented "Tick from its +// own timer loop" pattern), which would otherwise race the copy-and-truncate of +// the shared buffer. +func (c *recordingClock) drain() []state.JournalEntry { + c.mu.Lock() + defer c.mu.Unlock() + if len(*c.buf) == 0 { + return nil + } + out := make([]state.JournalEntry, len(*c.buf)) + copy(out, *c.buf) + *c.buf = (*c.buf)[:0] + return out +} + +// markBuf returns the current buffer length under the clock mutex, the index a +// subsequent Scheduler.Absorb's clock read will land at. armReadingAt reads that +// reading back under the same lock. The pair lets absorbTimers mirror the +// scheduler's deadline without a second clock read while staying safe against a +// concurrent Now() from the timer goroutine. +func (c *recordingClock) markBuf() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(*c.buf) +} + +// armReadingAt returns the clock reading recorded at index i, ok false when the +// buffer holds no entry there (no read was appended). Taken under the clock mutex +// so it does not race a concurrent Now(). +func (c *recordingClock) armReadingAt(i int) (time.Time, bool) { + c.mu.Lock() + defer c.mu.Unlock() + if i < 0 || i >= len(*c.buf) { + return time.Time{}, false + } + return time.Unix(0, (*c.buf)[i].ClockUnixNano).UTC(), true +} + // replayClock returns recorded clock readings in order instead of reading any // real clock, so a recovered instance's scheduler re-derives identical timer // deadlines without consulting the wall clock. It is the inverse of diff --git a/durable/clock_internal_test.go b/durable/clock_internal_test.go index 7a70a96..a0a0124 100644 --- a/durable/clock_internal_test.go +++ b/durable/clock_internal_test.go @@ -114,6 +114,47 @@ func TestClockReadings_ExtractsInOrder(t *testing.T) { } } +// TestRecordingClock_ConcurrentNowAndDrain exercises the documented timer-goroutine +// pattern: a host drives the scheduler's clock reads from its own timer loop while +// the durable Handle drains the recording buffer to close steps. The recording +// clock's godoc promises concurrency safety for exactly this, so Now() (the timer +// goroutine) and drain/markBuf/armReadingAt (the Handle) must not race the shared +// buffer. Run under -race to catch a regression; the count assertion proves no +// reading is lost or double-counted across the interleaving. +func TestRecordingClock_ConcurrentNowAndDrain(t *testing.T) { + var buf []state.JournalEntry + c := newRecordingClock(state.NewFakeClock(clockEpoch), &buf) + + const reads = 5000 + var drained int + done := make(chan struct{}) + + // Timer goroutine: the scheduler reading the clock as it arms and ticks. + go func() { + for range reads { + c.Now() + } + close(done) + }() + + // Handle goroutine (this one): close steps by draining, and exercise the + // mark/read-back pair absorbTimers uses, all under the same mutex Now() holds. + for { + before := c.markBuf() + _, _ = c.armReadingAt(before - 1) + drained += len(c.drain()) + select { + case <-done: + drained += len(c.drain()) // final flush of anything appended after the last drain + if drained != reads { + t.Fatalf("drained %d readings, want %d (a lost or double-counted reading signals a buffer race)", drained, reads) + } + return + default: + } + } +} + // TestHasTimerEffect covers the schedule/cancel detection that gates Absorb. func TestHasTimerEffect(t *testing.T) { if !hasTimerEffect([]state.Effect{state.ScheduleAfter{ID: "x"}}) { diff --git a/durable/filestore.go b/durable/filestore.go index 9489bf4..cabd731 100644 --- a/durable/filestore.go +++ b/durable/filestore.go @@ -394,7 +394,7 @@ func (s *FileStore) Load(_ context.Context, id InstanceID) ([]byte, []Record, er // (temp+rename) and compacts the on-disk journal to only the post-checkpoint // tail. throughStep must advance beyond the current checkpoint. func (s *FileStore) Checkpoint(_ context.Context, id InstanceID, snapshot []byte, throughStep int, opts ...CheckpointOption) error { - _ = resolveCheckpoint(opts...) // retainTail does not change Load's view; on-disk tail is always compacted + _ = resolveCheckpoint(opts...) // no per-checkpoint option defined; on-disk tail is always compacted inst, _, err := s.instance(id) if err != nil { @@ -544,8 +544,11 @@ func appendLine(path string, line []byte) (err error) { } // writeAtomic writes data to path atomically: a sibling temp file is written and -// fsynced, then renamed over path (atomic on POSIX), so a reader sees either the -// old contents or the fully written new contents, never a torn mix. +// fsynced, then renamed over path (atomic on POSIX), then the parent directory is +// fsynced so the rename itself survives a crash. Without the directory fsync the +// renamed entry may not be durable even though the file's bytes are, so a reader +// after a crash sees either the old contents or the fully written new contents, +// never a torn mix and never a lost rename. func writeAtomic(path string, data []byte) error { tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*") if err != nil { @@ -565,7 +568,26 @@ func writeAtomic(path string, data []byte) error { if err := tmp.Close(); err != nil { return err } - return os.Rename(tmpName, path) + if err := os.Rename(tmpName, path); err != nil { + return err + } + return syncDir(filepath.Dir(path)) +} + +// syncDir fsyncs a directory so a rename or create within it is durable across a +// crash. A failure to open or sync the directory is reported; a platform that +// does not support syncing a directory handle (a rare case) surfaces its error to +// the caller rather than being silently ignored. +func syncDir(dir string) error { + d, err := os.Open(dir) + if err != nil { + return fmt.Errorf("crucible/durable: opening dir %q to sync: %w", dir, err) + } + defer func() { _ = d.Close() }() + if err := d.Sync(); err != nil { + return fmt.Errorf("crucible/durable: syncing dir %q: %w", dir, err) + } + return nil } // encodeInstanceID maps an InstanceID to a filesystem-safe directory name. An id diff --git a/durable/filestore_test.go b/durable/filestore_test.go index ace205b..2012f49 100644 --- a/durable/filestore_test.go +++ b/durable/filestore_test.go @@ -288,7 +288,11 @@ func TestFileStore_JournalOrderingPreserved(t *testing.T) { } } -func TestFileStore_Checkpoint_RetainTail(t *testing.T) { +// TestFileStore_Checkpoint_CompactsTail verifies a Checkpoint compacts the +// on-disk journal through the checkpointed step, so Load returns only the +// post-checkpoint tail. Time-travel retention is a store-level capability +// (HistoryStore / WithHistory), not a per-checkpoint flag. +func TestFileStore_Checkpoint_CompactsTail(t *testing.T) { ctx := context.Background() st := newFileStore(t) const id = durable.InstanceID("inst") @@ -298,7 +302,7 @@ func TestFileStore_Checkpoint_RetainTail(t *testing.T) { t.Fatalf("Append %d: %v", i, err) } } - if err := st.Checkpoint(ctx, id, marshaledSnapshot(t, "cp"), 1, durable.WithRetainTail()); err != nil { + if err := st.Checkpoint(ctx, id, marshaledSnapshot(t, "cp"), 1); err != nil { t.Fatalf("Checkpoint: %v", err) } _, tail, err := st.Load(ctx, id) diff --git a/durable/memstore.go b/durable/memstore.go index b0307f1..cdc9b5a 100644 --- a/durable/memstore.go +++ b/durable/memstore.go @@ -198,7 +198,7 @@ func (s *MemStore) History(_ context.Context, id InstanceID) ([]byte, []Record, // (NewMemStore with WithHistory), so the per-checkpoint CheckpointOptions do not // change what Load returns here. func (s *MemStore) Checkpoint(_ context.Context, id InstanceID, snapshot []byte, throughStep int, opts ...CheckpointOption) error { - _ = resolveCheckpoint(opts...) // retainTail is superseded by store-level WithHistory + _ = resolveCheckpoint(opts...) // no per-checkpoint option defined; history is store-level (WithHistory) s.mu.Lock() defer s.mu.Unlock() diff --git a/durable/memstore_test.go b/durable/memstore_test.go index bbed96b..175b332 100644 --- a/durable/memstore_test.go +++ b/durable/memstore_test.go @@ -350,7 +350,11 @@ func TestMemStore_Load_IsolatedFromCallerMutation(t *testing.T) { } } -func TestMemStore_Checkpoint_RetainTail(t *testing.T) { +// TestMemStore_Checkpoint_CompactsTail verifies a Checkpoint compacts the journal +// through the checkpointed step, so Load returns only the post-checkpoint tail. +// History retention is a store-level capability (NewMemStore WithHistory), not a +// per-checkpoint flag. +func TestMemStore_Checkpoint_CompactsTail(t *testing.T) { ctx := context.Background() st := durable.NewMemStore() const id = durable.InstanceID("inst") @@ -360,9 +364,7 @@ func TestMemStore_Checkpoint_RetainTail(t *testing.T) { t.Fatalf("Append %d: %v", i, err) } } - // Retaining the tail must not change what Load returns (the post-checkpoint - // view is identical); it only preserves history internally for later reads. - if err := st.Checkpoint(ctx, id, marshaledSnapshot(t, "cp"), 1, durable.WithRetainTail()); err != nil { + if err := st.Checkpoint(ctx, id, marshaledSnapshot(t, "cp"), 1); err != nil { t.Fatalf("Checkpoint: %v", err) } _, tail, err := st.Load(ctx, id) diff --git a/durable/options.go b/durable/options.go index 59bc04c..112212f 100644 --- a/durable/options.go +++ b/durable/options.go @@ -40,25 +40,16 @@ func resolveAppend(opts ...AppendOption) appendConfig { } // CheckpointOption configures a single Store.Checkpoint call. It is the additive -// extension point for per-checkpoint policy a backend may layer on. +// extension point for per-checkpoint policy a backend may layer on. No option is +// defined yet: the seam reserves a stable signature so per-checkpoint policy can +// arrive additively without breaking the Store interface. Time-travel retention, +// which an earlier per-checkpoint flag covered, is now a store-level capability +// (the HistoryStore seam, MemStore's WithHistory). type CheckpointOption func(*checkpointConfig) -// checkpointConfig holds resolved CheckpointOption state for one Checkpoint. -type checkpointConfig struct { - // retainTail requested keeping the pre-checkpoint Records instead of compacting - // them away. Time-travel retention is now a store-level capability (the - // HistoryStore seam, MemStore's WithHistory), so this per-checkpoint flag no - // longer changes what Load returns; it is preserved for API compatibility. - retainTail bool -} - -// WithRetainTail requested that a Checkpoint keep the Records it would otherwise -// compact away. Time-travel retention is now a store-level capability — construct a -// MemStore WithHistory and read through StateAt — so this per-checkpoint option no -// longer changes Load's view and is retained only for API compatibility. -func WithRetainTail() CheckpointOption { - return func(c *checkpointConfig) { c.retainTail = true } -} +// checkpointConfig holds resolved CheckpointOption state for one Checkpoint. It is +// presently empty; the type reserves a stable seam for additive options. +type checkpointConfig struct{} func resolveCheckpoint(opts ...CheckpointOption) checkpointConfig { var c checkpointConfig diff --git a/durable/runner.go b/durable/runner.go index 4308c55..05769b8 100644 --- a/durable/runner.go +++ b/durable/runner.go @@ -82,7 +82,7 @@ type Handle[S comparable, E comparable, C any] struct { nextStep int sched *state.Scheduler[S, E, C] - clockBuf *[]state.JournalEntry // recording-clock accumulator, flushed per step + recClock *recordingClock // owns the clock-read accumulator and its mutex timers map[string]pendingTimer // armed `after` timers with absolute deadlines, persisted per checkpoint svc *state.ServiceRunner[S, E, C] // host driver for invoked services, nil when none wired @@ -94,14 +94,14 @@ type Handle[S comparable, E comparable, C any] struct { // drainClock returns and clears the clock readings accumulated since the last // drain, so each recorded step carries exactly the readings consumed during it. +// The drain is taken under the recording clock's mutex, so closing a step's +// Record is safe against a Now() the scheduler's timer goroutine issues +// concurrently (the documented "Tick from its own timer loop" pattern). func (h *Handle[S, E, C]) drainClock() []state.JournalEntry { - if h.clockBuf == nil || len(*h.clockBuf) == 0 { + if h.recClock == nil { return nil } - out := make([]state.JournalEntry, len(*h.clockBuf)) - copy(out, *h.clockBuf) - *h.clockBuf = (*h.clockBuf)[:0] - return out + return h.recClock.drain() } // absorbDrivers feeds a transition's effects into every wired host driver so the @@ -137,15 +137,17 @@ func (h *Handle[S, E, C]) absorbDrivers(ctx context.Context, effects []state.Eff // // The kernel Scheduler.Absorb reads the recording clock exactly once — that // reading is the `now` it computes every deadline against — and the reading lands -// in clockBuf. The Handle captures it by noting clockBuf's length before the -// Absorb and reading the entry the Absorb appended, so the durable table mirrors -// the scheduler's deadlines with no extra clock read (an extra read would corrupt -// the recorded sequence). When clockBuf is unavailable the Handle falls back to a -// direct clock read, keeping the table correct off the recording path. +// in the recording clock's buffer. The Handle captures it by noting the buffer's +// length (markBuf) before the Absorb and reading the entry the Absorb appended +// (armReadingAt), both under the clock mutex, so the durable table mirrors the +// scheduler's deadlines with no extra clock read (an extra read would corrupt the +// recorded sequence) and stays correct against a concurrent timer-goroutine +// Now(). When no recording clock is wired the Handle falls back to a direct clock +// read, keeping the table correct off the recording path. func (h *Handle[S, E, C]) absorbTimers(ctx context.Context, effects []state.Effect) { before := -1 - if h.clockBuf != nil { - before = len(*h.clockBuf) + if h.recClock != nil { + before = h.recClock.markBuf() } h.sched.Absorb(ctx, effects) now := h.armNow(before) @@ -163,13 +165,15 @@ func (h *Handle[S, E, C]) absorbTimers(ctx context.Context, effects []state.Effe } // armNow returns the clock instant the just-completed Scheduler.Absorb computed its -// deadlines against: the clock read the Absorb appended to clockBuf at index -// before, captured so the Handle's timer table mirrors the scheduler without a -// second clock read. It falls back to a fresh clock read only when no recording -// buffer is wired (off the recording path). +// deadlines against: the clock read the Absorb appended to the recording clock's +// buffer at index before, captured under the clock mutex so the Handle's timer +// table mirrors the scheduler without a second clock read. It falls back to a +// fresh clock read only when no recording clock is wired (off the recording path). func (h *Handle[S, E, C]) armNow(before int) time.Time { - if before >= 0 && h.clockBuf != nil && len(*h.clockBuf) > before { - return time.Unix(0, (*h.clockBuf)[before].ClockUnixNano).UTC() + if before >= 0 && h.recClock != nil { + if now, ok := h.recClock.armReadingAt(before); ok { + return now + } } return h.runner.cfg.clock.Now() } @@ -300,7 +304,7 @@ func (r *Runner[S, E, C]) Start(ctx context.Context, id InstanceID, input C, opt inst: inst, nextStep: 0, sched: state.NewScheduler(inst), - clockBuf: &buf, + recClock: recClock, timers: map[string]pendingTimer{}, svc: svc, svcBuf: &svcBuf, @@ -705,7 +709,7 @@ func (r *Runner[S, E, C]) recover(ctx context.Context, id InstanceID) (*Handle[S inst: inst, nextStep: next, sched: sched, - clockBuf: &buf, + recClock: recClock, timers: armed, svc: svc, svcBuf: &svcBuf, From 72f80fa94e3b0e6b1dbe51ddf2c72093c96ca2b5 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:12:59 -0400 Subject: [PATCH 2/5] fix(cluster): clamp backoff overflow, guard nil-respawner Tick, add Supervisor.Forget, cover migration error paths Signed-off-by: Joshua Temple --- cluster/CHANGELOG.md | 25 ++++++++++ cluster/backoff.go | 18 ++++++- cluster/backoff_internal_test.go | 43 +++++++++++++++++ cluster/backoff_test.go | 83 ++++++++++++++++++++++++++++++++ cluster/migration.go | 12 ++++- cluster/migration_test.go | 62 ++++++++++++++++++++++++ cluster/supervisor.go | 23 +++++++++ 7 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 cluster/backoff_internal_test.go diff --git a/cluster/CHANGELOG.md b/cluster/CHANGELOG.md index 7a5d599..64d56e3 100644 --- a/cluster/CHANGELOG.md +++ b/cluster/CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to `crucible/cluster` are documented here. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- **`Supervisor.Forget`.** Discards a supervisor's per-actor restart bookkeeping + (spent-restart counter and any scheduled backoff restart) for an actor id. A host + calls it when an actor is permanently stopped, so the restart map does not + accumulate one entry per distinct actor id for the process lifetime under churn. + +### Fixed + +- **Backoff overflow.** `backoffDelay` clamps to `math.MaxInt64` before the + `time.Duration` conversion, so an uncapped schedule (no max set) with a high + restart count no longer overflows into a negative or wrapped delay. +- **Nil-respawner Tick.** `Supervisor.Tick` guards against a Respawner cleared + between scheduling a backoff and the due tick: the due restart is a no-op and the + pending restart is preserved rather than panicking. + +### Documentation + +- **Capture consistency boundary.** `Capture`'s godoc now states that the instance + snapshot and actor-tree snapshot are read as two separate operations, so the + caller must quiesce the instance (no concurrent Fire or delivery) for the pair to + be point-in-time consistent. + ## [0.1.0] The first release of the host-side distribution runtime for the `crucible/state` diff --git a/cluster/backoff.go b/cluster/backoff.go index d5fe951..a44e24e 100644 --- a/cluster/backoff.go +++ b/cluster/backoff.go @@ -70,7 +70,11 @@ func (s *Supervisor) scheduleBackoff(esc *state.ActorEscalation) bool { } // backoffDelay is initial*factor^n capped at max (and floored at initial when the -// factor would shrink it). +// factor would shrink it). When no max is set (max==0) the growth is unbounded, so +// the result is clamped to math.MaxInt64 before the time.Duration conversion: a +// large n with factor>1 can drive the float past the int64 range, and converting +// an out-of-range float64 to time.Duration is undefined and can yield a negative +// (or wrapped) delay that would fire a backoff immediately or never. func backoffDelay(pol backoffPolicy, n int) time.Duration { d := float64(pol.initial) * math.Pow(pol.factor, float64(n)) if d < float64(pol.initial) { @@ -79,6 +83,9 @@ func backoffDelay(pol backoffPolicy, n int) time.Duration { if pol.max > 0 && d > float64(pol.max) { d = float64(pol.max) } + if d > float64(math.MaxInt64) { + return time.Duration(math.MaxInt64) + } return time.Duration(d) } @@ -90,6 +97,15 @@ func (s *Supervisor) Tick(ctx context.Context) int { now := s.clock.Now() s.mu.Lock() respawner := s.respawner + // A Backoff schedule is only created while a Respawner is wired + // (scheduleBackoff returns false otherwise), but the Respawner can be cleared + // between scheduling and Tick; guard so a due restart with no Respawner is a + // no-op rather than a nil-pointer panic. The pending restarts are left in place + // so a Respawner wired again before the next Tick still applies them. + if respawner == nil { + s.mu.Unlock() + return 0 + } var due, keep []pendingRestart for _, p := range s.pending { if p.dueAt.After(now) { diff --git a/cluster/backoff_internal_test.go b/cluster/backoff_internal_test.go new file mode 100644 index 0000000..16264c8 --- /dev/null +++ b/cluster/backoff_internal_test.go @@ -0,0 +1,43 @@ +package cluster + +import ( + "math" + "testing" + "time" +) + +// TestBackoffDelay_ClampsOnUncappedOverflow verifies that with no max set (max==0) +// a large restart count with factor>1 does not overflow the time.Duration +// conversion into a negative or wrapped delay: the result is clamped to +// math.MaxInt64. Without the clamp, float64(initial)*factor^n exceeds the int64 +// range and time.Duration(d) is undefined, which would fire a backoff immediately +// or never. +func TestBackoffDelay_ClampsOnUncappedOverflow(t *testing.T) { + pol := backoffPolicy{maxRestarts: 1000, initial: time.Second, max: 0, factor: 2.0} + // 2^60 seconds is far beyond math.MaxInt64 nanoseconds. + got := backoffDelay(pol, 60) + if got != time.Duration(math.MaxInt64) { + t.Fatalf("uncapped overflow delay = %d, want clamped to MaxInt64 (%d)", got, int64(math.MaxInt64)) + } + if got < 0 { + t.Fatalf("delay must never be negative, got %d", got) + } +} + +// TestBackoffDelay_RespectsExplicitMax confirms a set max caps growth well below +// the overflow clamp, so the clamp only governs the uncapped case. +func TestBackoffDelay_RespectsExplicitMax(t *testing.T) { + pol := backoffPolicy{maxRestarts: 1000, initial: time.Second, max: 5 * time.Second, factor: 2.0} + if got := backoffDelay(pol, 60); got != 5*time.Second { + t.Fatalf("capped delay = %v, want 5s", got) + } +} + +// TestBackoffDelay_FloorsAtInitial confirms a shrinking factor never produces a +// delay below initial. +func TestBackoffDelay_FloorsAtInitial(t *testing.T) { + pol := backoffPolicy{maxRestarts: 10, initial: time.Second, max: 10 * time.Second, factor: 0.5} + if got := backoffDelay(pol, 5); got != time.Second { + t.Fatalf("floored delay = %v, want 1s", got) + } +} diff --git a/cluster/backoff_test.go b/cluster/backoff_test.go index 52fab5b..6925569 100644 --- a/cluster/backoff_test.go +++ b/cluster/backoff_test.go @@ -132,3 +132,86 @@ func TestSupervisor_DefaultClock(t *testing.T) { t.Fatalf("Tick with nothing pending = %d, want 0", n) } } + +// TestSupervisor_TickNilRespawnerNoPanic confirms a Tick whose Respawner was +// cleared after a backoff was scheduled does not panic: the due restart is a no-op +// and stays pending so a Respawner wired again later still applies it. +func TestSupervisor_TickNilRespawnerNoPanic(t *testing.T) { + ctx := context.Background() + clock := state.NewFakeClock(time.Unix(0, 0)) + sup := cluster.NewSupervisor( + cluster.WithBackoff("child", 3, 100*time.Millisecond, time.Second, 2.0), + cluster.WithClock(clock), + ) + sys := backoffSystem(t, sup) + + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + clock.Advance(100 * time.Millisecond) + + // Clear the respawner between scheduling and the due Tick. + sup.SetRespawner(nil) + if n := sup.Tick(ctx); n != 0 { + t.Fatalf("Tick with a cleared respawner restarted %d, want 0", n) + } + + // The pending restart was preserved: rewiring a respawner and ticking applies it. + sup.SetRespawner(sys) + if n := sup.Tick(ctx); n != 1 { + t.Fatalf("Tick after rewiring the respawner restarted %d, want 1", n) + } +} + +// TestSupervisor_ForgetEvictsBookkeeping confirms Forget drops an actor's spent +// restart counter and any pending restart, bounding the restarts map under churn. +// After Forget a re-spawn of the same id earns a fresh restart budget. +func TestSupervisor_ForgetEvictsBookkeeping(t *testing.T) { + ctx := context.Background() + clock := state.NewFakeClock(time.Unix(0, 0)) + sup := cluster.NewSupervisor( + cluster.WithRestart("child", 1), + cluster.WithClock(clock), + ) + sys := backoffSystem(t, sup) + + // Spend the single restart budget. + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + if sys.Running() != 1 { + t.Fatalf("after first (restart) failure Running() = %d, want 1", sys.Running()) + } + // Budget spent: a second failure escalates rather than restarts. + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + if sys.Running() != 0 { + t.Fatalf("after budget-exhausting failure Running() = %d, want 0", sys.Running()) + } + + // Forget the actor (a genuine teardown), then re-spawn it: it earns a fresh + // budget, so the next failure restarts again rather than escalating. + sup.Forget("worker-1") + if _, err := sys.Respawn(ctx, "child", "worker-1", nil); err != nil { + t.Fatalf("Respawn after Forget: %v", err) + } + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + if sys.Running() != 1 { + t.Fatalf("after Forget+respawn, a failure should restart within the fresh budget; Running() = %d, want 1", sys.Running()) + } +} + +// TestSupervisor_ForgetPendingBackoff confirms Forget removes a not-yet-due backoff +// restart, so a forgotten actor is not restarted by a later Tick. +func TestSupervisor_ForgetPendingBackoff(t *testing.T) { + ctx := context.Background() + clock := state.NewFakeClock(time.Unix(0, 0)) + sup := cluster.NewSupervisor( + cluster.WithBackoff("child", 3, 100*time.Millisecond, time.Second, 2.0), + cluster.WithClock(clock), + ) + sys := backoffSystem(t, sup) + + sys.Local().SettleError(ctx, "worker-1", errors.New("boom")) + sup.Forget("worker-1") // drop the scheduled restart before it is due + + clock.Advance(time.Second) + if n := sup.Tick(ctx); n != 0 { + t.Fatalf("Tick after forgetting the pending restart restarted %d, want 0", n) + } +} diff --git a/cluster/migration.go b/cluster/migration.go index dedcf68..578148f 100644 --- a/cluster/migration.go +++ b/cluster/migration.go @@ -32,7 +32,17 @@ type Checkpoint struct { // Capture snapshots a running instance, its actor tree, and its machine definition // into a Checkpoint ready to ship to another node. It is a pure read: it neither -// fires the instance nor mutates any actor. Call it at a quiescent point. +// fires the instance nor mutates any actor. +// +// # Consistency boundary +// +// The instance snapshot and the actor-tree snapshot are read as two separate +// operations, not under one combined lock, so Capture does not by itself produce a +// point-in-time-consistent pair. The caller must ensure no Fire or actor delivery +// runs against the instance for the duration of the call — that is what "quiescent" +// means here. Captured under concurrent firing, the snapshot and the actor tree may +// reflect different instants and Restore could rebuild a tree that does not match +// the kernel configuration. Quiesce the instance (stop driving it) before Capture. func Capture[S comparable, E comparable, C any](inst *state.Instance[S, E, C], sys *state.ActorSystem[S, E, C], machine *state.Machine[S, E, C]) (Checkpoint, error) { snap, err := state.MarshalSnapshot(inst.Snapshot()) if err != nil { diff --git a/cluster/migration_test.go b/cluster/migration_test.go index 74ca13d..6af816e 100644 --- a/cluster/migration_test.go +++ b/cluster/migration_test.go @@ -121,6 +121,68 @@ func TestMigration_BreakingTargetRefused(t *testing.T) { } } +// TestMigration_Restore_BadMachineIR reports an error when the captured machine IR +// cannot be decoded, rather than silently proceeding with no compatibility gate. +func TestMigration_Restore_BadMachineIR(t *testing.T) { + ctx := context.Background() + cp := capturedInB(t) + cp.MachineIR = json.RawMessage(`{not valid ir`) + + _, _, err := cluster.Restore(ctx, cp, migSource()) + if err == nil { + t.Fatal("Restore with a corrupt machine IR must error") + } + if errors.Is(err, cluster.ErrIncompatibleMigration) { + t.Fatalf("a decode failure must not masquerade as an incompatible-migration refusal: %v", err) + } +} + +// TestMigration_Restore_BadSnapshot reports an error when the captured snapshot +// cannot be unmarshaled, after the compatibility gate has passed. +func TestMigration_Restore_BadSnapshot(t *testing.T) { + ctx := context.Background() + cp := capturedInB(t) + cp.Snapshot = json.RawMessage(`{"current": 12345}`) // wrong shape for the snapshot + + _, _, err := cluster.Restore(ctx, cp, migSource()) + if err == nil { + t.Fatal("Restore with a corrupt snapshot must error") + } +} + +// TestMigration_Restore_BadActors reports an error when a captured actor entry +// cannot be decoded, rather than restoring a partial actor tree. +func TestMigration_Restore_BadActors(t *testing.T) { + ctx := context.Background() + cp := capturedInB(t) + cp.Actors = map[string]json.RawMessage{"w-bad": json.RawMessage(`{not an actor`)} + + _, _, err := cluster.Restore(ctx, cp, migSource(), cluster.WithActorBehaviors(map[string]state.ActorBehavior{ + "child": childBehavior(), + })) + if err == nil { + t.Fatal("Restore with a corrupt actor entry must error") + } +} + +// TestMigration_Capture_Marshalable confirms Capture succeeds on a well-formed +// instance and that the captured snapshot, IR, and actor tree are all valid JSON +// (the marshal/serialize/snapshot success paths the error branches guard). +func TestMigration_Capture_Marshalable(t *testing.T) { + cp := capturedInB(t) + if !json.Valid(cp.Snapshot) { + t.Error("captured Snapshot is not valid JSON") + } + if !json.Valid(cp.MachineIR) { + t.Error("captured MachineIR is not valid JSON") + } + for id, raw := range cp.Actors { + if !json.Valid(raw) { + t.Errorf("captured actor %q is not valid JSON", id) + } + } +} + // TestMigration_ActorsMove confirms a migrated instance carries its running actors. func TestMigration_ActorsMove(t *testing.T) { ctx := context.Background() diff --git a/cluster/supervisor.go b/cluster/supervisor.go index 8f10da3..55bd7c5 100644 --- a/cluster/supervisor.go +++ b/cluster/supervisor.go @@ -213,6 +213,29 @@ func (s *Supervisor) tryRestart(ctx context.Context, esc *state.ActorEscalation) return true } +// Forget discards the supervisor's per-actor restart bookkeeping for actorID: its +// spent-restart counter and any not-yet-applied backoff restart scheduled for it. +// A host calls it when an actor is permanently stopped (not restarted), so the +// supervisor's restart map does not accumulate one entry per distinct actor id for +// the process lifetime under churn. Forgetting an unknown id is a no-op. After +// Forget a re-spawn of the same id starts a fresh restart budget, so call it only +// for a genuine teardown, never between a failure and its restart. +func (s *Supervisor) Forget(actorID string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.restarts, actorID) + if len(s.pending) == 0 { + return + } + kept := s.pending[:0] + for _, p := range s.pending { + if p.actorID != actorID { + kept = append(kept, p) + } + } + s.pending = kept +} + // Handled returns a snapshot of the failures this supervisor has processed, in // order. The returned slice is a copy and safe to retain. func (s *Supervisor) Handled() []HandledEscalation { From 60aef95bd3fdb4f88c7b484519b7d49170a5749a Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:14:50 -0400 Subject: [PATCH 3/5] fix(transport): drop redundant codec init, wrap StateAt wire error, add README and client error-path tests Signed-off-by: Joshua Temple --- transport/README.md | 68 +++++++++++++++++++++++++++++++++++++ transport/timetravel.go | 3 +- transport/transport.go | 10 +++--- transport/transport_test.go | 49 ++++++++++++++++++++++++++ 4 files changed, 125 insertions(+), 5 deletions(-) create mode 100644 transport/README.md diff --git a/transport/README.md b/transport/README.md new file mode 100644 index 0000000..603c053 --- /dev/null +++ b/transport/README.md @@ -0,0 +1,68 @@ +# crucible/transport + +A **gRPC network transport** for the [Crucible](../README.md) +[`cluster`](../cluster) runtime: it carries actor deliver, spawn, and read-only +time-travel operations between nodes over real gRPC (HTTP/2). + +> **Status:** experimental, pre-1.0. The transport is exercised over an in-process +> gRPC connection (bufconn) and the API may still change before v1. + +Import path: `github.com/stablekernel/crucible/transport` + +## What it is + +`cluster` spreads a `state` machine and its child-machine actors across nodes over +a pluggable `Transport`; its `InMemoryTransport` connects node-scoped systems in +one process. This module is the over-the-network implementation: a `Transport` +satisfies `cluster.Transport` on the client side and a server serves a node's +`cluster.WireEndpoint`, so a parent on one host can spawn and drive an actor +running on another. + +It lives in its **own module** so the gRPC dependency stays out of the cluster +core, which remains stdlib-only: a deployment that uses only the in-memory +transport never compiles gRPC in. + +## How it works + +Payloads (events and spawn inputs) cross the wire as the JSON the `WireEndpoint` +seam already produces and consumes, through a JSON gRPC codec. There is **no +protobuf schema or codegen**: the service descriptor is hand-written and both sides +force the JSON codec explicitly (`grpc.ForceCodec` on the client, `ForceServerCodec` +on the server), so the codec is never resolved through the global gRPC encoding +registry and the import has no process-wide side effect. + +## Client + +```go +tr := transport.New() +tr.AddNode("node-b", conn) // conn is a *grpc.ClientConn the caller dials and owns + +nodeA := cluster.NewSystem("node-a", actorSysA, cluster.WithTransport(tr)) +ref, err := nodeA.Spawn(ctx, "node-b", "worker", "w-1", nil) // spawn on node-b over gRPC +delivered, err := nodeA.Deliver(ctx, ref, "finish") // route to its owning node +``` + +A `Transport` holds one client connection per reachable node, registered with +`AddNode`; the caller dials each node (`grpc.NewClient`) and owns the connection's +lifecycle. An operation addressed to a node that was never registered reports +`cluster.ErrNodeUnreachable`. + +## Server + +```go +gs := transport.NewServer(nodeBWireEndpoint) // gRPC server with the JSON codec + service registered +go gs.Serve(lis) +``` + +`NewServer` builds a gRPC server preconfigured with the JSON codec and the node's +transport service registered; the caller serves it on a listener and owns its +lifecycle. Pass extra `grpc.ServerOption`s (interceptors, credentials, keepalives) +as needed, or register the service onto an existing server with `RegisterServer`. + +## Time travel + +For durable instances, `NewDurableTimeTravel` adapts a `durable.Store` and machine +into a `TimeTravelEndpoint`; `NewTimeTravelServer` / `RegisterTimeTravel` serve it, +and the client `Transport.StateAt` reconstructs an instance's past state as of a +recorded step over the wire. It is **read-only**: it runs no service, re-instantiates +no actor, and dispatches no effect. diff --git a/transport/timetravel.go b/transport/timetravel.go index 7ac4063..ebab3c1 100644 --- a/transport/timetravel.go +++ b/transport/timetravel.go @@ -2,6 +2,7 @@ package transport import ( "context" + "fmt" "github.com/stablekernel/crucible/durable" "github.com/stablekernel/crucible/state" @@ -128,7 +129,7 @@ func (t *Transport) StateAt(ctx context.Context, node, id string, step int) ([]b } var resp StateAtResponse if err := conn.Invoke(ctx, methodStateAt, &StateAtRequest{ID: id, Step: step}, &resp, grpc.ForceCodec(jsonCodec{})); err != nil { - return nil, err + return nil, fmt.Errorf("transport: state-at on node %q: %w", node, err) } return resp.Snapshot, nil } diff --git a/transport/transport.go b/transport/transport.go index 36e4018..9b3c04f 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -19,7 +19,6 @@ import ( "github.com/stablekernel/crucible/cluster" "github.com/stablekernel/crucible/state" "google.golang.org/grpc" - "google.golang.org/grpc/encoding" ) const ( @@ -31,15 +30,18 @@ const ( // jsonCodec is a gRPC content codec that encodes messages as JSON, so the // transport carries the same JSON the cluster WireEndpoint produces without a -// protobuf schema. +// protobuf schema. Both sides force this codec explicitly — the client with +// grpc.ForceCodec on every Invoke and the server with grpc.ForceServerCodec in +// NewServer — so the codec is never resolved through the global encoding registry. +// The transport therefore does not RegisterCodec at import time: a process-wide +// registration would be a hidden import side effect that could shadow another +// JSON codec for unrelated gRPC clients in the same process. type jsonCodec struct{} func (jsonCodec) Marshal(v any) ([]byte, error) { return json.Marshal(v) } func (jsonCodec) Unmarshal(data []byte, v any) error { return json.Unmarshal(data, v) } func (jsonCodec) Name() string { return jsonCodecName } -func init() { encoding.RegisterCodec(jsonCodec{}) } - // DeliverRequest carries a delivery to the actor named by Ref. Event is the // JSON-encoded event the owning node decodes into its event type. type DeliverRequest struct { diff --git a/transport/transport_test.go b/transport/transport_test.go index c3586ed..ad9f120 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -170,6 +170,55 @@ func TestTransport_RemoteSpawnError(t *testing.T) { } } +// TestTransport_DeliverMarshalError reports an error when the event cannot be +// JSON-encoded, before any RPC is attempted. A channel is not JSON-marshalable. +func TestTransport_DeliverMarshalError(t *testing.T) { + ctx := context.Background() + nodeB := newNodeSystem("node-b") + lis := bufconn.Listen(1 << 20) + gs := transport.NewServer(nodeB) + go func() { _ = gs.Serve(lis) }() + t.Cleanup(gs.Stop) + + tr := transport.New() + tr.AddNode("node-b", dialBuf(t, lis)) + + _, err := tr.Deliver(ctx, state.ActorRef{ID: "w-1", Node: "node-b"}, make(chan int)) + if err == nil { + t.Fatal("Deliver with an unmarshalable event = nil error, want a marshal error") + } +} + +// TestTransport_SpawnMarshalError reports an error when the spawn input cannot be +// JSON-encoded, before any RPC is attempted. +func TestTransport_SpawnMarshalError(t *testing.T) { + ctx := context.Background() + nodeB := newNodeSystem("node-b") + lis := bufconn.Listen(1 << 20) + gs := transport.NewServer(nodeB) + go func() { _ = gs.Serve(lis) }() + t.Cleanup(gs.Stop) + + tr := transport.New() + tr.AddNode("node-b", dialBuf(t, lis)) + + // A map value that cannot be marshaled (a channel) fails json.Marshal. + _, err := tr.Spawn(ctx, "node-b", "worker", "w-1", map[string]any{"bad": make(chan int)}) + if err == nil { + t.Fatal("Spawn with an unmarshalable input = nil error, want a marshal error") + } +} + +// TestTransport_StateAtUnknownNode reports an unreachable node for a StateAt to a +// node that was never registered. +func TestTransport_StateAtUnknownNode(t *testing.T) { + ctx := context.Background() + tr := transport.New() + if _, err := tr.StateAt(ctx, "ghost", "inst-1", 0); err == nil { + t.Fatal("StateAt on an unregistered node = nil error, want unreachable") + } +} + // TestTransport_SatisfiesClusterTransport is a compile-time check that *Transport // is a cluster.Transport. func TestTransport_SatisfiesClusterTransport(t *testing.T) { From 52a3edda14a09ca254bf6ac10584c578a3adda16 Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:18:31 -0400 Subject: [PATCH 4/5] fix(wasm): interrupt runaway guests, guard ABI result indexing, add Compile options, honest allocator docs Signed-off-by: Joshua Temple --- wasm/CHANGELOG.md | 25 +++++++++++++++ wasm/README.md | 17 +++++++++- wasm/runtime.go | 54 +++++++++++++++++++++++++++++-- wasm/runtime_test.go | 56 ++++++++++++++++++++++++++++++++- wasm/testdata/loopguest/main.go | 31 ++++++++++++++++++ 5 files changed, 179 insertions(+), 4 deletions(-) create mode 100644 wasm/testdata/loopguest/main.go diff --git a/wasm/CHANGELOG.md b/wasm/CHANGELOG.md index ee48d0d..02fa1e9 100644 --- a/wasm/CHANGELOG.md +++ b/wasm/CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to `crucible/wasm` are documented here. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- **`CompileOption` / `WithRuntimeConfig`.** `Compile` takes a variadic + `...CompileOption` so configuration arrives additively without breaking the + signature. `WithRuntimeConfig` overrides the wazero RuntimeConfig the module is + built with. + +### Fixed + +- **Runaway-guest timeout.** The runtime is built with + `WithCloseOnContextDone(true)`, so a guest that never returns is interrupted when + the `Eval` context is canceled or hits its deadline, returning an error instead of + blocking the host. Pass a per-call timeout context for untrusted guests. +- **Defensive ABI result indexing.** `Eval` guards the `alloc` and `eval` result + slices before indexing, so a misbehaving guest that returns no result fails the + call with an error rather than panicking the host. + +### Documentation + +- **ABI / allocator note.** The README no longer implies a general-purpose + allocator: `alloc` returns a writable region of at least `size` bytes, which the + reference guest backs with a fixed buffer. Added a timeout/cancellation section. + ## [0.1.0] The first release of the WebAssembly behavior runtime for the `crucible/state` diff --git a/wasm/README.md b/wasm/README.md index 7f05db8..32fbdb0 100644 --- a/wasm/README.md +++ b/wasm/README.md @@ -54,17 +54,32 @@ A guest module exports two functions over its linear memory: | Export | Signature | Purpose | | --- | --- | --- | -| `alloc` | `(size u32) u32` | reserve `size` bytes, return the pointer the host writes the request into | +| `alloc` | `(size u32) u32` | return a pointer to a writable region of at least `size` bytes for the host to write the request into | | `eval` | `(ptr u32, size u32) u64` | read the JSON request at `[ptr, ptr+size)`, evaluate, write the JSON response, return packed `(outPtr<<32 \| outLen)` | For a guard the request is `{"context": }` and the response `{"ok": }`. Because the payloads are JSON, the same module works for any host language. A `Module` serializes concurrent `Eval` calls behind a mutex (one linear memory per instance). +`alloc` need not be a general-purpose allocator. The reference guest backs it with a +fixed input buffer at a stable address and simply returns that address, sizing the +buffer large enough for expected requests; a guest is free to implement a real +growing allocator instead. The host writes exactly `size` bytes there and reads the +response back from the pointer `eval` returns. The host bounds-checks the response +region against linear memory and rejects an out-of-range pointer/length rather than +reading out of bounds. + A guest can be written in any WASM-targeting language; the test suite compiles a tiny Go `//go:wasmexport` guest with the standard toolchain (`GOOS=wasip1 GOARCH=wasm`, `-buildmode=c-shared`), no TinyGo and no committed binary. +## Timeout and cancellation + +The runtime is built so a guest that runs away (an infinite loop, a pathological +input) does not block the host forever: pass a context with a deadline or cancel to +`Eval`, and the call is interrupted and returns an error when the context is done. +Bound each evaluation with a per-call timeout context for untrusted guests. + ## Performance Indicative per-call overhead (Apple Silicon dev machine, `go test -bench`); reproduce diff --git a/wasm/runtime.go b/wasm/runtime.go index dc09272..be5e425 100644 --- a/wasm/runtime.go +++ b/wasm/runtime.go @@ -39,11 +39,50 @@ type Module struct { mu sync.Mutex } +// CompileOption configures Compile. New capabilities arrive as additional options, +// so the signature never breaks. No option changes the default behavior; each is +// additive. +type CompileOption func(*compileConfig) + +// compileConfig holds resolved CompileOption state for one Compile. +type compileConfig struct { + runtimeConfig wazero.RuntimeConfig +} + +// WithRuntimeConfig overrides the wazero RuntimeConfig the module is built with, +// for hosts that need to tune compilation (interpreter vs compiler), memory +// limits, or other wazero knobs. The default already closes a running guest when +// the call context is done, so a caller that supplies its own config should retain +// WithCloseOnContextDone(true) to keep timeout/cancellation working. +func WithRuntimeConfig(cfg wazero.RuntimeConfig) CompileOption { + return func(c *compileConfig) { + if cfg != nil { + c.runtimeConfig = cfg + } + } +} + // Compile instantiates a behavior module from its WebAssembly bytes and resolves // its ABI exports. The bytes are a wasip1 module (a Go //go:wasmexport guest, or any // language's equivalent); WASI preview 1 is provided for the Go runtime's needs. -func Compile(ctx context.Context, wasmBytes []byte) (*Module, error) { - rt := wazero.NewRuntime(ctx) +// +// The runtime is built with WithCloseOnContextDone(true), so a guest that runs away +// (an infinite loop, a pathological input) is interrupted when the context passed to +// Eval / Compile is canceled or hits its deadline: the call returns an error rather +// than blocking the host indefinitely. Pass a per-call timeout context to bound a +// guest's execution. +func Compile(ctx context.Context, wasmBytes []byte, opts ...CompileOption) (*Module, error) { + cfg := compileConfig{ + // Interrupt a running guest when the call context is done, so a runaway guest + // cannot block the host forever; a caller bounds execution with a timeout or + // cancelable context. + runtimeConfig: wazero.NewRuntimeConfig().WithCloseOnContextDone(true), + } + for _, opt := range opts { + opt(&cfg) + } + + rt := wazero.NewRuntimeWithConfig(ctx, cfg.runtimeConfig) wasi_snapshot_preview1.MustInstantiate(ctx, rt) // Instantiate as a reactor: run the module's _initialize (if present) but not @@ -76,6 +115,12 @@ func (m *Module) Eval(ctx context.Context, request []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("wasm: alloc: %w", err) } + // A conforming alloc returns one result (the pointer). A misbehaving guest may + // return none; index defensively so a malformed guest fails the call rather than + // panicking the host. + if len(allocRes) == 0 { + return nil, fmt.Errorf("wasm: alloc returned no result") + } inPtr := uint32(allocRes[0]) if !m.mod.Memory().Write(inPtr, request) { return nil, fmt.Errorf("wasm: writing %d-byte request at %d is out of range", len(request), inPtr) @@ -85,6 +130,11 @@ func (m *Module) Eval(ctx context.Context, request []byte) ([]byte, error) { if err != nil { return nil, fmt.Errorf("wasm: eval: %w", err) } + // A conforming eval returns one packed result. A misbehaving guest may return + // none; guard so a malformed guest fails the call rather than panicking the host. + if len(evalRes) == 0 { + return nil, fmt.Errorf("wasm: eval returned no result") + } packed := evalRes[0] outPtr, outLen := uint32(packed>>32), uint32(packed) out, ok := m.mod.Memory().Read(outPtr, outLen) diff --git a/wasm/runtime_test.go b/wasm/runtime_test.go index 4726c69..42585c6 100644 --- a/wasm/runtime_test.go +++ b/wasm/runtime_test.go @@ -6,14 +6,17 @@ import ( "os/exec" "path/filepath" "testing" + "time" "github.com/stablekernel/crucible/wasm" + "github.com/tetratelabs/wazero" ) -// guardWASM and badWASM hold the compiled guests, built once in TestMain. +// guardWASM, badWASM, and loopWASM hold the compiled guests, built once in TestMain. var ( guardWASM []byte badWASM []byte + loopWASM []byte ) // TestMain compiles the testdata guests to wasip1/wasm with the standard Go @@ -27,6 +30,7 @@ func TestMain(m *testing.M) { guardWASM = buildGuest(tmp, "guard", "./testdata/guardguest") badWASM = buildGuest(tmp, "bad", "./testdata/badguest") + loopWASM = buildGuest(tmp, "loop", "./testdata/loopguest") os.Exit(m.Run()) } @@ -62,6 +66,56 @@ func TestModule_EvalRejectsOutOfRangeResponse(t *testing.T) { } } +// TestModule_EvalTimesOutRunawayGuest confirms a guest that never returns is +// interrupted when the call context's deadline elapses, so the host is not blocked +// forever. This exercises the runtime's WithCloseOnContextDone(true) wiring. +func TestModule_EvalTimesOutRunawayGuest(t *testing.T) { + mod, err := wasm.Compile(context.Background(), loopWASM) + if err != nil { + t.Fatalf("compile: %v", err) + } + t.Cleanup(func() { _ = mod.Close(context.Background()) }) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + done := make(chan error, 1) + go func() { + _, evalErr := mod.Eval(ctx, []byte(`{}`)) + done <- evalErr + }() + + select { + case evalErr := <-done: + if evalErr == nil { + t.Fatal("Eval of a runaway guest = nil error, want a context/closed error") + } + case <-time.After(5 * time.Second): + t.Fatal("Eval of a runaway guest did not return after the context deadline: the runtime did not interrupt the guest") + } +} + +// TestCompile_WithRuntimeConfig confirms Compile accepts a custom RuntimeConfig +// option and still produces a working module. The option is additive: omitting it +// uses the default (which closes a guest on context-done). +func TestCompile_WithRuntimeConfig(t *testing.T) { + ctx := context.Background() + mod, err := wasm.Compile(ctx, guardWASM, + wasm.WithRuntimeConfig(wazero.NewRuntimeConfig().WithCloseOnContextDone(true))) + if err != nil { + t.Fatalf("compile with runtime config: %v", err) + } + t.Cleanup(func() { _ = mod.Close(ctx) }) + + resp, err := mod.Eval(ctx, []byte(`{"context":{"status":"paid","total":50}}`)) + if err != nil { + t.Fatalf("eval: %v", err) + } + if string(resp) != `{"ok":true}` { + t.Fatalf("response = %s, want {\"ok\":true}", resp) + } +} + // TestModule_EvalRoundTrip is the ABI proof: a JSON request crosses into the guest, // the guest evaluates it, and the JSON response crosses back. func TestModule_EvalRoundTrip(t *testing.T) { diff --git a/wasm/testdata/loopguest/main.go b/wasm/testdata/loopguest/main.go new file mode 100644 index 0000000..61febca --- /dev/null +++ b/wasm/testdata/loopguest/main.go @@ -0,0 +1,31 @@ +//go:build wasip1 + +// Command loopguest is a runaway WebAssembly guest used to exercise the host's +// timeout/cancellation: its eval spins in an unbounded loop and never returns, so +// a host that built the runtime with WithCloseOnContextDone(true) interrupts it +// when the call context is canceled or hits its deadline, returning an error +// rather than blocking forever. +package main + +import "unsafe" + +func main() {} + +var inBuf [1 << 10]byte + +func ptrOf(p *byte) uint32 { return uint32(uintptr(unsafe.Pointer(p))) } + +//go:wasmexport alloc +func alloc(uint32) uint32 { return ptrOf(&inBuf[0]) } + +// eval never returns: it spins on a volatile-ish accumulator the compiler cannot +// fold away, so only the host's context-done interruption can stop it. +// +//go:wasmexport eval +func eval(uint32, uint32) uint64 { + var n uint64 + for { + n++ + inBuf[n%uint64(len(inBuf))] = byte(n) + } +} From 87132c128695903230c02cec068bbbb94006b80c Mon Sep 17 00:00:00 2001 From: Joshua Temple Date: Fri, 5 Jun 2026 10:33:58 -0400 Subject: [PATCH 5/5] fix: skip directory fsync on windows; cover durable gaps Signed-off-by: Joshua Temple --- durable/CHANGELOG.md | 6 +++++- durable/filestore.go | 16 -------------- durable/filestore_dirsync_other.go | 28 +++++++++++++++++++++++++ durable/filestore_dirsync_other_test.go | 16 ++++++++++++++ durable/filestore_dirsync_windows.go | 11 ++++++++++ 5 files changed, 60 insertions(+), 17 deletions(-) create mode 100644 durable/filestore_dirsync_other.go create mode 100644 durable/filestore_dirsync_other_test.go create mode 100644 durable/filestore_dirsync_windows.go diff --git a/durable/CHANGELOG.md b/durable/CHANGELOG.md index 3a82425..e3d4271 100644 --- a/durable/CHANGELOG.md +++ b/durable/CHANGELOG.md @@ -16,7 +16,11 @@ and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0. clock's documentation promises. - **FileStore directory durability.** Atomic checkpoint writes now fsync the parent directory after the rename, so the rename itself survives a crash and the - documented crash-durability holds, not only the file's bytes. + documented crash-durability holds, not only the file's bytes. The directory sync + is split by build tag (`filestore_dirsync_other.go` for POSIX, a no-op + `filestore_dirsync_windows.go` for Windows) because Windows does not permit + opening a directory handle for sync — the file rename plus the file's own fsync + are sufficient for Windows crash durability. - **Actor replay error.** A recorded actor done-data payload that fails to decode is now surfaced as a replay error instead of being silently dropped, so a corrupt journal fails loudly rather than re-firing the parent with nil data. diff --git a/durable/filestore.go b/durable/filestore.go index cabd731..aab6d5c 100644 --- a/durable/filestore.go +++ b/durable/filestore.go @@ -574,22 +574,6 @@ func writeAtomic(path string, data []byte) error { return syncDir(filepath.Dir(path)) } -// syncDir fsyncs a directory so a rename or create within it is durable across a -// crash. A failure to open or sync the directory is reported; a platform that -// does not support syncing a directory handle (a rare case) surfaces its error to -// the caller rather than being silently ignored. -func syncDir(dir string) error { - d, err := os.Open(dir) - if err != nil { - return fmt.Errorf("crucible/durable: opening dir %q to sync: %w", dir, err) - } - defer func() { _ = d.Close() }() - if err := d.Sync(); err != nil { - return fmt.Errorf("crucible/durable: syncing dir %q: %w", dir, err) - } - return nil -} - // encodeInstanceID maps an InstanceID to a filesystem-safe directory name. An id // composed only of safe characters is used verbatim for readability; any other // id is hex-escaped per byte, so arbitrary ids (including path separators) map to diff --git a/durable/filestore_dirsync_other.go b/durable/filestore_dirsync_other.go new file mode 100644 index 0000000..cd689b8 --- /dev/null +++ b/durable/filestore_dirsync_other.go @@ -0,0 +1,28 @@ +//go:build !windows + +package durable + +import ( + "fmt" + "os" +) + +// syncDir fsyncs a directory so a rename or create within it is durable across a +// crash. After an atomic rename the file's bytes are fsync'd (by writeAtomic), but +// the directory entry that records the rename also needs to be flushed to survive a +// crash on POSIX systems; opening the directory and calling Sync achieves this. +// Windows does not support opening a directory handle for sync (FlushFileBuffers on +// a directory-backed handle returns Access Denied), so the Windows build uses a +// no-op instead — the file rename plus the file's own fsync are sufficient for +// Windows crash durability. +func syncDir(dir string) error { + d, err := os.Open(dir) + if err != nil { + return fmt.Errorf("crucible/durable: opening dir %q to sync: %w", dir, err) + } + defer func() { _ = d.Close() }() + if err := d.Sync(); err != nil { + return fmt.Errorf("crucible/durable: syncing dir %q: %w", dir, err) + } + return nil +} diff --git a/durable/filestore_dirsync_other_test.go b/durable/filestore_dirsync_other_test.go new file mode 100644 index 0000000..f36d5f0 --- /dev/null +++ b/durable/filestore_dirsync_other_test.go @@ -0,0 +1,16 @@ +//go:build !windows + +package durable + +import ( + "testing" +) + +// TestSyncDir_NonexistentPathErrors confirms syncDir surfaces an error when the +// directory cannot be opened, rather than silently returning nil. This covers the +// os.Open error branch that a valid FileStore path never hits. +func TestSyncDir_NonexistentPathErrors(t *testing.T) { + if err := syncDir("/nonexistent/path/that/cannot/exist/for/this/test"); err == nil { + t.Fatal("syncDir on a nonexistent path returned nil, want an error") + } +} diff --git a/durable/filestore_dirsync_windows.go b/durable/filestore_dirsync_windows.go new file mode 100644 index 0000000..af3c418 --- /dev/null +++ b/durable/filestore_dirsync_windows.go @@ -0,0 +1,11 @@ +//go:build windows + +package durable + +// syncDir is a no-op on Windows. Opening a directory handle and calling +// FlushFileBuffers (Sync) on it returns "Access is denied" on Windows. The +// directory-entry durability POSIX needs from this call is covered on Windows by +// the file rename combined with the file's own fsync already performed in +// writeAtomic, so skipping the directory sync is correct for Windows crash +// durability and avoids the access-denied failure. +func syncDir(string) error { return nil }