Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 103 additions & 17 deletions components/backend/websocket/agui_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,11 @@ const (
)

// loadEvents reads AG-UI events for a session from the JSONL log.
// For files larger than replayMaxTailBytes, only the tail is read to
// keep reconnect latency bounded (129ms at 1M events vs 9.7s full scan).
// For files larger than replayMaxTailBytes, a head+tail strategy is used:
// the head is scanned for snapshot/lifecycle events (MESSAGES_SNAPSHOT,
// STATE_SNAPSHOT, RUN_STARTED, RUN_FINISHED, RUN_ERROR) while the tail
// provides recent streaming events. This keeps reconnect latency bounded
// while ensuring the frontend always has complete conversation history.
// Automatically triggers legacy migration if the log doesn't exist but
// a pre-AG-UI messages.jsonl file does.
func loadEvents(sessionID string) []map[string]interface{} {
Expand Down Expand Up @@ -265,30 +268,26 @@ func loadEvents(sessionID string) []map[string]interface{} {
return scanJSONL(f)
}

// Large file — seek to tail to bound reconnect latency.
log.Printf("AGUI Store: large event log for %s (%.1f MB), reading tail only", sessionID, float64(fileSize)/(1024*1024))
offset := fileSize - replayMaxTailBytes
if _, err := f.Seek(offset, 0); err != nil {
log.Printf("AGUI Store: large event log for %s (%.1f MB), using head+tail read", sessionID, float64(fileSize)/(1024*1024))

headEvents := scanHeadSnapshotEvents(f)

tailOffset := fileSize - replayMaxTailBytes
if _, err := f.Seek(tailOffset, 0); err != nil {
log.Printf("AGUI Store: seek failed for %s: %v, falling back to full read", sessionID, err)
events, _ := readJSONLFile(path)
return events
}

// Read a single byte at the seek position to check if we landed on a
// record boundary ('\n' or start-of-file). If so, the next scanner
// line is a complete record and should not be skipped.
var boundary [1]byte
onBoundary := false
if offset == 0 {
if tailOffset == 0 {
onBoundary = true
} else if n, err := f.Read(boundary[:]); err == nil && n == 1 && boundary[0] == '\n' {
onBoundary = true
}
// If we read one byte that wasn't '\n', we're mid-record — the
// scanner will pick up from this position and the first line will
// be partial (skip it below).

var events []map[string]interface{}
var tailEvents []map[string]interface{}
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize)
skipFirst := !onBoundary
Expand All @@ -297,7 +296,6 @@ func loadEvents(sessionID string) []map[string]interface{} {
if len(line) == 0 {
continue
}
// Skip the first line only if the seek landed mid-record
if skipFirst {
skipFirst = false
continue
Expand All @@ -307,12 +305,38 @@ func loadEvents(sessionID string) []map[string]interface{} {
log.Printf("AGUI Store: skipping malformed JSON line in tail scan: %v", err)
continue
}
events = append(events, evt)
tailEvents = append(tailEvents, evt)
}
if err := scanner.Err(); err != nil {
log.Printf("AGUI Store: tail scan error for %s: %v", sessionID, err)
}
return events

if len(headEvents) == 0 {
return tailEvents
}

headTimestamps := make(map[string]bool, len(headEvents))
for _, evt := range headEvents {
if ts, ok := evt["timestamp"].(string); ok {
if evtType, _ := evt["type"].(string); evtType != "" {
headTimestamps[evtType+"|"+ts] = true
}
}
}

merged := make([]map[string]interface{}, 0, len(headEvents)+len(tailEvents))
merged = append(merged, headEvents...)
for _, evt := range tailEvents {
ts, _ := evt["timestamp"].(string)
evtType, _ := evt["type"].(string)
if ts != "" && evtType != "" && headTimestamps[evtType+"|"+ts] {
continue
}
merged = append(merged, evt)
}

log.Printf("AGUI Store: head+tail merge for %s: %d head + %d tail = %d total events", sessionID, len(headEvents), len(tailEvents), len(merged))
return merged
}

// scanJSONL reads all JSONL events from an already-open file handle.
Expand All @@ -338,6 +362,68 @@ func scanJSONL(f *os.File) []map[string]interface{} {
return events
}

var headScanEventTypes = map[string]bool{
types.EventTypeMessagesSnapshot: true,
types.EventTypeStateSnapshot: true,
types.EventTypeRunStarted: true,
types.EventTypeRunFinished: true,
types.EventTypeRunError: true,
}

func scanHeadSnapshotEvents(f *os.File) []map[string]interface{} {
if _, err := f.Seek(0, 0); err != nil {
return nil
}

reader := bufio.NewReaderSize(f, scannerInitialBufferSize)
var result []map[string]interface{}
var bytesRead int64

for bytesRead < replayMaxTailBytes {
line, err := reader.ReadBytes('\n')
bytesRead += int64(len(line))

line = bytes.TrimSpace(line)
if len(line) == 0 {
if err != nil {
break
}
continue
}

evtType := fastExtractType(line)
if headScanEventTypes[evtType] {
var evt map[string]interface{}
if jsonErr := json.Unmarshal(line, &evt); jsonErr == nil {
result = append(result, evt)
}
}

if err != nil {
break
}
}
return result
}

func fastExtractType(line []byte) string {
idx := bytes.Index(line, []byte(`"type"`))
if idx < 0 {
return ""
}
rest := line[idx+6:]
start := bytes.IndexByte(rest, '"')
if start < 0 {
return ""
}
rest = rest[start+1:]
end := bytes.IndexByte(rest, '"')
if end < 0 {
return ""
}
return string(rest[:end])
}

// DeriveAgentStatus reads a session's event log and returns the agent
// status derived from the last significant events.
//
Expand Down
214 changes: 214 additions & 0 deletions components/backend/websocket/agui_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package websocket
import (
"ambient-code-backend/types"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
)

Expand Down Expand Up @@ -449,3 +451,215 @@ func TestLoadEventsForReplay(t *testing.T) {
}
})
}

