Skip to content

feat: add SSE streaming support with billing extraction#7

Merged
jhaynie merged 7 commits intomainfrom
feat/streaming-support
Apr 14, 2026
Merged

feat: add SSE streaming support with billing extraction#7
jhaynie merged 7 commits intomainfrom
feat/streaming-support

Conversation

@jhaynie
Copy link
Copy Markdown
Member

@jhaynie jhaynie commented Apr 14, 2026

Summary

This PR adds comprehensive SSE streaming support with efficient billing extraction for all LLM providers.

Key Changes

  1. SSE Streaming Support

    • Added SSEParser for parsing Server-Sent Events
    • Implemented StreamingResponseExtractor interface for all providers
    • Uses http.ResponseController for efficient flushing after each chunk
  2. Efficient Billing Extraction

    • Added simdjson-go dependency for fast SIMD-accelerated JSON parsing
    • Created UsageExtractor in internal/fastjson for streaming usage extraction
    • Extracts token usage from streaming responses without buffering entire body
  3. Auto-injection of Stream Options

    • When BillingCalculator is configured and request has stream: true
    • Automatically injects stream_options: {"include_usage": true} to request
    • Ensures OpenAI returns token usage in the final streaming chunk
  4. Provider Updates

    • All providers now use StreamingExtractor for streaming responses
    • Non-streaming responses use chunked read/write/flush pattern (512KB buffer)
    • Consistent behavior across OpenAI, Anthropic, Azure, Bedrock, GoogleAI, Perplexity

Files Changed

  • streaming.go - SSE parsing and usage extraction types
  • billing_calculator.go - Reusable billing calculator for streaming/non-streaming
  • internal/fastjson/extractor.go - Fast JSON parsing with simdjson
  • autorouter.go - ForwardStreaming() method, auto stream_options injection
  • extractor.go - StreamingResponseExtractor interface
  • All provider files updated with streaming extractors

Tests Added

  • SSE parser tests
  • Fast JSON extraction tests
  • Streaming extractor tests
  • Auto stream_options injection tests (4 test cases)

Example Usage

# Streaming now works with billing
curl -X POST http://localhost:8080/ \
  -H 'Content-Type: application/json' \
  -d '{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"Hello"}]}'
  
# Billing logged after stream completes:
# [INFO] Billing: provider=openai model=gpt-4 tokens=100/50 cost=$0.006000

Summary by CodeRabbit

  • New Features

    • SSE streaming support across providers with real-time forwarding, streaming-aware response handling, and post-stream billing calculation
    • Automatic token-usage extraction (including cache-aware counts) and a configurable billing calculator with result callback
    • Request enrichment to ensure stream usage reporting is enabled when billing is active, with provider-specific safeguards to avoid duplicates
  • Tests

    • Extensive unit tests and benchmarks for SSE parsing, streaming extractors, and billing logic
  • Documentation

    • README and design docs updated with streaming and billing guidance and examples

- Add streaming SSE parser for OpenAI and Anthropic responses
- Implement StreamingResponseExtractor interface for all providers
- Use http.ResponseController for efficient flushing
- Add simdjson-go for fast JSON parsing of usage data
- Auto-inject stream_options.include_usage for billing
- Extract token usage from streaming responses for cost calculation
- Add comprehensive unit tests for streaming functionality
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 14, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds end-to-end SSE streaming support: parser/formatters, streaming-capable response extractors and handlers, per-provider streaming extractor implementations, streaming-aware AutoRouter forwarding with optional BillingCalculator and usage injection, SIMD-accelerated usage extraction, tests, examples, and documentation updates.

Changes

Cohort / File(s) Summary
Core streaming primitives & tests
streaming.go, streaming_test.go
Add SSE parser/formatter, OpenAI/Anthropic stream types, streaming usage extraction helpers, IsSSEStream, ErrStreamComplete, and comprehensive unit tests validating parsing, formatting, and usage mapping.
Extraction interfaces & utilities
extractor.go
Introduce StreamingResponseExtractor and StreamingHandler abstractions, DefaultStreamingHandler, and TeeReader to support streaming extraction and teeing concurrent reads/writes.
AutoRouter streaming flow & tests
autorouter.go, autorouter_test.go
Add ForwardStreaming, streaming branch in ServeHTTP, WithAutoRouterBillingCalculator option and BillingCalculator() accessor, header-sent guard, nativeStreamUsageProviders map, conditional injection of stream_options.include_usage when billing is present, and tests covering injection and streaming paths.
Billing calculator & integration tests
billing_calculator.go, internal/fastjson/extractor_test.go
Add BillingCalculator type with constructor, Calculate logic resolving provider/model and cost lookup, stores billing_result in respMeta.Custom, invokes optional callback; tests verify calculation, callback invocation, and respMeta mutation.
Fast JSON usage extractor & tests
internal/fastjson/extractor.go, internal/fastjson/extractor_test.go
Add UsageExtractor with SIMD (simdjson) fast path and JSON fallback for OpenAI/Anthropic usage and cache extraction; include unit tests and benchmarks.
OpenAI-compatible streaming
providers/openai_compatible/streaming_extractor.go, providers/openai_compatible/streaming_extractor_test.go, providers/openai_compatible/multiapi.go, providers/openai_compatible/provider.go
Add StreamingExtractor and StreamingMultiAPIExtractor, SSE detection and streaming/non-stream tee fallbacks, chunked streaming to clients with flushes, and tests; update provider wiring to use streaming extractor variants.
Anthropic streaming
providers/anthropic/streaming_extractor.go, providers/anthropic/provider.go
Add Anthropic StreamingExtractor that handles SSE scanning/parsing and non-SSE teeing, accumulates metadata/usage (including cache usage), and switch provider wiring to streaming extractor.
Bedrock streaming
providers/bedrock/streaming_extractor.go, providers/bedrock/provider.go
Add Bedrock StreamingExtractor that tees for extraction and streams buffered content in chunks with flushes; switch provider wiring to streaming extractor.
Google AI streaming
providers/googleai/streaming_extractor.go, providers/googleai/provider.go
Add Google AI StreamingExtractor that buffers and streams while delegating parsing; switch provider wiring to streaming extractor.
Azure & Perplexity provider wiring
providers/azure/provider.go, providers/perplexity/provider.go
Switch Azure and Perplexity provider constructors to use the OpenAI-compatible streaming extractor variants.
Examples, docs & design
examples/basic/main.go, README.md, DESIGN.md
Update example startup logs and billing wiring to register BillingCalculator, add streaming curl examples, document automatic stream_options.include_usage injection when billing is enabled, and add streaming design notes.
Dependencies
go.mod
Introduce indirect dependencies required by SIMD JSON parsing and CPU detection: github.com/minio/simdjson-go, github.com/klauspost/compress, github.com/klauspost/cpuid/v2, and golang.org/x/sys.
🚥 Pre-merge checks | ✅ 1
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🧹 Nitpick comments (3)
go.mod (1)

9-13: Run go mod tidy here.

github.com/minio/simdjson-go is imported directly by internal/fastjson/extractor.go, so keeping Line 11 as // indirect will just cause avoidable go.mod churn on the next tidy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@go.mod` around lines 9 - 13, Run `go mod tidy` to ensure
github.com/minio/simdjson-go is recorded as a direct dependency (it’s directly
imported by internal/fastjson/extractor.go); this will update go.mod to remove
the // indirect marker for github.com/minio/simdjson-go and prevent churn on
future tidy runs.
autorouter_test.go (1)

78-82: Consider removing unused embedding.

The *mockExtractor embedding in mockStreamingExtractor appears unused since streaming tests only exercise ExtractStreamingWithController, and the Extract method fallback isn't tested. This could be simplified, though it's not blocking.

