From 0529bd895cd4fb6512fe7c3e409a78331fbca649 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 16:09:26 -0300 Subject: [PATCH 01/17] feat: add EventsStorage interface, writer, and S2 backend for browser events --- server/go.mod | 2 + server/go.sum | 2 + server/lib/events/eventsstorage.go | 52 +++++++ .../lib/events/eventsstorage_writer_test.go | 143 ++++++++++++++++++ server/lib/events/s2storage.go | 107 +++++++++++++ 5 files changed, 306 insertions(+) create mode 100644 server/lib/events/eventsstorage.go create mode 100644 server/lib/events/eventsstorage_writer_test.go create mode 100644 server/lib/events/s2storage.go diff --git a/server/go.mod b/server/go.mod index 5b2d7ed8..18907540 100644 --- a/server/go.mod +++ b/server/go.mod @@ -80,6 +80,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/s2-streamstore/s2-sdk-go v0.16.1 // indirect github.com/shirou/gopsutil/v4 v4.25.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/speakeasy-api/jsonpath v0.6.0 // indirect @@ -99,6 +100,7 @@ require ( go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/mod v0.28.0 // indirect + golang.org/x/net v0.45.0 // indirect golang.org/x/text v0.30.0 // indirect golang.org/x/tools v0.37.0 // indirect google.golang.org/protobuf v1.36.10 // indirect diff --git a/server/go.sum b/server/go.sum index 0f5296e8..b26c03c9 100644 --- a/server/go.sum +++ b/server/go.sum @@ -198,6 +198,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/s2-streamstore/s2-sdk-go v0.16.1 h1:18Qht850wUhIb9JZkMwF5EJWfnmZnjdtW3z8xOuL7Ys= +github.com/s2-streamstore/s2-sdk-go v0.16.1/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo= github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go new file mode 100644 index 00000000..47f6c4fe --- /dev/null +++ b/server/lib/events/eventsstorage.go @@ -0,0 +1,52 @@ +package events + +import ( + "context" + "log/slog" +) + +// EventsStorage is the durable storage backend for browser events. +type EventsStorage interface { + Append(ctx context.Context, env Envelope) error + Close() error +} + +// EventsStorageWriter drains the ring buffer and forwards each envelope to +// the configured EventsStorage backend. It is designed to run as a single +// goroutine via Run. +type EventsStorageWriter struct { + reader *Reader + storage EventsStorage +} + +// NewEventsStorageWriter creates a writer that reads from es starting at seq 0. +func NewEventsStorageWriter(es *EventStream, storage EventsStorage) *EventsStorageWriter { + return &EventsStorageWriter{ + reader: es.NewReader(0), + storage: storage, + } +} + +// Run reads from the ring buffer and appends each envelope to storage until +// ctx is cancelled. It returns nil on clean shutdown. +func (w *EventsStorageWriter) Run(ctx context.Context) error { + for { + res, err := w.reader.Read(ctx) + if err != nil { + // ctx cancelled — clean shutdown + return nil + } + if res.Dropped > 0 { + slog.Warn("events storage writer: dropped events", "count", res.Dropped) + continue + } + if err := w.storage.Append(ctx, *res.Envelope); err != nil { + slog.Error("events storage writer: append failed", "seq", res.Envelope.Seq, "err", err) + } + } +} + +// Close drains in-flight writes and releases backend resources. +func (w *EventsStorageWriter) Close() error { + return w.storage.Close() +} diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go new file mode 100644 index 00000000..3e26a322 --- /dev/null +++ b/server/lib/events/eventsstorage_writer_test.go @@ -0,0 +1,143 @@ +package events + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockBackend struct { + mu sync.Mutex + appended []Envelope + err error +} + +func (m *mockBackend) Append(_ context.Context, env Envelope) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.err != nil { + return m.err + } + m.appended = append(m.appended, env) + return nil +} + +func (m *mockBackend) Close() error { return nil } + +func (m *mockBackend) envelopes() []Envelope { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]Envelope, len(m.appended)) + copy(out, m.appended) + return out +} + +func newTestStream(t *testing.T, capacity int) *EventStream { + t.Helper() + es, err := NewEventStream(EventStreamConfig{RingCapacity: capacity}) + require.NoError(t, err) + return es +} + +func makeEvent(typ string) Event { + return Event{Type: typ, Category: CategorySystem} +} + +func TestEventsStorageWriter_NormalAppend(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewEventsStorageWriter(es, backend) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + go func() { + defer close(done) + w.Run(ctx) //nolint:errcheck + }() + + env1 := es.Publish(Envelope{Event: makeEvent("test.one")}) + env2 := es.Publish(Envelope{Event: makeEvent("test.two")}) + + // wait for both to land + require.Eventually(t, func() bool { + return len(backend.envelopes()) == 2 + }, time.Second, 5*time.Millisecond) + + cancel() + <-done + + got := backend.envelopes() + assert.Equal(t, env1.Seq, got[0].Seq) + assert.Equal(t, "test.one", got[0].Event.Type) + assert.Equal(t, env2.Seq, got[1].Seq) + assert.Equal(t, "test.two", got[1].Event.Type) +} + +func TestEventsStorageWriter_DroppedEvents(t *testing.T) { + // Use a tiny ring so overflow is easy to trigger + es := newTestStream(t, 4) + backend := &mockBackend{} + w := NewEventsStorageWriter(es, backend) + + // Publish 8 events without a reader running — fills and wraps the ring + for i := range 8 { + es.Publish(Envelope{Event: makeEvent("drop.test." + string(rune('a'+i)))}) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + w.Run(ctx) //nolint:errcheck + }() + + // Let the writer drain what's available + require.Eventually(t, func() bool { + return len(backend.envelopes()) > 0 + }, time.Second, 5*time.Millisecond) + + cancel() + <-done + // Writer should not have crashed; any envelopes it did receive are valid + for _, env := range backend.envelopes() { + assert.NotEmpty(t, env.Event.Type) + } +} + +func TestEventsStorageWriter_AppendError(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{err: errors.New("storage unavailable")} + w := NewEventsStorageWriter(es, backend) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + w.Run(ctx) //nolint:errcheck + }() + + es.Publish(Envelope{Event: makeEvent("will.fail")}) + + // Give the writer a moment to process then cancel — it must not crash + time.Sleep(20 * time.Millisecond) + cancel() + <-done +} + +func TestEventsStorageWriter_ContextCancelled(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewEventsStorageWriter(es, backend) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + err := w.Run(ctx) + assert.NoError(t, err) +} diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go new file mode 100644 index 00000000..c0871d63 --- /dev/null +++ b/server/lib/events/s2storage.go @@ -0,0 +1,107 @@ +package events + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/s2-streamstore/s2-sdk-go/s2" +) + +// S2Config holds batcher tuning parameters for the S2. +type S2Config struct { + // BatcherLingerMs is how long the batcher waits before flushing (default: 100ms). + BatcherLingerMs int + // BatcherMaxRecords is the max records per batch (default: 50). + BatcherMaxRecords int +} + +// s2Producer bundles an S2 producer with a WaitGroup tracking in-flight ack goroutines. +type s2Producer struct { + p *s2.Producer + wg sync.WaitGroup +} + +func (sp *s2Producer) close() error { + sp.wg.Wait() + return sp.p.Close() +} + +// S2Storage is an EventsStorage backed by S2. All events are appended to a +// single fixed stream whose name is provided at construction time. +type S2Storage struct { + stream *s2.StreamClient + producer *s2Producer +} + +// NewS2Storage creates an S2Storage that appends to the given stream within basin. +// ctx is used only for AppendSession creation; it should be the process lifetime context. +func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config) (*S2Storage, error) { + if basin == "" || accessToken == "" || streamName == "" { + return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required") + } + + lingerMs := cfg.BatcherLingerMs + if lingerMs <= 0 { + lingerMs = 100 + } + maxRecs := cfg.BatcherMaxRecords + if maxRecs <= 0 { + maxRecs = 50 + } + + client := s2.New(accessToken, nil) + stream := client.Basin(basin).Stream(s2.StreamName(streamName)) + + session, err := stream.AppendSession(ctx, nil) + if err != nil { + return nil, fmt.Errorf("s2storage: open append session: %w", err) + } + + batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{ + Linger: time.Duration(lingerMs) * time.Millisecond, + MaxRecords: maxRecs, + }) + producer := s2.NewProducer(ctx, batcher, session) + + return &S2Storage{ + stream: stream, + producer: &s2Producer{ + p: producer, + }, + }, nil +} + +// Append marshals env to JSON and submits it to the S2 producer. +// The envelope is already size-bounded by EventStream.Publish (truncateIfNeeded). +func (s *S2Storage) Append(_ context.Context, env Envelope) error { + data, err := json.Marshal(env) + if err != nil { + return fmt.Errorf("s2storage: marshal envelope seq=%d: %w", env.Seq, err) + } + + future, err := s.producer.p.Submit(s2.AppendRecord{Body: data}) + if err != nil { + return fmt.Errorf("s2storage: submit seq=%d: %w", env.Seq, err) + } + + s.producer.wg.Add(1) + go func() { + defer s.producer.wg.Done() + ticket, err := future.Wait(context.Background()) + if err != nil || ticket == nil { + return + } + ticket.Ack(context.Background()) //nolint:errcheck + }() + + return nil +} + +// Close drains in-flight ack goroutines and closes the producer (which flushes +// the S2 batcher to the network). +func (s *S2Storage) Close() error { + return s.producer.close() +} From 6e50eb4772e70d87910256213771406e9797c2fb Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 16:11:52 -0300 Subject: [PATCH 02/17] feat: wire S2 storage writer into main with ordered shutdown --- server/cmd/api/main.go | 35 +++++++++++++++++++++++++++++++++++ server/cmd/config/config.go | 7 +++++++ 2 files changed, 42 insertions(+) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 48d17351..45b9c231 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -102,6 +102,21 @@ func main() { } captureSession := capturesession.NewCaptureSession(eventStream) + // Optional S2 durable storage sink. + var storageWriter *events.EventsStorageWriter + if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { + s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, + events.S2Config{ + BatcherLingerMs: config.S2BatcherLingerMs, + BatcherMaxRecords: config.S2BatcherMaxRecs, + }) + if err != nil { + slogger.Error("failed to create S2 storage", "err", err) + os.Exit(1) + } + storageWriter = events.NewEventsStorageWriter(eventStream, s2stor) + } + apiService, err := api.New( recorder.NewFFmpegManager(), recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz), @@ -244,10 +259,30 @@ func main() { } }() + // Start S2 storage writer goroutine. storageDone is closed when Run returns. + storageDone := make(chan struct{}) + if storageWriter != nil { + go func() { + defer close(storageDone) + storageWriter.Run(ctx) //nolint:errcheck + }() + } else { + close(storageDone) + } + // graceful shutdown <-ctx.Done() slogger.Info("shutdown signal received") + // Wait for the storage writer to drain from the ring, then flush S2 before + // shutting down the HTTP servers and closing the capture session. + <-storageDone + if storageWriter != nil { + if err := storageWriter.Close(); err != nil { + slogger.Error("storage writer close failed", "err", err) + } + } + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) defer shutdownCancel() g, _ := errgroup.WithContext(shutdownCtx) diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index 2fdd4bdb..68325531 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -35,6 +35,13 @@ type Config struct { // DevTools proxy address passed to ChromeDriver as goog:chromeOptions.debuggerAddress. // If empty, it is derived from DevToolsProxyPort as 127.0.0.1:. DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""` + + // S2 durable event storage. All three fields must be set to enable the S2 sink. + S2Basin string `envconfig:"S2_BASIN" default:""` + S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""` + S2Stream string `envconfig:"S2_STREAM" default:""` + S2BatcherLingerMs int `envconfig:"S2_BATCHER_LINGER_MS" default:"100"` + S2BatcherMaxRecs int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"` } // Load loads configuration from environment variables From 786820569b54c3eda3004d839b1f2b0a5cffa1c2 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 16:13:28 -0300 Subject: [PATCH 03/17] feat: pass S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM through supervisord configs --- .../chromium-headful/supervisor/services/kernel-images-api.conf | 2 +- .../image/supervisor/services/kernel-images-api.conf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/images/chromium-headful/supervisor/services/kernel-images-api.conf b/images/chromium-headful/supervisor/services/kernel-images-api.conf index e57d30a8..f1195bb1 100644 --- a/images/chromium-headful/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headful/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=2 diff --git a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf index e57d30a8..f1195bb1 100644 --- a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=2 From bbf4a3568246bed0d717c93ef30a6ddc66e0f8c3 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 16:15:56 -0300 Subject: [PATCH 04/17] fix: update config tests to expect S2 batcher defaults --- server/cmd/config/config_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/cmd/config/config_test.go b/server/cmd/config/config_test.go index d2b50291..03b35a4f 100644 --- a/server/cmd/config/config_test.go +++ b/server/cmd/config/config_test.go @@ -29,6 +29,8 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 9224, ChromeDriverUpstreamAddr: "127.0.0.1:9225", DevToolsProxyAddr: "127.0.0.1:9222", + S2BatcherLingerMs: 100, + S2BatcherMaxRecs: 50, }, }, { @@ -57,6 +59,8 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 5432, ChromeDriverUpstreamAddr: "127.0.0.1:9999", DevToolsProxyAddr: "127.0.0.1:9876", + S2BatcherLingerMs: 100, + S2BatcherMaxRecs: 50, }, }, { @@ -77,6 +81,8 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 9224, ChromeDriverUpstreamAddr: "127.0.0.1:9225", DevToolsProxyAddr: "10.0.0.1:1234", + S2BatcherLingerMs: 100, + S2BatcherMaxRecs: 50, }, }, { From c9a644509046334aad9e11f02ba0f98aef13b1ed Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 16:18:05 -0300 Subject: [PATCH 05/17] chore: promote s2-sdk-go to direct dependency --- server/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/go.mod b/server/go.mod index 18907540..1bdae079 100644 --- a/server/go.mod +++ b/server/go.mod @@ -21,6 +21,7 @@ require ( github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866 github.com/nrednav/cuid2 v1.1.0 github.com/oapi-codegen/runtime v1.2.0 + github.com/s2-streamstore/s2-sdk-go v0.16.1 github.com/samber/lo v1.52.0 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 @@ -80,7 +81,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/s2-streamstore/s2-sdk-go v0.16.1 // indirect github.com/shirou/gopsutil/v4 v4.25.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/speakeasy-api/jsonpath v0.6.0 // indirect From f42f992f49f2a31dea0030ee8cd295654b5378ea Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 17:20:30 -0300 Subject: [PATCH 06/17] review: rename Storage/StorageWriter, use time.Duration for batcher linger, fix test race, pass S2 env vars through run-docker.sh --- images/chromium-headful/run-docker.sh | 11 ++ images/chromium-headless/run-docker.sh | 11 ++ server/cmd/api/main.go | 7 +- server/cmd/config/config.go | 10 +- server/cmd/config/config_test.go | 12 +- server/e2e/e2e_s2_storage_test.go | 104 ++++++++++++++++++ server/lib/events/eventsstorage.go | 29 +++-- .../lib/events/eventsstorage_writer_test.go | 63 +++++++---- server/lib/events/s2storage.go | 45 ++++---- 9 files changed, 224 insertions(+), 68 deletions(-) create mode 100644 server/e2e/e2e_s2_storage_test.go diff --git a/images/chromium-headful/run-docker.sh b/images/chromium-headful/run-docker.sh index aad41331..0c1255d7 100755 --- a/images/chromium-headful/run-docker.sh +++ b/images/chromium-headful/run-docker.sh @@ -71,6 +71,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" ) fi +# S2 durable event storage (all three must be set to enable the sink) +if [[ -n "${S2_BASIN:-}" ]]; then + RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" ) +fi +if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then + RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" ) +fi +if [[ -n "${S2_STREAM:-}" ]]; then + RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" ) +fi + # WebRTC port mapping if [[ "${ENABLE_WEBRTC:-}" == "true" ]]; then echo "Running container with WebRTC" diff --git a/images/chromium-headless/run-docker.sh b/images/chromium-headless/run-docker.sh index 56f582bf..4a670748 100755 --- a/images/chromium-headless/run-docker.sh +++ b/images/chromium-headless/run-docker.sh @@ -24,6 +24,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" ) fi +# S2 durable event storage (all three must be set to enable the sink) +if [[ -n "${S2_BASIN:-}" ]]; then + RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" ) +fi +if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then + RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" ) +fi +if [[ -n "${S2_STREAM:-}" ]]; then + RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" ) +fi + # If a positional argument is given, use it as the entrypoint ENTRYPOINT_ARG=() if [[ $# -ge 1 && -n "$1" ]]; then diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 45b9c231..97ebac1d 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -103,18 +103,19 @@ func main() { captureSession := capturesession.NewCaptureSession(eventStream) // Optional S2 durable storage sink. - var storageWriter *events.EventsStorageWriter + var storageWriter *events.StorageWriter if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { + slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream) s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{ - BatcherLingerMs: config.S2BatcherLingerMs, + BatcherLinger: config.S2BatcherLinger, BatcherMaxRecords: config.S2BatcherMaxRecs, }) if err != nil { slogger.Error("failed to create S2 storage", "err", err) os.Exit(1) } - storageWriter = events.NewEventsStorageWriter(eventStream, s2stor) + storageWriter = events.NewStorageWriter(eventStream, s2stor) } apiService, err := api.New( diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index 68325531..ab51625e 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -37,11 +37,11 @@ type Config struct { DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""` // S2 durable event storage. All three fields must be set to enable the S2 sink. - S2Basin string `envconfig:"S2_BASIN" default:""` - S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""` - S2Stream string `envconfig:"S2_STREAM" default:""` - S2BatcherLingerMs int `envconfig:"S2_BATCHER_LINGER_MS" default:"100"` - S2BatcherMaxRecs int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"` + S2Basin string `envconfig:"S2_BASIN" default:""` + S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""` + S2Stream string `envconfig:"S2_STREAM" default:""` + S2BatcherLinger time.Duration `envconfig:"S2_BATCHER_LINGER" default:"100ms"` + S2BatcherMaxRecs int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"` } // Load loads configuration from environment variables diff --git a/server/cmd/config/config_test.go b/server/cmd/config/config_test.go index 03b35a4f..d719ab21 100644 --- a/server/cmd/config/config_test.go +++ b/server/cmd/config/config_test.go @@ -29,8 +29,8 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 9224, ChromeDriverUpstreamAddr: "127.0.0.1:9225", DevToolsProxyAddr: "127.0.0.1:9222", - S2BatcherLingerMs: 100, - S2BatcherMaxRecs: 50, + S2BatcherLinger: 100 * time.Millisecond, + S2BatcherMaxRecs: 50, }, }, { @@ -59,8 +59,8 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 5432, ChromeDriverUpstreamAddr: "127.0.0.1:9999", DevToolsProxyAddr: "127.0.0.1:9876", - S2BatcherLingerMs: 100, - S2BatcherMaxRecs: 50, + S2BatcherLinger: 100 * time.Millisecond, + S2BatcherMaxRecs: 50, }, }, { @@ -81,8 +81,8 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 9224, ChromeDriverUpstreamAddr: "127.0.0.1:9225", DevToolsProxyAddr: "10.0.0.1:1234", - S2BatcherLingerMs: 100, - S2BatcherMaxRecs: 50, + S2BatcherLinger: 100 * time.Millisecond, + S2BatcherMaxRecs: 50, }, }, { diff --git a/server/e2e/e2e_s2_storage_test.go b/server/e2e/e2e_s2_storage_test.go new file mode 100644 index 00000000..af99f857 --- /dev/null +++ b/server/e2e/e2e_s2_storage_test.go @@ -0,0 +1,104 @@ +package e2e + +import ( + "context" + "net/http" + "os" + "os/exec" + "testing" + "time" + + "github.com/s2-streamstore/s2-sdk-go/s2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + instanceoapi "github.com/kernel/kernel-images/server/lib/oapi" +) + +// TestS2StorageWriter starts a headless container with S2 credentials, runs a +// capture session, and verifies that events land in the configured S2 stream. +// +// Skips automatically when S2_BASIN, S2_ACCESS_TOKEN, or S2_STREAM are unset. +func TestS2StorageWriter(t *testing.T) { + basin := os.Getenv("S2_BASIN") + accessToken := os.Getenv("S2_ACCESS_TOKEN") + stream := os.Getenv("S2_STREAM") + if basin == "" || accessToken == "" || stream == "" { + t.Skip("S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM must be set to run this test") + } + + if _, err := exec.LookPath("docker"); err != nil { + t.Skipf("docker not available: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + c := NewTestContainer(t, headlessImage) + require.NoError(t, c.Start(ctx, ContainerConfig{ + Env: map[string]string{ + "S2_BASIN": basin, + "S2_ACCESS_TOKEN": accessToken, + "S2_STREAM": stream, + }, + }), "failed to start container") + defer c.Stop(ctx) + + require.NoError(t, c.WaitReady(ctx), "api not ready") + + client, err := c.APIClient() + require.NoError(t, err) + + // Note the current S2 stream tail seq before we write anything so we only + // read records produced by this test run. + s2Client := s2.New(accessToken, nil) + streamClient := s2Client.Basin(basin).Stream(s2.StreamName(stream)) + + checkResp, err := streamClient.CheckTail(ctx) + require.NoError(t, err, "check tail before test") + startSeq := checkResp.SeqNum + + // Start a capture session. + startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{}) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, startResp.StatusCode(), "start capture session: %s", string(startResp.Body)) + require.NotNil(t, startResp.JSON201) + sessionID := startResp.JSON201.Id + t.Logf("capture session started: %s", sessionID) + + // Let the session run briefly so at least one event is published (the + // session_started system event is emitted on session start). + time.Sleep(500 * time.Millisecond) + + // Stop the capture session. + stopResp, err := client.StopCaptureSessionWithResponse(ctx) + require.NoError(t, err) + require.Equal(t, http.StatusOK, stopResp.StatusCode(), "stop capture session: %s", string(stopResp.Body)) + t.Log("capture session stopped") + + // Give the storage writer time to flush to S2 (batcher linger + network). + time.Sleep(2 * time.Second) + + // Read records written after the pre-test tail and verify at least one + // envelope is present. + readSession, err := streamClient.ReadSession(ctx, &s2.ReadOptions{ + SeqNum: s2.Uint64(startSeq), + }) + require.NoError(t, err, "open S2 read session") + defer readSession.Close() + + readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second) + defer readCancel() + + var count int + for readSession.Next(readCtx) { + count++ + } + // EOF is expected once we reach the tail — not an error. + if err := readSession.Err(); err != nil && readCtx.Err() == nil { + t.Fatalf("S2 read session error: %v", err) + } + + assert.Greater(t, count, 0, "expected at least one event record in S2 stream %q", stream) + t.Logf("found %d record(s) in S2 stream after seq %d", count, startSeq) +} diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index 47f6c4fe..30bbd7b3 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -5,23 +5,23 @@ import ( "log/slog" ) -// EventsStorage is the durable storage backend for browser events. -type EventsStorage interface { +// Storage is the durable storage backend for browser events. +type Storage interface { Append(ctx context.Context, env Envelope) error Close() error } -// EventsStorageWriter drains the ring buffer and forwards each envelope to -// the configured EventsStorage backend. It is designed to run as a single -// goroutine via Run. -type EventsStorageWriter struct { +// StorageWriter drains the ring buffer and forwards each envelope to the +// configured Storage backend. Single-use: call Run once; it blocks until ctx +// is cancelled. Call Close after Run returns to flush in-flight writes. +type StorageWriter struct { reader *Reader - storage EventsStorage + storage Storage } -// NewEventsStorageWriter creates a writer that reads from es starting at seq 0. -func NewEventsStorageWriter(es *EventStream, storage EventsStorage) *EventsStorageWriter { - return &EventsStorageWriter{ +// NewStorageWriter creates a writer that reads from es starting at seq 0. +func NewStorageWriter(es *EventStream, storage Storage) *StorageWriter { + return &StorageWriter{ reader: es.NewReader(0), storage: storage, } @@ -29,24 +29,23 @@ func NewEventsStorageWriter(es *EventStream, storage EventsStorage) *EventsStora // Run reads from the ring buffer and appends each envelope to storage until // ctx is cancelled. It returns nil on clean shutdown. -func (w *EventsStorageWriter) Run(ctx context.Context) error { +func (w *StorageWriter) Run(ctx context.Context) error { for { res, err := w.reader.Read(ctx) if err != nil { - // ctx cancelled — clean shutdown return nil } if res.Dropped > 0 { - slog.Warn("events storage writer: dropped events", "count", res.Dropped) + slog.Warn("storage writer: dropped events", "count", res.Dropped) continue } if err := w.storage.Append(ctx, *res.Envelope); err != nil { - slog.Error("events storage writer: append failed", "seq", res.Envelope.Seq, "err", err) + slog.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err) } } } // Close drains in-flight writes and releases backend resources. -func (w *EventsStorageWriter) Close() error { +func (w *StorageWriter) Close() error { return w.storage.Close() } diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go index 3e26a322..203c4593 100644 --- a/server/lib/events/eventsstorage_writer_test.go +++ b/server/lib/events/eventsstorage_writer_test.go @@ -12,15 +12,17 @@ import ( ) type mockBackend struct { - mu sync.Mutex - appended []Envelope - err error + mu sync.Mutex + appended []Envelope + err error + errCount int // total calls that returned an error } func (m *mockBackend) Append(_ context.Context, env Envelope) error { m.mu.Lock() defer m.mu.Unlock() if m.err != nil { + m.errCount++ return m.err } m.appended = append(m.appended, env) @@ -37,6 +39,18 @@ func (m *mockBackend) envelopes() []Envelope { return out } +func (m *mockBackend) errors() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.errCount +} + +func (m *mockBackend) clearErr() { + m.mu.Lock() + defer m.mu.Unlock() + m.err = nil +} + func newTestStream(t *testing.T, capacity int) *EventStream { t.Helper() es, err := NewEventStream(EventStreamConfig{RingCapacity: capacity}) @@ -48,10 +62,10 @@ func makeEvent(typ string) Event { return Event{Type: typ, Category: CategorySystem} } -func TestEventsStorageWriter_NormalAppend(t *testing.T) { +func TestStorageWriter_NormalAppend(t *testing.T) { es := newTestStream(t, 64) backend := &mockBackend{} - w := NewEventsStorageWriter(es, backend) + w := NewStorageWriter(es, backend) ctx, cancel := context.WithCancel(context.Background()) @@ -64,7 +78,6 @@ func TestEventsStorageWriter_NormalAppend(t *testing.T) { env1 := es.Publish(Envelope{Event: makeEvent("test.one")}) env2 := es.Publish(Envelope{Event: makeEvent("test.two")}) - // wait for both to land require.Eventually(t, func() bool { return len(backend.envelopes()) == 2 }, time.Second, 5*time.Millisecond) @@ -79,13 +92,12 @@ func TestEventsStorageWriter_NormalAppend(t *testing.T) { assert.Equal(t, "test.two", got[1].Event.Type) } -func TestEventsStorageWriter_DroppedEvents(t *testing.T) { - // Use a tiny ring so overflow is easy to trigger +func TestStorageWriter_DroppedEvents(t *testing.T) { es := newTestStream(t, 4) backend := &mockBackend{} - w := NewEventsStorageWriter(es, backend) + w := NewStorageWriter(es, backend) - // Publish 8 events without a reader running — fills and wraps the ring + // Publish 8 events before the writer starts — fills and wraps the ring. for i := range 8 { es.Publish(Envelope{Event: makeEvent("drop.test." + string(rune('a'+i)))}) } @@ -97,23 +109,22 @@ func TestEventsStorageWriter_DroppedEvents(t *testing.T) { w.Run(ctx) //nolint:errcheck }() - // Let the writer drain what's available require.Eventually(t, func() bool { return len(backend.envelopes()) > 0 }, time.Second, 5*time.Millisecond) cancel() <-done - // Writer should not have crashed; any envelopes it did receive are valid + for _, env := range backend.envelopes() { assert.NotEmpty(t, env.Event.Type) } } -func TestEventsStorageWriter_AppendError(t *testing.T) { +func TestStorageWriter_AppendError(t *testing.T) { es := newTestStream(t, 64) backend := &mockBackend{err: errors.New("storage unavailable")} - w := NewEventsStorageWriter(es, backend) + w := NewStorageWriter(es, backend) ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) @@ -122,21 +133,35 @@ func TestEventsStorageWriter_AppendError(t *testing.T) { w.Run(ctx) //nolint:errcheck }() + // Publish an event that will fail. Wait until the writer has attempted it + // (errCount > 0), then clear the error and publish a second event. The + // writer must continue past the error and deliver the second event. es.Publish(Envelope{Event: makeEvent("will.fail")}) + require.Eventually(t, func() bool { + return backend.errors() > 0 + }, time.Second, 5*time.Millisecond) + + backend.clearErr() + es.Publish(Envelope{Event: makeEvent("will.succeed")}) + require.Eventually(t, func() bool { + return len(backend.envelopes()) == 1 + }, time.Second, 5*time.Millisecond) - // Give the writer a moment to process then cancel — it must not crash - time.Sleep(20 * time.Millisecond) cancel() <-done + + got := backend.envelopes() + require.Len(t, got, 1) + assert.Equal(t, "will.succeed", got[0].Event.Type) } -func TestEventsStorageWriter_ContextCancelled(t *testing.T) { +func TestStorageWriter_ContextCancelled(t *testing.T) { es := newTestStream(t, 64) backend := &mockBackend{} - w := NewEventsStorageWriter(es, backend) + w := NewStorageWriter(es, backend) ctx, cancel := context.WithCancel(context.Background()) - cancel() // cancel immediately + cancel() err := w.Run(ctx) assert.NoError(t, err) diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index c0871d63..822288bf 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -4,21 +4,21 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "sync" "time" "github.com/s2-streamstore/s2-sdk-go/s2" ) -// S2Config holds batcher tuning parameters for the S2. +// S2Config holds batcher tuning parameters for the S2 backend. type S2Config struct { - // BatcherLingerMs is how long the batcher waits before flushing (default: 100ms). - BatcherLingerMs int + // BatcherLinger is how long the batcher waits before flushing (default: 100ms). + BatcherLinger time.Duration // BatcherMaxRecords is the max records per batch (default: 50). BatcherMaxRecords int } -// s2Producer bundles an S2 producer with a WaitGroup tracking in-flight ack goroutines. type s2Producer struct { p *s2.Producer wg sync.WaitGroup @@ -29,23 +29,23 @@ func (sp *s2Producer) close() error { return sp.p.Close() } -// S2Storage is an EventsStorage backed by S2. All events are appended to a -// single fixed stream whose name is provided at construction time. +// S2Storage is a Storage backed by S2. All events are appended to a single +// fixed stream whose name is provided at construction time. type S2Storage struct { stream *s2.StreamClient producer *s2Producer } // NewS2Storage creates an S2Storage that appends to the given stream within basin. -// ctx is used only for AppendSession creation; it should be the process lifetime context. +// ctx is used for AppendSession creation and should be the process lifetime context. func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config) (*S2Storage, error) { if basin == "" || accessToken == "" || streamName == "" { return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required") } - lingerMs := cfg.BatcherLingerMs - if lingerMs <= 0 { - lingerMs = 100 + linger := cfg.BatcherLinger + if linger <= 0 { + linger = 100 * time.Millisecond } maxRecs := cfg.BatcherMaxRecords if maxRecs <= 0 { @@ -61,21 +61,18 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf } batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{ - Linger: time.Duration(lingerMs) * time.Millisecond, + Linger: linger, MaxRecords: maxRecs, }) producer := s2.NewProducer(ctx, batcher, session) return &S2Storage{ - stream: stream, - producer: &s2Producer{ - p: producer, - }, + stream: stream, + producer: &s2Producer{p: producer}, }, nil } // Append marshals env to JSON and submits it to the S2 producer. -// The envelope is already size-bounded by EventStream.Publish (truncateIfNeeded). func (s *S2Storage) Append(_ context.Context, env Envelope) error { data, err := json.Marshal(env) if err != nil { @@ -90,18 +87,26 @@ func (s *S2Storage) Append(_ context.Context, env Envelope) error { s.producer.wg.Add(1) go func() { defer s.producer.wg.Done() + // Use a fresh background context so the ack isn't tied to any request + // context, but the wg ensures Close() waits before the process exits. ticket, err := future.Wait(context.Background()) - if err != nil || ticket == nil { + if err != nil { + slog.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err) return } - ticket.Ack(context.Background()) //nolint:errcheck + if ticket == nil { + return + } + if _, err := ticket.Ack(context.Background()); err != nil { + slog.Error("s2storage: ack failed", "seq", env.Seq, "err", err) + } }() return nil } -// Close drains in-flight ack goroutines and closes the producer (which flushes -// the S2 batcher to the network). +// Close drains in-flight ack goroutines then closes the producer, which flushes +// the S2 batcher to the network. func (s *S2Storage) Close() error { return s.producer.close() } From 92a96b6bb04cfd623b004ec4a26d878280f85adf Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 17:20:30 -0300 Subject: [PATCH 07/17] fix: remove dead defaults, bound ack goroutine contexts, embed s2Producer, improve contracts --- server/cmd/api/main.go | 6 +- server/cmd/config/config.go | 8 +-- server/cmd/config/config_test.go | 6 -- server/lib/events/eventsstorage.go | 6 +- .../lib/events/eventsstorage_writer_test.go | 10 +++- server/lib/events/s2storage.go | 57 +++++++++++-------- 6 files changed, 48 insertions(+), 45 deletions(-) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 97ebac1d..d6806de6 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -106,11 +106,7 @@ func main() { var storageWriter *events.StorageWriter if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream) - s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, - events.S2Config{ - BatcherLinger: config.S2BatcherLinger, - BatcherMaxRecords: config.S2BatcherMaxRecs, - }) + s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}) if err != nil { slogger.Error("failed to create S2 storage", "err", err) os.Exit(1) diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index ab51625e..3a18b082 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -37,11 +37,9 @@ type Config struct { DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""` // S2 durable event storage. All three fields must be set to enable the S2 sink. - S2Basin string `envconfig:"S2_BASIN" default:""` - S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""` - S2Stream string `envconfig:"S2_STREAM" default:""` - S2BatcherLinger time.Duration `envconfig:"S2_BATCHER_LINGER" default:"100ms"` - S2BatcherMaxRecs int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"` + S2Basin string `envconfig:"S2_BASIN" default:""` + S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""` + S2Stream string `envconfig:"S2_STREAM" default:""` } // Load loads configuration from environment variables diff --git a/server/cmd/config/config_test.go b/server/cmd/config/config_test.go index d719ab21..d2b50291 100644 --- a/server/cmd/config/config_test.go +++ b/server/cmd/config/config_test.go @@ -29,8 +29,6 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 9224, ChromeDriverUpstreamAddr: "127.0.0.1:9225", DevToolsProxyAddr: "127.0.0.1:9222", - S2BatcherLinger: 100 * time.Millisecond, - S2BatcherMaxRecs: 50, }, }, { @@ -59,8 +57,6 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 5432, ChromeDriverUpstreamAddr: "127.0.0.1:9999", DevToolsProxyAddr: "127.0.0.1:9876", - S2BatcherLinger: 100 * time.Millisecond, - S2BatcherMaxRecs: 50, }, }, { @@ -81,8 +77,6 @@ func TestLoad(t *testing.T) { ChromeDriverProxyPort: 9224, ChromeDriverUpstreamAddr: "127.0.0.1:9225", DevToolsProxyAddr: "10.0.0.1:1234", - S2BatcherLinger: 100 * time.Millisecond, - S2BatcherMaxRecs: 50, }, }, { diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index 30bbd7b3..02ecb735 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -6,6 +6,7 @@ import ( ) // Storage is the durable storage backend for browser events. +// Append is called serially from StorageWriter.Run and need not be thread-safe. type Storage interface { Append(ctx context.Context, env Envelope) error Close() error @@ -14,6 +15,7 @@ type Storage interface { // StorageWriter drains the ring buffer and forwards each envelope to the // configured Storage backend. Single-use: call Run once; it blocks until ctx // is cancelled. Call Close after Run returns to flush in-flight writes. +// Starts from the oldest available event in the ring, not the current tail. type StorageWriter struct { reader *Reader storage Storage @@ -28,12 +30,12 @@ func NewStorageWriter(es *EventStream, storage Storage) *StorageWriter { } // Run reads from the ring buffer and appends each envelope to storage until -// ctx is cancelled. It returns nil on clean shutdown. +// ctx is cancelled. Returns ctx.Err() on clean shutdown. func (w *StorageWriter) Run(ctx context.Context) error { for { res, err := w.reader.Read(ctx) if err != nil { - return nil + return ctx.Err() } if res.Dropped > 0 { slog.Warn("storage writer: dropped events", "count", res.Dropped) diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go index 203c4593..f6b5200b 100644 --- a/server/lib/events/eventsstorage_writer_test.go +++ b/server/lib/events/eventsstorage_writer_test.go @@ -15,7 +15,7 @@ type mockBackend struct { mu sync.Mutex appended []Envelope err error - errCount int // total calls that returned an error + errCount int } func (m *mockBackend) Append(_ context.Context, env Envelope) error { @@ -116,7 +116,11 @@ func TestStorageWriter_DroppedEvents(t *testing.T) { cancel() <-done - for _, env := range backend.envelopes() { + // With ring capacity 4 and 8 publishes, the writer must have skipped at + // least 4 events via a drop gap — so fewer than 8 envelopes landed. + got := backend.envelopes() + assert.Less(t, len(got), 8, "expected fewer than 8 envelopes due to ring overflow") + for _, env := range got { assert.NotEmpty(t, env.Event.Type) } } @@ -164,5 +168,5 @@ func TestStorageWriter_ContextCancelled(t *testing.T) { cancel() err := w.Run(ctx) - assert.NoError(t, err) + assert.ErrorIs(t, err, context.Canceled) } diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index 822288bf..bcef98b5 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -24,16 +24,25 @@ type s2Producer struct { wg sync.WaitGroup } -func (sp *s2Producer) close() error { - sp.wg.Wait() +func (sp *s2Producer) close(ctx context.Context) error { + done := make(chan struct{}) + go func() { + sp.wg.Wait() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } return sp.p.Close() } // S2Storage is a Storage backed by S2. All events are appended to a single // fixed stream whose name is provided at construction time. type S2Storage struct { - stream *s2.StreamClient - producer *s2Producer + producer s2Producer + shutdownCh chan struct{} // closed when Close is called, bounds ack goroutine contexts } // NewS2Storage creates an S2Storage that appends to the given stream within basin. @@ -43,15 +52,6 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required") } - linger := cfg.BatcherLinger - if linger <= 0 { - linger = 100 * time.Millisecond - } - maxRecs := cfg.BatcherMaxRecords - if maxRecs <= 0 { - maxRecs = 50 - } - client := s2.New(accessToken, nil) stream := client.Basin(basin).Stream(s2.StreamName(streamName)) @@ -61,14 +61,14 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf } batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{ - Linger: linger, - MaxRecords: maxRecs, + Linger: cfg.BatcherLinger, + MaxRecords: cfg.BatcherMaxRecords, }) producer := s2.NewProducer(ctx, batcher, session) return &S2Storage{ - stream: stream, - producer: &s2Producer{p: producer}, + producer: s2Producer{p: producer}, + shutdownCh: make(chan struct{}), }, nil } @@ -87,9 +87,17 @@ func (s *S2Storage) Append(_ context.Context, env Envelope) error { s.producer.wg.Add(1) go func() { defer s.producer.wg.Done() - // Use a fresh background context so the ack isn't tied to any request - // context, but the wg ensures Close() waits before the process exits. - ticket, err := future.Wait(context.Background()) + ackCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-s.shutdownCh: + cancel() + case <-ackCtx.Done(): + } + }() + + ticket, err := future.Wait(ackCtx) if err != nil { slog.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err) return @@ -97,7 +105,7 @@ func (s *S2Storage) Append(_ context.Context, env Envelope) error { if ticket == nil { return } - if _, err := ticket.Ack(context.Background()); err != nil { + if _, err := ticket.Ack(ackCtx); err != nil { slog.Error("s2storage: ack failed", "seq", env.Seq, "err", err) } }() @@ -105,8 +113,9 @@ func (s *S2Storage) Append(_ context.Context, env Envelope) error { return nil } -// Close drains in-flight ack goroutines then closes the producer, which flushes -// the S2 batcher to the network. +// Close cancels in-flight ack goroutines, waits for them to drain, then closes +// the producer (which flushes the S2 batcher to the network). func (s *S2Storage) Close() error { - return s.producer.close() + close(s.shutdownCh) + return s.producer.close(context.Background()) } From 64e42f0a33a34a6b1a14d347ee409b35950d505d Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 17:38:20 -0300 Subject: [PATCH 08/17] fix: return err directly from Run, inject logger, enforce single-use with sync.Once - StorageWriter.Run now returns err instead of ctx.Err(), preventing silent nil exit if Read ever returns a non-context error - Inject *slog.Logger into StorageWriter and S2Storage to match the injected-logger convention used throughout server/lib/ - Add sync.Once to StorageWriter to panic on a second Run call, making misuse a detectable failure instead of a silent reader state race --- server/cmd/api/main.go | 4 ++-- server/lib/events/eventsstorage.go | 21 ++++++++++++++----- .../lib/events/eventsstorage_writer_test.go | 9 ++++---- server/lib/events/s2storage.go | 8 ++++--- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index d6806de6..fbb23724 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -106,12 +106,12 @@ func main() { var storageWriter *events.StorageWriter if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream) - s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}) + s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}, slogger) if err != nil { slogger.Error("failed to create S2 storage", "err", err) os.Exit(1) } - storageWriter = events.NewStorageWriter(eventStream, s2stor) + storageWriter = events.NewStorageWriter(eventStream, s2stor, slogger) } apiService, err := api.New( diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index 02ecb735..2e72491f 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -3,6 +3,7 @@ package events import ( "context" "log/slog" + "sync" ) // Storage is the durable storage backend for browser events. @@ -19,30 +20,40 @@ type Storage interface { type StorageWriter struct { reader *Reader storage Storage + log *slog.Logger + once sync.Once } // NewStorageWriter creates a writer that reads from es starting at seq 0. -func NewStorageWriter(es *EventStream, storage Storage) *StorageWriter { +func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *StorageWriter { return &StorageWriter{ reader: es.NewReader(0), storage: storage, + log: log, } } // Run reads from the ring buffer and appends each envelope to storage until -// ctx is cancelled. Returns ctx.Err() on clean shutdown. +// ctx is cancelled. Returns ctx.Err() on clean shutdown. Must be called at +// most once; panics on a second call. func (w *StorageWriter) Run(ctx context.Context) error { + firstCall := false + w.once.Do(func() { firstCall = true }) + if !firstCall { + panic("events: StorageWriter.Run called more than once") + } + for { res, err := w.reader.Read(ctx) if err != nil { - return ctx.Err() + return err } if res.Dropped > 0 { - slog.Warn("storage writer: dropped events", "count", res.Dropped) + w.log.Warn("storage writer: dropped events", "count", res.Dropped) continue } if err := w.storage.Append(ctx, *res.Envelope); err != nil { - slog.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err) + w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err) } } } diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go index f6b5200b..641a135a 100644 --- a/server/lib/events/eventsstorage_writer_test.go +++ b/server/lib/events/eventsstorage_writer_test.go @@ -3,6 +3,7 @@ package events import ( "context" "errors" + "log/slog" "sync" "testing" "time" @@ -65,7 +66,7 @@ func makeEvent(typ string) Event { func TestStorageWriter_NormalAppend(t *testing.T) { es := newTestStream(t, 64) backend := &mockBackend{} - w := NewStorageWriter(es, backend) + w := NewStorageWriter(es, backend, slog.Default()) ctx, cancel := context.WithCancel(context.Background()) @@ -95,7 +96,7 @@ func TestStorageWriter_NormalAppend(t *testing.T) { func TestStorageWriter_DroppedEvents(t *testing.T) { es := newTestStream(t, 4) backend := &mockBackend{} - w := NewStorageWriter(es, backend) + w := NewStorageWriter(es, backend, slog.Default()) // Publish 8 events before the writer starts — fills and wraps the ring. for i := range 8 { @@ -128,7 +129,7 @@ func TestStorageWriter_DroppedEvents(t *testing.T) { func TestStorageWriter_AppendError(t *testing.T) { es := newTestStream(t, 64) backend := &mockBackend{err: errors.New("storage unavailable")} - w := NewStorageWriter(es, backend) + w := NewStorageWriter(es, backend, slog.Default()) ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) @@ -162,7 +163,7 @@ func TestStorageWriter_AppendError(t *testing.T) { func TestStorageWriter_ContextCancelled(t *testing.T) { es := newTestStream(t, 64) backend := &mockBackend{} - w := NewStorageWriter(es, backend) + w := NewStorageWriter(es, backend, slog.Default()) ctx, cancel := context.WithCancel(context.Background()) cancel() diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index bcef98b5..56b759e0 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -43,11 +43,12 @@ func (sp *s2Producer) close(ctx context.Context) error { type S2Storage struct { producer s2Producer shutdownCh chan struct{} // closed when Close is called, bounds ack goroutine contexts + log *slog.Logger } // NewS2Storage creates an S2Storage that appends to the given stream within basin. // ctx is used for AppendSession creation and should be the process lifetime context. -func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config) (*S2Storage, error) { +func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config, log *slog.Logger) (*S2Storage, error) { if basin == "" || accessToken == "" || streamName == "" { return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required") } @@ -69,6 +70,7 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf return &S2Storage{ producer: s2Producer{p: producer}, shutdownCh: make(chan struct{}), + log: log, }, nil } @@ -99,14 +101,14 @@ func (s *S2Storage) Append(_ context.Context, env Envelope) error { ticket, err := future.Wait(ackCtx) if err != nil { - slog.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err) + s.log.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err) return } if ticket == nil { return } if _, err := ticket.Ack(ackCtx); err != nil { - slog.Error("s2storage: ack failed", "seq", env.Seq, "err", err) + s.log.Error("s2storage: ack failed", "seq", env.Seq, "err", err) } }() From 4d6390945634c4344a2defa3b5d47718a56e58a5 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 17:38:49 -0300 Subject: [PATCH 09/17] fix: include dropped seq range in storage writer log ReadResult now carries DroppedFrom/DroppedTo so the exact gap is visible in logs, making silent data holes detectable without cross-referencing seq numbers elsewhere. --- server/lib/events/eventsstorage.go | 2 +- server/lib/events/ringbuffer.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index 2e72491f..c2141eab 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -49,7 +49,7 @@ func (w *StorageWriter) Run(ctx context.Context) error { return err } if res.Dropped > 0 { - w.log.Warn("storage writer: dropped events", "count", res.Dropped) + w.log.Warn("storage writer: dropped events", "count", res.Dropped, "from_seq", res.DroppedFrom, "to_seq", res.DroppedTo) continue } if err := w.storage.Append(ctx, *res.Envelope); err != nil { diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index e9733309..75533155 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -68,10 +68,13 @@ func (rb *ringBuffer) newReader(afterSeq uint64) *Reader { // ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is // set: Envelope is non-nil for a normal read, Dropped is non-zero when the -// reader fell behind and events were lost. +// reader fell behind and events were lost. When Dropped > 0, DroppedFrom and +// DroppedTo are the inclusive seq range of the dropped events. type ReadResult struct { - Envelope *Envelope - Dropped uint64 + Envelope *Envelope + Dropped uint64 + DroppedFrom uint64 // first seq of the dropped range (only valid when Dropped > 0) + DroppedTo uint64 // last seq of the dropped range (only valid when Dropped > 0) } // Reader tracks an independent read position in a ringBuffer. @@ -102,10 +105,11 @@ func (r *Reader) Read(ctx context.Context) (ReadResult, error) { } if r.nextSeq < oldest { + from := r.nextSeq dropped := oldest - r.nextSeq r.nextSeq = oldest r.rb.mu.RUnlock() - return ReadResult{Dropped: dropped}, nil + return ReadResult{Dropped: dropped, DroppedFrom: from, DroppedTo: oldest - 1}, nil } if r.nextSeq <= latest { From 9df3e6c9c61e6256a0bb9e93adba94839bd033c9 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 17:39:09 -0300 Subject: [PATCH 10/17] feat: count append errors on StorageWriter for ops visibility Each Append failure increments an atomic counter and logs total_append_errors alongside the per-event error, making a sustained S2 outage detectable in structured logs without manual counting. AppendErrors() exposes the counter for future metrics integration. Retry and DLQ are intentionally out of scope. --- server/lib/events/eventsstorage.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index c2141eab..57997a91 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "sync" + "sync/atomic" ) // Storage is the durable storage backend for browser events. @@ -18,10 +19,11 @@ type Storage interface { // is cancelled. Call Close after Run returns to flush in-flight writes. // Starts from the oldest available event in the ring, not the current tail. type StorageWriter struct { - reader *Reader - storage Storage - log *slog.Logger - once sync.Once + reader *Reader + storage Storage + log *slog.Logger + once sync.Once + appendErrors atomic.Uint64 // total append failures; best-effort, not retried } // NewStorageWriter creates a writer that reads from es starting at seq 0. @@ -53,11 +55,17 @@ func (w *StorageWriter) Run(ctx context.Context) error { continue } if err := w.storage.Append(ctx, *res.Envelope); err != nil { - w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err) + total := w.appendErrors.Add(1) + w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total) } } } +// AppendErrors returns the total number of Append failures since Run started. +func (w *StorageWriter) AppendErrors() uint64 { + return w.appendErrors.Load() +} + // Close drains in-flight writes and releases backend resources. func (w *StorageWriter) Close() error { return w.storage.Close() From cc1d744f385a40ce7e4f0195f24f6eae2b520c41 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 11 May 2026 17:40:54 -0300 Subject: [PATCH 11/17] fix: drain ring after HTTP shutdown to prevent data loss on exit Two bugs combined caused silent event loss on shutdown: 1. StorageWriter.Run exited immediately on ctx cancellation with no drain. 2. HTTP servers were shut down after waiting on storageDone, so new events could arrive in the ring after Run had already exited. Fix: - Add Reader.TryRead for non-blocking ring reads. - Add StorageWriter.Drain(ctx) which flushes remaining ring contents after all publishers have stopped, with a bounded 5s deadline. - Reorder main.go shutdown: HTTP servers (and all publishers) stop first, then Run exits, then Drain flushes the remaining tail, then Close flushes the S2 batcher to the network. - Document at-least-once delivery and dedupe-by-seq contract on StorageWriter. --- server/cmd/api/main.go | 26 +++++---- server/lib/events/eventsstorage.go | 53 +++++++++++++++---- .../lib/events/eventsstorage_writer_test.go | 36 +++++++++++++ server/lib/events/ringbuffer.go | 26 +++++++++ 4 files changed, 123 insertions(+), 18 deletions(-) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index fbb23724..46b26345 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -271,15 +271,9 @@ func main() { <-ctx.Done() slogger.Info("shutdown signal received") - // Wait for the storage writer to drain from the ring, then flush S2 before - // shutting down the HTTP servers and closing the capture session. - <-storageDone - if storageWriter != nil { - if err := storageWriter.Close(); err != nil { - slogger.Error("storage writer close failed", "err", err) - } - } - + // Step 1: shut down all HTTP servers and stop all event publishers (cdpmonitor, + // captureSession) before draining the ring. This bounds the set of events that + // can arrive after Run exits so Drain sees a stable tail. shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) defer shutdownCancel() g, _ := errgroup.WithContext(shutdownCtx) @@ -301,6 +295,20 @@ func main() { if err := g.Wait(); err != nil { slogger.Error("server failed to shutdown", "err", err) } + + // Step 2: wait for Run to return (it exits on ctx cancellation), then drain any + // events that arrived between the last Read and HTTP shutdown, then flush S2. + <-storageDone + if storageWriter != nil { + drainCtx, drainCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer drainCancel() + if err := storageWriter.Drain(drainCtx); err != nil { + slogger.Warn("storage writer drain incomplete", "err", err) + } + if err := storageWriter.Close(); err != nil { + slogger.Error("storage writer close failed", "err", err) + } + } } func mustFFmpeg() { diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index 57997a91..afda8a35 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -16,7 +16,13 @@ type Storage interface { // StorageWriter drains the ring buffer and forwards each envelope to the // configured Storage backend. Single-use: call Run once; it blocks until ctx -// is cancelled. Call Close after Run returns to flush in-flight writes. +// is cancelled. After ctx is cancelled, call Drain to flush events that +// arrived before all publishers stopped, then call Close to flush the backend. +// +// Delivery is at-least-once: on process restart the ring is empty so no +// cross-restart duplicates occur, but consumers should dedupe by env.Seq in +// case StorageWriter is ever restarted within a process lifetime. +// // Starts from the oldest available event in the ring, not the current tail. type StorageWriter struct { reader *Reader @@ -36,8 +42,8 @@ func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *Stora } // Run reads from the ring buffer and appends each envelope to storage until -// ctx is cancelled. Returns ctx.Err() on clean shutdown. Must be called at -// most once; panics on a second call. +// ctx is cancelled. Returns the context error on clean shutdown. Must be +// called at most once; panics on a second call. func (w *StorageWriter) Run(ctx context.Context) error { firstCall := false w.once.Do(func() { firstCall = true }) @@ -50,15 +56,44 @@ func (w *StorageWriter) Run(ctx context.Context) error { if err != nil { return err } - if res.Dropped > 0 { - w.log.Warn("storage writer: dropped events", "count", res.Dropped, "from_seq", res.DroppedFrom, "to_seq", res.DroppedTo) - continue + if err := w.processResult(ctx, res); err != nil { + return err } - if err := w.storage.Append(ctx, *res.Envelope); err != nil { - total := w.appendErrors.Add(1) - w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total) + } +} + +// Drain reads any events still in the ring non-blockingly until caught up or +// ctx expires. Call after all publishers have stopped and Run has returned to +// ensure no events are silently skipped on shutdown. +func (w *StorageWriter) Drain(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + w.log.Warn("storage writer: drain deadline exceeded, ring may have unread events") + return ctx.Err() + default: + } + + res, ok := w.reader.TryRead() + if !ok { + return nil } + if err := w.processResult(ctx, res); err != nil { + return err + } + } +} + +func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error { + if res.Dropped > 0 { + w.log.Warn("storage writer: dropped events", "count", res.Dropped, "from_seq", res.DroppedFrom, "to_seq", res.DroppedTo) + return nil + } + if err := w.storage.Append(ctx, *res.Envelope); err != nil { + total := w.appendErrors.Add(1) + w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total) } + return nil } // AppendErrors returns the total number of Append failures since Run started. diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go index 641a135a..8deccdf8 100644 --- a/server/lib/events/eventsstorage_writer_test.go +++ b/server/lib/events/eventsstorage_writer_test.go @@ -171,3 +171,39 @@ func TestStorageWriter_ContextCancelled(t *testing.T) { err := w.Run(ctx) assert.ErrorIs(t, err, context.Canceled) } + +// TestStorageWriter_DrainFlushesRingAfterRunExits verifies that events +// published before Drain is called are not lost even after Run has returned. +func TestStorageWriter_DrainFlushesRingAfterRunExits(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewStorageWriter(es, backend, slog.Default()) + + // Publish events before Run starts so the ring is non-empty. + for range 5 { + es.Publish(Envelope{Event: Event{Type: "pre"}}) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() + + // Let Run consume some events, then cancel. + time.Sleep(20 * time.Millisecond) + + // Publish more events that may arrive while Run is winding down. + for range 3 { + es.Publish(Envelope{Event: Event{Type: "post"}}) + } + cancel() + require.ErrorIs(t, <-done, context.Canceled) + + // Drain must flush whatever is left in the ring. + drainCtx, drainCancel := context.WithTimeout(context.Background(), time.Second) + defer drainCancel() + require.NoError(t, w.Drain(drainCtx)) + + got := backend.envelopes() + // All 8 published events must have been appended across Run + Drain. + assert.Len(t, got, 8) +} diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index 75533155..b62cbdf8 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -83,6 +83,32 @@ type Reader struct { nextSeq uint64 } +// TryRead returns the next available result without blocking. Returns +// (result, true) if data is available, (ReadResult{}, false) if the reader +// has caught up to the latest published seq. +func (r *Reader) TryRead() (ReadResult, bool) { + r.rb.mu.RLock() + defer r.rb.mu.RUnlock() + + latest := r.rb.latestSeq + oldest := r.rb.oldestSeq() + + if latest == 0 || r.nextSeq > latest { + return ReadResult{}, false + } + + if r.nextSeq < oldest { + from := r.nextSeq + dropped := oldest - r.nextSeq + r.nextSeq = oldest + return ReadResult{Dropped: dropped, DroppedFrom: from, DroppedTo: oldest - 1}, true + } + + env := r.rb.buf[r.nextSeq%r.rb.cap] + r.nextSeq++ + return ReadResult{Envelope: &env}, true +} + // Read blocks until the next envelope is available or ctx is cancelled. func (r *Reader) Read(ctx context.Context) (ReadResult, error) { for { From fc0c7db9201b3926b65e8671197d233335ba4545 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 12 May 2026 08:16:41 -0300 Subject: [PATCH 12/17] review: redact S2AccessToken in config logs, bound Close with context, return error from StorageWriter.Run double-call --- server/cmd/api/main.go | 14 +++++----- server/cmd/config/config.go | 26 +++++++++++++++++++ server/lib/events/eventsstorage.go | 16 +++++------- .../lib/events/eventsstorage_writer_test.go | 2 +- server/lib/events/s2storage.go | 4 +-- 5 files changed, 41 insertions(+), 21 deletions(-) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 46b26345..4f825edd 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -102,7 +102,7 @@ func main() { } captureSession := capturesession.NewCaptureSession(eventStream) - // Optional S2 durable storage sink. + // Optional S2 storage sink. var storageWriter *events.StorageWriter if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream) @@ -261,7 +261,9 @@ func main() { if storageWriter != nil { go func() { defer close(storageDone) - storageWriter.Run(ctx) //nolint:errcheck + if err := storageWriter.Run(ctx); err != nil && ctx.Err() == nil { + slogger.Error("storage writer failed", "err", err) + } }() } else { close(storageDone) @@ -271,9 +273,6 @@ func main() { <-ctx.Done() slogger.Info("shutdown signal received") - // Step 1: shut down all HTTP servers and stop all event publishers (cdpmonitor, - // captureSession) before draining the ring. This bounds the set of events that - // can arrive after Run exits so Drain sees a stable tail. shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) defer shutdownCancel() g, _ := errgroup.WithContext(shutdownCtx) @@ -296,8 +295,7 @@ func main() { slogger.Error("server failed to shutdown", "err", err) } - // Step 2: wait for Run to return (it exits on ctx cancellation), then drain any - // events that arrived between the last Read and HTTP shutdown, then flush S2. + // wait for Run to return, then drain any events that arrived and flush S2. <-storageDone if storageWriter != nil { drainCtx, drainCancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -305,7 +303,7 @@ func main() { if err := storageWriter.Drain(drainCtx); err != nil { slogger.Warn("storage writer drain incomplete", "err", err) } - if err := storageWriter.Close(); err != nil { + if err := storageWriter.Close(drainCtx); err != nil { slogger.Error("storage writer close failed", "err", err) } } diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index 3a18b082..c2dddced 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "log/slog" "time" "github.com/kelseyhightower/envconfig" @@ -42,6 +43,31 @@ type Config struct { S2Stream string `envconfig:"S2_STREAM" default:""` } +// LogValue implements slog.LogValuer, redacting secret fields. +func (c *Config) LogValue() slog.Value { + s2AccessToken := "" + if c.S2AccessToken != "" { + s2AccessToken = "[redacted]" + } + return slog.GroupValue( + slog.Int("port", c.Port), + slog.Int("frame_rate", c.FrameRate), + slog.Int("display_num", c.DisplayNum), + slog.Int("max_size_mb", c.MaxSizeInMB), + slog.String("output_dir", c.OutputDir), + slog.String("ffmpeg_path", c.PathToFFmpeg), + slog.Int("devtools_proxy_port", c.DevToolsProxyPort), + slog.Bool("log_cdp_messages", c.LogCDPMessages), + slog.Duration("scale_to_zero_cooldown", c.ScaleToZeroCooldown), + slog.Int("chromedriver_proxy_port", c.ChromeDriverProxyPort), + slog.String("chromedriver_upstream_addr", c.ChromeDriverUpstreamAddr), + slog.String("devtools_proxy_addr", c.DevToolsProxyAddr), + slog.String("s2_basin", c.S2Basin), + slog.String("s2_access_token", s2AccessToken), + slog.String("s2_stream", c.S2Stream), + ) +} + // Load loads configuration from environment variables func Load() (*Config, error) { var config Config diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index afda8a35..4f904048 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -2,6 +2,7 @@ package events import ( "context" + "fmt" "log/slog" "sync" "sync/atomic" @@ -11,7 +12,7 @@ import ( // Append is called serially from StorageWriter.Run and need not be thread-safe. type Storage interface { Append(ctx context.Context, env Envelope) error - Close() error + Close(ctx context.Context) error } // StorageWriter drains the ring buffer and forwards each envelope to the @@ -43,12 +44,12 @@ func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *Stora // Run reads from the ring buffer and appends each envelope to storage until // ctx is cancelled. Returns the context error on clean shutdown. Must be -// called at most once; panics on a second call. +// called at most once; returns an error on a second call. func (w *StorageWriter) Run(ctx context.Context) error { firstCall := false w.once.Do(func() { firstCall = true }) if !firstCall { - panic("events: StorageWriter.Run called more than once") + return fmt.Errorf("events: StorageWriter.Run called more than once") } for { @@ -96,12 +97,7 @@ func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error return nil } -// AppendErrors returns the total number of Append failures since Run started. -func (w *StorageWriter) AppendErrors() uint64 { - return w.appendErrors.Load() -} - // Close drains in-flight writes and releases backend resources. -func (w *StorageWriter) Close() error { - return w.storage.Close() +func (w *StorageWriter) Close(ctx context.Context) error { + return w.storage.Close(ctx) } diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go index 8deccdf8..3d343e88 100644 --- a/server/lib/events/eventsstorage_writer_test.go +++ b/server/lib/events/eventsstorage_writer_test.go @@ -30,7 +30,7 @@ func (m *mockBackend) Append(_ context.Context, env Envelope) error { return nil } -func (m *mockBackend) Close() error { return nil } +func (m *mockBackend) Close(_ context.Context) error { return nil } func (m *mockBackend) envelopes() []Envelope { m.mu.Lock() diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index 56b759e0..3c126a10 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -117,7 +117,7 @@ func (s *S2Storage) Append(_ context.Context, env Envelope) error { // Close cancels in-flight ack goroutines, waits for them to drain, then closes // the producer (which flushes the S2 batcher to the network). -func (s *S2Storage) Close() error { +func (s *S2Storage) Close(ctx context.Context) error { close(s.shutdownCh) - return s.producer.close(context.Background()) + return s.producer.close(ctx) } From 040f3616ce4ae4fc60c2c67f33c88f15d6bfda93 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 12 May 2026 08:44:41 -0300 Subject: [PATCH 13/17] review: update eventsstorage comments and test error handling --- server/e2e/e2e_s2_storage_test.go | 12 ++++---- server/lib/events/eventsstorage.go | 17 ++++------- .../lib/events/eventsstorage_writer_test.go | 30 +++++++------------ 3 files changed, 22 insertions(+), 37 deletions(-) diff --git a/server/e2e/e2e_s2_storage_test.go b/server/e2e/e2e_s2_storage_test.go index af99f857..64d8c9a7 100644 --- a/server/e2e/e2e_s2_storage_test.go +++ b/server/e2e/e2e_s2_storage_test.go @@ -56,7 +56,7 @@ func TestS2StorageWriter(t *testing.T) { checkResp, err := streamClient.CheckTail(ctx) require.NoError(t, err, "check tail before test") - startSeq := checkResp.SeqNum + startSeq := checkResp.Tail.SeqNum // Start a capture session. startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{}) @@ -81,17 +81,17 @@ func TestS2StorageWriter(t *testing.T) { // Read records written after the pre-test tail and verify at least one // envelope is present. - readSession, err := streamClient.ReadSession(ctx, &s2.ReadOptions{ + readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second) + defer readCancel() + + readSession, err := streamClient.ReadSession(readCtx, &s2.ReadOptions{ SeqNum: s2.Uint64(startSeq), }) require.NoError(t, err, "open S2 read session") defer readSession.Close() - readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second) - defer readCancel() - var count int - for readSession.Next(readCtx) { + for readSession.Next() { count++ } // EOF is expected once we reach the tail — not an error. diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index 4f904048..2ff1e85c 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -8,23 +8,16 @@ import ( "sync/atomic" ) -// Storage is the durable storage backend for browser events. -// Append is called serially from StorageWriter.Run and need not be thread-safe. type Storage interface { Append(ctx context.Context, env Envelope) error Close(ctx context.Context) error } -// StorageWriter drains the ring buffer and forwards each envelope to the -// configured Storage backend. Single-use: call Run once; it blocks until ctx -// is cancelled. After ctx is cancelled, call Drain to flush events that -// arrived before all publishers stopped, then call Close to flush the backend. -// -// Delivery is at-least-once: on process restart the ring is empty so no -// cross-restart duplicates occur, but consumers should dedupe by env.Seq in -// case StorageWriter is ever restarted within a process lifetime. -// -// Starts from the oldest available event in the ring, not the current tail. +// StorageWriter reads from the ring buffer and forwards each envelope to +// Storage. Single-use and not thread-safe: call Run once, then after +// it returns call Drain followed by Close. Reads start from the oldest +// available event in the ring, not the current tail. Delivery is +// at-least-once; consumers should dedupe by env.Seq. type StorageWriter struct { reader *Reader storage Storage diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go index 3d343e88..ec9631ad 100644 --- a/server/lib/events/eventsstorage_writer_test.go +++ b/server/lib/events/eventsstorage_writer_test.go @@ -70,11 +70,8 @@ func TestStorageWriter_NormalAppend(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - go func() { - defer close(done) - w.Run(ctx) //nolint:errcheck - }() + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() env1 := es.Publish(Envelope{Event: makeEvent("test.one")}) env2 := es.Publish(Envelope{Event: makeEvent("test.two")}) @@ -84,7 +81,7 @@ func TestStorageWriter_NormalAppend(t *testing.T) { }, time.Second, 5*time.Millisecond) cancel() - <-done + require.ErrorIs(t, <-done, context.Canceled) got := backend.envelopes() assert.Equal(t, env1.Seq, got[0].Seq) @@ -104,18 +101,15 @@ func TestStorageWriter_DroppedEvents(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - go func() { - defer close(done) - w.Run(ctx) //nolint:errcheck - }() + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() require.Eventually(t, func() bool { return len(backend.envelopes()) > 0 }, time.Second, 5*time.Millisecond) cancel() - <-done + require.ErrorIs(t, <-done, context.Canceled) // With ring capacity 4 and 8 publishes, the writer must have skipped at // least 4 events via a drop gap — so fewer than 8 envelopes landed. @@ -132,11 +126,8 @@ func TestStorageWriter_AppendError(t *testing.T) { w := NewStorageWriter(es, backend, slog.Default()) ctx, cancel := context.WithCancel(context.Background()) - done := make(chan struct{}) - go func() { - defer close(done) - w.Run(ctx) //nolint:errcheck - }() + done := make(chan error, 1) + go func() { done <- w.Run(ctx) }() // Publish an event that will fail. Wait until the writer has attempted it // (errCount > 0), then clear the error and publish a second event. The @@ -153,7 +144,7 @@ func TestStorageWriter_AppendError(t *testing.T) { }, time.Second, 5*time.Millisecond) cancel() - <-done + require.ErrorIs(t, <-done, context.Canceled) got := backend.envelopes() require.Len(t, got, 1) @@ -198,10 +189,11 @@ func TestStorageWriter_DrainFlushesRingAfterRunExits(t *testing.T) { cancel() require.ErrorIs(t, <-done, context.Canceled) - // Drain must flush whatever is left in the ring. + // Drain then Close mirrors the real shutdown sequence. drainCtx, drainCancel := context.WithTimeout(context.Background(), time.Second) defer drainCancel() require.NoError(t, w.Drain(drainCtx)) + require.NoError(t, w.Close(drainCtx)) got := backend.envelopes() // All 8 published events must have been appended across Run + Drain. From c6467b356416252e3a98c78160fec5483ca03e58 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 12 May 2026 08:49:35 -0300 Subject: [PATCH 14/17] review: remove DroppedFrom/DroppedTo fields and trim redundant comments --- server/lib/events/eventsstorage.go | 2 +- server/lib/events/ringbuffer.go | 15 +++++---------- server/lib/events/s2storage.go | 8 ++------ 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go index 2ff1e85c..73065eec 100644 --- a/server/lib/events/eventsstorage.go +++ b/server/lib/events/eventsstorage.go @@ -80,7 +80,7 @@ func (w *StorageWriter) Drain(ctx context.Context) error { func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error { if res.Dropped > 0 { - w.log.Warn("storage writer: dropped events", "count", res.Dropped, "from_seq", res.DroppedFrom, "to_seq", res.DroppedTo) + w.log.Warn("storage writer: dropped events", "count", res.Dropped) return nil } if err := w.storage.Append(ctx, *res.Envelope); err != nil { diff --git a/server/lib/events/ringbuffer.go b/server/lib/events/ringbuffer.go index b62cbdf8..ee874c3f 100644 --- a/server/lib/events/ringbuffer.go +++ b/server/lib/events/ringbuffer.go @@ -68,13 +68,10 @@ func (rb *ringBuffer) newReader(afterSeq uint64) *Reader { // ReadResult is returned by Reader.Read. Exactly one of Envelope or Dropped is // set: Envelope is non-nil for a normal read, Dropped is non-zero when the -// reader fell behind and events were lost. When Dropped > 0, DroppedFrom and -// DroppedTo are the inclusive seq range of the dropped events. +// reader fell behind and events were lost. type ReadResult struct { - Envelope *Envelope - Dropped uint64 - DroppedFrom uint64 // first seq of the dropped range (only valid when Dropped > 0) - DroppedTo uint64 // last seq of the dropped range (only valid when Dropped > 0) + Envelope *Envelope + Dropped uint64 } // Reader tracks an independent read position in a ringBuffer. @@ -98,10 +95,9 @@ func (r *Reader) TryRead() (ReadResult, bool) { } if r.nextSeq < oldest { - from := r.nextSeq dropped := oldest - r.nextSeq r.nextSeq = oldest - return ReadResult{Dropped: dropped, DroppedFrom: from, DroppedTo: oldest - 1}, true + return ReadResult{Dropped: dropped}, true } env := r.rb.buf[r.nextSeq%r.rb.cap] @@ -131,11 +127,10 @@ func (r *Reader) Read(ctx context.Context) (ReadResult, error) { } if r.nextSeq < oldest { - from := r.nextSeq dropped := oldest - r.nextSeq r.nextSeq = oldest r.rb.mu.RUnlock() - return ReadResult{Dropped: dropped, DroppedFrom: from, DroppedTo: oldest - 1}, nil + return ReadResult{Dropped: dropped}, nil } if r.nextSeq <= latest { diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index 3c126a10..c9dc0e98 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -11,7 +11,6 @@ import ( "github.com/s2-streamstore/s2-sdk-go/s2" ) -// S2Config holds batcher tuning parameters for the S2 backend. type S2Config struct { // BatcherLinger is how long the batcher waits before flushing (default: 100ms). BatcherLinger time.Duration @@ -38,16 +37,14 @@ func (sp *s2Producer) close(ctx context.Context) error { return sp.p.Close() } -// S2Storage is a Storage backed by S2. All events are appended to a single -// fixed stream whose name is provided at construction time. +// S2Storage appends all events to a single fixed stream set at construction time. type S2Storage struct { producer s2Producer shutdownCh chan struct{} // closed when Close is called, bounds ack goroutine contexts log *slog.Logger } -// NewS2Storage creates an S2Storage that appends to the given stream within basin. -// ctx is used for AppendSession creation and should be the process lifetime context. +// ctx is used for AppendSession creation and must be the process lifetime context. func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config, log *slog.Logger) (*S2Storage, error) { if basin == "" || accessToken == "" || streamName == "" { return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required") @@ -74,7 +71,6 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf }, nil } -// Append marshals env to JSON and submits it to the S2 producer. func (s *S2Storage) Append(_ context.Context, env Envelope) error { data, err := json.Marshal(env) if err != nil { From ab5d8ba542ff14d6c74dc78f12eaae22065459c9 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 12 May 2026 09:22:11 -0300 Subject: [PATCH 15/17] fix: pass context.Background() to NewS2Storage so S2 pipeline outlives signal cancellation --- server/cmd/api/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 4f825edd..a2e1674b 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -106,7 +106,7 @@ func main() { var storageWriter *events.StorageWriter if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream) - s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}, slogger) + s2stor, err := events.NewS2Storage(context.Background(), config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}, slogger) if err != nil { slogger.Error("failed to create S2 storage", "err", err) os.Exit(1) From 42f1c47aefea1e7c42b2eddcd30c39214ebd57c6 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 12 May 2026 09:45:21 -0300 Subject: [PATCH 16/17] fix: apply S2Config documented defaults when zero values are passed --- server/lib/events/s2storage.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index c9dc0e98..3d75efd5 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -58,6 +58,12 @@ func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cf return nil, fmt.Errorf("s2storage: open append session: %w", err) } + if cfg.BatcherLinger == 0 { + cfg.BatcherLinger = 100 * time.Millisecond + } + if cfg.BatcherMaxRecords == 0 { + cfg.BatcherMaxRecords = 50 + } batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{ Linger: cfg.BatcherLinger, MaxRecords: cfg.BatcherMaxRecords, From 3f9550faef39305e392168e01ac102fe12f774f6 Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Tue, 12 May 2026 10:17:03 -0300 Subject: [PATCH 17/17] fix: always call p.Close() in s2Producer.close even when ack drain times out --- server/lib/events/s2storage.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go index 3d75efd5..28a3c0d7 100644 --- a/server/lib/events/s2storage.go +++ b/server/lib/events/s2storage.go @@ -3,6 +3,7 @@ package events import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "sync" @@ -29,12 +30,13 @@ func (sp *s2Producer) close(ctx context.Context) error { sp.wg.Wait() close(done) }() + var drainErr error select { case <-done: case <-ctx.Done(): - return ctx.Err() + drainErr = ctx.Err() } - return sp.p.Close() + return errors.Join(drainErr, sp.p.Close()) } // S2Storage appends all events to a single fixed stream set at construction time.