Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 143 additions & 2 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,138 @@ curl -X POST http://localhost:8080/v1/chat/completions \

---

## Streaming

The proxy fully supports SSE (Server-Sent Events) streaming with efficient token usage extraction for billing.

### StreamingResponseExtractor

Extends `ResponseExtractor` for streaming responses:

```
type StreamingResponseExtractor interface {
ResponseExtractor
ExtractStreamingWithController(resp, w, rc) -> (ResponseMetadata, error)
IsStreamingResponse(resp) -> bool
}
```

All built-in providers implement this interface for streaming support.

### Streaming Flow

```
+------------------+
| Incoming Request |
| stream: true |
+--------+---------+
|
Parse body, detect
stream: true flag
|
+--------v---------+
| AutoRouter |
| ForwardStreaming |
+--------+---------+
|
Auto-inject
stream_options:
{include_usage:true}
|
+--------v---------+
| HTTP Request |
| to upstream |
+--------+---------+
|
+--------v---------+
| Upstream Response|
| text/event-stream|
+--------+---------+
|
+--------v---------+
| StreamingExtractor|
| Parse SSE events |
| Extract usage |
| Flush each chunk |
+--------+---------+
|
+--------v---------+
| BillingCalculator|
| Calculate cost |
+--------+---------+
|
+--------v---------+
| HTTP Response |
| to client |
+------------------+
```

### Usage Extraction

**OpenAI**: Usage is sent in the final chunk before `[DONE]`:

```json
data: {"id":"...","usage":{"prompt_tokens":100,"completion_tokens":50,"total_tokens":150}}
data: [DONE]
```

**Anthropic**: Usage is sent in `message_start` and `message_delta` events:

```json
data: {"type":"message_start","message":{"usage":{"input_tokens":100}}}
...
data: {"type":"message_delta","usage":{"output_tokens":50}}
data: {"type":"message_stop"}
```

### Auto stream_options Injection

When `BillingCalculator` is configured and the request has `stream: true`, the proxy automatically injects:

```json
{
"stream": true,
"stream_options": { "include_usage": true }
}
```

This ensures OpenAI returns token usage in the streaming response for billing calculation.

### Efficient Flushing

Uses `http.ResponseController` for optimal streaming:

```go
rc := http.NewResponseController(w)

for each SSE event {
w.Write(event)
rc.Flush() // Immediate flush after each chunk
}
```

Non-streaming responses also use chunked read/write/flush with a 512KB buffer for better performance.

### Billing with Streaming

```go
adapter, _ := modelsdev.LoadFromURL()

billingCallback := func(r llmproxy.BillingResult) {
log.Printf("Cost: $%.6f", r.TotalCost)
}

router := llmproxy.NewAutoRouter(
llmproxy.WithAutoRouterBillingCalculator(
llmproxy.NewBillingCalculator(adapter.GetCostLookup(), billingCallback),
),
)
```

After the stream completes, the billing callback is invoked with the extracted token usage.

---

## Providers

