diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index a4566ad3..27ff7697 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -1,6 +1,6 @@ use error_stack::Report; use fastly::http::Method; -use fastly::{Error, Request, Response}; +use fastly::{Request, Response}; use log_fastly::Logger; use trusted_server_core::auction::endpoints::handle_auction; @@ -19,7 +19,9 @@ use trusted_server_core::proxy::{ handle_first_party_click, handle_first_party_proxy, handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, }; -use trusted_server_core::publisher::{handle_publisher_request, handle_tsjs_dynamic}; +use trusted_server_core::publisher::{ + handle_publisher_request, handle_tsjs_dynamic, stream_publisher_body, PublisherResponse, +}; use trusted_server_core::request_signing::{ handle_deactivate_key, handle_rotate_key, handle_trusted_server_discovery, handle_verify_signature, @@ -35,31 +37,44 @@ mod route_tests; use crate::error::to_error_response; use crate::platform::{build_runtime_services, open_kv_store, UnavailableKvStore}; -#[fastly::main] -fn main(req: Request) -> Result { +/// Entry point for the Fastly Compute program. +/// +/// Uses an undecorated `main()` with `Request::from_client()` instead of +/// `#[fastly::main]` so we can call `stream_to_client()` or `send_to_client()` +/// explicitly. `#[fastly::main]` is syntactic sugar that auto-calls +/// `send_to_client()` on the returned `Response`, which is incompatible with +/// streaming. +fn main() { init_logger(); + let req = Request::from_client(); + // Keep the health probe independent from settings loading and routing so // readiness checks still get a cheap liveness response during startup. if req.get_method() == Method::GET && req.get_path() == "/health" { - return Ok(Response::from_status(200).with_body_text_plain("ok")); + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; } let settings = match get_settings() { Ok(s) => s, Err(e) => { log::error!("Failed to load settings: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; log::debug!("Settings {settings:?}"); // Build the auction orchestrator once at startup let orchestrator = match build_orchestrator(&settings) { - Ok(orchestrator) => orchestrator, + Ok(o) => o, Err(e) => { log::error!("Failed to build auction orchestrator: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; @@ -67,7 +82,8 @@ fn main(req: Request) -> Result { Ok(r) => r, Err(e) => { log::error!("Failed to create integration registry: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; @@ -78,13 +94,17 @@ fn main(req: Request) -> Result { as std::sync::Arc; let runtime_services = build_runtime_services(&req, kv_store); - futures::executor::block_on(route_request( + // route_request may send the response directly (streaming path) or + // return it for us to send (buffered path). + if let Some(response) = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, &runtime_services, req, - )) + )) { + response.send_to_client(); + } } async fn route_request( @@ -93,7 +113,7 @@ async fn route_request( integration_registry: &IntegrationRegistry, runtime_services: &RuntimeServices, mut req: Request, -) -> Result { +) -> Option { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -115,14 +135,14 @@ async fn route_request( match enforce_basic_auth(settings, &req) { Ok(Some(mut response)) => { finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Some(response); } Ok(None) => {} Err(e) => { log::error!("Failed to evaluate basic auth: {:?}", e); let mut response = to_error_response(&e); finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return Some(response); } } @@ -197,7 +217,32 @@ async fn route_request( &publisher_services, req, ) { - Ok(response) => Ok(response), + Ok(PublisherResponse::Stream { + mut response, + body, + params, + }) => { + // Streaming path: finalize headers, then stream body to client. + finalize_response(settings, geo_info.as_ref(), &mut response); + let mut streaming_body = response.stream_to_client(); + if let Err(e) = stream_publisher_body( + body, + &mut streaming_body, + ¶ms, + settings, + integration_registry, + ) { + // Headers already committed. Log and abort — client + // sees a truncated response. Standard proxy behavior. + log::error!("Streaming processing failed: {e:?}"); + drop(streaming_body); + } else if let Err(e) = streaming_body.finish() { + log::error!("Failed to finish streaming body: {e}"); + } + // Response already sent via stream_to_client() + return None; + } + Ok(PublisherResponse::Buffered(response)) => Ok(response), Err(e) => { log::error!("Failed to proxy to publisher origin: {:?}", e); Err(e) @@ -214,7 +259,7 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - Ok(response) + Some(response) } fn runtime_services_for_consent_route( diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index ab5ff72a..a201a59d 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -33,9 +33,22 @@ use crate::settings::Settings; use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; use crate::tsjs; +/// Wraps [`HtmlRewriterAdapter`] with optional post-processing. +/// +/// When `post_processors` is empty (the common streaming path), chunks pass +/// through immediately with no extra copying. When post-processors are +/// registered, intermediate output is accumulated in `accumulated_output` +/// until `is_last`, then post-processors run on the full document. This adds +/// an extra copy per chunk compared to the pre-streaming adapter (which +/// accumulated raw input instead of rewriter output). The overhead is +/// acceptable because the post-processor path is already fully buffered — +/// the real streaming win comes from the empty-post-processor path in Phase 2. struct HtmlWithPostProcessing { inner: HtmlRewriterAdapter, post_processors: Vec>, + /// Buffer that accumulates all intermediate output when post-processors + /// need the full document. Left empty on the streaming-only path. + accumulated_output: Vec, origin_host: String, request_host: String, request_scheme: String, @@ -45,12 +58,26 @@ struct HtmlWithPostProcessing { impl StreamProcessor for HtmlWithPostProcessing { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { let output = self.inner.process_chunk(chunk, is_last)?; - if !is_last || output.is_empty() || self.post_processors.is_empty() { + + // Streaming-optimized path: no post-processors, pass through immediately. + if self.post_processors.is_empty() { return Ok(output); } - let Ok(output_str) = std::str::from_utf8(&output) else { - return Ok(output); + // Post-processors need the full document. Accumulate until the last chunk. + self.accumulated_output.extend_from_slice(&output); + if !is_last { + return Ok(Vec::new()); + } + + // Final chunk: run post-processors on the full accumulated output. + let full_output = std::mem::take(&mut self.accumulated_output); + if full_output.is_empty() { + return Ok(full_output); + } + + let Ok(output_str) = std::str::from_utf8(&full_output) else { + return Ok(full_output); }; let ctx = IntegrationHtmlContext { @@ -66,10 +93,10 @@ impl StreamProcessor for HtmlWithPostProcessing { .iter() .any(|p| p.should_process(output_str, &ctx)) { - return Ok(output); + return Ok(full_output); } - let mut html = String::from_utf8(output).map_err(|e| { + let mut html = String::from_utf8(full_output).map_err(|e| { io::Error::other(format!( "HTML post-processing expected valid UTF-8 output: {e}" )) @@ -93,10 +120,11 @@ impl StreamProcessor for HtmlWithPostProcessing { Ok(html.into_bytes()) } - fn reset(&mut self) { - self.inner.reset(); - self.document_state.clear(); - } + /// No-op. `HtmlWithPostProcessing` wraps a single-use + /// [`HtmlRewriterAdapter`] that cannot be reset. Clearing auxiliary + /// state without resetting the rewriter would leave the processor + /// in an inconsistent state, so this method intentionally does nothing. + fn reset(&mut self) {} } /// Configuration for HTML processing @@ -480,9 +508,12 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso ..RewriterSettings::default() }; + let inner = HtmlRewriterAdapter::new(rewriter_settings); + HtmlWithPostProcessing { - inner: HtmlRewriterAdapter::new(rewriter_settings), + inner, post_processors, + accumulated_output: Vec::new(), origin_host: config.origin_host, request_host: config.request_host, request_scheme: config.request_scheme, @@ -1007,4 +1038,151 @@ mod tests { .collect::() ); } + + #[test] + fn post_processors_accumulate_while_streaming_path_passes_through() { + use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; + use lol_html::Settings; + + // --- Streaming path: no post-processors → output emitted per chunk --- + let mut streaming = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: Vec::new(), + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + let chunk1 = streaming + .process_chunk(b"", false) + .expect("should process chunk1"); + let chunk2 = streaming + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let chunk3 = streaming + .process_chunk(b"", true) + .expect("should process final chunk"); + + assert!( + !chunk1.is_empty() || !chunk2.is_empty(), + "should emit intermediate output on streaming path" + ); + + let mut streaming_all = chunk1; + streaming_all.extend_from_slice(&chunk2); + streaming_all.extend_from_slice(&chunk3); + + // --- Buffered path: post-processor registered → accumulates until is_last --- + struct NoopPostProcessor; + impl IntegrationHtmlPostProcessor for NoopPostProcessor { + fn integration_id(&self) -> &'static str { + "test-noop" + } + fn post_process(&self, _html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool { + false + } + } + + let mut buffered = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: vec![Arc::new(NoopPostProcessor)], + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + let buf1 = buffered + .process_chunk(b"", false) + .expect("should process chunk1"); + let buf2 = buffered + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let buf3 = buffered + .process_chunk(b"", true) + .expect("should process final chunk"); + + assert!( + buf1.is_empty() && buf2.is_empty(), + "should return empty for intermediate chunks when post-processors are registered" + ); + assert!( + !buf3.is_empty(), + "should emit all output in final chunk when post-processors are registered" + ); + + // Both paths should produce identical output + let streaming_str = + String::from_utf8(streaming_all).expect("streaming output should be valid UTF-8"); + let buffered_str = String::from_utf8(buf3).expect("buffered output should be valid UTF-8"); + assert_eq!( + streaming_str, buffered_str, + "streaming and buffered paths should produce identical output" + ); + } + + #[test] + fn active_post_processor_receives_full_document_and_mutates_output() { + use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor}; + use lol_html::Settings; + + struct AppendCommentProcessor; + impl IntegrationHtmlPostProcessor for AppendCommentProcessor { + fn integration_id(&self) -> &'static str { + "test-append" + } + fn should_process(&self, html: &str, _ctx: &IntegrationHtmlContext<'_>) -> bool { + html.contains("") + } + fn post_process(&self, html: &mut String, _ctx: &IntegrationHtmlContext<'_>) -> bool { + html.push_str(""); + true + } + } + + let mut processor = HtmlWithPostProcessing { + inner: HtmlRewriterAdapter::new(Settings::default()), + post_processors: vec![Arc::new(AppendCommentProcessor)], + accumulated_output: Vec::new(), + origin_host: String::new(), + request_host: String::new(), + request_scheme: String::new(), + document_state: IntegrationDocumentState::default(), + }; + + // Feed multiple chunks + let r1 = processor + .process_chunk(b"", false) + .expect("should process chunk1"); + let r2 = processor + .process_chunk(b"

content

", false) + .expect("should process chunk2"); + let r3 = processor + .process_chunk(b"", true) + .expect("should process final chunk"); + + // Intermediate chunks return empty (buffered for post-processor) + assert!( + r1.is_empty() && r2.is_empty(), + "should buffer intermediate chunks" + ); + + // Final chunk contains the full document with post-processor mutation + let output = String::from_utf8(r3).expect("should be valid UTF-8"); + assert!( + output.contains("

content

"), + "should contain original content" + ); + assert!( + output.contains(""), + "should contain complete document" + ); + assert!( + output.contains(""), + "should contain post-processor mutation" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/google_tag_manager.rs b/crates/trusted-server-core/src/integrations/google_tag_manager.rs index aaf256a5..650fb4aa 100644 --- a/crates/trusted-server-core/src/integrations/google_tag_manager.rs +++ b/crates/trusted-server-core/src/integrations/google_tag_manager.rs @@ -12,7 +12,7 @@ //! | `GET/POST` | `.../collect` | Proxies GA analytics beacons | //! | `GET/POST` | `.../g/collect` | Proxies GA4 analytics beacons | -use std::sync::{Arc, LazyLock}; +use std::sync::{Arc, LazyLock, Mutex}; use async_trait::async_trait; use error_stack::{Report, ResultExt}; @@ -133,11 +133,22 @@ fn validate_container_id(container_id: &str) -> Result<(), validator::Validation pub struct GoogleTagManagerIntegration { config: GoogleTagManagerConfig, + /// Accumulates text fragments when `lol_html` splits a text node across + /// chunk boundaries. Drained on `is_last_in_text_node`. + /// + /// Uses `Mutex` to satisfy the `Sync` bound on `IntegrationScriptRewriter`. + /// The pipeline is single-threaded (`lol_html::HtmlRewriter` is `!Send`), + /// so the lock is uncontended. `lol_html` delivers text chunks sequentially + /// per element — the buffer is always empty when a new element's text begins. + accumulated_text: Mutex, } impl GoogleTagManagerIntegration { fn new(config: GoogleTagManagerConfig) -> Arc { - Arc::new(Self { config }) + Arc::new(Self { + config, + accumulated_text: Mutex::new(String::new()), + }) } fn error(message: impl Into) -> TrustedServerError { @@ -490,14 +501,40 @@ impl IntegrationScriptRewriter for GoogleTagManagerIntegration { "script" // Match all scripts to find inline GTM snippets } - fn rewrite(&self, content: &str, _ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction { + fn rewrite(&self, content: &str, ctx: &IntegrationScriptContext<'_>) -> ScriptRewriteAction { + let mut buf = self + .accumulated_text + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + if !ctx.is_last_in_text_node { + // Intermediate fragment — accumulate and suppress output. + buf.push_str(content); + return ScriptRewriteAction::RemoveNode; + } + + // Last fragment. If we accumulated prior fragments, combine them. + let full_content: Option = if buf.is_empty() { + None + } else { + buf.push_str(content); + Some(std::mem::take(&mut *buf)) + }; + let text = full_content.as_deref().unwrap_or(content); + // Look for the GTM snippet pattern. // Standard snippet contains: "googletagmanager.com/gtm.js" // Note: analytics.google.com is intentionally excluded — gtag.js stores // that domain as a bare string and constructs URLs dynamically, so // rewriting it in scripts produces broken URLs. - if content.contains("googletagmanager.com") || content.contains("google-analytics.com") { - return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(content)); + if text.contains("googletagmanager.com") || text.contains("google-analytics.com") { + return ScriptRewriteAction::replace(Self::rewrite_gtm_urls(text)); + } + + // No GTM content — if we accumulated fragments, emit them unchanged. + // Intermediate fragments were already suppressed via RemoveNode. + if full_content.is_some() { + return ScriptRewriteAction::replace(text.to_string()); } ScriptRewriteAction::keep() @@ -1635,4 +1672,224 @@ container_id = "GTM-DEFAULT" other => panic!("Expected Integration error, got {:?}", other), } } + + #[test] + fn fragmented_gtm_snippet_is_accumulated_and_rewritten() { + let config = GoogleTagManagerConfig { + enabled: true, + container_id: "GTM-FRAG1".to_string(), + upstream_url: "https://www.googletagmanager.com".to_string(), + cache_max_age: default_cache_max_age(), + max_beacon_body_size: default_max_beacon_body_size(), + }; + let integration = GoogleTagManagerIntegration::new(config); + + let document_state = IntegrationDocumentState::default(); + + // Simulate lol_html splitting the GTM snippet mid-domain. + let fragment1 = r#"(function(w,d,s,l,i){j.src='https://www.google"#; + let fragment2 = r#"tagmanager.com/gtm.js?id='+i;f.parentNode.insertBefore(j,f);})(window,document,'script','dataLayer','GTM-FRAG1');"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script", + request_host: "publisher.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + // Intermediate fragment: should be suppressed. + let action1 = + IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate fragment" + ); + + // Last fragment: should emit full rewritten content. + let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("/integrations/google_tag_manager/gtm.js"), + "should rewrite GTM URL. Got: {rewritten}" + ); + assert!( + !rewritten.contains("googletagmanager.com"), + "should not contain original GTM domain. Got: {rewritten}" + ); + } + other => panic!("expected Replace for fragmented GTM, got {other:?}"), + } + } + + #[test] + fn non_gtm_fragmented_script_is_passed_through() { + let config = GoogleTagManagerConfig { + enabled: true, + container_id: "GTM-PASS1".to_string(), + upstream_url: "https://www.googletagmanager.com".to_string(), + cache_max_age: default_cache_max_age(), + max_beacon_body_size: default_max_beacon_body_size(), + }; + let integration = GoogleTagManagerIntegration::new(config); + + let document_state = IntegrationDocumentState::default(); + + let fragment1 = "console.log('hel"; + let fragment2 = "lo world');"; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script", + request_host: "publisher.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = + IntegrationScriptRewriter::rewrite(&*integration, fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate" + ); + + // Last fragment: should emit full unchanged content since it's not GTM. + let action2 = IntegrationScriptRewriter::rewrite(&*integration, fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(content) => { + assert_eq!( + content, "console.log('hello world');", + "should emit full accumulated non-GTM content" + ); + } + other => panic!("expected Replace with passthrough, got {other:?}"), + } + } + + /// Verify the accumulation buffer drains correctly between two consecutive + /// `"#; + + let mut output = Vec::new(); + pipeline + .process(Cursor::new(html_input.as_bytes()), &mut output) + .expect("should process with small chunks"); + let processed = String::from_utf8_lossy(&output); + + assert!( + processed.contains("/integrations/google_tag_manager/gtm.js"), + "should rewrite fragmented GTM URL. Got: {processed}" + ); + assert!( + !processed.contains("googletagmanager.com"), + "should not contain original GTM domain. Got: {processed}" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/nextjs/mod.rs b/crates/trusted-server-core/src/integrations/nextjs/mod.rs index 50244438..6524ee58 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/mod.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/mod.rs @@ -599,4 +599,55 @@ mod tests { final_html ); } + + /// Regression test: with a small chunk size, `lol_html` fragments the + /// `__NEXT_DATA__` text node across chunks. The rewriter must accumulate + /// fragments and produce correct output. + #[test] + fn small_chunk_next_data_rewrite_survives_fragmentation() { + // Build a __NEXT_DATA__ payload large enough to cross a 32-byte chunk boundary. + let html = r#""#; + + let mut settings = create_test_settings(); + settings + .integrations + .insert_config( + "nextjs", + &json!({ + "enabled": true, + "rewrite_attributes": ["href", "link", "url"], + }), + ) + .expect("should update nextjs config"); + let registry = IntegrationRegistry::new(&settings).expect("should create registry"); + let config = config_from_settings(&settings, ®istry); + let processor = create_html_processor(config); + + // Use a very small chunk size to force fragmentation. + let pipeline_config = PipelineConfig { + input_compression: Compression::None, + output_compression: Compression::None, + chunk_size: 32, + }; + let mut pipeline = StreamingPipeline::new(pipeline_config, processor); + + let mut output = Vec::new(); + pipeline + .process(Cursor::new(html.as_bytes()), &mut output) + .expect("should process with small chunks"); + + let processed = String::from_utf8_lossy(&output); + assert!( + processed.contains("test.example.com") && processed.contains("/reviews"), + "should rewrite fragmented __NEXT_DATA__ href. Got: {processed}" + ); + assert!( + !processed.contains("origin.example.com/reviews"), + "should not contain original origin href. Got: {processed}" + ); + assert!( + processed.contains("Hello World"), + "should preserve non-URL content. Got: {processed}" + ); + } } diff --git a/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs b/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs index 1aa0b391..10101a70 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/rsc_placeholders.rs @@ -54,12 +54,13 @@ impl IntegrationScriptRewriter for NextJsRscPlaceholderRewriter { return ScriptRewriteAction::keep(); } - // Only process complete (unfragmented) scripts during streaming. - // Fragmented scripts are handled by the post-processor which re-parses the final HTML. - // This avoids corrupting non-RSC scripts that happen to be fragmented during streaming. + // Deliberately does not accumulate fragments (unlike NextJsNextDataRewriter + // and GoogleTagManagerIntegration which use Mutex buffers). RSC + // placeholder processing has a post-processor fallback that re-parses + // the final HTML at end-of-document, so fragmented scripts are safely + // deferred. Accumulation here would also risk corrupting non-RSC scripts + // that happen to be fragmented during streaming. if !ctx.is_last_in_text_node { - // Script is fragmented - skip placeholder processing. - // The post-processor will handle RSC scripts at end-of-document. return ScriptRewriteAction::keep(); } diff --git a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs index 72617c3e..eaf00a16 100644 --- a/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs +++ b/crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use error_stack::Report; use regex::{escape, Regex}; @@ -14,6 +14,14 @@ use super::{NextJsIntegrationConfig, NEXTJS_INTEGRATION_ID}; pub(super) struct NextJsNextDataRewriter { config: Arc, rewriter: UrlRewriter, + /// Accumulates text fragments when `lol_html` splits a text node across + /// chunk boundaries. Drained on `is_last_in_text_node`. + /// + /// Uses `Mutex` to satisfy the `Sync` bound on `IntegrationScriptRewriter`. + /// The pipeline is single-threaded (`lol_html::HtmlRewriter` is `!Send`), + /// so the lock is uncontended. `lol_html` delivers text chunks sequentially + /// per element — the buffer is always empty when a new element's text begins. + accumulated_text: Mutex, } impl NextJsNextDataRewriter { @@ -23,6 +31,7 @@ impl NextJsNextDataRewriter { Ok(Self { rewriter: UrlRewriter::new(&config.rewrite_attributes)?, config, + accumulated_text: Mutex::new(String::new()), }) } @@ -65,7 +74,33 @@ impl IntegrationScriptRewriter for NextJsNextDataRewriter { return ScriptRewriteAction::keep(); } - self.rewrite_structured(content, ctx) + let mut buf = self + .accumulated_text + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + + if !ctx.is_last_in_text_node { + // Intermediate fragment — accumulate and suppress output. + buf.push_str(content); + return ScriptRewriteAction::RemoveNode; + } + + // Last fragment. If nothing was accumulated, process directly. + if buf.is_empty() { + return self.rewrite_structured(content, ctx); + } + + // Complete the accumulated text and process the full content. + // If rewrite_structured returns Keep, we must still emit the full + // accumulated text via Replace — intermediate fragments were already + // removed from lol_html's output via RemoveNode. + buf.push_str(content); + let full_content = std::mem::take(&mut *buf); + let action = self.rewrite_structured(&full_content, ctx); + if matches!(action, ScriptRewriteAction::Keep) { + return ScriptRewriteAction::replace(full_content); + } + action } } @@ -464,4 +499,119 @@ mod tests { assert!(rewritten.contains("https://proxy.example.com/news")); assert!(rewritten.contains("//proxy.example.com/assets/logo.png")); } + + #[test] + fn fragmented_next_data_is_accumulated_and_rewritten() { + let rewriter = NextJsNextDataRewriter::new(test_config()).expect("should build rewriter"); + let document_state = IntegrationDocumentState::default(); + + let fragment1 = r#"{"props":{"pageProps":{"href":"https://origin."#; + let fragment2 = r#"example.com/reviews"}}}"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = rewriter.rewrite(fragment1, &ctx_intermediate); + assert_eq!( + action1, + ScriptRewriteAction::RemoveNode, + "should suppress intermediate fragment" + ); + + let action2 = rewriter.rewrite(fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("ts.example.com"), + "should rewrite origin to proxy host. Got: {rewritten}" + ); + assert!( + rewritten.contains("/reviews"), + "should preserve path. Got: {rewritten}" + ); + assert!( + !rewritten.contains("origin.example.com"), + "should not contain original host. Got: {rewritten}" + ); + } + other => panic!("expected Replace, got {other:?}"), + } + } + + #[test] + fn unfragmented_next_data_works_without_accumulation() { + let rewriter = NextJsNextDataRewriter::new(test_config()).expect("should build rewriter"); + let document_state = IntegrationDocumentState::default(); + let payload = r#"{"props":{"pageProps":{"href":"https://origin.example.com/page"}}}"#; + + let ctx_single = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: true, + document_state: &document_state, + }; + + let action = rewriter.rewrite(payload, &ctx_single); + match action { + ScriptRewriteAction::Replace(rewritten) => { + assert!( + rewritten.contains("ts.example.com"), + "should rewrite. Got: {rewritten}" + ); + } + other => panic!("expected Replace, got {other:?}"), + } + } + + #[test] + fn fragmented_next_data_without_rewritable_urls_preserves_content() { + let rewriter = NextJsNextDataRewriter::new(test_config()).expect("should build rewriter"); + let document_state = IntegrationDocumentState::default(); + + // __NEXT_DATA__ JSON with no origin URLs — rewrite_structured returns Keep. + let fragment1 = r#"{"props":{"pageProps":{"title":"Hello"#; + let fragment2 = r#" World","count":42}}}"#; + + let ctx_intermediate = IntegrationScriptContext { + selector: "script#__NEXT_DATA__", + request_host: "ts.example.com", + request_scheme: "https", + origin_host: "origin.example.com", + is_last_in_text_node: false, + document_state: &document_state, + }; + let ctx_last = IntegrationScriptContext { + is_last_in_text_node: true, + ..ctx_intermediate + }; + + let action1 = rewriter.rewrite(fragment1, &ctx_intermediate); + assert_eq!(action1, ScriptRewriteAction::RemoveNode); + + // Last fragment: even though no URLs to rewrite, must emit full content + // because intermediate fragments were removed. + let action2 = rewriter.rewrite(fragment2, &ctx_last); + match action2 { + ScriptRewriteAction::Replace(content) => { + let expected = format!("{fragment1}{fragment2}"); + assert_eq!( + content, expected, + "should emit full accumulated content unchanged" + ); + } + other => panic!("expected Replace with passthrough, got {other:?}"), + } + } } diff --git a/crates/trusted-server-core/src/integrations/registry.rs b/crates/trusted-server-core/src/integrations/registry.rs index 302574cf..8b55493b 100644 --- a/crates/trusted-server-core/src/integrations/registry.rs +++ b/crates/trusted-server-core/src/integrations/registry.rs @@ -740,6 +740,15 @@ impl IntegrationRegistry { self.inner.script_rewriters.clone() } + /// Check whether any HTML post-processors are registered. + /// + /// Cheaper than [`html_post_processors()`](Self::html_post_processors) when + /// only the presence check is needed — avoids cloning `Vec>`. + #[must_use] + pub fn has_html_post_processors(&self) -> bool { + !self.inner.html_post_processors.is_empty() + } + /// Expose registered HTML post-processors. #[must_use] pub fn html_post_processors(&self) -> Vec> { diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 9ec3685d..5b2350d7 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -7,6 +7,8 @@ //! all other `fastly::Request`/`Response`/`Body` migrations. It is not a //! content-rewriting concern. +use std::io::Write; + use error_stack::{Report, ResultExt}; use fastly::http::{header, StatusCode}; use fastly::{Body, Request, Response}; @@ -178,12 +180,21 @@ struct ProcessResponseParams<'a> { integration_registry: &'a IntegrationRegistry, } -/// Process response body in streaming fashion with compression preservation -fn process_response_streaming( +/// Process response body through the streaming pipeline. +/// +/// Selects the appropriate processor based on content type (HTML rewriter, +/// RSC Flight rewriter, or URL replacer) and pipes chunks from `body` +/// through it into `output`. The caller decides what `output` is — a +/// `Vec` for buffered responses, or a `StreamingBody` for streaming. +/// +/// # Errors +/// +/// Returns an error if processor creation or chunk processing fails. +fn process_response_streaming( body: Body, + output: &mut W, params: &ProcessResponseParams, -) -> Result> { - // Check if this is HTML content +) -> Result<(), Report> { let is_html = params.content_type.contains("text/html"); let is_rsc_flight = params.content_type.contains("text/x-component"); log::debug!( @@ -195,15 +206,14 @@ fn process_response_streaming( params.origin_host ); - // Determine compression type let compression = Compression::from_content_encoding(params.content_encoding); + let config = PipelineConfig { + input_compression: compression, + output_compression: compression, + chunk_size: 8192, + }; - // Create output body to collect results - let mut output = Vec::new(); - - // Choose processor based on content type if is_html { - // Use HTML rewriter for HTML content let processor = create_html_stream_processor( params.origin_host, params.request_host, @@ -211,57 +221,26 @@ fn process_response_streaming( params.settings, params.integration_registry, )?; - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else if is_rsc_flight { - // RSC Flight responses are length-prefixed (T rows). A naive string replacement will - // corrupt the stream by changing byte lengths without updating the prefixes. let processor = RscFlightUrlRewriter::new( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else { - // Use simple text replacer for non-HTML content let replacer = create_url_replacer( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, replacer); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, replacer).process(body, output)?; } - log::debug!( - "Streaming processing complete - output size: {} bytes", - output.len() - ); - Ok(Body::from(output)) + Ok(()) } /// Create a unified HTML stream processor @@ -285,23 +264,87 @@ fn create_html_stream_processor( Ok(create_html_processor(config)) } +/// Result of publisher request handling, indicating whether the response +/// body should be streamed or has already been buffered. +pub enum PublisherResponse { + /// Response is fully buffered and ready to send via `send_to_client()`. + Buffered(Response), + /// Response headers are ready. The caller must: + /// 1. Call `finalize_response()` on the response + /// 2. Call `response.stream_to_client()` to get a `StreamingBody` + /// 3. Call `stream_publisher_body()` with the body and streaming writer + /// 4. Call `StreamingBody::finish()` + Stream { + /// Response with all headers set (EC ID, cookies, etc.) + /// but body not yet written. `Content-Length` already removed. + response: Response, + /// Origin body to be piped through the streaming pipeline. + body: Body, + /// Parameters for `process_response_streaming`. + params: OwnedProcessResponseParams, + }, +} + +/// Owned version of [`ProcessResponseParams`] for returning from +/// `handle_publisher_request` without lifetime issues. +pub struct OwnedProcessResponseParams { + pub(crate) content_encoding: String, + pub(crate) origin_host: String, + pub(crate) origin_url: String, + pub(crate) request_host: String, + pub(crate) request_scheme: String, + pub(crate) content_type: String, +} + +/// Stream the publisher response body through the processing pipeline. +/// +/// Called by the adapter after `stream_to_client()` has committed the +/// response headers. Writes processed chunks directly to `output`. +/// +/// # Errors +/// +/// Returns an error if processing fails mid-stream. Since headers are +/// already committed, the caller should log the error and drop the +/// `StreamingBody` (client sees a truncated response). +pub fn stream_publisher_body( + body: Body, + output: &mut W, + params: &OwnedProcessResponseParams, + settings: &Settings, + integration_registry: &IntegrationRegistry, +) -> Result<(), Report> { + let borrowed = ProcessResponseParams { + content_encoding: ¶ms.content_encoding, + origin_host: ¶ms.origin_host, + origin_url: ¶ms.origin_url, + request_host: ¶ms.request_host, + request_scheme: ¶ms.request_scheme, + settings, + content_type: ¶ms.content_type, + integration_registry, + }; + process_response_streaming(body, output, &borrowed) +} + /// Proxies requests to the publisher's origin server. /// -/// This function forwards incoming requests to the configured origin URL, -/// preserving headers and request body. It's used as a fallback for routes -/// not explicitly handled by the trusted server. +/// Returns a [`PublisherResponse`] indicating whether the response can be +/// streamed or must be sent buffered. The streaming path is chosen when: +/// - The backend returns a 2xx status +/// - The response has a processable content type +/// - The response uses a supported `Content-Encoding` (gzip, deflate, br) +/// - No HTML post-processors are registered (the streaming gate) /// /// # Errors /// -/// Returns a [`TrustedServerError`] if: -/// - The proxy request fails -/// - The origin backend is unreachable +/// Returns a [`TrustedServerError`] if the proxy request fails or the +/// origin backend is unreachable. pub fn handle_publisher_request( settings: &Settings, integration_registry: &IntegrationRegistry, services: &RuntimeServices, mut req: Request, -) -> Result> { +) -> Result> { log::debug!("Proxying request to publisher_origin"); // Prebid.js requests are not intercepted here anymore. The HTML processor removes @@ -362,7 +405,7 @@ pub fn handle_publisher_request( .map(|_| services.kv_store()), }); let ec_allowed = allows_ec_creation(&consent_context); - log::trace!("Proxy EC ID: {}, ec_allowed: {}", ec_id, ec_allowed); + log::debug!("Proxy ec_allowed: {}", ec_allowed); let backend_name = BackendConfig::from_url( &settings.publisher.origin_url, @@ -385,99 +428,169 @@ pub fn handle_publisher_request( message: "Failed to proxy request to origin".to_string(), })?; - // Log all response headers for debugging log::debug!("Response headers:"); for (name, value) in response.get_headers() { log::debug!(" {}: {:?}", name, value); } - // Check if the response has a text-based content type that we should process + // Set EC ID / cookie headers BEFORE body processing. + // These are body-independent (computed from request cookies + consent). + apply_ec_headers( + settings, + services, + &mut response, + &ec_id, + ec_allowed, + existing_ec_cookie.as_deref(), + &consent_context, + ); + let content_type = response .get_header(header::CONTENT_TYPE) .map(|h| h.to_str().unwrap_or_default()) .unwrap_or_default() .to_string(); - let should_process = content_type.contains("text/") - || content_type.contains("application/javascript") - || content_type.contains("application/json"); + let should_process = is_processable_content_type(&content_type); + let is_success = response.get_status().is_success(); - if should_process && !request_host.is_empty() { - // Check if the response is compressed - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); + if !should_process || request_host.is_empty() || !is_success { + log::debug!( + "Skipping response processing - should_process: {}, request_host: '{}', status: {}", + should_process, + request_host, + response.get_status(), + ); + return Ok(PublisherResponse::Buffered(response)); + } - // Log response details for debugging + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); + + // Streaming gate: can we stream this response? + // - 2xx status (non-success already returned Buffered above) + // - Supported Content-Encoding (unsupported would fail mid-stream) + // - No HTML post-processors registered (they need the full document) + // - Non-HTML content always streams (post-processors only apply to HTML) + let is_html = content_type.contains("text/html"); + let has_post_processors = integration_registry.has_html_post_processors(); + let encoding_supported = is_supported_content_encoding(&content_encoding); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + + if can_stream { log::debug!( - "Processing response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", content_type, content_encoding, request_host, origin_host ); - // Take the response body for streaming processing let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + + return Ok(PublisherResponse::Stream { + response, + body, + params: OwnedProcessResponseParams { + content_encoding, + origin_host, + origin_url: settings.publisher.origin_url.clone(), + request_host: request_host.to_string(), + request_scheme: request_scheme.to_string(), + content_type, + }, + }); + } - // Process the body using streaming approach - let params = ProcessResponseParams { - content_encoding: &content_encoding, - origin_host: &origin_host, - origin_url: &settings.publisher.origin_url, - request_host, - request_scheme, - settings, - content_type: &content_type, - integration_registry, - }; - match process_response_streaming(body, ¶ms) { - Ok(processed_body) => { - // Set the processed body back - response.set_body(processed_body); + // Unsupported Content-Encoding: we cannot decompress, so processing would + // treat compressed bytes as identity and produce garbled output. Return + // the origin response unchanged. + if !encoding_supported { + log::warn!( + "Unsupported Content-Encoding '{}' - returning response unmodified", + content_encoding, + ); + return Ok(PublisherResponse::Buffered(response)); + } - // Remove Content-Length as the size has likely changed - response.remove_header(header::CONTENT_LENGTH); + // Buffered fallback: post-processors need the full document. + log::debug!( + "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); - // Keep Content-Encoding header since we're returning compressed content - log::debug!( - "Preserved Content-Encoding: {} for compressed response", - content_encoding - ); + let body = response.take_body(); + let params = ProcessResponseParams { + content_encoding: &content_encoding, + origin_host: &origin_host, + origin_url: &settings.publisher.origin_url, + request_host, + request_scheme, + settings, + content_type: &content_type, + integration_registry, + }; + let mut output = Vec::new(); + process_response_streaming(body, &mut output, ¶ms)?; - log::debug!("Completed streaming processing of response body"); - } - Err(e) => { - log::error!("Failed to process response body: {:?}", e); - // Return an error response - return Err(e); - } - } - } else { - log::debug!( - "Skipping response processing - should_process: {}, request_host: '{}'", - should_process, - request_host - ); - } + response.set_header(header::CONTENT_LENGTH, output.len().to_string()); + response.set_body(Body::from(output)); + + Ok(PublisherResponse::Buffered(response)) +} + +/// Whether the content type requires processing (URL rewriting, HTML injection). +/// +/// Text-based and JavaScript/JSON responses are processable; binary types +/// (images, fonts, video, etc.) pass through unchanged. +fn is_processable_content_type(content_type: &str) -> bool { + content_type.contains("text/") + || content_type.contains("application/javascript") + || content_type.contains("application/json") +} - // Consent-gated EC creation: - // - Consent given → set EC ID header + cookie. - // - Consent absent + existing cookie → revoke (expire cookie + delete KV entry). - // - Consent absent + no cookie → do nothing. +/// Whether the `Content-Encoding` is one the streaming pipeline can handle. +/// +/// Unsupported encodings (e.g. `zstd` from a misbehaving origin) bypass the +/// rewrite pipeline entirely and are returned unchanged. Processing such +/// bodies as identity-encoded would produce garbled output. +fn is_supported_content_encoding(encoding: &str) -> bool { + matches!(encoding, "" | "identity" | "gzip" | "deflate" | "br") +} + +/// Apply EC ID and cookie headers to the response. +/// +/// Extracted so headers can be set before streaming begins (headers must +/// be finalized before `stream_to_client()` commits them). +/// +/// Consent-gated EC creation: +/// - Consent given → set EC ID header + cookie. +/// - Consent absent + existing cookie → revoke (expire cookie + delete KV entry). +/// - Consent absent + no cookie → do nothing. +fn apply_ec_headers( + settings: &Settings, + services: &RuntimeServices, + response: &mut Response, + ec_id: &str, + ec_allowed: bool, + existing_ec_cookie: Option<&str>, + consent_context: &crate::consent::ConsentContext, +) { if ec_allowed { // Fastly's HeaderValue API rejects \r, \n, and \0, so the EC ID // cannot inject additional response headers. - response.set_header(HEADER_X_TS_EC, ec_id.as_str()); + response.set_header(HEADER_X_TS_EC, ec_id); // Cookie persistence is skipped if the EC ID contains RFC 6265-illegal // characters. The header is still emitted when consent allows it. - set_ec_cookie(settings, &mut response, ec_id.as_str()); - } else if let Some(cookie_ec_id) = existing_ec_cookie.as_deref() { + set_ec_cookie(settings, response, ec_id); + } else if let Some(cookie_ec_id) = existing_ec_cookie { log::info!( "EC revoked for '{}': consent withdrawn (jurisdiction={})", cookie_ec_id, consent_context.jurisdiction, ); - expire_ec_cookie(settings, &mut response); + expire_ec_cookie(settings, response); if settings.consent.consent_store.is_some() { crate::consent::kv::delete_consent_from_kv(services.kv_store(), cookie_ec_id); } @@ -487,8 +600,6 @@ pub fn handle_publisher_request( consent_context.jurisdiction, ); } - - Ok(response) } #[cfg(test)] @@ -517,21 +628,89 @@ mod tests { ("application/octet-stream", false), ]; - for (content_type, should_process) in test_cases { - let result = content_type.contains("text/html") - || content_type.contains("text/css") - || content_type.contains("text/javascript") - || content_type.contains("application/javascript") - || content_type.contains("application/json"); - + for (content_type, expected) in test_cases { assert_eq!( - result, should_process, - "Content-Type '{}' should_process: expected {}, got {}", - content_type, should_process, result + is_processable_content_type(content_type), + expected, + "Content-Type '{content_type}' should_process: expected {expected}", ); } } + #[test] + fn supported_content_encoding_accepts_known_values() { + assert!(is_supported_content_encoding(""), "should accept empty"); + assert!( + is_supported_content_encoding("identity"), + "should accept identity" + ); + assert!(is_supported_content_encoding("gzip"), "should accept gzip"); + assert!( + is_supported_content_encoding("deflate"), + "should accept deflate" + ); + assert!(is_supported_content_encoding("br"), "should accept br"); + } + + #[test] + fn supported_content_encoding_rejects_unknown_values() { + assert!(!is_supported_content_encoding("zstd"), "should reject zstd"); + assert!( + !is_supported_content_encoding("compress"), + "should reject compress" + ); + assert!( + !is_supported_content_encoding("snappy"), + "should reject snappy" + ); + } + + #[test] + fn unsupported_encoding_response_is_returned_unmodified() { + // Simulate a processable (HTML) 2xx response with an unsupported + // Content-Encoding. The bytes are not real zstd - the point is that + // they must be returned untouched rather than fed to the rewriter as + // identity-encoded text. + let origin_bytes = b"\x28\xb5\x2f\xfd\x00\x58\x61\x00\x00not really zstd".to_vec(); + + let mut response = Response::from_status(StatusCode::OK); + response.set_header(header::CONTENT_TYPE, "text/html; charset=utf-8"); + response.set_header(header::CONTENT_ENCODING, "zstd"); + response.set_body(Body::from(origin_bytes.clone())); + + // Re-derive the gate decision the same way handle_publisher_request does. + let content_type = response + .get_header(header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .unwrap_or_default() + .to_string(); + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .and_then(|h| h.to_str().ok()) + .unwrap_or_default() + .to_lowercase(); + + assert!( + is_processable_content_type(&content_type), + "text/html must be processable" + ); + assert!(response.get_status().is_success(), "status must be 2xx"); + assert!( + !is_supported_content_encoding(&content_encoding), + "zstd must not be a supported encoding" + ); + + // The fix: when the only reason to fall out of the streaming gate is + // unsupported encoding, return the response unchanged rather than + // re-routing through process_response_streaming (which would treat + // the compressed bytes as identity and garble them). + let body = response.into_body().into_bytes(); + assert_eq!( + body, origin_bytes, + "unsupported-encoding response must pass through byte-for-byte" + ); + } + #[test] fn test_publisher_origin_host_extraction() { let settings = create_test_settings(); @@ -583,11 +762,52 @@ mod tests { } } - // Note: test_streaming_compressed_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. + #[test] + fn streaming_gate_allows_2xx_html_without_post_processors() { + let is_html = true; + let has_post_processors = false; + let encoding_supported = is_supported_content_encoding("gzip"); + assert!( + encoding_supported && (!is_html || !has_post_processors), + "should stream 2xx HTML without post-processors" + ); + } + + #[test] + fn streaming_gate_blocks_html_with_post_processors() { + let is_html = true; + let has_post_processors = true; + let encoding_supported = is_supported_content_encoding("gzip"); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + assert!( + !can_stream, + "should not stream HTML when post-processors are registered" + ); + } - // Note: test_streaming_brotli_content removed as it directly tested private function - // process_response_streaming. The functionality is tested through handle_publisher_request. + #[test] + fn streaming_gate_allows_non_html_with_post_processors() { + let is_html = false; + let has_post_processors = true; + let encoding_supported = is_supported_content_encoding("gzip"); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + assert!( + can_stream, + "should stream non-HTML even with post-processors (they only apply to HTML)" + ); + } + + #[test] + fn streaming_gate_blocks_unsupported_encoding() { + let is_html = false; + let has_post_processors = false; + let encoding_supported = is_supported_content_encoding("zstd"); + let can_stream = encoding_supported && (!is_html || !has_post_processors); + assert!( + !can_stream, + "should not stream when content-encoding is unsupported" + ); + } #[test] fn test_content_encoding_detection() { @@ -838,4 +1058,56 @@ mod tests { "should reject unknown module names" ); } + + #[test] + fn stream_publisher_body_preserves_gzip_round_trip() { + use flate2::write::GzEncoder; + use std::io::Write; + + let settings = create_test_settings(); + let registry = + IntegrationRegistry::new(&settings).expect("should create integration registry"); + + // Compress CSS containing an origin URL that should be rewritten. + // CSS uses the text URL replacer (not lol_html), so inline URLs are rewritten. + let html = b"body { background: url('https://origin.example.com/page'); }"; + let mut compressed = Vec::new(); + { + let mut encoder = GzEncoder::new(&mut compressed, flate2::Compression::default()); + encoder.write_all(html).expect("should compress"); + encoder.finish().expect("should finish compression"); + } + + let body = Body::from(compressed); + let params = OwnedProcessResponseParams { + content_encoding: "gzip".to_string(), + origin_host: "origin.example.com".to_string(), + origin_url: "https://origin.example.com".to_string(), + request_host: "proxy.example.com".to_string(), + request_scheme: "https".to_string(), + content_type: "text/css".to_string(), + }; + + let mut output = Vec::new(); + stream_publisher_body(body, &mut output, ¶ms, &settings, ®istry) + .expect("should process gzip CSS"); + + // Decompress output + use flate2::read::GzDecoder; + use std::io::Read; + let mut decoder = GzDecoder::new(&output[..]); + let mut decompressed = String::new(); + decoder + .read_to_string(&mut decompressed) + .expect("should decompress output"); + + assert!( + decompressed.contains("proxy.example.com"), + "should rewrite origin to proxy. Got: {decompressed}" + ); + assert!( + !decompressed.contains("origin.example.com"), + "should not contain original host. Got: {decompressed}" + ); + } } diff --git a/crates/trusted-server-core/src/streaming_processor.rs b/crates/trusted-server-core/src/streaming_processor.rs index 44fd7af3..c4c901e7 100644 --- a/crates/trusted-server-core/src/streaming_processor.rs +++ b/crates/trusted-server-core/src/streaming_processor.rs @@ -19,8 +19,11 @@ //! streaming interface. See `crate::platform` module doc for the //! authoritative note. -use error_stack::{Report, ResultExt}; +use std::cell::RefCell; use std::io::{self, Read, Write}; +use std::rc::Rc; + +use error_stack::{Report, ResultExt}; use crate::error::TrustedServerError; @@ -45,7 +48,7 @@ pub trait StreamProcessor { } /// Compression type for the stream -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Compression { None, Gzip, @@ -66,7 +69,21 @@ impl Compression { } } -/// Configuration for the streaming pipeline +/// Configuration for the streaming pipeline. +/// +/// # Supported compression combinations +/// +/// | Input | Output | Behavior | +/// |-------|--------|----------| +/// | None | None | Pass-through processing | +/// | Gzip | Gzip | Decompress → process → recompress | +/// | Gzip | None | Decompress → process | +/// | Deflate | Deflate | Decompress → process → recompress | +/// | Deflate | None | Decompress → process | +/// | Brotli | Brotli | Decompress → process → recompress | +/// | Brotli | None | Decompress → process | +/// +/// All other combinations return an error at runtime. pub struct PipelineConfig { /// Input compression type pub input_compression: Compression, @@ -104,6 +121,10 @@ impl StreamingPipeline

{ /// Process a stream from input to output /// + /// Handles all supported compression transformations by wrapping the raw + /// reader/writer in the appropriate decoder/encoder, then delegating to + /// [`Self::process_chunks`]. + /// /// # Errors /// /// Returns an error if the compression transformation is unsupported or if reading/writing fails. @@ -116,44 +137,104 @@ impl StreamingPipeline

{ self.config.input_compression, self.config.output_compression, ) { - (Compression::None, Compression::None) => self.process_uncompressed(input, output), - (Compression::Gzip, Compression::Gzip) => self.process_gzip_to_gzip(input, output), - (Compression::Gzip, Compression::None) => self.process_gzip_to_none(input, output), + (Compression::None, Compression::None) => self.process_chunks(input, output), + (Compression::Gzip, Compression::Gzip) => { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let decoder = GzDecoder::new(input); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + self.process_chunks(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize gzip encoder".to_string(), + })?; + Ok(()) + } + (Compression::Gzip, Compression::None) => { + use flate2::read::GzDecoder; + + self.process_chunks(GzDecoder::new(input), output) + } (Compression::Deflate, Compression::Deflate) => { - self.process_deflate_to_deflate(input, output) + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let decoder = ZlibDecoder::new(input); + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + self.process_chunks(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize deflate encoder".to_string(), + })?; + Ok(()) } (Compression::Deflate, Compression::None) => { - self.process_deflate_to_none(input, output) + use flate2::read::ZlibDecoder; + + self.process_chunks(ZlibDecoder::new(input), output) } (Compression::Brotli, Compression::Brotli) => { - self.process_brotli_to_brotli(input, output) + use brotli::enc::writer::CompressorWriter; + use brotli::enc::BrotliEncoderParams; + use brotli::Decompressor; + + let decoder = Decompressor::new(input, 4096); + let params = BrotliEncoderParams { + quality: 4, + lgwin: 22, + ..Default::default() + }; + let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); + self.process_chunks(decoder, &mut encoder)?; + // CompressorWriter emits the brotli stream trailer via flush(), + // which process_chunks already called. into_inner() avoids a + // redundant flush on drop and makes finalization explicit. + // Note: unlike flate2's finish(), CompressorWriter has no + // fallible finalization method — flush() is the only option. + let _ = encoder.into_inner(); + Ok(()) + } + (Compression::Brotli, Compression::None) => { + use brotli::Decompressor; + + self.process_chunks(Decompressor::new(input, 4096), output) } - (Compression::Brotli, Compression::None) => self.process_brotli_to_none(input, output), _ => Err(Report::new(TrustedServerError::Proxy { message: "Unsupported compression transformation".to_string(), })), } } - /// Process uncompressed stream - fn process_uncompressed( + /// Read chunks from `reader`, pass each through the processor, and write output to `writer`. + /// + /// This is the single unified chunk loop used by all compression paths. + /// The method calls `writer.flush()` before returning. For the `None → None` + /// path this is the only finalization needed. For compressed paths, the caller + /// must still call the encoder's type-specific finalization after this returns: + /// - **flate2** (`GzEncoder`, `ZlibEncoder`): call `finish()` — `flush()` does + /// not write the gzip/deflate trailer. + /// - **brotli** (`CompressorWriter`): `flush()` does finalize the stream, so + /// the caller only needs `into_inner()` to reclaim the writer. + /// + /// # Errors + /// + /// Returns an error if reading, processing, or writing any chunk fails. + fn process_chunks( &mut self, - mut input: R, - mut output: W, + mut reader: R, + mut writer: W, ) -> Result<(), Report> { let mut buffer = vec![0u8; self.config.chunk_size]; loop { - match input.read(&mut buffer) { + match reader.read(&mut buffer) { Ok(0) => { - // End of stream - process any remaining data let final_chunk = self.processor.process_chunk(&[], true).change_context( TrustedServerError::Proxy { message: "Failed to process final chunk".to_string(), }, )?; if !final_chunk.is_empty() { - output.write_all(&final_chunk).change_context( + writer.write_all(&final_chunk).change_context( TrustedServerError::Proxy { message: "Failed to write final chunk".to_string(), }, @@ -162,7 +243,6 @@ impl StreamingPipeline

{ break; } Ok(n) => { - // Process this chunk let processed = self .processor .process_chunk(&buffer[..n], false) @@ -170,7 +250,7 @@ impl StreamingPipeline

{ message: "Failed to process chunk".to_string(), })?; if !processed.is_empty() { - output + writer .write_all(&processed) .change_context(TrustedServerError::Proxy { message: "Failed to write processed chunk".to_string(), @@ -179,309 +259,99 @@ impl StreamingPipeline

{ } Err(e) => { return Err(Report::new(TrustedServerError::Proxy { - message: format!("Failed to read from input: {}", e), + message: format!("Failed to read: {e}"), })); } } } - output.flush().change_context(TrustedServerError::Proxy { + writer.flush().change_context(TrustedServerError::Proxy { message: "Failed to flush output".to_string(), })?; Ok(()) } +} - /// Process gzip compressed stream - fn process_gzip_to_gzip( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::GzDecoder; - use flate2::write::GzEncoder; - use flate2::Compression; - - // Decompress input - let mut decoder = GzDecoder::new(input); - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: "Failed to decompress gzip".to_string(), - })?; - - log::info!("Decompressed size: {} bytes", decompressed.len()); - - // Process the decompressed content - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("Processed size: {} bytes", processed.len()); - - // Recompress the output - let mut encoder = GzEncoder::new(output, Compression::default()); - encoder - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write to gzip encoder".to_string(), - })?; - encoder.finish().change_context(TrustedServerError::Proxy { - message: "Failed to finish gzip encoder".to_string(), - })?; - - Ok(()) - } - - /// Decompress input, process content, and write uncompressed output. - fn decompress_and_process( - &mut self, - mut decoder: R, - mut output: W, - codec_name: &str, - ) -> Result<(), Report> { - let mut decompressed = Vec::new(); - decoder - .read_to_end(&mut decompressed) - .change_context(TrustedServerError::Proxy { - message: format!("Failed to decompress {codec_name}"), - })?; - - log::info!( - "{codec_name} decompressed size: {} bytes", - decompressed.len() - ); - - let processed = self - .processor - .process_chunk(&decompressed, true) - .change_context(TrustedServerError::Proxy { - message: "Failed to process content".to_string(), - })?; - - log::info!("{codec_name} processed size: {} bytes", processed.len()); - - output - .write_all(&processed) - .change_context(TrustedServerError::Proxy { - message: "Failed to write output".to_string(), - })?; - - Ok(()) - } - - /// Process gzip compressed input to uncompressed output (decompression only) - fn process_gzip_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::GzDecoder; - - self.decompress_and_process(GzDecoder::new(input), output, "gzip") - } - - /// Process deflate compressed stream - fn process_deflate_to_deflate( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::ZlibDecoder; - use flate2::write::ZlibEncoder; - use flate2::Compression; - - let decoder = ZlibDecoder::new(input); - let encoder = ZlibEncoder::new(output, Compression::default()); - - self.process_through_compression(decoder, encoder) - } - - /// Process deflate compressed input to uncompressed output (decompression only) - fn process_deflate_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use flate2::read::ZlibDecoder; - - self.decompress_and_process(ZlibDecoder::new(input), output, "deflate") - } - - /// Process brotli compressed stream - fn process_brotli_to_brotli( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use brotli::enc::writer::CompressorWriter; - use brotli::enc::BrotliEncoderParams; - use brotli::Decompressor; - - let decoder = Decompressor::new(input, 4096); - let params = BrotliEncoderParams { - quality: 4, - lgwin: 22, - ..Default::default() - }; - let encoder = CompressorWriter::with_params(output, 4096, ¶ms); - - self.process_through_compression(decoder, encoder) - } - - /// Process brotli compressed input to uncompressed output (decompression only) - fn process_brotli_to_none( - &mut self, - input: R, - output: W, - ) -> Result<(), Report> { - use brotli::Decompressor; - - self.decompress_and_process(Decompressor::new(input, 4096), output, "brotli") - } - - /// Generic processing through compression layers - fn process_through_compression( - &mut self, - mut decoder: R, - mut encoder: W, - ) -> Result<(), Report> { - let mut buffer = vec![0u8; self.config.chunk_size]; - - loop { - match decoder.read(&mut buffer) { - Ok(0) => { - // End of stream - let final_chunk = self.processor.process_chunk(&[], true).change_context( - TrustedServerError::Proxy { - message: "Failed to process final chunk".to_string(), - }, - )?; - if !final_chunk.is_empty() { - encoder.write_all(&final_chunk).change_context( - TrustedServerError::Proxy { - message: "Failed to write final chunk".to_string(), - }, - )?; - } - break; - } - Ok(n) => { - let processed = self - .processor - .process_chunk(&buffer[..n], false) - .change_context(TrustedServerError::Proxy { - message: "Failed to process chunk".to_string(), - })?; - if !processed.is_empty() { - encoder.write_all(&processed).change_context( - TrustedServerError::Proxy { - message: "Failed to write processed chunk".to_string(), - }, - )?; - } - } - Err(e) => { - return Err(Report::new(TrustedServerError::Proxy { - message: format!("Failed to read from decoder: {}", e), - })); - } - } - } - - // Flush encoder (this also finishes compression) - encoder.flush().change_context(TrustedServerError::Proxy { - message: "Failed to flush encoder".to_string(), - })?; - - // For GzEncoder and similar, we need to finish() to properly close the stream - // The flush above might not be enough - drop(encoder); +/// Shared output buffer used as an [`lol_html::OutputSink`]. +/// +/// The `HtmlRewriter` invokes [`OutputSink::handle_chunk`] synchronously during +/// each [`HtmlRewriter::write`] call, so the buffer is drained after every +/// `process_chunk` invocation to emit output incrementally. +struct RcVecSink(Rc>>); - Ok(()) +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); } } -/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor` -/// Important: Due to `lol_html`'s ownership model, we must accumulate input -/// and process it all at once when the stream ends. This is a limitation -/// of the `lol_html` library's API design. +/// Adapter to use `lol_html` [`HtmlRewriter`](lol_html::HtmlRewriter) as a [`StreamProcessor`]. +/// +/// Output is emitted incrementally on every [`process_chunk`](StreamProcessor::process_chunk) +/// call. Script rewriters that receive text from `lol_html` must be fragment-safe — +/// they accumulate text fragments internally until `is_last_in_text_node` is true. +/// +/// The adapter is single-use: one adapter per request. Calling [`StreamProcessor::reset`] +/// is a no-op because the rewriter consumes its settings on construction. pub struct HtmlRewriterAdapter { - settings: lol_html::Settings<'static, 'static>, - accumulated_input: Vec, + rewriter: Option>, + output: Rc>>, } impl HtmlRewriterAdapter { - /// Create a new HTML rewriter adapter + /// Create a new HTML rewriter adapter that streams output per chunk. #[must_use] pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); Self { - settings, - accumulated_input: Vec::new(), + rewriter: Some(rewriter), + output, } } } impl StreamProcessor for HtmlRewriterAdapter { fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { - // Accumulate input chunks - self.accumulated_input.extend_from_slice(chunk); - - if !chunk.is_empty() { - log::debug!( - "Buffering chunk: {} bytes, total buffered: {} bytes", - chunk.len(), - self.accumulated_input.len() - ); + match &mut self.rewriter { + Some(rewriter) => { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + None if !chunk.is_empty() => { + log::warn!( + "HtmlRewriterAdapter: {} bytes received after finalization, data will be lost", + chunk.len() + ); + } + None => {} } - // Only process when we have all the input if is_last { - log::info!( - "Processing complete document: {} bytes", - self.accumulated_input.len() - ); - - // Process all accumulated input at once - let mut output = Vec::new(); - - // Create rewriter with output sink - let mut rewriter = lol_html::HtmlRewriter::new( - std::mem::take(&mut self.settings), - |chunk: &[u8]| { - output.extend_from_slice(chunk); - }, - ); - - // Process the entire document - rewriter.write(&self.accumulated_input).map_err(|e| { - log::error!("Failed to process HTML: {}", e); - io::Error::other(format!("HTML processing failed: {}", e)) - })?; - - // Finalize the rewriter - rewriter.end().map_err(|e| { - log::error!("Failed to finalize: {}", e); - io::Error::other(format!("HTML finalization failed: {}", e)) - })?; - - log::debug!("Output size: {} bytes", output.len()); - self.accumulated_input.clear(); - Ok(output) - } else { - // Return empty until we have all input - // This is a limitation of lol_html's API - Ok(Vec::new()) + if let Some(rewriter) = self.rewriter.take() { + rewriter.end().map_err(|e| { + log::error!("Failed to finalize HTML: {e}"); + io::Error::other(format!("HTML finalization failed: {e}")) + })?; + } } - } - fn reset(&mut self) { - self.accumulated_input.clear(); + // Drain whatever lol_html produced since the last call + Ok(std::mem::take(&mut *self.output.borrow_mut())) } + + /// No-op. `HtmlRewriterAdapter` is single-use: the rewriter consumes its + /// [`Settings`](lol_html::Settings) on construction and cannot be recreated. + /// Calling [`process_chunk`](StreamProcessor::process_chunk) after + /// [`process_chunk`](StreamProcessor::process_chunk) with `is_last = true` + /// will produce empty output. + fn reset(&mut self) {} } /// Adapter to use our existing `StreamingReplacer` as a `StreamProcessor` @@ -498,6 +368,57 @@ mod tests { use super::*; use crate::streaming_replacer::{Replacement, StreamingReplacer}; + /// Verify that `lol_html` fragments text nodes when input chunks split + /// mid-text-node. Script rewriters must be fragment-safe — they accumulate + /// text fragments internally until `is_last_in_text_node` is true. + #[test] + fn lol_html_fragments_text_across_chunk_boundaries() { + use std::cell::RefCell; + use std::rc::Rc; + + let fragments: Rc>> = Rc::new(RefCell::new(Vec::new())); + let fragments_clone = Rc::clone(&fragments); + + let mut rewriter = lol_html::HtmlRewriter::new( + lol_html::Settings { + element_content_handlers: vec![lol_html::text!("script", move |text| { + fragments_clone + .borrow_mut() + .push((text.as_str().to_string(), text.last_in_text_node())); + Ok(()) + })], + ..lol_html::Settings::default() + }, + |_chunk: &[u8]| {}, + ); + + // Split "googletagmanager.com/gtm.js" across two chunks + rewriter + .write(b"") + .expect("should write chunk2"); + rewriter.end().expect("should end"); + + let frags = fragments.borrow(); + // lol_html should emit at least 2 text fragments since input was split + assert!( + frags.len() >= 2, + "should fragment text across chunk boundaries, got {} fragments: {:?}", + frags.len(), + *frags + ); + // No single fragment should contain the full domain + assert!( + !frags + .iter() + .any(|(text, _)| text.contains("googletagmanager.com")), + "no individual fragment should contain the full domain when split across chunks: {:?}", + *frags + ); + } + #[test] fn test_uncompressed_pipeline() { let replacer = StreamingReplacer::new(vec![Replacement { @@ -547,7 +468,7 @@ mod tests { } #[test] - fn test_html_rewriter_adapter_accumulates_until_last() { + fn test_html_rewriter_adapter_streams_incrementally() { use lol_html::{element, Settings}; // Create a simple HTML rewriter that replaces text @@ -561,32 +482,40 @@ mod tests { let mut adapter = HtmlRewriterAdapter::new(settings); - // Test that intermediate chunks return empty let chunk1 = b""; let result1 = adapter .process_chunk(chunk1, false) .expect("should process chunk1"); - assert_eq!(result1.len(), 0, "Should return empty for non-last chunk"); let chunk2 = b"

original

"; let result2 = adapter .process_chunk(chunk2, false) .expect("should process chunk2"); - assert_eq!(result2.len(), 0, "Should return empty for non-last chunk"); - // Test that last chunk processes everything let chunk3 = b""; let result3 = adapter .process_chunk(chunk3, true) .expect("should process final chunk"); + + // Concatenate all outputs and verify the final HTML is correct + let mut all_output = result1; + all_output.extend_from_slice(&result2); + all_output.extend_from_slice(&result3); + assert!( - !result3.is_empty(), - "Should return processed content for last chunk" + !all_output.is_empty(), + "should produce non-empty concatenated output" ); - let output = String::from_utf8(result3).expect("output should be valid UTF-8"); - assert!(output.contains("replaced"), "Should have replaced content"); - assert!(output.contains(""), "Should have complete HTML"); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains("replaced"), + "should have replaced content in concatenated output" + ); + assert!( + output.contains(""), + "should have complete HTML in concatenated output" + ); } #[test] @@ -603,59 +532,294 @@ mod tests { } large_html.push_str(""); - // Process in chunks + // Process in chunks and collect all output let chunk_size = 1024; let bytes = large_html.as_bytes(); - let mut chunks = bytes.chunks(chunk_size); - let mut last_chunk = chunks.next().unwrap_or(&[]); + let mut chunks = bytes.chunks(chunk_size).peekable(); + let mut all_output = Vec::new(); - for chunk in chunks { + while let Some(chunk) = chunks.next() { + let is_last = chunks.peek().is_none(); let result = adapter - .process_chunk(last_chunk, false) - .expect("should process intermediate chunk"); - assert_eq!(result.len(), 0, "Intermediate chunks should return empty"); - last_chunk = chunk; + .process_chunk(chunk, is_last) + .expect("should process chunk"); + all_output.extend_from_slice(&result); } - // Process last chunk - let result = adapter - .process_chunk(last_chunk, true) - .expect("should process last chunk"); - assert!(!result.is_empty(), "Last chunk should return content"); + assert!( + !all_output.is_empty(), + "should produce non-empty output for large document" + ); - let output = String::from_utf8(result).expect("output should be valid UTF-8"); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); assert!( output.contains("Paragraph 999"), - "Should contain all content" + "should contain all content from large document" ); } #[test] - fn test_html_rewriter_adapter_reset() { + fn test_html_rewriter_adapter_reset_then_finalize() { use lol_html::Settings; let settings = Settings::default(); let mut adapter = HtmlRewriterAdapter::new(settings); - // Process some content - adapter - .process_chunk(b"", false) - .expect("should process html tag"); - adapter - .process_chunk(b"test", false) - .expect("should process body"); + let result1 = adapter + .process_chunk(b"test", false) + .expect("should process html"); - // Reset should clear accumulated input + // reset() is a documented no-op — adapter is single-use adapter.reset(); - // After reset, adapter should be ready for new input - let result = adapter - .process_chunk(b"

new

", true) - .expect("should process new content after reset"); - let output = String::from_utf8(result).expect("output should be valid UTF-8"); + // Finalize still works; the rewriter is still alive + let result2 = adapter + .process_chunk(b"", true) + .expect("should finalize after reset"); + + let mut all_output = result1; + all_output.extend_from_slice(&result2); + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains("test"), + "should produce correct output despite no-op reset" + ); + } + + #[test] + fn test_deflate_round_trip_produces_valid_output() { + // Verify that deflate-to-deflate produces valid output that decompresses + // correctly, confirming that encoder finalization works. + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + use std::io::{Read as _, Write as _}; + + let input_data = b"hello world"; + + // Compress input + let mut compressed_input = Vec::new(); + { + let mut enc = ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Deflate, + output_compression: Compression::Deflate, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process deflate-to-deflate"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + ZlibDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through deflate round-trip" + ); + } + + #[test] + fn test_gzip_to_gzip_produces_correct_output() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + use std::io::{Read as _, Write as _}; + + // Arrange + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + // Act + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-gzip"); + + // Assert + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through gzip round-trip" + ); + } + + #[test] + fn test_gzip_to_none_produces_correct_output() { + use flate2::write::GzEncoder; + use std::io::Write as _; + + // Arrange + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::None, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + // Act + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-none"); + + // Assert + let result = String::from_utf8(output).expect("should be valid UTF-8 uncompressed output"); + assert_eq!( + result, "hi world", + "should have replaced content after gzip decompression" + ); + } + + #[test] + fn test_brotli_round_trip_produces_valid_output() { + use brotli::enc::writer::CompressorWriter; + use brotli::Decompressor; + use std::io::{Read as _, Write as _}; + + let input_data = b"hello world"; + + // Compress input with brotli + let mut compressed_input = Vec::new(); + { + let mut enc = CompressorWriter::new(&mut compressed_input, 4096, 4, 22); + enc.write_all(input_data) + .expect("should compress test input"); + enc.flush().expect("should flush brotli encoder"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Brotli, + output_compression: Compression::Brotli, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process brotli-to-brotli"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + Decompressor::new(&output[..], 4096) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + assert_eq!( - output, "

new

", - "Should only contain new input after reset" + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through brotli round-trip" + ); + } + + #[test] + fn test_html_rewriter_adapter_emits_output_per_chunk() { + use lol_html::Settings; + + let settings = Settings::default(); + let mut adapter = HtmlRewriterAdapter::new(settings); + + // Send three chunks — lol_html may buffer internally, so individual + // chunk outputs may vary by version. The contract is that concatenated + // output is correct, and that output is not deferred entirely to is_last. + let result1 = adapter + .process_chunk(b"", false) + .expect("should process chunk1"); + let result2 = adapter + .process_chunk(b"