♻️ Simplify mock structure
 type mockStreamingExtractor struct {
-	*mockExtractor
 	isStreaming        bool
 	extractStreamingFn func(resp *http.Response, w http.ResponseWriter, rc *http.ResponseController) (ResponseMetadata, error)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@autorouter_test.go` around lines 78 - 82, The mockStreamingExtractor struct
currently embeds *mockExtractor which is unused by the streaming tests; remove
the embedding to simplify the mock type (delete the *mockExtractor field from
mockStreamingExtractor) and ensure tests and any usages reference
mockStreamingExtractor.ExtractStreamingWithController directly; if other tests
expect mockExtractor methods, either implement necessary methods on
mockStreamingExtractor or keep a named field (not embedded) to avoid accidental
method promotion from mockExtractor while preserving required behavior for
Extract and ExtractStreamingWithController.
internal/fastjson/extractor_test.go (1)

56-62: Consider validating cache is nil when not expected.

When expectedCached == 0, the test doesn't verify that cacheUsage is actually nil. This could mask unexpected cache values being returned.

♻️ Add nil check for non-cache cases
 			if tt.expectedCached > 0 {
 				if cacheUsage == nil {
 					t.Error("expected cache usage, got nil")
 				} else if cacheUsage.CachedTokens != tt.expectedCached {
 					t.Errorf("expected cached tokens %d, got %d", tt.expectedCached, cacheUsage.CachedTokens)
 				}
+			} else if cacheUsage != nil {
+				t.Errorf("expected nil cache usage, got %+v", cacheUsage)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/fastjson/extractor_test.go` around lines 56 - 62, The test currently
only checks cacheUsage when tt.expectedCached > 0; add an explicit assertion for
the opposite case so that when tt.expectedCached == 0 the test verifies
cacheUsage is nil. Update the test in extractor_test.go around the cacheUsage
checks (where tt.expectedCached and cacheUsage are used) to branch: if
tt.expectedCached > 0 keep the existing non-nil and CachedTokens checks,
otherwise assert cacheUsage == nil to surface unexpected cache values.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@autorouter.go`:
- Around line 287-293: ForwardStreaming currently calls a.client.Do directly and
bypasses the interceptor chain; change it to invoke a.interceptors the same way
Forward does by wrapping the actual upstream call in a handler passed to
a.interceptors. Specifically, in AutoRouter.ForwardStreaming(), set the
MetaContextValue on upstreamReq (using MetaContextKey{} and
upstreamReq.WithContext), then call a.interceptors(upstreamReq, handler) where
handler performs the a.client.Do(upstreamReq) and returns the response/error so
auth/logging/retry/policy interceptors run for streaming requests as well;
ensure the handler adapts to the same return types used by Forward (upstreamResp
and err) and preserves streaming semantics.
- Around line 296-325: The response is being committed too early by calling
w.WriteHeader(upstreamResp.StatusCode) before running
provider.ResponseExtractor/StreamingResponseExtractor
(ExtractStreamingWithController) and before billingCalculator.Calculate, which
prevents SSE headers and X-Gateway-* billing headers from being sent; remove or
move the WriteHeader call so the extractor or streamResponseWithFlush can
set/commit headers when ready (or use HTTP trailers for end-of-stream billing
values), and ensure implementations of ExtractStreamingWithController and
streamResponseWithFlush call w.WriteHeader/upstream status at the appropriate
time; update code around ResponseExtractor(), StreamingResponseExtractor,
ExtractStreamingWithController, and streamResponseWithFlush so headers/trailers
are emitted after streaming/billing decisions.
- Around line 241-244: The code currently overwrites any caller-provided
stream_options when billing is enabled; update the block in autorouter where
a.billingCalculator is checked so that if raw["stream"] is true you merge
include_usage: true into the existing raw["stream_options"] map (if present and
a map[string]any / map[string]interface{}) and only create a new map when
stream_options is missing or not a map, preserving any existing keys; target the
raw map manipulation around the "stream" / "stream_options" usage to implement
the merge.
- Around line 455-462: The headerSent function incorrectly assumes
ResponseWriter has WroteHeader; replace this with a ResponseWriter wrapper that
tracks whether headers have been sent by intercepting WriteHeader and Write
(e.g., create a type like headerTrackingResponseWriter that embeds
http.ResponseWriter and sets a sent flag in WriteHeader and on first Write if
WriteHeader was not called), use that wrapper where ForwardStreaming and related
handlers accept w so you can reliably check the sent flag, and change headerSent
to check that wrapper's flag (or, if the writer is not the tracked type,
conservatively assume headers were sent and bail out of error handling instead
of calling http.Error). Ensure you update any call sites that pass w (e.g.,
ForwardStreaming) to wrap the original http.ResponseWriter with
headerTrackingResponseWriter so headerSent can detect stream start.

In `@billing_calculator.go`:
- Around line 8-53: In Calculate, guard against nil c.lookup and nil respMeta
before any dereference and return nil for bad billing setup: check if c.lookup
== nil and return nil early; check if respMeta == nil and return nil early (so
subsequent accesses to respMeta.Custom and respMeta.Usage are safe). Also guard
the cacheUsage extraction by only reading respMeta.Custom when respMeta != nil,
and ensure you handle missing respMeta.Usage (or zero values) when calling
CalculateCost; keep existing onResult handling. Update the Calculate method (and
any calls to c.lookup(provider, ...)) to skip lookup invocation when c.lookup is
nil and to return nil instead of panicking.

In `@internal/fastjson/extractor.go`:
- Around line 167-267: The fastjson extractors (extractAnthropicSimd and
extractAnthropicStd) are not populating CacheUsage.Ephemeral5mInputTokens and
CacheUsage.Ephemeral1hInputTokens; update both extractAnthropicSimd and
extractAnthropicStd to read the same JSON keys used by the Anthropic provider
extractor and set CacheUsage.Ephemeral5mInputTokens and
CacheUsage.Ephemeral1hInputTokens accordingly (for simdjson, add cases in the
usage object loop to parse the ephemeral fields into cacheUsage; for the std
json path, add the fields to the resp.Usage struct and populate the CacheUsage
struct), matching the exact key names and types used in the existing Anthropic
extractor.
- Around line 144-157: extractOpenAIStd() currently leaves usage.TotalTokens at
0 when total_tokens is missing, causing inconsistent metadata vs
extractOpenAISimd(); update the logic in extractOpenAIStd() (same place where
usage is built) to mirror extractOpenAISimd() by setting usage.TotalTokens =
resp.Usage.TotalTokens if >0 else resp.Usage.PromptTokens +
resp.Usage.CompletionTokens, and ensure the same check for nil
PromptTokensDetails -> CacheUsage is applied so both SIMD and stdlib paths
produce identical Usage and CacheUsage values.

In `@providers/anthropic/streaming_extractor.go`:
- Around line 107-114: The code currently drops SSE "data:" chunks when
ParseAnthropicSSEEvent returns an error or nil; change the handling in
streaming_extractor.go so that if llmproxy.ParseAnthropicSSEEvent(data) returns
an error or nil you still forward the original data chunk downstream (i.e.,
write the raw data as you would for other events) and only skip metadata
extraction, rather than continue/return early; specifically update the block
around ParseAnthropicSSEEvent, the err check and the event==nil check to fall
through to the existing write/send logic when parsing fails, while preserving
the successful-path metadata handling when event != nil.

In `@providers/bedrock/streaming_extractor.go`:
- Around line 27-72: The current extractNonStreamingWithController calls
e.Extractor.Extract with a TeeReader that forces Extract to consume the entire
resp.Body before any bytes are written to the client; change this to stream
concurrently by wiring resp.Body into an io.Pipe and using an io.MultiWriter to
forward bytes to both the pipe (for e.Extractor.Extract) and the HTTP client
(w), flushing via rc.Flush after writes. Concretely: in
extractNonStreamingWithController create pr, pw := io.Pipe(), start a goroutine
that io.Copy(io.MultiWriter(pw, &clientWriter{w, rc}), resp.Body) and closes pw
on completion/err; call e.Extractor.Extract with a synthetic *http.Response
whose Body is io.NopCloser(pr) so Extract can parse incrementally; propagate
errors from the copy goroutine and ensure pw/resp.Body are closed and rc.Flush
is called after each client write. Use the symbols
extractNonStreamingWithController, e.Extractor.Extract, resp.Body, rc.Flush and
ResponseWriter w to locate and implement the change.

In `@providers/googleai/streaming_extractor.go`:
- Around line 27-72: The current ExtractStreamingWithController routes all
responses into extractNonStreamingWithController which drains resp.Body into a
bytes.Buffer via io.TeeReader and calls Extractor.Extract on a fully buffered
body, preventing SSE/incremental events from being forwarded; change the
implementation to stream: read resp.Body in small chunks, immediately write each
chunk to w and call rc.Flush(), and simultaneously feed those chunks into the
extraction path incrementally (e.g., replace the full-body Extractor.Extract
call with an incremental extraction that consumes the same streaming bytes—use
an io.Pipe or a custom io.Writer that passes chunks to Extractor while you
Write/Flush to w) so Extractor.Extract (or its streaming variant) processes
bytes as they arrive rather than after the entire body is buffered.

In `@providers/openai_compatible/streaming_extractor.go`:
- Around line 116-123: The code currently skips forwarding the raw SSE line
whenever llmproxy.ParseOpenAISSEEvent(data) returns an error or chunk==nil;
change this so parsing is best-effort but the raw event is always forwarded.
Specifically, in the block around ParseOpenAISSEEvent(data) and the variables
chunk/err, remove the early continue on err and on chunk==nil and instead call
the existing upstream write method (e.g., upstream.Write/WriteLine/WriteString
used elsewhere in this file) to forward the original data payload before
continuing; keep any logging of the parse error but do not drop the raw event.
- Around line 91-92: Replace the bufio.Scanner-based loop in
streaming_extractor.go (where scanner := bufio.NewScanner(resp.Body) and
scanner.Buffer(...) are used) with the existing SSEParser from streaming.go:
construct an SSEParser over resp.Body and call SSEParser.Next() to iterate SSE
events, handling each event's data field instead of scanning lines; update any
logic that expects scanner.Text() to use the event payload returned by
SSEParser.Next() and remove the scanner.Buffer code to avoid bufio.ErrTooLong
for large single-line JSON payloads (keep existing resp.Body closing and error
handling behavior).

In `@streaming.go`:
- Around line 280-290: FormatSSEEvent currently writes the whole data blob as a
single "data: " line which breaks SSE when data contains newlines; modify
FormatSSEEvent to split the data (byte slice) on '\n' and write a separate
"data: " line for each segment (including empty segments for
trailing/newline-only lines) to the bytes.Buffer, preserving the existing
"event: " header when event is non-empty and ending the frame with the required
blank line; use the FormatSSEEvent function name and the existing bytes.Buffer
to locate and update the logic.
- Around line 27-30: NewSSEParser currently constructs a bufio.Scanner with the
default 64 KiB limit which can cause bufio.ErrTooLong for large LLM SSE data
lines; update NewSSEParser to call scanner.Buffer(make([]byte, 64*1024),
1024*1024) on the created scanner (use the existing scanner variable) so the
maximum token size is increased to 1 MiB, keeping the rest of the SSEParser
construction the same.

---

Nitpick comments:
In `@autorouter_test.go`:
- Around line 78-82: The mockStreamingExtractor struct currently embeds
*mockExtractor which is unused by the streaming tests; remove the embedding to
simplify the mock type (delete the *mockExtractor field from
mockStreamingExtractor) and ensure tests and any usages reference
mockStreamingExtractor.ExtractStreamingWithController directly; if other tests
expect mockExtractor methods, either implement necessary methods on
mockStreamingExtractor or keep a named field (not embedded) to avoid accidental
method promotion from mockExtractor while preserving required behavior for
Extract and ExtractStreamingWithController.

In `@go.mod`:
- Around line 9-13: Run `go mod tidy` to ensure github.com/minio/simdjson-go is
recorded as a direct dependency (it’s directly imported by
internal/fastjson/extractor.go); this will update go.mod to remove the //
indirect marker for github.com/minio/simdjson-go and prevent churn on future
tidy runs.

In `@internal/fastjson/extractor_test.go`:
- Around line 56-62: The test currently only checks cacheUsage when
tt.expectedCached > 0; add an explicit assertion for the opposite case so that
when tt.expectedCached == 0 the test verifies cacheUsage is nil. Update the test
in extractor_test.go around the cacheUsage checks (where tt.expectedCached and
cacheUsage are used) to branch: if tt.expectedCached > 0 keep the existing
non-nil and CachedTokens checks, otherwise assert cacheUsage == nil to surface
unexpected cache values.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8f349475-c48e-47d1-809c-a9ab7689d910

📥 Commits

Reviewing files that changed from the base of the PR and between b1ae20e and 7aba7c0.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (22)
  • autorouter.go
  • autorouter_test.go
  • billing_calculator.go
  • examples/basic/main.go
  • extractor.go
  • go.mod
  • internal/fastjson/extractor.go
  • internal/fastjson/extractor_test.go
  • providers/anthropic/provider.go
  • providers/anthropic/streaming_extractor.go
  • providers/azure/provider.go
  • providers/bedrock/provider.go
  • providers/bedrock/streaming_extractor.go
  • providers/googleai/provider.go
  • providers/googleai/streaming_extractor.go
  • providers/openai_compatible/multiapi.go
  • providers/openai_compatible/provider.go
  • providers/openai_compatible/streaming_extractor.go
  • providers/openai_compatible/streaming_extractor_test.go
  • providers/perplexity/provider.go
  • streaming.go
  • streaming_test.go
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (17)
providers/azure/provider.go (1)

69-69: LGTM!

The Azure provider is correctly updated to use the new NewStreamingExtractor() for streaming-capable response extraction, consistent with the PR's streaming support additions. Both constructor functions (New and NewWithDynamicDeployment) are updated uniformly.

Also applies to: 95-95

providers/anthropic/provider.go (1)

21-21: LGTM!

The Anthropic provider is correctly updated to use NewStreamingExtractor() for streaming-aware response extraction, consistent with other providers in this PR.

Also applies to: 37-37

examples/basic/main.go (2)

221-226: LGTM!

Good refactoring to extract the billing callback into a reusable variable. The dual registration pattern (interceptor for non-streaming + billing calculator for streaming) correctly handles both response paths with consistent billing logic.


273-289: LGTM!

Helpful streaming examples added to the startup logs, demonstrating the new SSE streaming capabilities with proper stream_options.include_usage usage.

autorouter_test.go (3)

420-487: LGTM!

Good test coverage for the stream_options.include_usage auto-injection behavior when billing calculator is configured. The test correctly verifies that streaming requests get the option injected into the upstream request body.


489-556: LGTM!

Important test case verifying that user-specified stream_options.include_usage=false is correctly overridden to true when billing is enabled, ensuring billing data is always available for cost tracking.


558-668: LGTM!

Good negative test cases ensuring stream_options is only injected when appropriate:

  • Not injected without billing calculator (lines 558-613)
  • Not injected for non-streaming requests (lines 615-668)
internal/fastjson/extractor_test.go (1)

145-197: LGTM!

Good integration test for BillingCalculator covering token extraction, cost calculation formula, callback invocation, and metadata storage.

providers/openai_compatible/streaming_extractor_test.go (3)

14-61: LGTM!

Comprehensive test for SSE stream extraction covering multi-chunk parsing, metadata extraction (ID, Model), usage/token counting, and verifying the SSE framing is preserved in the output.


89-109: LGTM!

Good fallback test ensuring non-streaming JSON responses are handled gracefully even when using the streaming extractor.


111-147: LGTM!

Important test validating cache token extraction from streaming responses and proper storage in the Custom["cache_usage"] map.

providers/openai_compatible/multiapi.go (2)

79-100: LGTM!

Clean implementation of StreamingMultiAPIExtractor with proper routing between SSE streaming and non-streaming paths based on content-type detection.


102-144: LGTM!

The extractNonStreamingWithController correctly uses TeeReader to capture the response body during metadata extraction, then writes it to the client in 512KB chunks with flushes for responsive delivery. Error handling covers EOF, context cancellation, and write failures appropriately.

streaming_test.go (3)

9-84: LGTM!

Comprehensive SSE parser tests covering single events, event types, multiple events, multiline data aggregation, and OpenAI-style streaming format with [DONE] terminator.


145-233: LGTM!

Thorough tests for ExtractUsageFromOpenAIChunk with good null-safety coverage (nil chunk, missing usage) and validation of cache token extraction via PromptTokensDetails.


257-286: LGTM!

Good tests for FormatSSEEvent validating correct SSE wire format with and without event type prefix.

providers/anthropic/streaming_extractor.go (1)

90-91: The Anthropic stream parser has a 1 MiB buffer ceiling that affects both anthropic and openai_compatible extractors.

The scanner.Buffer() configuration at line 91 limits SSE lines to 1 MiB. Any line exceeding this will cause bufio.ErrTooLong and terminate the stream. This constraint applies identically in providers/openai_compatible/streaming_extractor.go at line 92.

Comment thread autorouter.go
Comment thread autorouter.go
Comment thread autorouter.go
Comment thread autorouter.go
Comment on lines +455 to +462
func headerSent(w http.ResponseWriter) bool {
type headerChecker interface {
WroteHeader() bool
}
if hc, ok := w.(headerChecker); ok {
return hc.WroteHeader()
}
return false
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's find and examine the headerSent function in context
cd /repo && cat -n autorouter.go | sed -n '450,470p'

Repository: agentuity/llmproxy

Length of output: 117


🏁 Script executed:

# Search for uses of headerSent to understand the impact
cd /repo && rg "headerSent" -B 3 -A 3

Repository: agentuity/llmproxy

Length of output: 117


🏁 Script executed:

# Check if there's any import or usage of http.Error
cd /repo && rg "http\.Error" -B 2 -A 2

Repository: agentuity/llmproxy

Length of output: 117


🏁 Script executed:

# Let's verify the Go http.ResponseWriter interface definition
python3 << 'EOF'
import subprocess
import re

# The Go http.ResponseWriter interface is part of the standard library
# Let's check what methods it actually exposes
result = subprocess.run(['go', 'doc', 'net/http.ResponseWriter'], 
                       capture_output=True, text=True)
print("=== net/http.ResponseWriter interface ===")
print(result.stdout)
print(result.stderr)
EOF

Repository: agentuity/llmproxy

Length of output: 3611


🏁 Script executed:

# Also check if http.response (the concrete type) has WroteHeader
python3 << 'EOF'
import subprocess

# Try to get info on http.response internal type
result = subprocess.run(['go', 'doc', 'net/http.response'], 
                       capture_output=True, text=True)
print("=== net/http.response ===")
print(result.stdout)
if result.stderr:
    print("STDERR:", result.stderr)
    
# Let's try another approach - check Go source for WroteHeader
result2 = subprocess.run(['grep', '-r', 'WroteHeader', '/usr/lib/go'], 
                        capture_output=True, text=True)
print("\n=== WroteHeader in Go stdlib ===")
print(result2.stdout[:2000] if result2.stdout else "Not found in /usr/lib/go")
EOF

Repository: agentuity/llmproxy

Length of output: 4225


🏁 Script executed:

git ls-files | head -20

Repository: agentuity/llmproxy

Length of output: 372


🏁 Script executed:

# Find autorouter.go
find . -name "autorouter.go" -o -name "*.go" | head -20

Repository: agentuity/llmproxy

Length of output: 509


🏁 Script executed:

# Search for WroteHeader usage in the codebase
rg "WroteHeader" -B 3 -A 3

Repository: agentuity/llmproxy

Length of output: 387


🏁 Script executed:

# Search for headerSent function and its usage
rg "headerSent" -B 5 -A 5

Repository: agentuity/llmproxy

Length of output: 814


🏁 Script executed:

# Search for http.Error usage to understand the context of the risk
rg "http\.Error" -B 3 -A 3

Repository: agentuity/llmproxy

Length of output: 1253


🏁 Script executed:

# Let's check the Go http package source to see if http.response has WroteHeader
python3 << 'EOF'
import subprocess
import os

# Find the Go stdlib location
result = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True)
goroot = result.stdout.strip()
print(f"GOROOT: {goroot}")

# Check the net/http/server.go file for the response type implementation
server_path = os.path.join(goroot, 'src', 'net', 'http', 'server.go')
if os.path.exists(server_path):
    result = subprocess.run(['grep', '-n', 'WroteHeader', server_path], 
                           capture_output=True, text=True)
    if result.stdout:
        print("Found WroteHeader in server.go:")
        print(result.stdout)
    else:
        print("WroteHeader not found in server.go")
        
    # Also search for type response and its methods
    result2 = subprocess.run(['grep', '-n', 'func (w \*response)', server_path], 
                            capture_output=True, text=True, timeout=10)
    lines = result2.stdout.split('\n')[:30]
    print("\nMethods on *response type (first 30):")
    print('\n'.join(lines))
else:
    print(f"server.go not found at {server_path}")
EOF

Repository: agentuity/llmproxy

Length of output: 1297


🏁 Script executed:

# Let's examine the autorouter.go file more closely to understand the full context
cat -n autorouter.go | sed -n '440,475p'

Repository: agentuity/llmproxy

Length of output: 929


🏁 Script executed:

# Let's also check if there are any tests that might show how headerSent is expected to work
cat -n autorouter_test.go | grep -B 5 -A 5 "headerSent\|streaming" || echo "No matches found"

Repository: agentuity/llmproxy

Length of output: 2426


🏁 Script executed:

# Let's check if there's a way to detect if headers were sent in the standard library
# by examining the response type structure
python3 << 'EOF'
import subprocess
import os

goroot = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True).stdout.strip()
server_path = os.path.join(goroot, 'src', 'net', 'http', 'server.go')

# Search for any field or tracking of header being written
result = subprocess.run(['grep', '-n', 'wroteHeader\|headerWritten', server_path], 
                       capture_output=True, text=True)
if result.stdout:
    print("Found header tracking fields:")
    print(result.stdout[:1500])
else:
    print("No wroteHeader/headerWritten fields found")

# Also check for write method behavior
result2 = subprocess.run(['grep', '-B 3', '-A 10', 'func (w \*response) Write', server_path],
                        capture_output=True, text=True)
print("\nWrite method implementation:")
print(result2.stdout[:1500])
EOF

Repository: agentuity/llmproxy

Length of output: 2117


🏁 Script executed:

# Let's check how http.Error actually behaves and what happens when called after writing
cat > /tmp/test_error.go << 'EOF'
package main

import (
	"net/http"
	"net/http/httptest"
)

func main() {
	// Simulate what happens with standard http.ResponseWriter
	w := httptest.NewRecorder()
	
	// Simulate streaming where some data was already written
	w.Write([]byte("some data"))
	
	// Now try to write an error
	http.Error(w, "error message", http.StatusInternalServerError)
	
	// Check what happened
	println("Status:", w.Code)
	println("Body length:", len(w.Body.String()))
	println("Body:", w.Body.String())
}
EOF
go run /tmp/test_error.go 2>&1 || echo "Test execution note"

Repository: agentuity/llmproxy

Length of output: 119


🏁 Script executed:

# Let's look for how httptest.ResponseRecorder is used in tests to understand the issue better
rg "httptest\|ResponseRecorder" -B 2 -A 2 autorouter_test.go | head -50

Repository: agentuity/llmproxy

Length of output: 44


🏁 Script executed:

# Check if http.ResponseWriter implementations in this codebase (or standard lib) implement WroteHeader
python3 << 'EOF'
import subprocess

# Try to find if any implementation defines WroteHeader
goroot = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True).stdout.strip()

# Search for WroteHeader definition anywhere in the stdlib
result = subprocess.run(['grep', '-r', 'func.*WroteHeader', goroot + '/src/net/http/'],
                       capture_output=True, text=True)

if result.stdout:
    print("Found WroteHeader implementations:")
    print(result.stdout)
else:
    print("WroteHeader not found in net/http package")
    
# Let's also check Go version - WroteHeader was added in Go 1.14
print("\nChecking Go version and http features...")
result2 = subprocess.run(['go', 'version'], capture_output=True, text=True)
print(result2.stdout)
EOF

Repository: agentuity/llmproxy

Length of output: 333


🏁 Script executed:

# Let's check the actual behavior - read the response type structure
python3 << 'EOF'
import subprocess
import os

goroot = subprocess.run(['go', 'env', 'GOROOT'], capture_output=True, text=True).stdout.strip()
server_path = os.path.join(goroot, 'src', 'net', 'http', 'server.go')

# Find the type response definition
result = subprocess.run(['grep', '-B 5', '-A 50', 'type response struct', server_path],
                       capture_output=True, text=True)
print("Response struct definition:")
print(result.stdout[:2000])
EOF

Repository: agentuity/llmproxy

Length of output: 2090


Fix headerSent() to properly detect when response headers have been written to prevent appending error messages to already-started streams.

The standard http.ResponseWriter interface does not expose WroteHeader(), so this function always returns false for standard library implementations. When ForwardStreaming fails after partial output, calling http.Error() will write an error message body into an already-started response, corrupting the stream output.

Recommended approaches:

  • Track header writes by wrapping the http.ResponseWriter
  • Bail out of error handling entirely if streaming has started (no recovery possible)
  • Use a different ResponseWriter that exposes header state (e.g., httputil.ResponseWriter or similar)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@autorouter.go` around lines 455 - 462, The headerSent function incorrectly
assumes ResponseWriter has WroteHeader; replace this with a ResponseWriter
wrapper that tracks whether headers have been sent by intercepting WriteHeader
and Write (e.g., create a type like headerTrackingResponseWriter that embeds
http.ResponseWriter and sets a sent flag in WriteHeader and on first Write if
WriteHeader was not called), use that wrapper where ForwardStreaming and related
handlers accept w so you can reliably check the sent flag, and change headerSent
to check that wrapper's flag (or, if the writer is not the tracked type,
conservatively assume headers were sent and bail out of error handling instead
of calling http.Error). Ensure you update any call sites that pass w (e.g.,
ForwardStreaming) to wrap the original http.ResponseWriter with
headerTrackingResponseWriter so headerSent can detect stream start.

