diff --git a/images/chromium-headful/run-docker.sh b/images/chromium-headful/run-docker.sh index aad41331..0c1255d7 100755 --- a/images/chromium-headful/run-docker.sh +++ b/images/chromium-headful/run-docker.sh @@ -71,6 +71,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" ) fi +# S2 durable event storage (all three must be set to enable the sink) +if [[ -n "${S2_BASIN:-}" ]]; then + RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" ) +fi +if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then + RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" ) +fi +if [[ -n "${S2_STREAM:-}" ]]; then + RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" ) +fi + # WebRTC port mapping if [[ "${ENABLE_WEBRTC:-}" == "true" ]]; then echo "Running container with WebRTC" diff --git a/images/chromium-headful/supervisor/services/kernel-images-api.conf b/images/chromium-headful/supervisor/services/kernel-images-api.conf index e57d30a8..f1195bb1 100644 --- a/images/chromium-headful/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headful/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=2 diff --git a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf index e57d30a8..f1195bb1 100644 --- a/images/chromium-headless/image/supervisor/services/kernel-images-api.conf +++ b/images/chromium-headless/image/supervisor/services/kernel-images-api.conf @@ -1,5 +1,5 @@ [program:kernel-images-api] -command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api' +command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api' autostart=false autorestart=true startsecs=2 diff --git a/images/chromium-headless/run-docker.sh b/images/chromium-headless/run-docker.sh index 56f582bf..4a670748 100755 --- a/images/chromium-headless/run-docker.sh +++ b/images/chromium-headless/run-docker.sh @@ -24,6 +24,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" ) fi +# S2 durable event storage (all three must be set to enable the sink) +if [[ -n "${S2_BASIN:-}" ]]; then + RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" ) +fi +if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then + RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" ) +fi +if [[ -n "${S2_STREAM:-}" ]]; then + RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" ) +fi + # If a positional argument is given, use it as the entrypoint ENTRYPOINT_ARG=() if [[ $# -ge 1 && -n "$1" ]]; then diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index 48d17351..d6806de6 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -102,6 +102,18 @@ func main() { } captureSession := capturesession.NewCaptureSession(eventStream) + // Optional S2 durable storage sink. + var storageWriter *events.StorageWriter + if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" { + slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream) + s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}) + if err != nil { + slogger.Error("failed to create S2 storage", "err", err) + os.Exit(1) + } + storageWriter = events.NewStorageWriter(eventStream, s2stor) + } + apiService, err := api.New( recorder.NewFFmpegManager(), recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz), @@ -244,10 +256,30 @@ func main() { } }() + // Start S2 storage writer goroutine. storageDone is closed when Run returns. + storageDone := make(chan struct{}) + if storageWriter != nil { + go func() { + defer close(storageDone) + storageWriter.Run(ctx) //nolint:errcheck + }() + } else { + close(storageDone) + } + // graceful shutdown <-ctx.Done() slogger.Info("shutdown signal received") + // Wait for the storage writer to drain from the ring, then flush S2 before + // shutting down the HTTP servers and closing the capture session. + <-storageDone + if storageWriter != nil { + if err := storageWriter.Close(); err != nil { + slogger.Error("storage writer close failed", "err", err) + } + } + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) defer shutdownCancel() g, _ := errgroup.WithContext(shutdownCtx) diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index 2fdd4bdb..3a18b082 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -35,6 +35,11 @@ type Config struct { // DevTools proxy address passed to ChromeDriver as goog:chromeOptions.debuggerAddress. // If empty, it is derived from DevToolsProxyPort as 127.0.0.1:. DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""` + + // S2 durable event storage. All three fields must be set to enable the S2 sink. + S2Basin string `envconfig:"S2_BASIN" default:""` + S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""` + S2Stream string `envconfig:"S2_STREAM" default:""` } // Load loads configuration from environment variables diff --git a/server/e2e/e2e_s2_storage_test.go b/server/e2e/e2e_s2_storage_test.go new file mode 100644 index 00000000..af99f857 --- /dev/null +++ b/server/e2e/e2e_s2_storage_test.go @@ -0,0 +1,104 @@ +package e2e + +import ( + "context" + "net/http" + "os" + "os/exec" + "testing" + "time" + + "github.com/s2-streamstore/s2-sdk-go/s2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + instanceoapi "github.com/kernel/kernel-images/server/lib/oapi" +) + +// TestS2StorageWriter starts a headless container with S2 credentials, runs a +// capture session, and verifies that events land in the configured S2 stream. +// +// Skips automatically when S2_BASIN, S2_ACCESS_TOKEN, or S2_STREAM are unset. +func TestS2StorageWriter(t *testing.T) { + basin := os.Getenv("S2_BASIN") + accessToken := os.Getenv("S2_ACCESS_TOKEN") + stream := os.Getenv("S2_STREAM") + if basin == "" || accessToken == "" || stream == "" { + t.Skip("S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM must be set to run this test") + } + + if _, err := exec.LookPath("docker"); err != nil { + t.Skipf("docker not available: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + c := NewTestContainer(t, headlessImage) + require.NoError(t, c.Start(ctx, ContainerConfig{ + Env: map[string]string{ + "S2_BASIN": basin, + "S2_ACCESS_TOKEN": accessToken, + "S2_STREAM": stream, + }, + }), "failed to start container") + defer c.Stop(ctx) + + require.NoError(t, c.WaitReady(ctx), "api not ready") + + client, err := c.APIClient() + require.NoError(t, err) + + // Note the current S2 stream tail seq before we write anything so we only + // read records produced by this test run. + s2Client := s2.New(accessToken, nil) + streamClient := s2Client.Basin(basin).Stream(s2.StreamName(stream)) + + checkResp, err := streamClient.CheckTail(ctx) + require.NoError(t, err, "check tail before test") + startSeq := checkResp.SeqNum + + // Start a capture session. + startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{}) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, startResp.StatusCode(), "start capture session: %s", string(startResp.Body)) + require.NotNil(t, startResp.JSON201) + sessionID := startResp.JSON201.Id + t.Logf("capture session started: %s", sessionID) + + // Let the session run briefly so at least one event is published (the + // session_started system event is emitted on session start). + time.Sleep(500 * time.Millisecond) + + // Stop the capture session. + stopResp, err := client.StopCaptureSessionWithResponse(ctx) + require.NoError(t, err) + require.Equal(t, http.StatusOK, stopResp.StatusCode(), "stop capture session: %s", string(stopResp.Body)) + t.Log("capture session stopped") + + // Give the storage writer time to flush to S2 (batcher linger + network). + time.Sleep(2 * time.Second) + + // Read records written after the pre-test tail and verify at least one + // envelope is present. + readSession, err := streamClient.ReadSession(ctx, &s2.ReadOptions{ + SeqNum: s2.Uint64(startSeq), + }) + require.NoError(t, err, "open S2 read session") + defer readSession.Close() + + readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second) + defer readCancel() + + var count int + for readSession.Next(readCtx) { + count++ + } + // EOF is expected once we reach the tail — not an error. + if err := readSession.Err(); err != nil && readCtx.Err() == nil { + t.Fatalf("S2 read session error: %v", err) + } + + assert.Greater(t, count, 0, "expected at least one event record in S2 stream %q", stream) + t.Logf("found %d record(s) in S2 stream after seq %d", count, startSeq) +} diff --git a/server/go.mod b/server/go.mod index 5b2d7ed8..1bdae079 100644 --- a/server/go.mod +++ b/server/go.mod @@ -21,6 +21,7 @@ require ( github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866 github.com/nrednav/cuid2 v1.1.0 github.com/oapi-codegen/runtime v1.2.0 + github.com/s2-streamstore/s2-sdk-go v0.16.1 github.com/samber/lo v1.52.0 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 @@ -99,6 +100,7 @@ require ( go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/mod v0.28.0 // indirect + golang.org/x/net v0.45.0 // indirect golang.org/x/text v0.30.0 // indirect golang.org/x/tools v0.37.0 // indirect google.golang.org/protobuf v1.36.10 // indirect diff --git a/server/go.sum b/server/go.sum index 0f5296e8..b26c03c9 100644 --- a/server/go.sum +++ b/server/go.sum @@ -198,6 +198,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/s2-streamstore/s2-sdk-go v0.16.1 h1:18Qht850wUhIb9JZkMwF5EJWfnmZnjdtW3z8xOuL7Ys= +github.com/s2-streamstore/s2-sdk-go v0.16.1/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo= github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= diff --git a/server/lib/events/eventsstorage.go b/server/lib/events/eventsstorage.go new file mode 100644 index 00000000..02ecb735 --- /dev/null +++ b/server/lib/events/eventsstorage.go @@ -0,0 +1,53 @@ +package events + +import ( + "context" + "log/slog" +) + +// Storage is the durable storage backend for browser events. +// Append is called serially from StorageWriter.Run and need not be thread-safe. +type Storage interface { + Append(ctx context.Context, env Envelope) error + Close() error +} + +// StorageWriter drains the ring buffer and forwards each envelope to the +// configured Storage backend. Single-use: call Run once; it blocks until ctx +// is cancelled. Call Close after Run returns to flush in-flight writes. +// Starts from the oldest available event in the ring, not the current tail. +type StorageWriter struct { + reader *Reader + storage Storage +} + +// NewStorageWriter creates a writer that reads from es starting at seq 0. +func NewStorageWriter(es *EventStream, storage Storage) *StorageWriter { + return &StorageWriter{ + reader: es.NewReader(0), + storage: storage, + } +} + +// Run reads from the ring buffer and appends each envelope to storage until +// ctx is cancelled. Returns ctx.Err() on clean shutdown. +func (w *StorageWriter) Run(ctx context.Context) error { + for { + res, err := w.reader.Read(ctx) + if err != nil { + return ctx.Err() + } + if res.Dropped > 0 { + slog.Warn("storage writer: dropped events", "count", res.Dropped) + continue + } + if err := w.storage.Append(ctx, *res.Envelope); err != nil { + slog.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err) + } + } +} + +// Close drains in-flight writes and releases backend resources. +func (w *StorageWriter) Close() error { + return w.storage.Close() +} diff --git a/server/lib/events/eventsstorage_writer_test.go b/server/lib/events/eventsstorage_writer_test.go new file mode 100644 index 00000000..f6b5200b --- /dev/null +++ b/server/lib/events/eventsstorage_writer_test.go @@ -0,0 +1,172 @@ +package events + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockBackend struct { + mu sync.Mutex + appended []Envelope + err error + errCount int +} + +func (m *mockBackend) Append(_ context.Context, env Envelope) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.err != nil { + m.errCount++ + return m.err + } + m.appended = append(m.appended, env) + return nil +} + +func (m *mockBackend) Close() error { return nil } + +func (m *mockBackend) envelopes() []Envelope { + m.mu.Lock() + defer m.mu.Unlock() + out := make([]Envelope, len(m.appended)) + copy(out, m.appended) + return out +} + +func (m *mockBackend) errors() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.errCount +} + +func (m *mockBackend) clearErr() { + m.mu.Lock() + defer m.mu.Unlock() + m.err = nil +} + +func newTestStream(t *testing.T, capacity int) *EventStream { + t.Helper() + es, err := NewEventStream(EventStreamConfig{RingCapacity: capacity}) + require.NoError(t, err) + return es +} + +func makeEvent(typ string) Event { + return Event{Type: typ, Category: CategorySystem} +} + +func TestStorageWriter_NormalAppend(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewStorageWriter(es, backend) + + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + go func() { + defer close(done) + w.Run(ctx) //nolint:errcheck + }() + + env1 := es.Publish(Envelope{Event: makeEvent("test.one")}) + env2 := es.Publish(Envelope{Event: makeEvent("test.two")}) + + require.Eventually(t, func() bool { + return len(backend.envelopes()) == 2 + }, time.Second, 5*time.Millisecond) + + cancel() + <-done + + got := backend.envelopes() + assert.Equal(t, env1.Seq, got[0].Seq) + assert.Equal(t, "test.one", got[0].Event.Type) + assert.Equal(t, env2.Seq, got[1].Seq) + assert.Equal(t, "test.two", got[1].Event.Type) +} + +func TestStorageWriter_DroppedEvents(t *testing.T) { + es := newTestStream(t, 4) + backend := &mockBackend{} + w := NewStorageWriter(es, backend) + + // Publish 8 events before the writer starts — fills and wraps the ring. + for i := range 8 { + es.Publish(Envelope{Event: makeEvent("drop.test." + string(rune('a'+i)))}) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + w.Run(ctx) //nolint:errcheck + }() + + require.Eventually(t, func() bool { + return len(backend.envelopes()) > 0 + }, time.Second, 5*time.Millisecond) + + cancel() + <-done + + // With ring capacity 4 and 8 publishes, the writer must have skipped at + // least 4 events via a drop gap — so fewer than 8 envelopes landed. + got := backend.envelopes() + assert.Less(t, len(got), 8, "expected fewer than 8 envelopes due to ring overflow") + for _, env := range got { + assert.NotEmpty(t, env.Event.Type) + } +} + +func TestStorageWriter_AppendError(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{err: errors.New("storage unavailable")} + w := NewStorageWriter(es, backend) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + defer close(done) + w.Run(ctx) //nolint:errcheck + }() + + // Publish an event that will fail. Wait until the writer has attempted it + // (errCount > 0), then clear the error and publish a second event. The + // writer must continue past the error and deliver the second event. + es.Publish(Envelope{Event: makeEvent("will.fail")}) + require.Eventually(t, func() bool { + return backend.errors() > 0 + }, time.Second, 5*time.Millisecond) + + backend.clearErr() + es.Publish(Envelope{Event: makeEvent("will.succeed")}) + require.Eventually(t, func() bool { + return len(backend.envelopes()) == 1 + }, time.Second, 5*time.Millisecond) + + cancel() + <-done + + got := backend.envelopes() + require.Len(t, got, 1) + assert.Equal(t, "will.succeed", got[0].Event.Type) +} + +func TestStorageWriter_ContextCancelled(t *testing.T) { + es := newTestStream(t, 64) + backend := &mockBackend{} + w := NewStorageWriter(es, backend) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := w.Run(ctx) + assert.ErrorIs(t, err, context.Canceled) +} diff --git a/server/lib/events/s2storage.go b/server/lib/events/s2storage.go new file mode 100644 index 00000000..bcef98b5 --- /dev/null +++ b/server/lib/events/s2storage.go @@ -0,0 +1,121 @@ +package events + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/s2-streamstore/s2-sdk-go/s2" +) + +// S2Config holds batcher tuning parameters for the S2 backend. +type S2Config struct { + // BatcherLinger is how long the batcher waits before flushing (default: 100ms). + BatcherLinger time.Duration + // BatcherMaxRecords is the max records per batch (default: 50). + BatcherMaxRecords int +} + +type s2Producer struct { + p *s2.Producer + wg sync.WaitGroup +} + +func (sp *s2Producer) close(ctx context.Context) error { + done := make(chan struct{}) + go func() { + sp.wg.Wait() + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } + return sp.p.Close() +} + +// S2Storage is a Storage backed by S2. All events are appended to a single +// fixed stream whose name is provided at construction time. +type S2Storage struct { + producer s2Producer + shutdownCh chan struct{} // closed when Close is called, bounds ack goroutine contexts +} + +// NewS2Storage creates an S2Storage that appends to the given stream within basin. +// ctx is used for AppendSession creation and should be the process lifetime context. +func NewS2Storage(ctx context.Context, basin, accessToken, streamName string, cfg S2Config) (*S2Storage, error) { + if basin == "" || accessToken == "" || streamName == "" { + return nil, fmt.Errorf("s2storage: basin, accessToken, and streamName are required") + } + + client := s2.New(accessToken, nil) + stream := client.Basin(basin).Stream(s2.StreamName(streamName)) + + session, err := stream.AppendSession(ctx, nil) + if err != nil { + return nil, fmt.Errorf("s2storage: open append session: %w", err) + } + + batcher := s2.NewBatcher(ctx, &s2.BatchingOptions{ + Linger: cfg.BatcherLinger, + MaxRecords: cfg.BatcherMaxRecords, + }) + producer := s2.NewProducer(ctx, batcher, session) + + return &S2Storage{ + producer: s2Producer{p: producer}, + shutdownCh: make(chan struct{}), + }, nil +} + +// Append marshals env to JSON and submits it to the S2 producer. +func (s *S2Storage) Append(_ context.Context, env Envelope) error { + data, err := json.Marshal(env) + if err != nil { + return fmt.Errorf("s2storage: marshal envelope seq=%d: %w", env.Seq, err) + } + + future, err := s.producer.p.Submit(s2.AppendRecord{Body: data}) + if err != nil { + return fmt.Errorf("s2storage: submit seq=%d: %w", env.Seq, err) + } + + s.producer.wg.Add(1) + go func() { + defer s.producer.wg.Done() + ackCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-s.shutdownCh: + cancel() + case <-ackCtx.Done(): + } + }() + + ticket, err := future.Wait(ackCtx) + if err != nil { + slog.Error("s2storage: wait for submit failed", "seq", env.Seq, "err", err) + return + } + if ticket == nil { + return + } + if _, err := ticket.Ack(ackCtx); err != nil { + slog.Error("s2storage: ack failed", "seq", env.Seq, "err", err) + } + }() + + return nil +} + +// Close cancels in-flight ack goroutines, waits for them to drain, then closes +// the producer (which flushes the S2 batcher to the network). +func (s *S2Storage) Close() error { + close(s.shutdownCh) + return s.producer.close(context.Background()) +}