Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions images/chromium-headful/run-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
fi

# S2 durable event storage (all three must be set to enable the sink)
if [[ -n "${S2_BASIN:-}" ]]; then
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
fi
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
fi
if [[ -n "${S2_STREAM:-}" ]]; then
RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" )
fi

# WebRTC port mapping
if [[ "${ENABLE_WEBRTC:-}" == "true" ]]; then
echo "Running container with WebRTC"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api'
autostart=false
autorestart=true
startsecs=2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api'
autostart=false
autorestart=true
startsecs=2
Expand Down
11 changes: 11 additions & 0 deletions images/chromium-headless/run-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
fi

# S2 durable event storage (all three must be set to enable the sink)
if [[ -n "${S2_BASIN:-}" ]]; then
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
fi
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
fi
if [[ -n "${S2_STREAM:-}" ]]; then
RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" )
fi

# If a positional argument is given, use it as the entrypoint
ENTRYPOINT_ARG=()
if [[ $# -ge 1 && -n "$1" ]]; then
Expand Down
32 changes: 32 additions & 0 deletions server/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ func main() {
}
captureSession := capturesession.NewCaptureSession(eventStream)

// Optional S2 durable storage sink.
var storageWriter *events.StorageWriter
if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" {
slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream)
s2stor, err := events.NewS2Storage(ctx, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{})
if err != nil {
slogger.Error("failed to create S2 storage", "err", err)
os.Exit(1)
}
storageWriter = events.NewStorageWriter(eventStream, s2stor)
}

apiService, err := api.New(
recorder.NewFFmpegManager(),
recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz),
Expand Down Expand Up @@ -244,10 +256,30 @@ func main() {
}
}()

// Start S2 storage writer goroutine. storageDone is closed when Run returns.
storageDone := make(chan struct{})
if storageWriter != nil {
go func() {
defer close(storageDone)
storageWriter.Run(ctx) //nolint:errcheck
}()
} else {
close(storageDone)
}

// graceful shutdown
<-ctx.Done()
slogger.Info("shutdown signal received")

// Wait for the storage writer to drain from the ring, then flush S2 before
// shutting down the HTTP servers and closing the capture session.
<-storageDone
if storageWriter != nil {
if err := storageWriter.Close(); err != nil {
slogger.Error("storage writer close failed", "err", err)
}
}

shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
g, _ := errgroup.WithContext(shutdownCtx)
Expand Down
5 changes: 5 additions & 0 deletions server/cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type Config struct {
// DevTools proxy address passed to ChromeDriver as goog:chromeOptions.debuggerAddress.
// If empty, it is derived from DevToolsProxyPort as 127.0.0.1:<port>.
DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""`

// S2 durable event storage. All three fields must be set to enable the S2 sink.
S2Basin string `envconfig:"S2_BASIN" default:""`
S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""`
S2Stream string `envconfig:"S2_STREAM" default:""`
}

// Load loads configuration from environment variables
Expand Down
104 changes: 104 additions & 0 deletions server/e2e/e2e_s2_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package e2e

import (
"context"
"net/http"
"os"
"os/exec"
"testing"
"time"

"github.com/s2-streamstore/s2-sdk-go/s2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

instanceoapi "github.com/kernel/kernel-images/server/lib/oapi"
)