Nine providers are included. Six share the OpenAI-compatible base; three have fully custom implementations.
Expand Down Expand Up @@ -884,19 +1016,24 @@ Matches the signature of `github.com/agentuity/go-common/logger` without requiri
```
llmproxy/
├── apitype.go # API type detection and constants
├── autorouter.go # AutoRouter, provider/API auto-detection
├── autorouter.go # AutoRouter, provider/API auto-detection, streaming
├── billing.go # CostInfo, CostLookup, BillingResult, CalculateCost
├── billing_calculator.go # BillingCalculator for streaming/non-streaming
├── detection.go # Provider detection from model/header
├── enricher.go # RequestEnricher interface
├── extractor.go # ResponseExtractor interface
├── extractor.go # ResponseExtractor, StreamingResponseExtractor interface
├── interceptor.go # Interceptor, InterceptorChain, RoundTripFunc
├── internal/
│ └── fastjson/
│ └── extractor.go # Fast JSON parsing with simdjson-go
├── logger.go # Logger interface, LoggerFunc adapter
├── metadata.go # BodyMetadata, ResponseMetadata, Message, Usage, Choice
├── parser.go # BodyParser interface
├── provider.go # Provider interface, BaseProvider
├── proxy.go # Proxy struct, Forward method
├── registry.go # Registry interface, MapRegistry
├── resolver.go # URLResolver interface
├── streaming.go # SSE parser, streaming types, usage extraction
├── interceptors/
│ ├── addheader.go # AddHeaderInterceptor
│ ├── billing.go # BillingInterceptor
Expand All @@ -911,14 +1048,18 @@ llmproxy/
│ └── adapter.go # models.dev pricing adapter
├── providers/
│ ├── anthropic/ # Anthropic Messages API
│ │ └── streaming_extractor.go # Anthropic SSE streaming
│ ├── azure/ # Azure OpenAI
│ ├── bedrock/ # AWS Bedrock Converse API
│ │ └── streaming_extractor.go # Bedrock streaming
│ ├── fireworks/ # Fireworks (OpenAI-compatible)
│ ├── googleai/ # Google AI Gemini
│ │ └── streaming_extractor.go # Google AI streaming
│ ├── groq/ # Groq (OpenAI-compatible)
│ ├── openai/ # OpenAI (Chat Completions + Responses)
│ ├── openai_compatible/ # Base for OpenAI-compatible providers
│ │ ├── multiapi.go # Multi-API parser/extractor
│ │ ├── streaming_extractor.go # SSE streaming with usage extraction
│ │ ├── responses_parser.go # Responses API parser
│ │ └── responses_extractor.go # Responses API extractor
│ └── xai/ # x.AI (OpenAI-compatible)
Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ curl -X POST http://localhost:8080/ \
- **9 Provider Implementations**: OpenAI, Anthropic, Groq, Fireworks, x.AI, Google AI, AWS Bedrock, Azure OpenAI, OpenAI-compatible base
- **AutoRouter**: Single endpoint with automatic provider/API detection
- **Responses API**: Full support for OpenAI's new Responses API
- **SSE Streaming**: Full streaming support with efficient token usage extraction
- **8 Built-in Interceptors**: Logging, Metrics, Retry, Billing, Tracing (OTel), HeaderBan, AddHeader, PromptCaching
- **Pricing Integration**: models.dev adapter with markup support
- **Prompt Caching**: prompt caching support for Anthropic, OpenAI, xAI, Fireworks, and Bedrock
Expand Down Expand Up @@ -153,6 +154,37 @@ curl -X POST http://localhost:8080/v1/chat/completions \
-d '{"model":"gpt-4","messages":[{"role":"user","content":"Hello"}]}'
```

## Streaming

SSE streaming is fully supported with automatic token usage extraction for billing:

```bash
# Streaming with automatic usage extraction
curl -X POST http://localhost:8080/ \
-H 'Content-Type: application/json' \
-d '{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"Hello"}]}'
```

**Key Features:**

- **Efficient flushing**: Uses `http.ResponseController` for immediate SSE delivery
- **Token extraction**: Extracts usage from streaming responses for billing
- **Auto stream_options**: Automatically injects `stream_options.include_usage` when billing is configured
- **Works with billing**: Billing is calculated after stream completes

**Example with billing:**

```go
adapter, _ := modelsdev.LoadFromURL()
billingCallback := func(r llmproxy.BillingResult) {
log.Printf("Cost: $%.6f (tokens: %d/%d)", r.TotalCost, r.PromptTokens, r.CompletionTokens)
}

router := llmproxy.NewAutoRouter(
llmproxy.WithAutoRouterBillingCalculator(llmproxy.NewBillingCalculator(adapter.GetCostLookup(), billingCallback)),
)
```

## Providers

| Provider | Auth | API Format | Notes |
Expand Down
Loading
Loading