func TestFastExtractType(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{"standard event", `{"type":"RUN_STARTED","runId":"r1"}`, "RUN_STARTED"},
{"type not first field", `{"runId":"r1","type":"RUN_FINISHED","ts":123}`, "RUN_FINISHED"},
{"messages snapshot", `{"type":"MESSAGES_SNAPSHOT","messages":[]}`, "MESSAGES_SNAPSHOT"},
{"no type field", `{"runId":"r1","data":"hello"}`, ""},
{"empty object", `{}`, ""},
{"empty string", ``, ""},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := fastExtractType([]byte(tt.input))
if result != tt.expected {
t.Errorf("fastExtractType(%q) = %q, want %q", tt.input, result, tt.expected)
}
})
}
}

func writeLargeEventFile(t *testing.T, path string, headEvents []map[string]interface{}, paddingCount int, tailEvents []map[string]interface{}) {
t.Helper()
f, err := os.Create(path)
if err != nil {
t.Fatalf("Failed to create events file: %v", err)
}
defer f.Close()

for _, evt := range headEvents {
data, err := json.Marshal(evt)
if err != nil {
t.Fatalf("Failed to marshal head event: %v", err)
}
if _, err := f.Write(append(data, '\n')); err != nil {
t.Fatalf("Failed to write head event: %v", err)
}
}

paddingContent := strings.Repeat("x", 200)
for i := 0; i < paddingCount; i++ {
evt := map[string]interface{}{
"type": types.EventTypeTextMessageContent,
"messageId": fmt.Sprintf("msg-pad-%d", i),
"delta": paddingContent,
"timestamp": fmt.Sprintf("2025-01-01T00:01:%02dZ", i%60),
}
data, err := json.Marshal(evt)
if err != nil {
t.Fatalf("Failed to marshal padding event: %v", err)
}
if _, err := f.Write(append(data, '\n')); err != nil {
t.Fatalf("Failed to write padding event: %v", err)
}
}

for _, evt := range tailEvents {
data, err := json.Marshal(evt)
if err != nil {
t.Fatalf("Failed to marshal tail event: %v", err)
}
if _, err := f.Write(append(data, '\n')); err != nil {
t.Fatalf("Failed to write tail event: %v", err)
}
}
}