// TestS2StorageWriter starts a headless container with S2 credentials, runs a
// capture session, and verifies that events land in the configured S2 stream.
//
// Skips automatically when S2_BASIN, S2_ACCESS_TOKEN, or S2_STREAM are unset.
func TestS2StorageWriter(t *testing.T) {
basin := os.Getenv("S2_BASIN")
accessToken := os.Getenv("S2_ACCESS_TOKEN")
stream := os.Getenv("S2_STREAM")
if basin == "" || accessToken == "" || stream == "" {
t.Skip("S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM must be set to run this test")
}

if _, err := exec.LookPath("docker"); err != nil {
t.Skipf("docker not available: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

c := NewTestContainer(t, headlessImage)
require.NoError(t, c.Start(ctx, ContainerConfig{
Env: map[string]string{
"S2_BASIN": basin,
"S2_ACCESS_TOKEN": accessToken,
"S2_STREAM": stream,
},
}), "failed to start container")
defer c.Stop(ctx)

require.NoError(t, c.WaitReady(ctx), "api not ready")

client, err := c.APIClient()
require.NoError(t, err)

// Note the current S2 stream tail seq before we write anything so we only
// read records produced by this test run.
s2Client := s2.New(accessToken, nil)
streamClient := s2Client.Basin(basin).Stream(s2.StreamName(stream))

checkResp, err := streamClient.CheckTail(ctx)
require.NoError(t, err, "check tail before test")
startSeq := checkResp.SeqNum

// Start a capture session.
startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{})
require.NoError(t, err)
require.Equal(t, http.StatusCreated, startResp.StatusCode(), "start capture session: %s", string(startResp.Body))
require.NotNil(t, startResp.JSON201)
sessionID := startResp.JSON201.Id
t.Logf("capture session started: %s", sessionID)

// Let the session run briefly so at least one event is published (the
// session_started system event is emitted on session start).
time.Sleep(500 * time.Millisecond)

// Stop the capture session.
stopResp, err := client.StopCaptureSessionWithResponse(ctx)
require.NoError(t, err)
require.Equal(t, http.StatusOK, stopResp.StatusCode(), "stop capture session: %s", string(stopResp.Body))
t.Log("capture session stopped")

// Give the storage writer time to flush to S2 (batcher linger + network).
time.Sleep(2 * time.Second)

// Read records written after the pre-test tail and verify at least one
// envelope is present.
readSession, err := streamClient.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(startSeq),
})
require.NoError(t, err, "open S2 read session")
defer readSession.Close()

readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second)
defer readCancel()

var count int
for readSession.Next(readCtx) {
count++
}
// EOF is expected once we reach the tail — not an error.
if err := readSession.Err(); err != nil && readCtx.Err() == nil {
t.Fatalf("S2 read session error: %v", err)
}

assert.Greater(t, count, 0, "expected at least one event record in S2 stream %q", stream)
t.Logf("found %d record(s) in S2 stream after seq %d", count, startSeq)
}
2 changes: 2 additions & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866
github.com/nrednav/cuid2 v1.1.0
github.com/oapi-codegen/runtime v1.2.0
github.com/s2-streamstore/s2-sdk-go v0.16.1
github.com/samber/lo v1.52.0
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.40.0
Expand Down Expand Up @@ -99,6 +100,7 @@ require (
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/mod v0.28.0 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/tools v0.37.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/s2-streamstore/s2-sdk-go v0.16.1 h1:18Qht850wUhIb9JZkMwF5EJWfnmZnjdtW3z8xOuL7Ys=
github.com/s2-streamstore/s2-sdk-go v0.16.1/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo=
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
Expand Down
53 changes: 53 additions & 0 deletions server/lib/events/eventsstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package events

import (
"context"
"log/slog"
)

// Storage is the durable storage backend for browser events.
// Append is called serially from StorageWriter.Run and need not be thread-safe.
type Storage interface {
Append(ctx context.Context, env Envelope) error
Close() error
}

// StorageWriter drains the ring buffer and forwards each envelope to the
// configured Storage backend. Single-use: call Run once; it blocks until ctx
// is cancelled. Call Close after Run returns to flush in-flight writes.
// Starts from the oldest available event in the ring, not the current tail.
type StorageWriter struct {
reader *Reader
storage Storage
}

// NewStorageWriter creates a writer that reads from es starting at seq 0.
func NewStorageWriter(es *EventStream, storage Storage) *StorageWriter {
return &StorageWriter{
reader: es.NewReader(0),
storage: storage,
}
}

// Run reads from the ring buffer and appends each envelope to storage until
// ctx is cancelled. Returns ctx.Err() on clean shutdown.
func (w *StorageWriter) Run(ctx context.Context) error {
for {
res, err := w.reader.Read(ctx)
if err != nil {
return ctx.Err()
}
if res.Dropped > 0 {
slog.Warn("storage writer: dropped events", "count", res.Dropped)
continue
}
if err := w.storage.Append(ctx, *res.Envelope); err != nil {
slog.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err)
}
}
}

// Close drains in-flight writes and releases backend resources.
func (w *StorageWriter) Close() error {
return w.storage.Close()
}
Loading
Loading