Comment thread billing_calculator.go
Comment thread providers/googleai/streaming_extractor.go
Comment on lines +91 to +92
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the streaming_extractor.go file around lines 91-92
cat -n providers/openai_compatible/streaming_extractor.go | head -120 | tail -50

Repository: agentuity/llmproxy

Length of output: 1650


🏁 Script executed:

# Get the full structure of the file to understand the context
wc -l providers/openai_compatible/streaming_extractor.go

Repository: agentuity/llmproxy

Length of output: 116


🏁 Script executed:

# Search for other SSE parser implementations in the codebase
fd -e go -e ts -e js | xargs rg -l "SSE|Server.Sent.Event|bufio\.Scanner|bufio\.Reader.*ReadBytes" | head -20

Repository: agentuity/llmproxy

Length of output: 365


🏁 Script executed:

# Read the full streaming_extractor.go to see error handling
cat -n providers/openai_compatible/streaming_extractor.go

Repository: agentuity/llmproxy

Length of output: 5781


🏁 Script executed:

# Check if scanner errors are handled
rg "scanner\.(Err|Scan)" providers/openai_compatible/streaming_extractor.go -A 2 -B 1

Repository: agentuity/llmproxy

Length of output: 176


🏁 Script executed:

# Look at other streaming extractors to see if they use similar patterns
cat providers/anthropic/streaming_extractor.go | head -150