hello

", false) + .expect("should process chunk2"); + let result3 = adapter + .process_chunk(b"", true) + .expect("should process final chunk"); + + // At least one intermediate chunk should produce output (verifies + // we're not deferring everything to is_last like the old adapter). + assert!( + !result1.is_empty() || !result2.is_empty(), + "should emit some output before is_last" + ); + + // Concatenated output must be correct + let mut all_output = result1; + all_output.extend_from_slice(&result2); + all_output.extend_from_slice(&result3); + + let output = String::from_utf8(all_output).expect("output should be valid UTF-8"); + assert!( + output.contains(""), + "should contain html tag in concatenated output" + ); + assert!( + output.contains("

hello

"), + "should contain paragraph in concatenated output" + ); + assert!( + output.contains(""), + "should contain closing html tag in concatenated output" ); } @@ -696,4 +860,61 @@ mod tests { "Should not contain original URL" ); } + + #[test] + fn test_gzip_pipeline_with_html_rewriter() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + use lol_html::{element, Settings}; + use std::io::{Read as _, Write as _}; + + let settings = Settings { + element_content_handlers: vec![element!("a[href]", |el| { + if let Some(href) = el.get_attribute("href") { + if href.contains("example.com") { + el.set_attribute("href", &href.replace("example.com", "test.com"))?; + } + } + Ok(()) + })], + ..Settings::default() + }; + + let input = b"Link"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input).expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let adapter = HtmlRewriterAdapter::new(settings); + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + let mut pipeline = StreamingPipeline::new(config, adapter); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("pipeline should process gzip HTML"); + + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output"); + + let result = String::from_utf8(decompressed).expect("output should be valid UTF-8"); + assert!( + result.contains("https://test.com"), + "should have replaced URL through gzip HTML pipeline" + ); + assert!( + !result.contains("example.com"), + "should not contain original URL after gzip HTML pipeline" + ); + } } diff --git a/docs/superpowers/plans/2026-03-25-streaming-response.md b/docs/superpowers/plans/2026-03-25-streaming-response.md new file mode 100644 index 00000000..39f9914a --- /dev/null +++ b/docs/superpowers/plans/2026-03-25-streaming-response.md @@ -0,0 +1,1120 @@ +# Streaming Response Optimization — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use +> superpowers:subagent-driven-development (recommended) or +> superpowers:executing-plans to implement this plan task-by-task. Steps use +> checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Stream HTTP responses through the publisher proxy instead of buffering +them, reducing peak memory from ~4x response size to constant and improving +time-to-last-byte. + +**Architecture:** Two independent phases. Phase 1 makes the internal streaming +pipeline truly chunk-emitting (HtmlRewriterAdapter, compression paths, encoder +finalization). Phase 2 wires up Fastly's `StreamingBody` API so processed chunks +flow directly to the client. Each phase is shippable independently. + +**Tech Stack:** Rust 1.91.1, Fastly Compute SDK 0.11.12 +(`stream_to_client`/`send_to_client`/`StreamingBody`), `lol_html` (HTML +rewriting), `flate2` (gzip/deflate), `brotli` (brotli compression). + +**Spec:** `docs/superpowers/specs/2026-03-25-streaming-response-design.md` +**Issue:** #563 + +--- + +## File Map + +| File | Role | Phase | +| ------------------------------------------------------- | -------------------------------------------------------------------------------------- | ----- | +| `crates/trusted-server-core/src/streaming_processor.rs` | `HtmlRewriterAdapter` rewrite, compression path fixes, encoder finalization | 1 | +| `crates/trusted-server-core/src/publisher.rs` | `process_response_streaming` refactor to `W: Write`, streaming gate, header reordering | 2 | +| `crates/trusted-server-adapter-fastly/src/main.rs` | Entry point migration from `#[fastly::main]` to raw `main()`, response routing | 2 | + +--- + +## Phase 1: Make the Pipeline Chunk-Emitting + +> **Implementation note (2026-03-26):** Tasks 1-3 were implemented as planned, +> then followed by a refactor that unified all 9 `process_*_to_*` methods into +> a single `process_chunks` method with inline decoder/encoder creation in +> `process()`. This eliminated ~150 lines of duplication. The refactor was +> committed as "Unify compression paths into single process_chunks method". +> Tasks 1-3 descriptions below reflect the original plan; the final code is +> cleaner than described. + +### Task 1: Fix encoder finalization in `process_through_compression` + +This is the prerequisite for Task 2. The current code calls `flush()` then +`drop(encoder)`, silently swallowing finalization errors. Must fix before +moving gzip to this path. + +**Files:** + +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:334-393` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +- [ ] **Step 1: Write a test verifying deflate round-trip correctness** + +Add to the `#[cfg(test)]` module at the bottom of +`streaming_processor.rs`: + +```rust +#[test] +fn test_deflate_round_trip_produces_valid_output() { + // Verify that deflate-to-deflate (which uses process_through_compression) + // produces valid output that decompresses correctly. This establishes the + // correctness contract before we change the finalization path. + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let input_data = b"hello world"; + + // Compress input + let mut compressed_input = Vec::new(); + { + let mut enc = ZlibEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Deflate, + output_compression: Compression::Deflate, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process deflate-to-deflate"); + + // Decompress output and verify correctness + let mut decompressed = Vec::new(); + ZlibDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress output — implies encoder was finalized correctly"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through deflate round-trip" + ); +} +``` + +- [ ] **Step 2: Run test to verify it passes (baseline)** + +Run: `cargo test --package trusted-server-core test_deflate_round_trip_produces_valid_output` + +Expected: PASS (current code happens to work for this case since +`ZlibEncoder::drop` calls `finish` internally — the test establishes the +contract). + +- [ ] **Step 3: Change `process_through_compression` to take `&mut W` and remove `drop(encoder)`** + +`finish()` is not on the `Write` trait — each encoder type +(`GzEncoder`, `ZlibEncoder`, `CompressorWriter`) has its own `finish()`. +The fix: change the signature to take `&mut W` so the caller retains +ownership and calls `finish()` explicitly. + +Change signature (line 335-338): + +```rust + fn process_through_compression( + &mut self, + mut decoder: R, + encoder: &mut W, + ) -> Result<(), Report> { +``` + +Replace lines 383-393 (the `flush` + `drop` block): + +```rust + encoder.flush().change_context(TrustedServerError::Proxy { + message: "Failed to flush encoder".to_string(), + })?; + + // Caller owns encoder and must call finish() after this returns. + Ok(()) + } +``` + +Then update `process_deflate_to_deflate` (lines 276-289): + +```rust + fn process_deflate_to_deflate( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use flate2::read::ZlibDecoder; + use flate2::write::ZlibEncoder; + + let decoder = ZlibDecoder::new(input); + let mut encoder = ZlibEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize deflate encoder".to_string(), + })?; + Ok(()) + } +``` + +And update `process_brotli_to_brotli` (lines 303-321): + +```rust + fn process_brotli_to_brotli( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use brotli::enc::writer::CompressorWriter; + use brotli::enc::BrotliEncoderParams; + use brotli::Decompressor; + + let decoder = Decompressor::new(input, 4096); + let mut params = BrotliEncoderParams::default(); + params.quality = 4; + params.lgwin = 22; + let mut encoder = CompressorWriter::with_params(output, 4096, ¶ms); + self.process_through_compression(decoder, &mut encoder)?; + // CompressorWriter finalizes on flush (already called) and into_inner + encoder.into_inner(); + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All existing tests pass plus the new one. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Fix encoder finalization: explicit finish instead of drop" +``` + +--- + +### Task 2: Convert `process_gzip_to_gzip` to chunk-based processing + +**Files:** + +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:183-225` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +- [ ] **Step 1: Write a test for gzip chunk-based round-trip** + +```rust +#[test] +fn test_gzip_to_gzip_produces_correct_output() { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let input_data = b"hello world"; + + // Compress input as gzip + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::Gzip, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-gzip"); + + // Decompress and verify + let mut decompressed = Vec::new(); + GzDecoder::new(&output[..]) + .read_to_end(&mut decompressed) + .expect("should decompress gzip output"); + + assert_eq!( + String::from_utf8(decompressed).expect("should be valid UTF-8"), + "hi world", + "should have replaced content through gzip round-trip" + ); +} +``` + +- [ ] **Step 2: Run test to verify it passes (baseline)** + +Run: `cargo test --package trusted-server-core test_gzip_to_gzip_produces_correct_output` + +Expected: PASS (current code works, just buffers everything). + +- [ ] **Step 3: Rewrite `process_gzip_to_gzip` to use `process_through_compression`** + +Replace `process_gzip_to_gzip` (lines 183-225): + +```rust + fn process_gzip_to_gzip( + &mut self, + input: R, + output: W, + ) -> Result<(), Report> { + use flate2::read::GzDecoder; + use flate2::write::GzEncoder; + + let decoder = GzDecoder::new(input); + let mut encoder = GzEncoder::new(output, flate2::Compression::default()); + self.process_through_compression(decoder, &mut encoder)?; + encoder.finish().change_context(TrustedServerError::Proxy { + message: "Failed to finalize gzip encoder".to_string(), + })?; + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Convert process_gzip_to_gzip to chunk-based processing" +``` + +--- + +### Task 3: Convert `decompress_and_process` to chunk-based processing + +**Files:** + +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:227-262` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +Note: the `*_to_none` callers (`process_gzip_to_none`, +`process_deflate_to_none`, `process_brotli_to_none` at lines 264-332) do +not need changes — they call `decompress_and_process` with the same +signature. + +- [ ] **Step 1: Write a test for gzip-to-none chunk-based processing** + +```rust +#[test] +fn test_gzip_to_none_produces_correct_output() { + use flate2::write::GzEncoder; + + let input_data = b"hello world"; + + let mut compressed_input = Vec::new(); + { + let mut enc = GzEncoder::new(&mut compressed_input, flate2::Compression::default()); + enc.write_all(input_data) + .expect("should compress test input"); + enc.finish().expect("should finish compression"); + } + + let replacer = StreamingReplacer::new(vec![Replacement { + find: "hello".to_string(), + replace_with: "hi".to_string(), + }]); + + let config = PipelineConfig { + input_compression: Compression::Gzip, + output_compression: Compression::None, + chunk_size: 8192, + }; + + let mut pipeline = StreamingPipeline::new(config, replacer); + let mut output = Vec::new(); + + pipeline + .process(&compressed_input[..], &mut output) + .expect("should process gzip-to-none"); + + assert_eq!( + String::from_utf8(output).expect("should be valid UTF-8"), + "hi world", + "should have replaced content and output uncompressed" + ); +} +``` + +- [ ] **Step 2: Run test to verify baseline** + +Run: `cargo test --package trusted-server-core test_gzip_to_none_produces_correct_output` + +Expected: PASS. + +- [ ] **Step 3: Rewrite `decompress_and_process` to use chunk loop** + +Replace `decompress_and_process` (lines 227-262) with a chunk-based +version that mirrors `process_uncompressed`: + +```rust + fn decompress_and_process( + &mut self, + mut decoder: R, + mut output: W, + _codec_name: &str, + ) -> Result<(), Report> { + let mut buffer = vec![0u8; self.config.chunk_size]; + + loop { + match decoder.read(&mut buffer) { + Ok(0) => { + let final_chunk = + self.processor.process_chunk(&[], true).change_context( + TrustedServerError::Proxy { + message: "Failed to process final chunk".to_string(), + }, + )?; + if !final_chunk.is_empty() { + output.write_all(&final_chunk).change_context( + TrustedServerError::Proxy { + message: "Failed to write final chunk".to_string(), + }, + )?; + } + break; + } + Ok(n) => { + let processed = self + .processor + .process_chunk(&buffer[..n], false) + .change_context(TrustedServerError::Proxy { + message: "Failed to process chunk".to_string(), + })?; + if !processed.is_empty() { + output.write_all(&processed).change_context( + TrustedServerError::Proxy { + message: "Failed to write processed chunk".to_string(), + }, + )?; + } + } + Err(e) => { + return Err(Report::new(TrustedServerError::Proxy { + message: format!("Failed to read from decoder: {e}"), + })); + } + } + } + + output.flush().change_context(TrustedServerError::Proxy { + message: "Failed to flush output".to_string(), + })?; + + Ok(()) + } +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 5: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Convert decompress_and_process to chunk-based processing" +``` + +--- + +### Task 4: Rewrite `HtmlRewriterAdapter` for incremental streaming + +**Files:** + +- Modify: `crates/trusted-server-core/src/streaming_processor.rs:396-472` +- Test: `crates/trusted-server-core/src/streaming_processor.rs` (test module) + +Important context: `create_html_processor` in `html_processor.rs` returns +`HtmlWithPostProcessing`, which wraps `HtmlRewriterAdapter`. The wrapper's +`process_chunk` (line 31-34 of `html_processor.rs`) returns intermediate +output immediately for `!is_last` chunks — it passes through, not +swallows. When the post-processor list is empty (streaming gate condition), +the wrapper is a no-op passthrough. No changes needed to +`html_processor.rs`. + +- [ ] **Step 1: Write a test proving incremental output** + +```rust +#[test] +fn test_html_rewriter_adapter_emits_output_per_chunk() { + use lol_html::Settings; + + let settings = Settings::default(); + let mut adapter = HtmlRewriterAdapter::new(settings); + + // First chunk should produce output (not empty) + let result1 = adapter + .process_chunk(b"", false) + .expect("should process chunk 1"); + assert!( + !result1.is_empty(), + "should emit output for non-last chunk, got empty" + ); + + // Second chunk should also produce output + let result2 = adapter + .process_chunk(b"