func TestLoadEventsHeadTailMerge(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "agui-headtail-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)

origStateBaseDir := StateBaseDir
StateBaseDir = tmpDir
defer func() { StateBaseDir = origStateBaseDir }()

t.Run("large file preserves head snapshot events", func(t *testing.T) {
sessionID := "test-large-headtail"
sessionsDir := filepath.Join(tmpDir, "sessions", sessionID)
if err := os.MkdirAll(sessionsDir, 0755); err != nil {
t.Fatalf("Failed to create sessions dir: %v", err)
}

eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl")
writeLargeEventFile(t, eventsFile,
[]map[string]interface{}{
{"type": types.EventTypeRunStarted, "runId": "r1", "timestamp": "2025-01-01T00:00:00Z"},
{"type": types.EventTypeMessagesSnapshot, "messages": []interface{}{
map[string]interface{}{"id": "msg1", "role": "user", "content": "Hello"},
}, "timestamp": "2025-01-01T00:00:01Z"},
},
15000,
[]map[string]interface{}{
{"type": types.EventTypeTextMessageContent, "messageId": "msg-tail", "delta": "tail event", "timestamp": "2025-01-01T00:02:00Z"},
},
)

stat, err := os.Stat(eventsFile)
if err != nil {
t.Fatalf("Failed to stat events file: %v", err)
}
if stat.Size() <= replayMaxTailBytes {
t.Fatalf("Test file too small (%d bytes), need > %d to trigger head+tail path", stat.Size(), replayMaxTailBytes)
}

result := loadEvents(sessionID)
if len(result) == 0 {
t.Fatal("Expected events from loadEvents, got none")
}

hasRunStarted := false
hasMessagesSnapshot := false
for _, evt := range result {
evtType, _ := evt["type"].(string)
if evtType == types.EventTypeRunStarted {
hasRunStarted = true
}
if evtType == types.EventTypeMessagesSnapshot {
hasMessagesSnapshot = true
}
}

if !hasRunStarted {
t.Error("Expected RUN_STARTED from head scan to be present in merged result")
}
if !hasMessagesSnapshot {
t.Error("Expected MESSAGES_SNAPSHOT from head scan to be present in merged result")
}

if result[0]["type"] != types.EventTypeRunStarted {
t.Errorf("Expected first event to be RUN_STARTED, got %v", result[0]["type"])
}
if result[1]["type"] != types.EventTypeMessagesSnapshot {
t.Errorf("Expected second event to be MESSAGES_SNAPSHOT, got %v", result[1]["type"])
}
})

t.Run("large file deduplicates overlapping events", func(t *testing.T) {
sessionID := "test-large-dedup"
sessionsDir := filepath.Join(tmpDir, "sessions", sessionID)
if err := os.MkdirAll(sessionsDir, 0755); err != nil {
t.Fatalf("Failed to create sessions dir: %v", err)
}

eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl")
writeLargeEventFile(t, eventsFile,
[]map[string]interface{}{
{"type": types.EventTypeRunStarted, "runId": "r1", "timestamp": "2025-01-01T00:00:00Z"},
},
15000,
[]map[string]interface{}{
{"type": types.EventTypeRunFinished, "runId": "r1", "timestamp": "2025-01-01T00:03:00Z"},
},
)

stat, err := os.Stat(eventsFile)
if err != nil {
t.Fatalf("Failed to stat events file: %v", err)
}
if stat.Size() <= replayMaxTailBytes {
t.Fatalf("Test file too small (%d bytes), need > %d", stat.Size(), replayMaxTailBytes)
}

result := loadEvents(sessionID)

runStartedCount := 0
for _, evt := range result {
if evt["type"] == types.EventTypeRunStarted {
runStartedCount++
}
}
if runStartedCount != 1 {
t.Errorf("Expected exactly 1 RUN_STARTED (no duplicates), got %d", runStartedCount)
}
})

t.Run("large file with no head snapshots returns tail only", func(t *testing.T) {
sessionID := "test-large-no-head-snapshot"
sessionsDir := filepath.Join(tmpDir, "sessions", sessionID)
if err := os.MkdirAll(sessionsDir, 0755); err != nil {
t.Fatalf("Failed to create sessions dir: %v", err)
}

eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl")
writeLargeEventFile(t, eventsFile, nil, 15000, nil)

stat, err := os.Stat(eventsFile)
if err != nil {
t.Fatalf("Failed to stat events file: %v", err)
}
if stat.Size() <= replayMaxTailBytes {
t.Fatalf("Test file too small (%d bytes), need > %d", stat.Size(), replayMaxTailBytes)
}

result := loadEvents(sessionID)
if len(result) == 0 {
t.Fatal("Expected tail events, got none")
}

for _, evt := range result {
if evt["type"] != types.EventTypeTextMessageContent {
t.Errorf("Expected only TEXT_MESSAGE_CONTENT events, got %v", evt["type"])
}
}
})
}