Repository: agentuity/llmproxy

Length of output: 3979


🏁 Script executed:

# Search for any SSE parsing utilities in the codebase
rg "ParseOpenAISSEEvent|SSE|EventStream" -t go --max-count 5

Repository: agentuity/llmproxy

Length of output: 1343


🏁 Script executed:

# Check the SSEParser implementation to see if it avoids the buffer size issue
cat streaming.go

Repository: agentuity/llmproxy

Length of output: 7283


🏁 Script executed:

# Look for the llmproxy package structure
fd -type f streaming.go | xargs ls -la

Repository: agentuity/llmproxy

Length of output: 2630


🏁 Script executed:

# Check tool/function-call delta sizes in OpenAI documentation or examples
rg "tool_calls|function_call" -t go -A 2 -B 2 | head -40

Repository: agentuity/llmproxy

Length of output: 592


Replace bufio.Scanner with SSEParser to handle multi-line SSE data fields.

The current implementation uses bufio.Scanner with a 1 MiB max token size (line 92), which will abort the stream with bufio.ErrTooLong if a single data: line exceeds this limit. This is a real risk for large tool call or function call responses where the entire JSON payload sits on one line.

The codebase already provides SSEParser (in streaming.go) which correctly handles multi-line SSE data fields and avoids this single-line size limitation. Switch to using SSEParser.Next() instead of the line-by-line scanner approach, similar to how other streaming extractors could benefit.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@providers/openai_compatible/streaming_extractor.go` around lines 91 - 92,
Replace the bufio.Scanner-based loop in streaming_extractor.go (where scanner :=
bufio.NewScanner(resp.Body) and scanner.Buffer(...) are used) with the existing
SSEParser from streaming.go: construct an SSEParser over resp.Body and call
SSEParser.Next() to iterate SSE events, handling each event's data field instead
of scanning lines; update any logic that expects scanner.Text() to use the event
payload returned by SSEParser.Next() and remove the scanner.Buffer code to avoid
bufio.ErrTooLong for large single-line JSON payloads (keep existing resp.Body
closing and error handling behavior).

Comment thread providers/openai_compatible/streaming_extractor.go
Comment thread streaming.go
Comment thread streaming.go
…treaming tests

Replace whitelist-based provider switch with exclusion list approach for
stream_options injection. Providers with native streaming usage reporting
(Anthropic, Bedrock, Google AI) are excluded; all others automatically get
stream_options injected. This is more extensible for new providers.

Also adds comprehensive Anthropic streaming tests:
- TestParseAnthropicSSEEvent
- TestExtractUsageFromAnthropicEvent
- TestAnthropicSSEParser (full event stream)
- TestAutoRouter_AnthropicStreamingNoStreamOptions
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
DESIGN.md (2)

417-461: Add language specifier to ASCII diagram code block.

The streaming flow diagram is in a fenced code block without a language identifier. Add text or leave the language empty with just triple backticks for better rendering.

📝 Proposed fix
-```
+```text
 +------------------+
 | Incoming Request |
 |  stream: true    |

As per coding guidelines, the static analysis tool flagged: "Fenced code blocks should have a language specified (MD040)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@DESIGN.md` around lines 417 - 461, Add a language identifier to the fenced
ASCII diagram block (e.g., change the opening triple-backticks to ```text) so
the diagram is treated as plain text by the markdown linter; locate the fenced
block that contains the diagram with labels like "Incoming Request", "AutoRouter
/ ForwardStreaming", "StreamingExtractor", and "BillingCalculator" and update
its opening fence to include the language specifier (or an explicit "text") to
satisfy MD040.

405-410: Add language specifier to code block.

The interface definition code block is missing a language identifier, which affects markdown rendering and syntax highlighting. Add go as the language specifier.

📝 Proposed fix
-```
+```go
 type StreamingResponseExtractor interface {
     ResponseExtractor
     ExtractStreamingWithController(resp, w, rc) -> (ResponseMetadata, error)
     IsStreamingResponse(resp) -> bool
 }

</details>

As per coding guidelines, the static analysis tool flagged: "Fenced code blocks should have a language specified (MD040)".

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @DESIGN.md around lines 405 - 410, Add the missing language specifier for the
fenced code block that contains the StreamingResponseExtractor interface
definition so Markdown renders with Go syntax highlighting; update the fence
from togo for the block that includes the StreamingResponseExtractor,
ResponseExtractor, ExtractStreamingWithController, and IsStreamingResponse
identifiers.


</details>

</blockquote></details>

</blockquote></details>

<details>
<summary>🤖 Prompt for all review comments with AI agents</summary>

Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In @DESIGN.md:

  • Around line 417-461: Add a language identifier to the fenced ASCII diagram
    block (e.g., change the opening triple-backticks to ```text) so the diagram is
    treated as plain text by the markdown linter; locate the fenced block that
    contains the diagram with labels like "Incoming Request", "AutoRouter /
    ForwardStreaming", "StreamingExtractor", and "BillingCalculator" and update its
    opening fence to include the language specifier (or an explicit "text") to
    satisfy MD040.
  • Around line 405-410: Add the missing language specifier for the fenced code
    block that contains the StreamingResponseExtractor interface definition so
    Markdown renders with Go syntax highlighting; update the fence from togo
    for the block that includes the StreamingResponseExtractor, ResponseExtractor,
    ExtractStreamingWithController, and IsStreamingResponse identifiers.

