feat: add SSE streaming support with billing extraction#7
Conversation
- Add streaming SSE parser for OpenAI and Anthropic responses - Implement StreamingResponseExtractor interface for all providers - Use http.ResponseController for efficient flushing - Add simdjson-go for fast JSON parsing of usage data - Auto-inject stream_options.include_usage for billing - Extract token usage from streaming responses for cost calculation - Add comprehensive unit tests for streaming functionality
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds end-to-end SSE streaming support: parser/formatters, streaming-capable response extractors and handlers, per-provider streaming extractor implementations, streaming-aware AutoRouter forwarding with optional BillingCalculator and usage injection, SIMD-accelerated usage extraction, tests, examples, and documentation updates. Changes
🚥 Pre-merge checks | ✅ 1✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 14
🧹 Nitpick comments (3)
go.mod (1)
9-13: Rungo mod tidyhere.
github.com/minio/simdjson-gois imported directly byinternal/fastjson/extractor.go, so keeping Line 11 as// indirectwill just cause avoidablego.modchurn on the next tidy.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@go.mod` around lines 9 - 13, Run `go mod tidy` to ensure github.com/minio/simdjson-go is recorded as a direct dependency (it’s directly imported by internal/fastjson/extractor.go); this will update go.mod to remove the // indirect marker for github.com/minio/simdjson-go and prevent churn on future tidy runs.autorouter_test.go (1)
78-82: Consider removing unused embedding.The
*mockExtractorembedding inmockStreamingExtractorappears unused since streaming tests only exerciseExtractStreamingWithController, and theExtractmethod fallback isn't tested. This could be simplified, though it's not blocking.♻️ Simplify mock structure
type mockStreamingExtractor struct { - *mockExtractor isStreaming bool extractStreamingFn func(resp *http.Response, w http.ResponseWriter, rc *http.ResponseController) (ResponseMetadata, error) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@autorouter_test.go` around lines 78 - 82, The mockStreamingExtractor struct currently embeds *mockExtractor which is unused by the streaming tests; remove the embedding to simplify the mock type (delete the *mockExtractor field from mockStreamingExtractor) and ensure tests and any usages reference mockStreamingExtractor.ExtractStreamingWithController directly; if other tests expect mockExtractor methods, either implement necessary methods on mockStreamingExtractor or keep a named field (not embedded) to avoid accidental method promotion from mockExtractor while preserving required behavior for Extract and ExtractStreamingWithController.internal/fastjson/extractor_test.go (1)
56-62: Consider validating cache is nil when not expected.When
expectedCached == 0, the test doesn't verify thatcacheUsageis actuallynil. This could mask unexpected cache values being returned.♻️ Add nil check for non-cache cases
if tt.expectedCached > 0 { if cacheUsage == nil { t.Error("expected cache usage, got nil") } else if cacheUsage.CachedTokens != tt.expectedCached { t.Errorf("expected cached tokens %d, got %d", tt.expectedCached, cacheUsage.CachedTokens) } + } else if cacheUsage != nil { + t.Errorf("expected nil cache usage, got %+v", cacheUsage) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/fastjson/extractor_test.go` around lines 56 - 62, The test currently only checks cacheUsage when tt.expectedCached > 0; add an explicit assertion for the opposite case so that when tt.expectedCached == 0 the test verifies cacheUsage is nil. Update the test in extractor_test.go around the cacheUsage checks (where tt.expectedCached and cacheUsage are used) to branch: if tt.expectedCached > 0 keep the existing non-nil and CachedTokens checks, otherwise assert cacheUsage == nil to surface unexpected cache values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@autorouter.go`:
- Around line 287-293: ForwardStreaming currently calls a.client.Do directly and
bypasses the interceptor chain; change it to invoke a.interceptors the same way
Forward does by wrapping the actual upstream call in a handler passed to
a.interceptors. Specifically, in AutoRouter.ForwardStreaming(), set the
MetaContextValue on upstreamReq (using MetaContextKey{} and
upstreamReq.WithContext), then call a.interceptors(upstreamReq, handler) where
handler performs the a.client.Do(upstreamReq) and returns the response/error so
auth/logging/retry/policy interceptors run for streaming requests as well;
ensure the handler adapts to the same return types used by Forward (upstreamResp
and err) and preserves streaming semantics.
- Around line 296-325: The response is being committed too early by calling
w.WriteHeader(upstreamResp.StatusCode) before running
provider.ResponseExtractor/StreamingResponseExtractor
(ExtractStreamingWithController) and before billingCalculator.Calculate, which
prevents SSE headers and X-Gateway-* billing headers from being sent; remove or
move the WriteHeader call so the extractor or streamResponseWithFlush can
set/commit headers when ready (or use HTTP trailers for end-of-stream billing
values), and ensure implementations of ExtractStreamingWithController and
streamResponseWithFlush call w.WriteHeader/upstream status at the appropriate
time; update code around ResponseExtractor(), StreamingResponseExtractor,
ExtractStreamingWithController, and streamResponseWithFlush so headers/trailers
are emitted after streaming/billing decisions.
- Around line 241-244: The code currently overwrites any caller-provided
stream_options when billing is enabled; update the block in autorouter where
a.billingCalculator is checked so that if raw["stream"] is true you merge
include_usage: true into the existing raw["stream_options"] map (if present and
a map[string]any / map[string]interface{}) and only create a new map when
stream_options is missing or not a map, preserving any existing keys; target the
raw map manipulation around the "stream" / "stream_options" usage to implement
the merge.
- Around line 455-462: The headerSent function incorrectly assumes
ResponseWriter has WroteHeader; replace this with a ResponseWriter wrapper that
tracks whether headers have been sent by intercepting WriteHeader and Write
(e.g., create a type like headerTrackingResponseWriter that embeds
http.ResponseWriter and sets a sent flag in WriteHeader and on first Write if
WriteHeader was not called), use that wrapper where ForwardStreaming and related
handlers accept w so you can reliably check the sent flag, and change headerSent
to check that wrapper's flag (or, if the writer is not the tracked type,
conservatively assume headers were sent and bail out of error handling instead
of calling http.Error). Ensure you update any call sites that pass w (e.g.,
ForwardStreaming) to wrap the original http.ResponseWriter with
headerTrackingResponseWriter so headerSent can detect stream start.
In `@billing_calculator.go`:
- Around line 8-53: In Calculate, guard against nil c.lookup and nil respMeta
before any dereference and return nil for bad billing setup: check if c.lookup
== nil and return nil early; check if respMeta == nil and return nil early (so
subsequent accesses to respMeta.Custom and respMeta.Usage are safe). Also guard
the cacheUsage extraction by only reading respMeta.Custom when respMeta != nil,
and ensure you handle missing respMeta.Usage (or zero values) when calling
CalculateCost; keep existing onResult handling. Update the Calculate method (and
any calls to c.lookup(provider, ...)) to skip lookup invocation when c.lookup is
nil and to return nil instead of panicking.
In `@internal/fastjson/extractor.go`:
- Around line 167-267: The fastjson extractors (extractAnthropicSimd and
extractAnthropicStd) are not populating CacheUsage.Ephemeral5mInputTokens and
CacheUsage.Ephemeral1hInputTokens; update both extractAnthropicSimd and
extractAnthropicStd to read the same JSON keys used by the Anthropic provider
extractor and set CacheUsage.Ephemeral5mInputTokens and
CacheUsage.Ephemeral1hInputTokens accordingly (for simdjson, add cases in the
usage object loop to parse the ephemeral fields into cacheUsage; for the std
json path, add the fields to the resp.Usage struct and populate the CacheUsage
struct), matching the exact key names and types used in the existing Anthropic
extractor.
- Around line 144-157: extractOpenAIStd() currently leaves usage.TotalTokens at
0 when total_tokens is missing, causing inconsistent metadata vs
extractOpenAISimd(); update the logic in extractOpenAIStd() (same place where
usage is built) to mirror extractOpenAISimd() by setting usage.TotalTokens =
resp.Usage.TotalTokens if >0 else resp.Usage.PromptTokens +
resp.Usage.CompletionTokens, and ensure the same check for nil
PromptTokensDetails -> CacheUsage is applied so both SIMD and stdlib paths
produce identical Usage and CacheUsage values.
In `@providers/anthropic/streaming_extractor.go`:
- Around line 107-114: The code currently drops SSE "data:" chunks when
ParseAnthropicSSEEvent returns an error or nil; change the handling in
streaming_extractor.go so that if llmproxy.ParseAnthropicSSEEvent(data) returns
an error or nil you still forward the original data chunk downstream (i.e.,
write the raw data as you would for other events) and only skip metadata
extraction, rather than continue/return early; specifically update the block
around ParseAnthropicSSEEvent, the err check and the event==nil check to fall
through to the existing write/send logic when parsing fails, while preserving
the successful-path metadata handling when event != nil.
In `@providers/bedrock/streaming_extractor.go`:
- Around line 27-72: The current extractNonStreamingWithController calls
e.Extractor.Extract with a TeeReader that forces Extract to consume the entire
resp.Body before any bytes are written to the client; change this to stream
concurrently by wiring resp.Body into an io.Pipe and using an io.MultiWriter to
forward bytes to both the pipe (for e.Extractor.Extract) and the HTTP client
(w), flushing via rc.Flush after writes. Concretely: in
extractNonStreamingWithController create pr, pw := io.Pipe(), start a goroutine
that io.Copy(io.MultiWriter(pw, &clientWriter{w, rc}), resp.Body) and closes pw
on completion/err; call e.Extractor.Extract with a synthetic *http.Response
whose Body is io.NopCloser(pr) so Extract can parse incrementally; propagate
errors from the copy goroutine and ensure pw/resp.Body are closed and rc.Flush
is called after each client write. Use the symbols
extractNonStreamingWithController, e.Extractor.Extract, resp.Body, rc.Flush and
ResponseWriter w to locate and implement the change.
In `@providers/googleai/streaming_extractor.go`:
- Around line 27-72: The current ExtractStreamingWithController routes all
responses into extractNonStreamingWithController which drains resp.Body into a
bytes.Buffer via io.TeeReader and calls Extractor.Extract on a fully buffered
body, preventing SSE/incremental events from being forwarded; change the
implementation to stream: read resp.Body in small chunks, immediately write each
chunk to w and call rc.Flush(), and simultaneously feed those chunks into the
extraction path incrementally (e.g., replace the full-body Extractor.Extract
call with an incremental extraction that consumes the same streaming bytes—use
an io.Pipe or a custom io.Writer that passes chunks to Extractor while you
Write/Flush to w) so Extractor.Extract (or its streaming variant) processes
bytes as they arrive rather than after the entire body is buffered.
In `@providers/openai_compatible/streaming_extractor.go`:
- Around line 116-123: The code currently skips forwarding the raw SSE line
whenever llmproxy.ParseOpenAISSEEvent(data) returns an error or chunk==nil;
change this so parsing is best-effort but the raw event is always forwarded.
Specifically, in the block around ParseOpenAISSEEvent(data) and the variables
chunk/err, remove the early continue on err and on chunk==nil and instead call
the existing upstream write method (e.g., upstream.Write/WriteLine/WriteString
used elsewhere in this file) to forward the original data payload before
continuing; keep any logging of the parse error but do not drop the raw event.
- Around line 91-92: Replace the bufio.Scanner-based loop in
streaming_extractor.go (where scanner := bufio.NewScanner(resp.Body) and
scanner.Buffer(...) are used) with the existing SSEParser from streaming.go:
construct an SSEParser over resp.Body and call SSEParser.Next() to iterate SSE
events, handling each event's data field instead of scanning lines; update any
logic that expects scanner.Text() to use the event payload returned by
SSEParser.Next() and remove the scanner.Buffer code to avoid bufio.ErrTooLong
for large single-line JSON payloads (keep existing resp.Body closing and error
handling behavior).
In `@streaming.go`:
- Around line 280-290: FormatSSEEvent currently writes the whole data blob as a
single "data: " line which breaks SSE when data contains newlines; modify
FormatSSEEvent to split the data (byte slice) on '\n' and write a separate
"data: " line for each segment (including empty segments for
trailing/newline-only lines) to the bytes.Buffer, preserving the existing
"event: " header when event is non-empty and ending the frame with the required
blank line; use the FormatSSEEvent function name and the existing bytes.Buffer
to locate and update the logic.
- Around line 27-30: NewSSEParser currently constructs a bufio.Scanner with the
default 64 KiB limit which can cause bufio.ErrTooLong for large LLM SSE data
lines; update NewSSEParser to call scanner.Buffer(make([]byte, 64*1024),
1024*1024) on the created scanner (use the existing scanner variable) so the
maximum token size is increased to 1 MiB, keeping the rest of the SSEParser
construction the same.
---
Nitpick comments:
In `@autorouter_test.go`:
- Around line 78-82: The mockStreamingExtractor struct currently embeds
*mockExtractor which is unused by the streaming tests; remove the embedding to
simplify the mock type (delete the *mockExtractor field from
mockStreamingExtractor) and ensure tests and any usages reference
mockStreamingExtractor.ExtractStreamingWithController directly; if other tests
expect mockExtractor methods, either implement necessary methods on
mockStreamingExtractor or keep a named field (not embedded) to avoid accidental
method promotion from mockExtractor while preserving required behavior for
Extract and ExtractStreamingWithController.
In `@go.mod`:
- Around line 9-13: Run `go mod tidy` to ensure github.com/minio/simdjson-go is
recorded as a direct dependency (it’s directly imported by
internal/fastjson/extractor.go); this will update go.mod to remove the //
indirect marker for github.com/minio/simdjson-go and prevent churn on future
tidy runs.
In `@internal/fastjson/extractor_test.go`:
- Around line 56-62: The test currently only checks cacheUsage when
tt.expectedCached > 0; add an explicit assertion for the opposite case so that
when tt.expectedCached == 0 the test verifies cacheUsage is nil. Update the test
in extractor_test.go around the cacheUsage checks (where tt.expectedCached and
cacheUsage are used) to branch: if tt.expectedCached > 0 keep the existing
non-nil and CachedTokens checks, otherwise assert cacheUsage == nil to surface
unexpected cache values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8f349475-c48e-47d1-809c-a9ab7689d910
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (22)
autorouter.goautorouter_test.gobilling_calculator.goexamples/basic/main.goextractor.gogo.modinternal/fastjson/extractor.gointernal/fastjson/extractor_test.goproviders/anthropic/provider.goproviders/anthropic/streaming_extractor.goproviders/azure/provider.goproviders/bedrock/provider.goproviders/bedrock/streaming_extractor.goproviders/googleai/provider.goproviders/googleai/streaming_extractor.goproviders/openai_compatible/multiapi.goproviders/openai_compatible/provider.goproviders/openai_compatible/streaming_extractor.goproviders/openai_compatible/streaming_extractor_test.goproviders/perplexity/provider.gostreaming.gostreaming_test.go
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (17)
providers/azure/provider.go (1)
69-69: LGTM!The Azure provider is correctly updated to use the new
NewStreamingExtractor()for streaming-capable response extraction, consistent with the PR's streaming support additions. Both constructor functions (NewandNewWithDynamicDeployment) are updated uniformly.Also applies to: 95-95
providers/anthropic/provider.go (1)
21-21: LGTM!The Anthropic provider is correctly updated to use
NewStreamingExtractor()for streaming-aware response extraction, consistent with other providers in this PR.Also applies to: 37-37
examples/basic/main.go (2)
221-226: LGTM!Good refactoring to extract the billing callback into a reusable variable. The dual registration pattern (interceptor for non-streaming + billing calculator for streaming) correctly handles both response paths with consistent billing logic.
273-289: LGTM!Helpful streaming examples added to the startup logs, demonstrating the new SSE streaming capabilities with proper
stream_options.include_usageusage.autorouter_test.go (3)
420-487: LGTM!Good test coverage for the
stream_options.include_usageauto-injection behavior when billing calculator is configured. The test correctly verifies that streaming requests get the option injected into the upstream request body.
489-556: LGTM!Important test case verifying that user-specified
stream_options.include_usage=falseis correctly overridden totruewhen billing is enabled, ensuring billing data is always available for cost tracking.
558-668: LGTM!Good negative test cases ensuring
stream_optionsis only injected when appropriate:
- Not injected without billing calculator (lines 558-613)
- Not injected for non-streaming requests (lines 615-668)
internal/fastjson/extractor_test.go (1)
145-197: LGTM!Good integration test for
BillingCalculatorcovering token extraction, cost calculation formula, callback invocation, and metadata storage.providers/openai_compatible/streaming_extractor_test.go (3)
14-61: LGTM!Comprehensive test for SSE stream extraction covering multi-chunk parsing, metadata extraction (ID, Model), usage/token counting, and verifying the SSE framing is preserved in the output.
89-109: LGTM!Good fallback test ensuring non-streaming JSON responses are handled gracefully even when using the streaming extractor.
111-147: LGTM!Important test validating cache token extraction from streaming responses and proper storage in the
Custom["cache_usage"]map.providers/openai_compatible/multiapi.go (2)
79-100: LGTM!Clean implementation of
StreamingMultiAPIExtractorwith proper routing between SSE streaming and non-streaming paths based on content-type detection.
102-144: LGTM!The
extractNonStreamingWithControllercorrectly usesTeeReaderto capture the response body during metadata extraction, then writes it to the client in 512KB chunks with flushes for responsive delivery. Error handling covers EOF, context cancellation, and write failures appropriately.streaming_test.go (3)
9-84: LGTM!Comprehensive SSE parser tests covering single events, event types, multiple events, multiline data aggregation, and OpenAI-style streaming format with
[DONE]terminator.
145-233: LGTM!Thorough tests for
ExtractUsageFromOpenAIChunkwith good null-safety coverage (nil chunk, missing usage) and validation of cache token extraction viaPromptTokensDetails.
257-286: LGTM!Good tests for
FormatSSEEventvalidating correct SSE wire format with and without event type prefix.providers/anthropic/streaming_extractor.go (1)
90-91: The Anthropic stream parser has a 1 MiB buffer ceiling that affects bothanthropicandopenai_compatibleextractors.The
scanner.Buffer()configuration at line 91 limits SSE lines to 1 MiB. Any line exceeding this will causebufio.ErrTooLongand terminate the stream. This constraint applies identically inproviders/openai_compatible/streaming_extractor.goat line 92.
| func headerSent(w http.ResponseWriter) bool { | ||
| type headerChecker interface { | ||
| WroteHeader() bool | ||
| } | ||
| if hc, ok := w.(headerChecker); ok { | ||
| return hc.WroteHeader() | ||
| } | ||
| return false |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and examine the headerSent function in context
cd /repo && cat -n autorouter.go | sed -n '450,470p'Repository: agentuity/llmproxy
Length of output: 117
🏁 Script executed:
# Search for uses of headerSent to understand the impact
cd /repo && rg "headerSent" -B 3 -A 3Repository: agentuity/llmproxy
Length of output: 117
🏁 Script executed:
# Check if there's any import or usage of http.Error
cd /repo && rg "http\.Error" -B 2 -A 2Repository: agentuity/llmproxy
Length of output: 117
🏁 Script executed:
# Let's verify the Go http.ResponseWriter interface definition
python3 << 'EOF'
import subprocess
import re
# The Go http.ResponseWriter interface is part of the standard library
# Let's check what methods it actually exposes
result = subprocess.run(['go', 'doc', 'net/http.ResponseWriter'],
capture_output=True, text=True)
print("=== net/http.ResponseWriter interface ===")
print(result.stdout)
print(result.stderr)
EOFRepository: agentuity/llmproxy
Length of output: 3611
🏁 Script executed:
# Also check if http.response (the concrete type) has WroteHeader
python3 << 'EOF'
import subprocess
# Try to get info on http.response internal type
result = subprocess.run(['go', 'doc', 'net/http.response'],
capture_output=True, text=True)
print("=== net/http.response ===")
print(result.stdout)
if result.stderr:
print("STDERR:", result.stderr)
# Let's try another approach - check Go source for WroteHeader
result2 = subprocess.run(['grep', '-r', 'WroteHeader', '/usr/lib/go'],
capture_output=True, text=True)
print("\n=== WroteHeader in Go stdlib ===")
print(result2.stdout[:2000] if result2.stdout else "Not found in /usr/lib/go")
EOFRepository: agentuity/llmproxy
Length of output: 4225
🏁 Script executed:
git ls-files | head -20Repository: agentuity/llmproxy
Length of output: 372
🏁 Script executed:
# Find autorouter.go
find . -name "autorouter.go" -o -name "*.go" | head -20Repository: agentuity/llmproxy
Length of output: 509
🏁 Script executed:
# Search for WroteHeader usage in the codebase
rg "WroteHeader" -B 3 -A 3Repository: agentuity/llmproxy
Length of output: 387
🏁 Script executed:
# Search for headerSent function and its usage
rg "headerSent" -B 5 -A 5Repository: agentuity/llmproxy
Length of output: 814
🏁 Script executed:
# Search for http.Error usage to understand the context of the risk
rg "http\.Error" -B 3 -A 3Repository: agentuity/llmproxy
Length of output: 1253
🏁 Script executed:
# Let's check the Go http package source to see if http.response has WroteHeader
python3 << 'EOF'
import subprocess
import os
# Find the Go stdlib location
result = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True)
goroot = result.stdout.strip()
print(f"GOROOT: {goroot}")
# Check the net/http/server.go file for the response type implementation
server_path = os.path.join(goroot, 'src', 'net', 'http', 'server.go')
if os.path.exists(server_path):
result = subprocess.run(['grep', '-n', 'WroteHeader', server_path],
capture_output=True, text=True)
if result.stdout:
print("Found WroteHeader in server.go:")
print(result.stdout)
else:
print("WroteHeader not found in server.go")
# Also search for type response and its methods
result2 = subprocess.run(['grep', '-n', 'func (w \*response)', server_path],
capture_output=True, text=True, timeout=10)
lines = result2.stdout.split('\n')[:30]
print("\nMethods on *response type (first 30):")
print('\n'.join(lines))
else:
print(f"server.go not found at {server_path}")
EOFRepository: agentuity/llmproxy
Length of output: 1297
🏁 Script executed:
# Let's examine the autorouter.go file more closely to understand the full context
cat -n autorouter.go | sed -n '440,475p'Repository: agentuity/llmproxy
Length of output: 929
🏁 Script executed:
# Let's also check if there are any tests that might show how headerSent is expected to work
cat -n autorouter_test.go | grep -B 5 -A 5 "headerSent\|streaming" || echo "No matches found"Repository: agentuity/llmproxy
Length of output: 2426
🏁 Script executed:
# Let's check if there's a way to detect if headers were sent in the standard library
# by examining the response type structure
python3 << 'EOF'
import subprocess
import os
goroot = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True).stdout.strip()
server_path = os.path.join(goroot, 'src', 'net', 'http', 'server.go')
# Search for any field or tracking of header being written
result = subprocess.run(['grep', '-n', 'wroteHeader\|headerWritten', server_path],
capture_output=True, text=True)
if result.stdout:
print("Found header tracking fields:")
print(result.stdout[:1500])
else:
print("No wroteHeader/headerWritten fields found")
# Also check for write method behavior
result2 = subprocess.run(['grep', '-B 3', '-A 10', 'func (w \*response) Write', server_path],
capture_output=True, text=True)
print("\nWrite method implementation:")
print(result2.stdout[:1500])
EOFRepository: agentuity/llmproxy
Length of output: 2117
🏁 Script executed:
# Let's check how http.Error actually behaves and what happens when called after writing
cat > /tmp/test_error.go << 'EOF'
package main
import (
"net/http"
"net/http/httptest"
)
func main() {
// Simulate what happens with standard http.ResponseWriter
w := httptest.NewRecorder()
// Simulate streaming where some data was already written
w.Write([]byte("some data"))
// Now try to write an error
http.Error(w, "error message", http.StatusInternalServerError)
// Check what happened
println("Status:", w.Code)
println("Body length:", len(w.Body.String()))
println("Body:", w.Body.String())
}
EOF
go run /tmp/test_error.go 2>&1 || echo "Test execution note"Repository: agentuity/llmproxy
Length of output: 119
🏁 Script executed:
# Let's look for how httptest.ResponseRecorder is used in tests to understand the issue better
rg "httptest\|ResponseRecorder" -B 2 -A 2 autorouter_test.go | head -50Repository: agentuity/llmproxy
Length of output: 44
🏁 Script executed:
# Check if http.ResponseWriter implementations in this codebase (or standard lib) implement WroteHeader
python3 << 'EOF'
import subprocess
# Try to find if any implementation defines WroteHeader
goroot = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True).stdout.strip()
# Search for WroteHeader definition anywhere in the stdlib
result = subprocess.run(['grep', '-r', 'func.*WroteHeader', goroot + '/src/net/http/'],
capture_output=True, text=True)
if result.stdout:
print("Found WroteHeader implementations:")
print(result.stdout)
else:
print("WroteHeader not found in net/http package")
# Let's also check Go version - WroteHeader was added in Go 1.14
print("\nChecking Go version and http features...")
result2 = subprocess.run(['go', 'version'], capture_output=True, text=True)
print(result2.stdout)
EOFRepository: agentuity/llmproxy
Length of output: 333
🏁 Script executed:
# Let's check the actual behavior - read the response type structure
python3 << 'EOF'
import subprocess
import os
goroot = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True).stdout.strip()
server_path = os.path.join(goroot, 'src', 'net', 'http', 'server.go')
# Find the type response definition
result = subprocess.run(['grep', '-B 5', '-A 50', 'type response struct', server_path],
capture_output=True, text=True)
print("Response struct definition:")
print(result.stdout[:2000])
EOFRepository: agentuity/llmproxy
Length of output: 2090
Fix headerSent() to properly detect when response headers have been written to prevent appending error messages to already-started streams.
The standard http.ResponseWriter interface does not expose WroteHeader(), so this function always returns false for standard library implementations. When ForwardStreaming fails after partial output, calling http.Error() will write an error message body into an already-started response, corrupting the stream output.
Recommended approaches:
- Track header writes by wrapping the
http.ResponseWriter - Bail out of error handling entirely if streaming has started (no recovery possible)
- Use a different ResponseWriter that exposes header state (e.g.,
httputil.ResponseWriteror similar)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@autorouter.go` around lines 455 - 462, The headerSent function incorrectly
assumes ResponseWriter has WroteHeader; replace this with a ResponseWriter
wrapper that tracks whether headers have been sent by intercepting WriteHeader
and Write (e.g., create a type like headerTrackingResponseWriter that embeds
http.ResponseWriter and sets a sent flag in WriteHeader and on first Write if
WriteHeader was not called), use that wrapper where ForwardStreaming and related
handlers accept w so you can reliably check the sent flag, and change headerSent
to check that wrapper's flag (or, if the writer is not the tracked type,
conservatively assume headers were sent and bail out of error handling instead
of calling http.Error). Ensure you update any call sites that pass w (e.g.,
ForwardStreaming) to wrap the original http.ResponseWriter with
headerTrackingResponseWriter so headerSent can detect stream start.
| scanner := bufio.NewScanner(resp.Body) | ||
| scanner.Buffer(make([]byte, 64*1024), 1024*1024) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the streaming_extractor.go file around lines 91-92
cat -n providers/openai_compatible/streaming_extractor.go | head -120 | tail -50Repository: agentuity/llmproxy
Length of output: 1650
🏁 Script executed:
# Get the full structure of the file to understand the context
wc -l providers/openai_compatible/streaming_extractor.goRepository: agentuity/llmproxy
Length of output: 116
🏁 Script executed:
# Search for other SSE parser implementations in the codebase
fd -e go -e ts -e js | xargs rg -l "SSE|Server.Sent.Event|bufio\.Scanner|bufio\.Reader.*ReadBytes" | head -20Repository: agentuity/llmproxy
Length of output: 365
🏁 Script executed:
# Read the full streaming_extractor.go to see error handling
cat -n providers/openai_compatible/streaming_extractor.goRepository: agentuity/llmproxy
Length of output: 5781
🏁 Script executed:
# Check if scanner errors are handled
rg "scanner\.(Err|Scan)" providers/openai_compatible/streaming_extractor.go -A 2 -B 1Repository: agentuity/llmproxy
Length of output: 176
🏁 Script executed:
# Look at other streaming extractors to see if they use similar patterns
cat providers/anthropic/streaming_extractor.go | head -150Repository: agentuity/llmproxy
Length of output: 3979
🏁 Script executed:
# Search for any SSE parsing utilities in the codebase
rg "ParseOpenAISSEEvent|SSE|EventStream" -t go --max-count 5Repository: agentuity/llmproxy
Length of output: 1343
🏁 Script executed:
# Check the SSEParser implementation to see if it avoids the buffer size issue
cat streaming.goRepository: agentuity/llmproxy
Length of output: 7283
🏁 Script executed:
# Look for the llmproxy package structure
fd -type f streaming.go | xargs ls -laRepository: agentuity/llmproxy
Length of output: 2630
🏁 Script executed:
# Check tool/function-call delta sizes in OpenAI documentation or examples
rg "tool_calls|function_call" -t go -A 2 -B 2 | head -40Repository: agentuity/llmproxy
Length of output: 592
Replace bufio.Scanner with SSEParser to handle multi-line SSE data fields.
The current implementation uses bufio.Scanner with a 1 MiB max token size (line 92), which will abort the stream with bufio.ErrTooLong if a single data: line exceeds this limit. This is a real risk for large tool call or function call responses where the entire JSON payload sits on one line.
The codebase already provides SSEParser (in streaming.go) which correctly handles multi-line SSE data fields and avoids this single-line size limitation. Switch to using SSEParser.Next() instead of the line-by-line scanner approach, similar to how other streaming extractors could benefit.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@providers/openai_compatible/streaming_extractor.go` around lines 91 - 92,
Replace the bufio.Scanner-based loop in streaming_extractor.go (where scanner :=
bufio.NewScanner(resp.Body) and scanner.Buffer(...) are used) with the existing
SSEParser from streaming.go: construct an SSEParser over resp.Body and call
SSEParser.Next() to iterate SSE events, handling each event's data field instead
of scanning lines; update any logic that expects scanner.Text() to use the event
payload returned by SSEParser.Next() and remove the scanner.Buffer code to avoid
bufio.ErrTooLong for large single-line JSON payloads (keep existing resp.Body
closing and error handling behavior).
…treaming tests Replace whitelist-based provider switch with exclusion list approach for stream_options injection. Providers with native streaming usage reporting (Anthropic, Bedrock, Google AI) are excluded; all others automatically get stream_options injected. This is more extensible for new providers. Also adds comprehensive Anthropic streaming tests: - TestParseAnthropicSSEEvent - TestExtractUsageFromAnthropicEvent - TestAnthropicSSEParser (full event stream) - TestAutoRouter_AnthropicStreamingNoStreamOptions
There was a problem hiding this comment.
🧹 Nitpick comments (2)
DESIGN.md (2)
417-461: Add language specifier to ASCII diagram code block.The streaming flow diagram is in a fenced code block without a language identifier. Add
textor leave the language empty with just triple backticks for better rendering.📝 Proposed fix
-``` +```text +------------------+ | Incoming Request | | stream: true |As per coding guidelines, the static analysis tool flagged: "Fenced code blocks should have a language specified (MD040)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@DESIGN.md` around lines 417 - 461, Add a language identifier to the fenced ASCII diagram block (e.g., change the opening triple-backticks to ```text) so the diagram is treated as plain text by the markdown linter; locate the fenced block that contains the diagram with labels like "Incoming Request", "AutoRouter / ForwardStreaming", "StreamingExtractor", and "BillingCalculator" and update its opening fence to include the language specifier (or an explicit "text") to satisfy MD040.
405-410: Add language specifier to code block.The interface definition code block is missing a language identifier, which affects markdown rendering and syntax highlighting. Add
goas the language specifier.📝 Proposed fix
-``` +```go type StreamingResponseExtractor interface { ResponseExtractor ExtractStreamingWithController(resp, w, rc) -> (ResponseMetadata, error) IsStreamingResponse(resp) -> bool }</details> As per coding guidelines, the static analysis tool flagged: "Fenced code blocks should have a language specified (MD040)". <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against the current code and only fix it if needed.
In
@DESIGN.mdaround lines 405 - 410, Add the missing language specifier for the
fenced code block that contains the StreamingResponseExtractor interface
definition so Markdown renders with Go syntax highlighting; update the fence
fromtogo for the block that includes the StreamingResponseExtractor,
ResponseExtractor, ExtractStreamingWithController, and IsStreamingResponse
identifiers.</details> </blockquote></details> </blockquote></details> <details> <summary>🤖 Prompt for all review comments with AI agents</summary>Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In@DESIGN.md:
- Around line 417-461: Add a language identifier to the fenced ASCII diagram
block (e.g., change the opening triple-backticks to ```text) so the diagram is
treated as plain text by the markdown linter; locate the fenced block that
contains the diagram with labels like "Incoming Request", "AutoRouter /
ForwardStreaming", "StreamingExtractor", and "BillingCalculator" and update its
opening fence to include the language specifier (or an explicit "text") to
satisfy MD040.- Around line 405-410: Add the missing language specifier for the fenced code
block that contains the StreamingResponseExtractor interface definition so
Markdown renders with Go syntax highlighting; update the fence fromtogo
for the block that includes the StreamingResponseExtractor, ResponseExtractor,
ExtractStreamingWithController, and IsStreamingResponse identifiers.</details> --- <details> <summary>ℹ️ Review info</summary> <details> <summary>⚙️ Run configuration</summary> **Configuration used**: Organization UI **Review profile**: CHILL **Plan**: Pro **Run ID**: `13d38219-4a65-41e6-92ac-8424714a25ba` </details> <details> <summary>📥 Commits</summary> Reviewing files that changed from the base of the PR and between 7aba7c0236edcd888df9ccc49214f31cbe2b76d0 and bbefc9c83e46bf6d4bec6bece158600194abb688. </details> <details> <summary>📒 Files selected for processing (2)</summary> * `DESIGN.md` * `README.md` </details> <details> <summary>✅ Files skipped from review due to trivial changes (1)</summary> * README.md </details> </details> <details> <summary>📜 Review details</summary> <details> <summary>🧰 Additional context used</summary> <details> <summary>🪛 markdownlint-cli2 (0.22.0)</summary> <details> <summary>DESIGN.md</summary> [warning] 405-405: Fenced code blocks should have a language specified (MD040, fenced-code-language) --- [warning] 417-417: Fenced code blocks should have a language specified (MD040, fenced-code-language) </details> </details> </details> <details> <summary>🔇 Additional comments (2)</summary><blockquote> <details> <summary>DESIGN.md (2)</summary><blockquote> `397-492`: **LGTM! Clear and accurate streaming documentation.** The new Streaming section provides comprehensive documentation of the SSE streaming support. The interface definitions, flow diagrams, and examples accurately reflect the implementation in the codebase, including the automatic `stream_options` injection behavior when billing is enabled. --- `1019-1062`: **LGTM! Directory structure accurately reflects new files.** The directory structure updates correctly document the new streaming-related files and their purposes, including the billing calculator, streaming utilities, fast JSON extraction, and provider-specific streaming extractors. </blockquote></details> </blockquote></details> </details> <!-- This is an auto-generated comment by CodeRabbit for review status -->
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
streaming_test.go (1)
138-140: Replace the no-op with real parse assertions.Line 139 currently only references
chunk.IDwithout validating behavior. This can let parsing regressions pass unnoticed.Proposed test assertion upgrade
- if chunk != nil && tt.input != nil && len(tt.input) > 0 { - _ = chunk.ID - } + if len(tt.input) == 0 { + if chunk != nil { + t.Error("expected nil chunk for empty input") + } + return + } + + if !tt.expectError && !tt.expectDone { + if chunk == nil { + t.Fatal("expected non-nil chunk") + } + if chunk.ID == "" { + t.Error("expected parsed chunk ID") + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@streaming_test.go` around lines 138 - 140, The current no-op "_ = chunk.ID" allows regressions; replace it with real assertions that validate parsing behavior: assert chunk is non-nil when tt.input is non-empty, assert chunk.ID is non-empty (or equals an expected value provided on the test case), and assert other parsed fields (e.g., chunk.Content or chunk.Text) match the expected values you add to the test case struct; for cases where tt.input is empty assert chunk is nil. Use the existing identifiers chunk and tt.input and add per-test expected fields (like expectedID/expectedContent) to drive the assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@streaming_test.go`:
- Around line 519-532: The test currently calls ParseAnthropicSSEEvent on
events[0].Data and events[5].Data and immediately dereferences startEvent and
deltaEvent without checking the returned error; update the test to capture and
check the error from ParseAnthropicSSEEvent for both calls (e.g., if err != nil
{ t.Fatalf("ParseAnthropicSSEEvent failed for events[%d]: %v", idx, err) })
before accessing fields like Message.Usage or Usage.OutputTokens so the test
fails cleanly and avoids panics when parsing fails.
- Around line 439-444: The check that accepts a non-nil zero-valued usage when
tt.expectedPrompt==0 && tt.expectedCompletion==0 is too lax; instead require
result to be strictly nil for the nil-expected cases. Update the branch in
streaming_test.go so that when tt.expectedPrompt == 0 && tt.expectedCompletion
== 0 you assert result == nil (and error if not), rather than allowing a
zero-valued result with PromptTokens/CompletionTokens == 0; use the existing
identifiers (tt.expectedPrompt, tt.expectedCompletion, result,
result.PromptTokens, result.CompletionTokens) to locate and tighten the
assertion.
---
Nitpick comments:
In `@streaming_test.go`:
- Around line 138-140: The current no-op "_ = chunk.ID" allows regressions;
replace it with real assertions that validate parsing behavior: assert chunk is
non-nil when tt.input is non-empty, assert chunk.ID is non-empty (or equals an
expected value provided on the test case), and assert other parsed fields (e.g.,
chunk.Content or chunk.Text) match the expected values you add to the test case
struct; for cases where tt.input is empty assert chunk is nil. Use the existing
identifiers chunk and tt.input and add per-test expected fields (like
expectedID/expectedContent) to drive the assertions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3ea95a6f-5294-4af8-88a5-a8944b182f5d
📒 Files selected for processing (3)
autorouter.goautorouter_test.gostreaming_test.go
✅ Files skipped from review due to trivial changes (1)
- autorouter.go
🚧 Files skipped from review as they are similar to previous changes (1)
- autorouter_test.go
📜 Review details
🔇 Additional comments (1)
streaming_test.go (1)
9-84: Good coverage for core SSE parser framing paths.This table set (single/multi-event, multiline data, and
[DONE]) gives a solid baseline for parser behavior.
- Add error checking for ParseAnthropicSSEEvent calls to prevent panics - Require strict nil result for expected-nil test cases - Replace no-op _ = chunk.ID with real assertions for ID and content
There was a problem hiding this comment.
🧹 Nitpick comments (3)
streaming_test.go (3)
539-560: Previous error-handling feedback addressed; minor nil-check robustness gap remains.Error handling was properly added per past feedback. However, if the test input or implementation changes, accessing
startEvent.Message.Usage(line 543) ordeltaEvent.Usage(line 558) could panic without prior nil checks. This is low risk since the test input is controlled, but adding guards would make the test more resilient.Optional defensive nil checks
startEvent, err := ParseAnthropicSSEEvent(events[0].Data) if err != nil { t.Fatalf("ParseAnthropicSSEEvent failed for events[0]: %v", err) } + if startEvent == nil || startEvent.Message == nil || startEvent.Message.Usage == nil { + t.Fatal("expected message_start with usage payload") + } if startEvent.Message.Usage.InputTokens != 150 {deltaEvent, err := ParseAnthropicSSEEvent(events[5].Data) if err != nil { t.Fatalf("ParseAnthropicSSEEvent failed for events[5]: %v", err) } + if deltaEvent == nil || deltaEvent.Usage == nil { + t.Fatal("expected message_delta with usage payload") + } if deltaEvent.Usage.OutputTokens != 25 {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@streaming_test.go` around lines 539 - 560, The test assumes startEvent.Message.Usage and deltaEvent.Usage are non-nil and may panic if they are nil; update the assertions around ParseAnthropicSSEEvent results (the startEvent and deltaEvent variables produced by ParseAnthropicSSEEvent) to first check that startEvent.Message and startEvent.Message.Usage are not nil before reading InputTokens/CacheReadInputTokens, and similarly ensure deltaEvent.Usage is not nil before asserting OutputTokens; if any of these are nil, call t.Fatalf or t.Fatalf with a clear message so the test fails safely instead of panicking.
477-487: Consider adding negative assertion forCacheUsage.Similar to the OpenAI test, this only validates
CacheUsagewhen expected values are positive. A test case where result is non-nil butCacheUsageshould be nil (e.g., amessage_startevent without cache tokens) would improve coverage.Proposed fix
if tt.expectedCacheRead > 0 || tt.expectedCacheCreate > 0 { if result.CacheUsage == nil { t.Fatal("expected cache usage") } if result.CacheUsage.CacheReadInputTokens != tt.expectedCacheRead { t.Errorf("expected cache read %d, got %d", tt.expectedCacheRead, result.CacheUsage.CacheReadInputTokens) } if result.CacheUsage.CacheCreationInputTokens != tt.expectedCacheCreate { t.Errorf("expected cache create %d, got %d", tt.expectedCacheCreate, result.CacheUsage.CacheCreationInputTokens) } + } else if result != nil && result.CacheUsage != nil { + t.Errorf("expected no CacheUsage, got %+v", result.CacheUsage) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@streaming_test.go` around lines 477 - 487, Add a negative assertion to ensure CacheUsage is nil when no cache tokens are expected: after the existing positive branch, add a complementary branch for when tt.expectedCacheRead and tt.expectedCacheCreate are both zero (or not > 0) that asserts result.CacheUsage == nil (use t.Fatal or t.Errorf) so cases like a message_start event which should not set CacheUsage fail if it is non-nil; reference result.CacheUsage, tt.expectedCacheRead and tt.expectedCacheCreate to locate where to insert this check.
244-250: Missing assertion whenCacheUsageshould be nil.The test only validates
CacheUsagewhen expected is non-nil. If the implementation incorrectly setsCacheUsagefor the "chunk with basic usage" case, this test won't catch it.Proposed fix to add negative assertion
if tt.expected.CacheUsage != nil { if result.CacheUsage == nil { t.Error("expected CacheUsage, got nil") } else if result.CacheUsage.CachedTokens != tt.expected.CacheUsage.CachedTokens { t.Errorf("expected CachedTokens %d, got %d", tt.expected.CacheUsage.CachedTokens, result.CacheUsage.CachedTokens) } + } else if result.CacheUsage != nil { + t.Errorf("expected no CacheUsage, got %+v", result.CacheUsage) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@streaming_test.go` around lines 244 - 250, The test currently only asserts CacheUsage when tt.expected.CacheUsage is non-nil; add the complementary negative assertion so that when tt.expected.CacheUsage is nil the test fails if result.CacheUsage is non-nil. Concretely, in the block that inspects tt.expected.CacheUsage and result.CacheUsage (the code referencing tt.expected.CacheUsage and result.CacheUsage), add an else branch that checks if result.CacheUsage != nil and calls t.Errorf("expected CacheUsage nil, got %+v", result.CacheUsage) to catch unexpected cache data.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@streaming_test.go`:
- Around line 539-560: The test assumes startEvent.Message.Usage and
deltaEvent.Usage are non-nil and may panic if they are nil; update the
assertions around ParseAnthropicSSEEvent results (the startEvent and deltaEvent
variables produced by ParseAnthropicSSEEvent) to first check that
startEvent.Message and startEvent.Message.Usage are not nil before reading
InputTokens/CacheReadInputTokens, and similarly ensure deltaEvent.Usage is not
nil before asserting OutputTokens; if any of these are nil, call t.Fatalf or
t.Fatalf with a clear message so the test fails safely instead of panicking.
- Around line 477-487: Add a negative assertion to ensure CacheUsage is nil when
no cache tokens are expected: after the existing positive branch, add a
complementary branch for when tt.expectedCacheRead and tt.expectedCacheCreate
are both zero (or not > 0) that asserts result.CacheUsage == nil (use t.Fatal or
t.Errorf) so cases like a message_start event which should not set CacheUsage
fail if it is non-nil; reference result.CacheUsage, tt.expectedCacheRead and
tt.expectedCacheCreate to locate where to insert this check.
- Around line 244-250: The test currently only asserts CacheUsage when
tt.expected.CacheUsage is non-nil; add the complementary negative assertion so
that when tt.expected.CacheUsage is nil the test fails if result.CacheUsage is
non-nil. Concretely, in the block that inspects tt.expected.CacheUsage and
result.CacheUsage (the code referencing tt.expected.CacheUsage and
result.CacheUsage), add an else branch that checks if result.CacheUsage != nil
and calls t.Errorf("expected CacheUsage nil, got %+v", result.CacheUsage) to
catch unexpected cache data.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 36537963-bd42-47a8-89ab-5f5286a9d291
📒 Files selected for processing (1)
streaming_test.go
📜 Review details
🔇 Additional comments (6)
streaming_test.go (6)
9-84: LGTM!Good table-driven test coverage for the SSE parser including multi-line data aggregation, event types, and the OpenAI streaming format. The EOF handling and field comparisons are correct.
86-163: LGTM!Solid coverage of OpenAI SSE event parsing including the
[DONE]marker, empty input, and invalid JSON cases. Good use of early returns for special cases.
255-275: LGTM!Good coverage of content-type detection including the charset variant and negative cases.
277-306: LGTM!Covers both data-only and event+data SSE formatting scenarios correctly.
308-402: LGTM!Comprehensive coverage of Anthropic SSE event types with proper nil checks before field access.
459-464: Previous feedback addressed.The assertion now strictly requires
nilwhen both expected token values are zero, which is the correct contract.
Critical fixes: - ForwardStreaming now wraps upstream call with interceptor chain - Use HTTP trailers for billing headers in streaming (fixes timing) - SSE events now forwarded even when parsing fails - stream_options merged instead of overwritten Robustness improvements: - Add nil guards in BillingCalculator.Calculate - Increase SSEParser buffer to 1MB for large payloads - FormatSSEEvent splits multi-line data correctly - Add TotalTokens fallback in extractOpenAIStd - Support Anthropic ephemeral cache token fields Test improvements: - Remove unused mockExtractor embedding - Add cacheUsage nil assertion in tests - Add error checking for ParseAnthropicSSEEvent calls Dependency fix: - Run go mod tidy to mark simdjson-go as direct dependency
There was a problem hiding this comment.
🧹 Nitpick comments (1)
autorouter.go (1)
198-296: Consider extracting shared request preparation logic.The request preparation logic (body parsing, provider detection, URL resolution, request enrichment) is duplicated between
Forward(lines 79-164) andForwardStreaming(lines 198-296). Consider extracting this into a shared helper to reduce maintenance burden.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@autorouter.go` around lines 198 - 296, The forwarding logic in ForwardStreaming and Forward is duplicated; extract the shared steps (reading/parsing body, detecting provider via detector.Detect and modelProviderLookup, stripping provider prefix, adding billing stream include_usage, determining apiType with DetectAPITypeFromPath/DetectAPITypeFromBodyAndProvider, calling provider.BodyParser().Parse to get meta, setting meta.Custom fields, resolving upstreamURL with provider.URLResolver().Resolve, creating upstream *http.Request and copying headers, calling provider.RequestEnricher().Enrich, and attaching MetaContextValue) into a single helper (e.g., prepareUpstreamRequest or buildUpstreamRequest) that returns the resolved Provider, meta, body bytes, upstream *http.Request (or upstreamURL plus request body) and any error; then have Forward and ForwardStreaming call that helper and only perform stream-specific adjustments (like streaming headers or stream_options) afterwards.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@autorouter.go`:
- Around line 198-296: The forwarding logic in ForwardStreaming and Forward is
duplicated; extract the shared steps (reading/parsing body, detecting provider
via detector.Detect and modelProviderLookup, stripping provider prefix, adding
billing stream include_usage, determining apiType with
DetectAPITypeFromPath/DetectAPITypeFromBodyAndProvider, calling
provider.BodyParser().Parse to get meta, setting meta.Custom fields, resolving
upstreamURL with provider.URLResolver().Resolve, creating upstream *http.Request
and copying headers, calling provider.RequestEnricher().Enrich, and attaching
MetaContextValue) into a single helper (e.g., prepareUpstreamRequest or
buildUpstreamRequest) that returns the resolved Provider, meta, body bytes,
upstream *http.Request (or upstreamURL plus request body) and any error; then
have Forward and ForwardStreaming call that helper and only perform
stream-specific adjustments (like streaming headers or stream_options)
afterwards.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2c9cd9ea-426a-4605-8d59-899443745244
📒 Files selected for processing (9)
autorouter.goautorouter_test.gobilling_calculator.gogo.modinternal/fastjson/extractor.gointernal/fastjson/extractor_test.goproviders/anthropic/streaming_extractor.goproviders/openai_compatible/streaming_extractor.gostreaming.go
✅ Files skipped from review due to trivial changes (1)
- go.mod
🚧 Files skipped from review as they are similar to previous changes (3)
- autorouter_test.go
- internal/fastjson/extractor_test.go
- billing_calculator.go
📜 Review details
🔇 Additional comments (17)
streaming.go (4)
27-33: LGTM - Scanner buffer size issue addressed.The scanner now correctly uses
scanner.Buffer(make([]byte, 64*1024), 1024*1024)to handle large SSE payloads up to 1 MiB, matching the pattern used in provider streaming extractors.
35-89: LGTM - SSE parsing follows spec correctly.The
Next()method properly handles SSE field parsing, comment lines (:prefix), multi-linedata:aggregation with newline joining, and delivers events on blank lines per the SSE specification.
282-297: LGTM - Multi-line SSE data formatting issue addressed.The function now correctly splits data on newlines and emits each segment with its own
data:prefix, producing valid SSE frames for multi-line payloads.
91-280: LGTM - Usage extraction and event parsing.The streaming usage structs, event parsers, and extraction functions correctly handle both OpenAI and Anthropic formats with proper nil guards and cache usage extraction. The
[DONE]sentinel andErrStreamCompletepattern for OpenAI streams is appropriately implemented.internal/fastjson/extractor.go (3)
144-162: LGTM - TotalTokens consistency issue addressed.The
extractOpenAIStdpath now correctly backfillsTotalTokens(lines 150-152) when missing, matching the behavior ofextractOpenAISimd.
171-286: LGTM - Anthropic ephemeral cache fields now extracted.Both SIMD (lines 226-235) and stdlib (lines 256-257, 276-282) paths now correctly extract
ephemeral_5m_input_tokensandephemeral_1h_input_tokens, aligning with the Anthropic provider extractor behavior.
11-26: LGTM - UsageExtractor design.The dispatcher correctly gates SIMD parsing on both CPU support and a reasonable 1024-byte payload threshold, falling back to standard library JSON for smaller payloads where SIMD overhead wouldn't pay off.
providers/anthropic/streaming_extractor.go (3)
103-123: LGTM - Raw events now forwarded before parsing.The past issue about dropping events when parsing fails is addressed. The code now writes the raw
data:line to the client (lines 108-114) before attempting to parse (line 116), ensuring transparent forwarding even when metadata extraction fails.
35-77: LGTM - Non-streaming extraction with tee forwarding.The approach of teeing the body for simultaneous extraction and client delivery is correct, with proper EOF/context cancellation handling and flush-per-chunk writes.
79-218: LGTM - Anthropic streaming extraction implementation.The implementation correctly:
- Sets required SSE headers including
X-Accel-Buffering: nofor nginx proxies- Uses 1 MiB scanner buffer matching the SSEParser
- Accumulates usage across
message_startandmessage_deltaevents- Forwards raw
event:anddata:lines with correct SSE framing- Extracts cache usage into
meta.Custom["cache_usage"]providers/openai_compatible/streaming_extractor.go (2)
116-132: LGTM - Raw events now forwarded before parsing.The past issue about dropping events when parsing fails is addressed. The code now writes the raw line (lines 117-123) before attempting to parse (line 125), ensuring transparent forwarding regardless of parsing success.
80-193: LGTM - OpenAI-compatible streaming extraction.The implementation correctly handles:
- SSE headers and scanner buffer sizing
- The
[DONE]sentinel with explicit termination (lines 108-114)- Usage extraction from the final chunk (OpenAI sends complete usage in the last chunk with
stream_options.include_usage)- Fallback to
lastChunk.Choiceswhen no usage is accumulated (lines 176-190)autorouter.go (5)
241-252: LGTM - stream_options merge issue addressed.The code now correctly retrieves existing
stream_options(line 245), creates a new map only if absent (lines 246-248), and mergesinclude_usageinto it (line 250), preserving any caller-supplied keys.
298-312: LGTM - Interceptor chain now applied to streaming requests.The streaming path now properly wraps the upstream call with the interceptor chain (lines 299-312), matching the
Forwardmethod pattern.
323-363: LGTM - HTTP trailers used for billing headers.The implementation correctly declares trailers before
WriteHeader(line 325) and sets them after streaming completes (lines 359-361). This is the proper approach for sending metadata after the response body in chunked/streaming responses.
488-496:headerSent()always returnsfalsefor standardhttp.ResponseWriterimplementations.The standard
http.ResponseWriterinterface does not exposeWroteHeader(). This function will always returnfalse, causing the error path inServeHTTP(line 433) to callhttp.Error()after headers have already been written inForwardStreaming(line 334), appending error text to the stream instead of handling the failure gracefully.Consider tracking header state explicitly by wrapping
http.ResponseWriterwith a type that sets a flag inWriteHeaderand firstWrite.[duplicate_comment, raise_major_issue]
🔧 Suggested approach
type headerTrackingWriter struct { http.ResponseWriter headerWritten bool } func (w *headerTrackingWriter) WriteHeader(statusCode int) { w.headerWritten = true w.ResponseWriter.WriteHeader(statusCode) } func (w *headerTrackingWriter) Write(b []byte) (int, error) { w.headerWritten = true return w.ResponseWriter.Write(b) } func (w *headerTrackingWriter) Unwrap() http.ResponseWriter { return w.ResponseWriter }Then wrap
winServeHTTPand checkw.headerWritteninstead of callingheaderSent().
508-514: LGTM - Native streaming usage provider configuration.The map correctly identifies providers (Anthropic, Bedrock, GoogleAI) that include usage data in their streaming events without requiring
stream_options.include_usageinjection.
Previously both providers buffered entire responses before forwarding to client, defeating SSE streaming. Now: Resolvers: - Google AI: Use :streamGenerateContent?alt=sse for streaming - Bedrock: Use /converse-stream for streaming Streaming extractors: - Google AI: Incremental SSE parsing, forward each event immediately - Bedrock: Binary AWS event stream parsing, forward each message immediately Tests: - Verify incremental streaming (data arrives before stream completes) - Test metadata extraction from streams - Test resolver endpoint selection based on stream flag All 100+ tests pass.
Use thread-safe response writer with channel-based signaling instead of polling shared buffer. This eliminates the race condition where one goroutine writes to the buffer while another reads its length. - Added threadSafeResponseWriter with mutex-protected buffer - Use channel to signal first write time (eliminates race on time.Time) - Added Flush() method to satisfy http.Flusher interface - Both bedrock and googleai streaming tests now pass with -race
Summary
This PR adds comprehensive SSE streaming support with efficient billing extraction for all LLM providers.
Key Changes
SSE Streaming Support
SSEParserfor parsing Server-Sent EventsStreamingResponseExtractorinterface for all providershttp.ResponseControllerfor efficient flushing after each chunkEfficient Billing Extraction
simdjson-godependency for fast SIMD-accelerated JSON parsingUsageExtractorininternal/fastjsonfor streaming usage extractionAuto-injection of Stream Options
BillingCalculatoris configured and request hasstream: truestream_options: {"include_usage": true}to requestProvider Updates
StreamingExtractorfor streaming responsesFiles Changed
streaming.go- SSE parsing and usage extraction typesbilling_calculator.go- Reusable billing calculator for streaming/non-streaminginternal/fastjson/extractor.go- Fast JSON parsing with simdjsonautorouter.go-ForwardStreaming()method, auto stream_options injectionextractor.go-StreamingResponseExtractorinterfaceTests Added
Example Usage
Summary by CodeRabbit
New Features
Tests
Documentation