hello

", false) + .expect("should process chunk 2"); + assert!( + !result2.is_empty(), + "should emit output for second non-last chunk, got empty" + ); + + // Final chunk + let result3 = adapter + .process_chunk(b"", true) + .expect("should process final chunk"); + + // Concatenated output should be the full document + let mut full_output = result1; + full_output.extend_from_slice(&result2); + full_output.extend_from_slice(&result3); + let output_str = String::from_utf8(full_output).expect("should be valid UTF-8"); + assert!( + output_str.contains("") && output_str.contains("hello"), + "should contain complete document, got: {output_str}" + ); +} +``` + +- [ ] **Step 2: Run test to verify it fails (current code returns empty for non-last chunks)** + +Run: `cargo test --package trusted-server-core test_html_rewriter_adapter_emits_output_per_chunk` + +Expected: FAIL — assertion `should emit output for non-last chunk` fails. + +- [ ] **Step 3: Rewrite `HtmlRewriterAdapter` to stream incrementally** + +Replace the struct and impl (lines 396-472): + +```rust +/// Adapter to use `lol_html` `HtmlRewriter` as a `StreamProcessor`. +/// +/// Creates the rewriter eagerly and emits output on every `process_chunk` +/// call. Single-use: `reset()` is a no-op since `Settings` are consumed +/// by the rewriter constructor. +pub struct HtmlRewriterAdapter { + rewriter: Option>, + output: Rc>>, +} + +/// Output sink that appends to a shared `Vec`. +struct RcVecSink(Rc>>); + +impl lol_html::OutputSink for RcVecSink { + fn handle_chunk(&mut self, chunk: &[u8]) { + self.0.borrow_mut().extend_from_slice(chunk); + } +} + +impl HtmlRewriterAdapter { + /// Create a new HTML rewriter adapter. + /// + /// The rewriter is created immediately, consuming the settings. + #[must_use] + pub fn new(settings: lol_html::Settings<'static, 'static>) -> Self { + let output = Rc::new(RefCell::new(Vec::new())); + let sink = RcVecSink(Rc::clone(&output)); + let rewriter = lol_html::HtmlRewriter::new(settings, sink); + Self { + rewriter: Some(rewriter), + output, + } + } +} + +impl StreamProcessor for HtmlRewriterAdapter { + fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result, io::Error> { + if let Some(rewriter) = &mut self.rewriter { + if !chunk.is_empty() { + rewriter.write(chunk).map_err(|e| { + log::error!("Failed to process HTML chunk: {e}"); + io::Error::other(format!("HTML processing failed: {e}")) + })?; + } + } + + if is_last { + if let Some(rewriter) = self.rewriter.take() { + rewriter.end().map_err(|e| { + log::error!("Failed to finalize HTML: {e}"); + io::Error::other(format!("HTML finalization failed: {e}")) + })?; + } + } + + // Drain whatever lol_html produced since last call. + // Safe: sink borrow released before we borrow here. + Ok(std::mem::take(&mut *self.output.borrow_mut())) + } + + fn reset(&mut self) { + // No-op: rewriter consumed Settings on construction. + // Single-use by design (one per request). + } +} +``` + +Add these imports at the top of `streaming_processor.rs`: + +```rust +use std::cell::RefCell; +use std::rc::Rc; +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test --package trusted-server-core` + +Expected: The new per-chunk test passes. Some existing tests that assert +"intermediate chunks return empty" will now fail and need updating. + +- [ ] **Step 5: Update existing tests for new behavior** + +Update `test_html_rewriter_adapter_accumulates_until_last` — it currently +asserts empty output for non-last chunks. Change assertions to expect +non-empty intermediate output and verify the concatenated result. + +Update `test_html_rewriter_adapter_handles_large_input` — same: remove +assertions that intermediate chunks are empty. + +Update `test_html_rewriter_adapter_reset` — `reset()` is now a no-op. +Remove or update this test since the adapter is single-use. + +- [ ] **Step 6: Run all tests again** + +Run: `cargo test --package trusted-server-core` + +Expected: All tests pass. + +- [ ] **Step 7: Run clippy** + +Run: `cargo clippy --workspace --all-targets --all-features -- -D warnings` + +Expected: No warnings. + +- [ ] **Step 8: Commit** + +``` +git add crates/trusted-server-core/src/streaming_processor.rs +git commit -m "Rewrite HtmlRewriterAdapter for incremental lol_html streaming" +``` + +--- + +### Task 5: Phase 1 full verification + +- [ ] **Step 1: Run full test suite** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 2: Run JS tests** + +Run: `cd crates/js/lib && npx vitest run` + +Expected: All tests pass. + +- [ ] **Step 3: Run clippy and fmt** + +Run: `cargo clippy --workspace --all-targets --all-features -- -D warnings && cargo fmt --all -- --check` + +Expected: Clean. + +- [ ] **Step 4: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +--- + +## Phase 2: Stream Response to Client + +> **Note:** Phase 2 may need adjustment to align with the EC (Edge Compute) +> implementation. Coordinate with the EC work before finalizing the approach. + +### Task 6: Migrate entry point from `#[fastly::main]` to raw `main()` + +**Files:** + +- Modify: `crates/trusted-server-adapter-fastly/src/main.rs:32-68` + +- [ ] **Step 1: Rewrite `main` function** + +Replace lines 32-68: + +```rust +fn main() { + init_logger(); + + let req = Request::from_client(); + + // Health probe: independent from settings/routing. + if req.get_method() == Method::GET && req.get_path() == "/health" { + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; + } + + let settings = match get_settings() { + Ok(s) => s, + Err(e) => { + log::error!("Failed to load settings: {:?}", e); + to_error_response(&e).send_to_client(); + return; + } + }; + log::debug!("Settings {settings:?}"); + + let orchestrator = build_orchestrator(&settings); + + let integration_registry = match IntegrationRegistry::new(&settings) { + Ok(r) => r, + Err(e) => { + log::error!("Failed to create integration registry: {:?}", e); + to_error_response(&e).send_to_client(); + return; + } + }; + + let response = futures::executor::block_on(route_request( + &settings, + &orchestrator, + &integration_registry, + req, + )); + + match response { + Ok(resp) => resp.send_to_client(), + Err(e) => to_error_response(&e).send_to_client(), + } +} +``` + +- [ ] **Step 2: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 3: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +- [ ] **Step 4: Commit** + +``` +git add crates/trusted-server-adapter-fastly/src/main.rs +git commit -m "Migrate entry point from #[fastly::main] to raw main()" +``` + +--- + +### Task 7: Refactor `process_response_streaming` to accept `W: Write` + +**Files:** + +- Modify: `crates/trusted-server-core/src/publisher.rs:97-180` + +- [ ] **Step 1: Change signature to accept generic writer** + +Change `process_response_streaming` from returning `Body` to writing into +a generic `W: Write`: + +```rust +fn process_response_streaming( + body: Body, + output: &mut W, + params: &ProcessResponseParams, +) -> Result<(), Report> { +``` + +Remove `let mut output = Vec::new();` (line 117) and +`Ok(Body::from(output))` (line 179). The caller passes the output writer. + +- [ ] **Step 2: Update the call site in `handle_publisher_request`** + +In `handle_publisher_request`, replace the current call (lines 338-341): + +```rust +// Before: +match process_response_streaming(body, ¶ms) { + Ok(processed_body) => { + response.set_body(processed_body); + +// After: +let mut output = Vec::new(); +match process_response_streaming(body, &mut output, ¶ms) { + Ok(()) => { + response.set_body(Body::from(output)); +``` + +This preserves existing behavior — the buffered path still works. + +- [ ] **Step 3: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass (behavior unchanged). + +- [ ] **Step 4: Commit** + +``` +git add crates/trusted-server-core/src/publisher.rs +git commit -m "Refactor process_response_streaming to accept generic writer" +``` + +--- + +### Task 8: Add streaming path to publisher proxy + +**Files:** + +- Modify: `crates/trusted-server-core/src/publisher.rs` +- Modify: `crates/trusted-server-adapter-fastly/src/main.rs` + +This is the core change. `handle_publisher_request` needs to support two +modes: buffered (returns `Response`) and streaming (sends response directly +via `StreamingBody`). The streaming path requires access to Fastly-specific +types (`StreamingBody`, `send_to_client`), but `publisher.rs` lives in +`trusted-server-core` which is platform-agnostic. + +**Approach:** Add a `ResponseMode` enum or callback that +`handle_publisher_request` uses to decide how to send the response. The +simplest approach: split into a preparation phase (returns headers + body +stream + processing params) and a send phase (in the fastly adapter). + +Alternatively, since `StreamingPipeline::process` already takes `W: Write`, +the adapter can call `process_response_streaming` with a `StreamingBody` +directly. The key is that the adapter needs to: + +1. Call `handle_publisher_request` logic up to the point of body processing +2. Decide buffered vs streaming +3. Either buffer or stream + +This task is complex — the implementer should read the spec's Step 2 +carefully and adapt the approach to minimize changes. The plan provides the +structure; exact code depends on how the publisher function is decomposed. + +- [ ] **Step 1: Export `finalize_response` or its logic for use before streaming** + +In `main.rs`, make `finalize_response` callable from the publisher path. +Either make it `pub` and move to `trusted-server-core`, or pass a +pre-finalized response to the streaming path. + +- [ ] **Step 2: Add `has_html_post_processors()` to `IntegrationRegistry`** + +Add a method that returns `bool` to avoid the allocation that +`html_post_processors()` incurs (cloning `Vec>`): + +```rust +pub fn has_html_post_processors(&self) -> bool { + !self.inner.html_post_processors.is_empty() +} +``` + +**File:** `crates/trusted-server-core/src/integrations/registry.rs` + +- [ ] **Step 3: Add streaming gate check** + +Add a helper in `publisher.rs`: + +```rust +fn should_stream( + status: u16, + content_type: &str, + integration_registry: &IntegrationRegistry, +) -> bool { + if !(200..300).contains(&status) { + return false; + } + // Use has_html_post_processors() to avoid allocating a Vec> + // just to check emptiness. + // Only html_post_processors gate streaming — NOT script_rewriters. + // Script rewriters (Next.js, GTM) run inside lol_html element handlers + // during streaming and do not require full-document buffering. + // Currently only Next.js registers a post-processor. + let is_html = content_type.contains("text/html"); + if is_html && integration_registry.has_html_post_processors() { + return false; + } + true +} +``` + +- [ ] **Step 4: Restructure `handle_publisher_request` to support streaming** + +Split the function into: + +1. Pre-processing: request info, cookies, synthetic ID, consent, backend + request — everything before `response.take_body()` +2. Header finalization: synthetic ID/cookie headers, `finalize_response()` + headers, Content-Length removal +3. Body processing: either buffered (`Vec`) or streaming + (`StreamingBody`) + +The streaming path in the fastly adapter: + +```rust +// After header finalization, before body processing: +if should_stream { + let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + let mut streaming_body = response.stream_to_client(); + + match process_response_streaming(body, &mut streaming_body, ¶ms) { + Ok(()) => { + streaming_body.finish() + .expect("should finish streaming body"); + } + Err(e) => { + log::error!("Streaming processing failed: {:?}", e); + // StreamingBody dropped → client sees abort + } + } +} else { + // Existing buffered path +} +``` + +- [ ] **Step 5: Handle binary pass-through in streaming path** + +For non-text content when streaming is enabled: + +```rust +if !should_process { + let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + let mut streaming_body = response.stream_to_client(); + io::copy(&mut body, &mut streaming_body) + .expect("should copy body to streaming output"); + streaming_body.finish() + .expect("should finish streaming body"); +} +``` + +- [ ] **Step 6: Run all tests** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 7: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds successfully. + +- [ ] **Step 8: Commit** + +``` +git add crates/trusted-server-core/src/publisher.rs \ + crates/trusted-server-adapter-fastly/src/main.rs +git commit -m "Add streaming response path for publisher proxy" +``` + +--- + +### Task 9: Phase 2 full verification + +- [ ] **Step 1: Run full test suite** + +Run: `cargo test --workspace` + +Expected: All tests pass. + +- [ ] **Step 2: Run clippy, fmt, JS tests** + +```bash +cargo clippy --workspace --all-targets --all-features -- -D warnings +cargo fmt --all -- --check +cd crates/js/lib && npx vitest run +``` + +Expected: All clean. + +- [ ] **Step 3: Build for WASM target** + +Run: `cargo build --package trusted-server-adapter-fastly --release --target wasm32-wasip1` + +Expected: Builds. + +- [ ] **Step 4: Manual verification with Viceroy** + +Run: `fastly compute serve` + +Test: + +- `curl -s http://localhost:7676/ | sha256sum` — compare with baseline +- `curl -sI http://localhost:7676/` — verify headers present (geo, version, + synthetic ID cookie if consent configured) +- `curl -s http://localhost:7676/static/tsjs=tsjs-unified.min.js` — verify + static routes still work via `send_to_client` + +- [ ] **Step 5: Chrome DevTools MCP performance capture** + +Follow the measurement protocol in the spec's "Performance measurement via +Chrome DevTools MCP" section. Compare against baseline captured on `main`. + +--- + +### Task 10: Chrome DevTools MCP baseline + comparison + +- [ ] **Step 1: Capture baseline on `main`** + +Follow spec section "Baseline capture" — use `navigate_page`, +`list_network_requests`, `lighthouse_audit`, `performance_start_trace` / +`performance_stop_trace`, `performance_analyze_insight`, +`take_memory_snapshot`. Record median TTFB, TTLB, LCP, Speed Index across +5 runs. + +- [ ] **Step 2: Capture metrics on feature branch** + +Repeat the same measurements after building the feature branch. + +- [ ] **Step 3: Compare and document results** + +Create a comparison table and save to PR description or a results file. +Check for: + +- TTLB improvement (primary goal) +- No TTFB regression +- Identical response body hash (correctness) +- LCP/Speed Index improvement (secondary) + +--- + +## Phase 3: Make Script Rewriters Fragment-Safe (PR #591) + +> **Implementation note (2026-03-27):** All tasks completed. Script rewriters +> accumulate text fragments via `Mutex` until `last_in_text_node` is +> true. Buffered mode removed from `HtmlRewriterAdapter`. 2xx streaming gate +> added. Small-chunk (32 byte) pipeline regression tests added for both +> NextJS `__NEXT_DATA__` and GTM inline scripts. + +### Task 11: Make `NextJsNextDataRewriter` fragment-safe + +**Files:** `crates/trusted-server-core/src/integrations/nextjs/script_rewriter.rs` + +- [x] Add `accumulated_text: Mutex` field +- [x] Accumulate intermediate fragments, return `RemoveNode` +- [x] On last fragment, process full accumulated text +- [x] Handle Keep-after-accumulation (emit `Replace(full_content)`) +- [x] Add regression tests + +### Task 12: Make `GoogleTagManagerIntegration` rewrite fragment-safe + +**Files:** `crates/trusted-server-core/src/integrations/google_tag_manager.rs` + +- [x] Add `accumulated_text: Mutex` field +- [x] Accumulate intermediate fragments, return `RemoveNode` +- [x] On last fragment, match and rewrite on complete text +- [x] Non-GTM accumulated scripts emitted unchanged via `Replace` +- [x] Add regression tests + +### Task 13: Remove buffered mode from `HtmlRewriterAdapter` + +**Files:** `crates/trusted-server-core/src/streaming_processor.rs` + +- [x] Delete `new_buffered()`, `buffered` flag, `accumulated_input` +- [x] Simplify `process_chunk` to streaming-only path +- [x] Remove `buffered_adapter_prevents_text_fragmentation` test +- [x] Update doc comments + +### Task 14: Always use streaming adapter in `create_html_processor` + +**Files:** `crates/trusted-server-core/src/html_processor.rs` + +- [x] Remove `has_script_rewriters` check +- [x] Always call `HtmlRewriterAdapter::new(settings)` + +### Task 15: Full verification, regression tests, and performance measurement + +- [x] Add 2xx streaming gate (`response.get_status().is_success()`) +- [x] Add streaming gate unit tests (5 tests) +- [x] Add `stream_publisher_body` gzip round-trip test +- [x] Add small-chunk (32 byte) pipeline tests for NextJS and GTM +- [x] `cargo test --workspace` — 766 passed +- [x] `cargo clippy` — clean +- [x] `cargo fmt --check` — clean +- [x] WASM release build — success +- [x] Staging performance comparison (see results below) + +### Performance Results (getpurpose.ai, median over 5 runs, Chrome 1440x900) + +| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta | +| -------------------------- | --------------------------- | ------------------------- | ------------------ | +| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** | +| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) | +| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** | + +--- + +## Phase 4: Stream Binary Pass-Through Responses + +Non-processable content (images, fonts, video, `application/octet-stream`) +currently passes through `handle_publisher_request` unchanged via the +`Buffered` path. This buffers the entire response body in memory — wasteful +for large binaries that need no processing. Phase 4 adds a `PassThrough` +variant that streams the body directly via `io::copy` into `StreamingBody`. + +### Task 16: Stream binary pass-through responses via `io::copy` + +**Files:** + +- `crates/trusted-server-core/src/publisher.rs` +- `crates/trusted-server-adapter-fastly/src/main.rs` + +- [ ] Add `PublisherResponse::PassThrough { response, body }` variant +- [ ] Return `PassThrough` when `!should_process` and backend returned 2xx +- [ ] Handle in `main.rs`: `stream_to_client()` + `io::copy(body, &mut streaming_body)` +- [ ] Keep `Buffered` for non-2xx responses and `request_host.is_empty()` +- [ ] Preserve `Content-Length` for pass-through (body is unmodified) + +### Task 17: Binary pass-through tests and verification + +- [ ] Publisher-level test: image content type returns `PassThrough` +- [ ] Publisher-level test: 4xx image stays `Buffered` +- [ ] `cargo test --workspace` +- [ ] `cargo clippy` + `cargo fmt --check` +- [ ] WASM release build +- [ ] Staging performance comparison (DOM Complete for image-heavy pages) diff --git a/docs/superpowers/specs/2026-03-25-streaming-response-design.md b/docs/superpowers/specs/2026-03-25-streaming-response-design.md index 27650636..414c4954 100644 --- a/docs/superpowers/specs/2026-03-25-streaming-response-design.md +++ b/docs/superpowers/specs/2026-03-25-streaming-response-design.md @@ -27,10 +27,12 @@ JS, auction, discovery). Before committing to `stream_to_client()`, check: 1. Backend status is success (2xx). -2. For HTML content: `html_post_processors()` is empty — no registered - post-processors. Non-HTML content types (text/JSON, RSC Flight, binary) can - always stream regardless of post-processor registration, since - post-processors only apply to HTML. +2. For HTML content: `has_html_post_processors()` returns false — no registered + post-processors. This method returns a `bool` directly, avoiding the + allocation of cloning the `Vec>` that + `html_post_processors()` performs. Non-HTML content types (text/JSON, RSC + Flight, binary) can always stream regardless of post-processor registration, + since post-processors only apply to HTML. If either check fails for the given content type, fall back to the current buffered path. This keeps the optimization transparent: same behavior for all @@ -81,9 +83,11 @@ compressed paths should follow the same structure. #### C) `process_through_compression` finalization — prerequisite for B -`process_through_compression` currently uses `drop(encoder)` which silently -swallows errors. Today this affects deflate and brotli (which already use this -path). The current `process_gzip_to_gzip` calls `encoder.finish()` explicitly — +`process_through_compression` currently calls `flush()` (with error +propagation) then `drop(encoder)` for finalization. The `flush()` only flushes +buffered data but does not write compression trailers/footers — `drop()` +handles finalization but silently swallows errors. Today this affects deflate +and brotli (which already use this path). The current `process_gzip_to_gzip` calls `encoder.finish()` explicitly — but Step 1B moves gzip to `process_through_compression`, which would **regress** gzip from working `finish()` to broken `drop()`. This fix prevents that regression and also fixes the pre-existing issue for deflate/brotli. @@ -93,13 +97,16 @@ before or with Step 1B. ### Step 2: Stream response to client +> **Note:** Step 2 may need adjustment to align with the EC (Edge Compute) +> implementation. Coordinate with the EC work before finalizing the approach. + Change the publisher proxy path to use Fastly's `StreamingBody` API: 1. Fetch from origin, receive response headers. 2. Validate status — if backend error, return buffered error response via `send_to_client()`. -3. Check streaming gate — if `html_post_processors()` is non-empty, fall back - to buffered path. +3. Check streaming gate — if `has_html_post_processors()` returns true, fall + back to buffered path. 4. Finalize all response headers. This requires reordering two things: - **Synthetic ID/cookie headers**: today set _after_ body processing in `handle_publisher_request`. Since they are body-independent (computed from @@ -184,7 +191,7 @@ No decompression, no processing. Body streams through as read. ### Buffered fallback path (error responses or post-processors present) ``` -Origin returns 4xx/5xx OR html_post_processors() is non-empty +Origin returns 4xx/5xx OR has_html_post_processors() is true → Current buffered path unchanged → send_to_client() with proper status and full body ``` @@ -233,9 +240,29 @@ remains in place — no need to bypass it. Clarification: `script_rewriters` (used by Next.js and GTM) are distinct from `html_post_processors`. Script rewriters run inside `lol_html` element handlers -during streaming — they do not require buffering and are unaffected by this -change. The streaming gate checks only `html_post_processors().is_empty()`, not -script rewriters. Currently only Next.js registers a post-processor. +during streaming and are now fragment-safe (resolved in +[Phase 3](#text-node-fragmentation-phase-3)). `html_post_processors` require +the full document for post-processing. The streaming gate checks +`has_html_post_processors()` for the post-processor path. Currently only +Next.js registers a post-processor. + +## Text Node Fragmentation (Phase 3) + +`lol_html` fragments text nodes across input chunk boundaries when processing +HTML incrementally. Script rewriters (`NextJsNextDataRewriter`, +`GoogleTagManagerIntegration`) expect complete text content — if a domain string +is split across chunks, the rewrite silently fails. + +**Resolved in Phase 3**: Each script rewriter is now fragment-safe. They +accumulate text fragments internally via `Mutex` until +`is_last_in_text_node` is true, then process the complete text. Intermediate +fragments return `RemoveNode` (suppressed from output); the final fragment +emits the full rewritten content via `Replace`. If no rewrite is needed, +the full accumulated content is still emitted via `Replace` (since +intermediate fragments were already removed from the output). + +The `HtmlRewriterAdapter` buffered mode (`new_buffered()`) has been removed. +`create_html_processor` always uses the streaming adapter. ## Rollback Strategy @@ -258,9 +285,9 @@ improvements. ### Integration tests (publisher.rs) -- Streaming gate: when `html_post_processors()` is non-empty, response is +- Streaming gate: when `has_html_post_processors()` is true, response is buffered. -- Streaming gate: when `html_post_processors()` is empty, response streams. +- Streaming gate: when `has_html_post_processors()` is false, response streams. - Backend error (4xx/5xx) returns buffered error response with correct status. - Binary content passes through without processing. @@ -271,8 +298,128 @@ improvements. - Compare response bodies before/after to confirm byte-identical output for HTML, text, and binary. -### Measurement (post-deploy) +### Performance measurement via Chrome DevTools MCP + +Capture before/after metrics using Chrome DevTools MCP against Viceroy locally +and staging. Run each measurement set on `main` (baseline) and the feature +branch, then compare. + +#### Baseline capture (before — on `main`) + +1. Start local server: `fastly compute serve` +2. Navigate to publisher proxy URL via `navigate_page` +3. Capture network timing: + - `list_network_requests` — record TTFB (`responseStart - requestStart`), + total time (`responseEnd - requestStart`), and transfer size for the + document request + - Filter for the main document (`resourceType: Document`) +4. Run Lighthouse audit: + - `lighthouse_audit` with categories `["performance"]` + - Record TTFB, LCP, Speed Index, Total Blocking Time +5. Capture performance trace: + - `performance_start_trace` → load page → `performance_stop_trace` + - `performance_analyze_insight` — extract "Time to First Byte" and + "Network requests" insights +6. Take memory snapshot: + - `take_memory_snapshot` — record JS heap size as a secondary check + (WASM heap is measured separately via Fastly dashboard) +7. Repeat 3-5 times for stable medians + +#### Post-implementation capture (after — on feature branch) + +Repeat the same steps on the feature branch. Compare: + +| Metric | Source | Expected change | +| ------------------ | ------------------------------ | ----------------------------------------------------- | +| TTFB (document) | Network timing | Minimal change (gated by backend response time) | +| Time to last byte | Network timing (`responseEnd`) | Reduced — body streams incrementally | +| LCP | Lighthouse | Improved — browser receives `` resources sooner | +| Speed Index | Lighthouse | Improved — progressive rendering starts earlier | +| Transfer size | Network timing | Unchanged (same content, same compression) | +| Response body hash | `evaluate_script` with hash | Identical — correctness check | + +#### Automated comparison script + +Use `evaluate_script` to compute a response body hash in the browser for +correctness verification: + +```js +// Run via evaluate_script after page load +const response = await fetch(location.href) +const buffer = await response.arrayBuffer() +const hash = await crypto.subtle.digest('SHA-256', buffer) +const hex = [...new Uint8Array(hash)] + .map((b) => b.toString(16).padStart(2, '0')) + .join('') +hex // compare this between baseline and feature branch +``` + +#### What to watch for -- Compare TTFB and time-to-last-byte on staging before and after. +- **TTFB regression**: If TTFB increases, the header finalization reordering + may be adding latency. Investigate `finalize_response()` and synthetic ID + computation timing. +- **Body mismatch**: If response body hashes differ between baseline and + feature branch, the streaming pipeline is producing different output. + Bisect between Step 1 and Step 2 to isolate. +- **LCP unchanged**: If LCP doesn't improve, the `` content may not be + reaching the browser earlier. Check whether `lol_html` emits the `` + injection in the first chunk or buffers until more input arrives. + +### Measurement (post-deploy to staging) + +- Repeat Chrome DevTools MCP measurements against staging URL. +- Compare against Viceroy results to account for real network conditions. - Monitor WASM heap usage via Fastly dashboard. - Verify no regressions on static endpoints or auction. + +### Results (getpurpose.ai, median over 5 runs, Chrome 1440x900) + +Measured via Chrome DevTools Protocol against prod (v135, buffered) and +staging (v136, streaming). Chrome `--host-resolver-rules` used to route +`getpurpose.ai` to the staging Fastly edge (167.82.83.52). + +| Metric | Production (v135, buffered) | Staging (v136, streaming) | Delta | +| -------------------------- | --------------------------- | ------------------------- | ------------------ | +| **TTFB** | 54 ms | 35 ms | **-19 ms (-35%)** | +| **First Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **First Contentful Paint** | 186 ms | 160 ms | -26 ms (-14%) | +| **DOM Content Loaded** | 286 ms | 282 ms | -4 ms (~same) | +| **DOM Complete** | 1060 ms | 663 ms | **-397 ms (-37%)** | + +## Phase 4: Binary Pass-Through Streaming + +Non-processable content (images, fonts, video, `application/octet-stream`) +currently passes through `handle_publisher_request` unchanged via the +`Buffered` path, buffering the entire body in memory before sending. For +large binaries (1-10 MB images), this is wasteful. + +Phase 4 adds a `PublisherResponse::PassThrough` variant that signals the +adapter to stream the body directly via `io::copy` into `StreamingBody` +with no processing pipeline. This eliminates peak memory for binary +responses and improves DOM Complete for image-heavy pages. + +### Streaming gate (updated) + +``` +is_success (2xx) +├── should_process && (!is_html || !has_post_processors) → Stream (pipeline) +├── should_process && is_html && has_post_processors → Buffered (post-processors) +└── !should_process → PassThrough (io::copy) + +!is_success +└── any content type → Buffered (error page) +``` + +### `PublisherResponse` enum (updated) + +```rust +pub enum PublisherResponse { + Buffered(Response), + Stream { response, body, params }, + PassThrough { response, body }, +} +``` + +`Content-Length` is preserved for `PassThrough` since the body is +unmodified — no need for chunked transfer encoding.