</details>

---

<details>
<summary>ℹ️ Review info</summary>

<details>
<summary>⚙️ Run configuration</summary>

**Configuration used**: Organization UI

**Review profile**: CHILL

**Plan**: Pro

**Run ID**: `13d38219-4a65-41e6-92ac-8424714a25ba`

</details>

<details>
<summary>📥 Commits</summary>

Reviewing files that changed from the base of the PR and between 7aba7c0236edcd888df9ccc49214f31cbe2b76d0 and bbefc9c83e46bf6d4bec6bece158600194abb688.

</details>

<details>
<summary>📒 Files selected for processing (2)</summary>

* `DESIGN.md`
* `README.md`

</details>

<details>
<summary>✅ Files skipped from review due to trivial changes (1)</summary>

* README.md

</details>

</details>

<details>
<summary>📜 Review details</summary>

<details>
<summary>🧰 Additional context used</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.0)</summary>

<details>
<summary>DESIGN.md</summary>

[warning] 405-405: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

---

[warning] 417-417: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

</details>

<details>
<summary>🔇 Additional comments (2)</summary><blockquote>

<details>
<summary>DESIGN.md (2)</summary><blockquote>

`397-492`: **LGTM! Clear and accurate streaming documentation.**

The new Streaming section provides comprehensive documentation of the SSE streaming support. The interface definitions, flow diagrams, and examples accurately reflect the implementation in the codebase, including the automatic `stream_options` injection behavior when billing is enabled.

---

`1019-1062`: **LGTM! Directory structure accurately reflects new files.**

The directory structure updates correctly document the new streaming-related files and their purposes, including the billing calculator, streaming utilities, fast JSON extraction, and provider-specific streaming extractors.

</blockquote></details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
streaming_test.go (1)

138-140: Replace the no-op with real parse assertions.

Line 139 currently only references chunk.ID without validating behavior. This can let parsing regressions pass unnoticed.

Proposed test assertion upgrade
-			if chunk != nil && tt.input != nil && len(tt.input) > 0 {
-				_ = chunk.ID
-			}
+			if len(tt.input) == 0 {
+				if chunk != nil {
+					t.Error("expected nil chunk for empty input")
+				}
+				return
+			}
+
+			if !tt.expectError && !tt.expectDone {
+				if chunk == nil {
+					t.Fatal("expected non-nil chunk")
+				}
+				if chunk.ID == "" {
+					t.Error("expected parsed chunk ID")
+				}
+			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@streaming_test.go` around lines 138 - 140, The current no-op "_ = chunk.ID"
allows regressions; replace it with real assertions that validate parsing
behavior: assert chunk is non-nil when tt.input is non-empty, assert chunk.ID is
non-empty (or equals an expected value provided on the test case), and assert
other parsed fields (e.g., chunk.Content or chunk.Text) match the expected
values you add to the test case struct; for cases where tt.input is empty assert
chunk is nil. Use the existing identifiers chunk and tt.input and add per-test
expected fields (like expectedID/expectedContent) to drive the assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@streaming_test.go`:
- Around line 519-532: The test currently calls ParseAnthropicSSEEvent on
events[0].Data and events[5].Data and immediately dereferences startEvent and
deltaEvent without checking the returned error; update the test to capture and
check the error from ParseAnthropicSSEEvent for both calls (e.g., if err != nil
{ t.Fatalf("ParseAnthropicSSEEvent failed for events[%d]: %v", idx, err) })
before accessing fields like Message.Usage or Usage.OutputTokens so the test
fails cleanly and avoids panics when parsing fails.
- Around line 439-444: The check that accepts a non-nil zero-valued usage when
tt.expectedPrompt==0 && tt.expectedCompletion==0 is too lax; instead require
result to be strictly nil for the nil-expected cases. Update the branch in
streaming_test.go so that when tt.expectedPrompt == 0 && tt.expectedCompletion
== 0 you assert result == nil (and error if not), rather than allowing a
zero-valued result with PromptTokens/CompletionTokens == 0; use the existing
identifiers (tt.expectedPrompt, tt.expectedCompletion, result,
result.PromptTokens, result.CompletionTokens) to locate and tighten the
assertion.

---

Nitpick comments:
In `@streaming_test.go`:
- Around line 138-140: The current no-op "_ = chunk.ID" allows regressions;
replace it with real assertions that validate parsing behavior: assert chunk is
non-nil when tt.input is non-empty, assert chunk.ID is non-empty (or equals an
expected value provided on the test case), and assert other parsed fields (e.g.,
chunk.Content or chunk.Text) match the expected values you add to the test case
struct; for cases where tt.input is empty assert chunk is nil. Use the existing
identifiers chunk and tt.input and add per-test expected fields (like
expectedID/expectedContent) to drive the assertions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3ea95a6f-5294-4af8-88a5-a8944b182f5d

📥 Commits

Reviewing files that changed from the base of the PR and between bbefc9c and 35b9213.

📒 Files selected for processing (3)
  • autorouter.go
  • autorouter_test.go
  • streaming_test.go
✅ Files skipped from review due to trivial changes (1)
  • autorouter.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • autorouter_test.go
📜 Review details
🔇 Additional comments (1)
streaming_test.go (1)

9-84: Good coverage for core SSE parser framing paths.

This table set (single/multi-event, multiline data, and [DONE]) gives a solid baseline for parser behavior.

Comment thread streaming_test.go
Comment thread streaming_test.go Outdated
- Add error checking for ParseAnthropicSSEEvent calls to prevent panics
- Require strict nil result for expected-nil test cases
- Replace no-op _ = chunk.ID with real assertions for ID and content
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
streaming_test.go (3)

539-560: Previous error-handling feedback addressed; minor nil-check robustness gap remains.

Error handling was properly added per past feedback. However, if the test input or implementation changes, accessing startEvent.Message.Usage (line 543) or deltaEvent.Usage (line 558) could panic without prior nil checks. This is low risk since the test input is controlled, but adding guards would make the test more resilient.

Optional defensive nil checks
 	startEvent, err := ParseAnthropicSSEEvent(events[0].Data)
 	if err != nil {
 		t.Fatalf("ParseAnthropicSSEEvent failed for events[0]: %v", err)
 	}
+	if startEvent == nil || startEvent.Message == nil || startEvent.Message.Usage == nil {
+		t.Fatal("expected message_start with usage payload")
+	}
 	if startEvent.Message.Usage.InputTokens != 150 {
 	deltaEvent, err := ParseAnthropicSSEEvent(events[5].Data)
 	if err != nil {
 		t.Fatalf("ParseAnthropicSSEEvent failed for events[5]: %v", err)
 	}
+	if deltaEvent == nil || deltaEvent.Usage == nil {
+		t.Fatal("expected message_delta with usage payload")
+	}
 	if deltaEvent.Usage.OutputTokens != 25 {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@streaming_test.go` around lines 539 - 560, The test assumes
startEvent.Message.Usage and deltaEvent.Usage are non-nil and may panic if they
are nil; update the assertions around ParseAnthropicSSEEvent results (the
startEvent and deltaEvent variables produced by ParseAnthropicSSEEvent) to first
check that startEvent.Message and startEvent.Message.Usage are not nil before
reading InputTokens/CacheReadInputTokens, and similarly ensure deltaEvent.Usage
is not nil before asserting OutputTokens; if any of these are nil, call t.Fatalf
or t.Fatalf with a clear message so the test fails safely instead of panicking.

477-487: Consider adding negative assertion for CacheUsage.

Similar to the OpenAI test, this only validates CacheUsage when expected values are positive. A test case where result is non-nil but CacheUsage should be nil (e.g., a message_start event without cache tokens) would improve coverage.

Proposed fix
 			if tt.expectedCacheRead > 0 || tt.expectedCacheCreate > 0 {
 				if result.CacheUsage == nil {
 					t.Fatal("expected cache usage")
 				}
 				if result.CacheUsage.CacheReadInputTokens != tt.expectedCacheRead {
 					t.Errorf("expected cache read %d, got %d", tt.expectedCacheRead, result.CacheUsage.CacheReadInputTokens)
 				}
 				if result.CacheUsage.CacheCreationInputTokens != tt.expectedCacheCreate {
 					t.Errorf("expected cache create %d, got %d", tt.expectedCacheCreate, result.CacheUsage.CacheCreationInputTokens)
 				}
+			} else if result != nil && result.CacheUsage != nil {
+				t.Errorf("expected no CacheUsage, got %+v", result.CacheUsage)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@streaming_test.go` around lines 477 - 487, Add a negative assertion to ensure
CacheUsage is nil when no cache tokens are expected: after the existing positive
branch, add a complementary branch for when tt.expectedCacheRead and
tt.expectedCacheCreate are both zero (or not > 0) that asserts result.CacheUsage
== nil (use t.Fatal or t.Errorf) so cases like a message_start event which
should not set CacheUsage fail if it is non-nil; reference result.CacheUsage,
tt.expectedCacheRead and tt.expectedCacheCreate to locate where to insert this
check.

244-250: Missing assertion when CacheUsage should be nil.

The test only validates CacheUsage when expected is non-nil. If the implementation incorrectly sets CacheUsage for the "chunk with basic usage" case, this test won't catch it.

Proposed fix to add negative assertion
 			if tt.expected.CacheUsage != nil {
 				if result.CacheUsage == nil {
 					t.Error("expected CacheUsage, got nil")
 				} else if result.CacheUsage.CachedTokens != tt.expected.CacheUsage.CachedTokens {
 					t.Errorf("expected CachedTokens %d, got %d", tt.expected.CacheUsage.CachedTokens, result.CacheUsage.CachedTokens)
 				}
+			} else if result.CacheUsage != nil {
+				t.Errorf("expected no CacheUsage, got %+v", result.CacheUsage)
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@streaming_test.go` around lines 244 - 250, The test currently only asserts
CacheUsage when tt.expected.CacheUsage is non-nil; add the complementary
negative assertion so that when tt.expected.CacheUsage is nil the test fails if
result.CacheUsage is non-nil. Concretely, in the block that inspects
tt.expected.CacheUsage and result.CacheUsage (the code referencing
tt.expected.CacheUsage and result.CacheUsage), add an else branch that checks if
result.CacheUsage != nil and calls t.Errorf("expected CacheUsage nil, got %+v",
result.CacheUsage) to catch unexpected cache data.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@streaming_test.go`:
- Around line 539-560: The test assumes startEvent.Message.Usage and
deltaEvent.Usage are non-nil and may panic if they are nil; update the
assertions around ParseAnthropicSSEEvent results (the startEvent and deltaEvent
variables produced by ParseAnthropicSSEEvent) to first check that
startEvent.Message and startEvent.Message.Usage are not nil before reading
InputTokens/CacheReadInputTokens, and similarly ensure deltaEvent.Usage is not
nil before asserting OutputTokens; if any of these are nil, call t.Fatalf or
t.Fatalf with a clear message so the test fails safely instead of panicking.
- Around line 477-487: Add a negative assertion to ensure CacheUsage is nil when
no cache tokens are expected: after the existing positive branch, add a
complementary branch for when tt.expectedCacheRead and tt.expectedCacheCreate
are both zero (or not > 0) that asserts result.CacheUsage == nil (use t.Fatal or
t.Errorf) so cases like a message_start event which should not set CacheUsage
fail if it is non-nil; reference result.CacheUsage, tt.expectedCacheRead and
tt.expectedCacheCreate to locate where to insert this check.
- Around line 244-250: The test currently only asserts CacheUsage when
tt.expected.CacheUsage is non-nil; add the complementary negative assertion so
that when tt.expected.CacheUsage is nil the test fails if result.CacheUsage is
non-nil. Concretely, in the block that inspects tt.expected.CacheUsage and
result.CacheUsage (the code referencing tt.expected.CacheUsage and
result.CacheUsage), add an else branch that checks if result.CacheUsage != nil
and calls t.Errorf("expected CacheUsage nil, got %+v", result.CacheUsage) to
catch unexpected cache data.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 36537963-bd42-47a8-89ab-5f5286a9d291

📥 Commits

Reviewing files that changed from the base of the PR and between 35b9213 and e2ce42f.

📒 Files selected for processing (1)
  • streaming_test.go
📜 Review details
🔇 Additional comments (6)
streaming_test.go (6)

9-84: LGTM!

Good table-driven test coverage for the SSE parser including multi-line data aggregation, event types, and the OpenAI streaming format. The EOF handling and field comparisons are correct.


86-163: LGTM!

Solid coverage of OpenAI SSE event parsing including the [DONE] marker, empty input, and invalid JSON cases. Good use of early returns for special cases.


255-275: LGTM!

Good coverage of content-type detection including the charset variant and negative cases.


277-306: LGTM!

Covers both data-only and event+data SSE formatting scenarios correctly.


308-402: LGTM!

Comprehensive coverage of Anthropic SSE event types with proper nil checks before field access.


459-464: Previous feedback addressed.

The assertion now strictly requires nil when both expected token values are zero, which is the correct contract.

Critical fixes:
- ForwardStreaming now wraps upstream call with interceptor chain
- Use HTTP trailers for billing headers in streaming (fixes timing)
- SSE events now forwarded even when parsing fails
- stream_options merged instead of overwritten

Robustness improvements:
- Add nil guards in BillingCalculator.Calculate
- Increase SSEParser buffer to 1MB for large payloads
- FormatSSEEvent splits multi-line data correctly
- Add TotalTokens fallback in extractOpenAIStd
- Support Anthropic ephemeral cache token fields

Test improvements:
- Remove unused mockExtractor embedding
- Add cacheUsage nil assertion in tests
- Add error checking for ParseAnthropicSSEEvent calls

Dependency fix:
- Run go mod tidy to mark simdjson-go as direct dependency
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
autorouter.go (1)

198-296: Consider extracting shared request preparation logic.

The request preparation logic (body parsing, provider detection, URL resolution, request enrichment) is duplicated between Forward (lines 79-164) and ForwardStreaming (lines 198-296). Consider extracting this into a shared helper to reduce maintenance burden.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@autorouter.go` around lines 198 - 296, The forwarding logic in
ForwardStreaming and Forward is duplicated; extract the shared steps
(reading/parsing body, detecting provider via detector.Detect and
modelProviderLookup, stripping provider prefix, adding billing stream
include_usage, determining apiType with
DetectAPITypeFromPath/DetectAPITypeFromBodyAndProvider, calling
provider.BodyParser().Parse to get meta, setting meta.Custom fields, resolving
upstreamURL with provider.URLResolver().Resolve, creating upstream *http.Request
and copying headers, calling provider.RequestEnricher().Enrich, and attaching
MetaContextValue) into a single helper (e.g., prepareUpstreamRequest or
buildUpstreamRequest) that returns the resolved Provider, meta, body bytes,
upstream *http.Request (or upstreamURL plus request body) and any error; then
have Forward and ForwardStreaming call that helper and only perform
stream-specific adjustments (like streaming headers or stream_options)
afterwards.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@autorouter.go`:
- Around line 198-296: The forwarding logic in ForwardStreaming and Forward is
duplicated; extract the shared steps (reading/parsing body, detecting provider
via detector.Detect and modelProviderLookup, stripping provider prefix, adding
billing stream include_usage, determining apiType with
DetectAPITypeFromPath/DetectAPITypeFromBodyAndProvider, calling
provider.BodyParser().Parse to get meta, setting meta.Custom fields, resolving
upstreamURL with provider.URLResolver().Resolve, creating upstream *http.Request
and copying headers, calling provider.RequestEnricher().Enrich, and attaching
MetaContextValue) into a single helper (e.g., prepareUpstreamRequest or
buildUpstreamRequest) that returns the resolved Provider, meta, body bytes,
upstream *http.Request (or upstreamURL plus request body) and any error; then
have Forward and ForwardStreaming call that helper and only perform
stream-specific adjustments (like streaming headers or stream_options)
afterwards.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2c9cd9ea-426a-4605-8d59-899443745244

📥 Commits

Reviewing files that changed from the base of the PR and between e2ce42f and 04537ca.

📒 Files selected for processing (9)
  • autorouter.go
  • autorouter_test.go
  • billing_calculator.go
  • go.mod
  • internal/fastjson/extractor.go
  • internal/fastjson/extractor_test.go
  • providers/anthropic/streaming_extractor.go
  • providers/openai_compatible/streaming_extractor.go
  • streaming.go
✅ Files skipped from review due to trivial changes (1)
  • go.mod
🚧 Files skipped from review as they are similar to previous changes (3)
  • autorouter_test.go
  • internal/fastjson/extractor_test.go
  • billing_calculator.go
📜 Review details
🔇 Additional comments (17)
streaming.go (4)

27-33: LGTM - Scanner buffer size issue addressed.

The scanner now correctly uses scanner.Buffer(make([]byte, 64*1024), 1024*1024) to handle large SSE payloads up to 1 MiB, matching the pattern used in provider streaming extractors.


35-89: LGTM - SSE parsing follows spec correctly.

The Next() method properly handles SSE field parsing, comment lines (: prefix), multi-line data: aggregation with newline joining, and delivers events on blank lines per the SSE specification.


282-297: LGTM - Multi-line SSE data formatting issue addressed.

The function now correctly splits data on newlines and emits each segment with its own data: prefix, producing valid SSE frames for multi-line payloads.


91-280: LGTM - Usage extraction and event parsing.

The streaming usage structs, event parsers, and extraction functions correctly handle both OpenAI and Anthropic formats with proper nil guards and cache usage extraction. The [DONE] sentinel and ErrStreamComplete pattern for OpenAI streams is appropriately implemented.

internal/fastjson/extractor.go (3)

144-162: LGTM - TotalTokens consistency issue addressed.

The extractOpenAIStd path now correctly backfills TotalTokens (lines 150-152) when missing, matching the behavior of extractOpenAISimd.


171-286: LGTM - Anthropic ephemeral cache fields now extracted.

Both SIMD (lines 226-235) and stdlib (lines 256-257, 276-282) paths now correctly extract ephemeral_5m_input_tokens and ephemeral_1h_input_tokens, aligning with the Anthropic provider extractor behavior.


11-26: LGTM - UsageExtractor design.

The dispatcher correctly gates SIMD parsing on both CPU support and a reasonable 1024-byte payload threshold, falling back to standard library JSON for smaller payloads where SIMD overhead wouldn't pay off.

providers/anthropic/streaming_extractor.go (3)

103-123: LGTM - Raw events now forwarded before parsing.

The past issue about dropping events when parsing fails is addressed. The code now writes the raw data: line to the client (lines 108-114) before attempting to parse (line 116), ensuring transparent forwarding even when metadata extraction fails.


35-77: LGTM - Non-streaming extraction with tee forwarding.

The approach of teeing the body for simultaneous extraction and client delivery is correct, with proper EOF/context cancellation handling and flush-per-chunk writes.


79-218: LGTM - Anthropic streaming extraction implementation.

The implementation correctly:

  • Sets required SSE headers including X-Accel-Buffering: no for nginx proxies
  • Uses 1 MiB scanner buffer matching the SSEParser
  • Accumulates usage across message_start and message_delta events
  • Forwards raw event: and data: lines with correct SSE framing
  • Extracts cache usage into meta.Custom["cache_usage"]
providers/openai_compatible/streaming_extractor.go (2)

116-132: LGTM - Raw events now forwarded before parsing.

The past issue about dropping events when parsing fails is addressed. The code now writes the raw line (lines 117-123) before attempting to parse (line 125), ensuring transparent forwarding regardless of parsing success.


80-193: LGTM - OpenAI-compatible streaming extraction.

The implementation correctly handles:

  • SSE headers and scanner buffer sizing
  • The [DONE] sentinel with explicit termination (lines 108-114)
  • Usage extraction from the final chunk (OpenAI sends complete usage in the last chunk with stream_options.include_usage)
  • Fallback to lastChunk.Choices when no usage is accumulated (lines 176-190)
autorouter.go (5)

241-252: LGTM - stream_options merge issue addressed.

The code now correctly retrieves existing stream_options (line 245), creates a new map only if absent (lines 246-248), and merges include_usage into it (line 250), preserving any caller-supplied keys.


298-312: LGTM - Interceptor chain now applied to streaming requests.

The streaming path now properly wraps the upstream call with the interceptor chain (lines 299-312), matching the Forward method pattern.


323-363: LGTM - HTTP trailers used for billing headers.

The implementation correctly declares trailers before WriteHeader (line 325) and sets them after streaming completes (lines 359-361). This is the proper approach for sending metadata after the response body in chunked/streaming responses.


488-496: headerSent() always returns false for standard http.ResponseWriter implementations.

The standard http.ResponseWriter interface does not expose WroteHeader(). This function will always return false, causing the error path in ServeHTTP (line 433) to call http.Error() after headers have already been written in ForwardStreaming (line 334), appending error text to the stream instead of handling the failure gracefully.

Consider tracking header state explicitly by wrapping http.ResponseWriter with a type that sets a flag in WriteHeader and first Write.

[duplicate_comment, raise_major_issue]

🔧 Suggested approach
type headerTrackingWriter struct {
	http.ResponseWriter
	headerWritten bool
}

func (w *headerTrackingWriter) WriteHeader(statusCode int) {
	w.headerWritten = true
	w.ResponseWriter.WriteHeader(statusCode)
}

func (w *headerTrackingWriter) Write(b []byte) (int, error) {
	w.headerWritten = true
	return w.ResponseWriter.Write(b)
}

func (w *headerTrackingWriter) Unwrap() http.ResponseWriter {
	return w.ResponseWriter
}

Then wrap w in ServeHTTP and check w.headerWritten instead of calling headerSent().


508-514: LGTM - Native streaming usage provider configuration.

The map correctly identifies providers (Anthropic, Bedrock, GoogleAI) that include usage data in their streaming events without requiring stream_options.include_usage injection.

jhaynie added 2 commits April 13, 2026 23:59
Previously both providers buffered entire responses before forwarding to
client, defeating SSE streaming. Now:

Resolvers:
- Google AI: Use :streamGenerateContent?alt=sse for streaming
- Bedrock: Use /converse-stream for streaming

Streaming extractors:
- Google AI: Incremental SSE parsing, forward each event immediately
- Bedrock: Binary AWS event stream parsing, forward each message immediately

Tests:
- Verify incremental streaming (data arrives before stream completes)
- Test metadata extraction from streams
- Test resolver endpoint selection based on stream flag

All 100+ tests pass.
Use thread-safe response writer with channel-based signaling instead of
polling shared buffer. This eliminates the race condition where one
goroutine writes to the buffer while another reads its length.

- Added threadSafeResponseWriter with mutex-protected buffer
- Use channel to signal first write time (eliminates race on time.Time)
- Added Flush() method to satisfy http.Flusher interface
- Both bedrock and googleai streaming tests now pass with -race
@jhaynie jhaynie merged commit b213390 into main Apr 14, 2026
2 checks passed
@jhaynie jhaynie deleted the feat/streaming-support branch April 14, 2026 05:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant