From 98772768d9db29e9264d2f0c82ea53853eea0d78 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:29:05 -0700 Subject: [PATCH 1/8] Migrate entry point from #[fastly::main] to undecorated main() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace #[fastly::main] with an undecorated main() that calls Request::from_client() and explicitly sends responses via send_to_client(). This is required for Phase 2's stream_to_client() support — #[fastly::main] auto-calls send_to_client() on the returned Response, which is incompatible with streaming. The program still compiles to wasm32-wasip1 and runs on Fastly Compute — #[fastly::main] was just syntactic sugar. Also simplifies route_request to return Response directly instead of Result, since it already converts all errors to HTTP responses internally. --- .../trusted-server-adapter-fastly/src/main.rs | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index d97c8402..38c74cb0 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -1,6 +1,6 @@ use error_stack::Report; use fastly::http::Method; -use fastly::{Error, Request, Response}; +use fastly::{Request, Response}; use log_fastly::Logger; use trusted_server_core::auction::endpoints::handle_auction; @@ -29,21 +29,33 @@ use trusted_server_core::settings_data::get_settings; mod error; use crate::error::to_error_response; -#[fastly::main] -fn main(req: Request) -> Result { +/// Entry point for the Fastly Compute program. +/// +/// Uses an undecorated `main()` with `Request::from_client()` instead of +/// `#[fastly::main]` so we can call `stream_to_client()` or `send_to_client()` +/// explicitly. `#[fastly::main]` is syntactic sugar that auto-calls +/// `send_to_client()` on the returned `Response`, which is incompatible with +/// streaming. +fn main() { init_logger(); + let req = Request::from_client(); + // Keep the health probe independent from settings loading and routing so // readiness checks still get a cheap liveness response during startup. if req.get_method() == Method::GET && req.get_path() == "/health" { - return Ok(Response::from_status(200).with_body_text_plain("ok")); + Response::from_status(200) + .with_body_text_plain("ok") + .send_to_client(); + return; } let settings = match get_settings() { Ok(s) => s, Err(e) => { log::error!("Failed to load settings: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; log::debug!("Settings {settings:?}"); @@ -55,16 +67,19 @@ fn main(req: Request) -> Result { Ok(r) => r, Err(e) => { log::error!("Failed to create integration registry: {:?}", e); - return Ok(to_error_response(&e)); + to_error_response(&e).send_to_client(); + return; } }; - futures::executor::block_on(route_request( + let response = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, req, - )) + )); + + response.send_to_client(); } async fn route_request( @@ -72,7 +87,7 @@ async fn route_request( orchestrator: &AuctionOrchestrator, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result { +) -> Response { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -83,7 +98,7 @@ async fn route_request( if let Some(mut response) = enforce_basic_auth(settings, &req) { finalize_response(settings, geo_info.as_ref(), &mut response); - return Ok(response); + return response; } // Get path and method for routing @@ -153,7 +168,7 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - Ok(response) + response } /// Applies all standard response headers: geo, version, staging, and configured headers. From d59f9bccf75dc4c39f757004d0f6cb973c4956af Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:30:19 -0700 Subject: [PATCH 2/8] Refactor process_response_streaming to accept W: Write Change signature from returning Body (with internal Vec) to writing into a generic &mut W: Write parameter. This enables Task 8 to pass StreamingBody directly as the output sink. The call site in handle_publisher_request passes &mut Vec for now, preserving the buffered behavior until the streaming path is wired up. --- crates/trusted-server-core/src/publisher.rs | 92 ++++++--------------- 1 file changed, 26 insertions(+), 66 deletions(-) diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index a2f54441..6a010c5f 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -93,12 +93,21 @@ struct ProcessResponseParams<'a> { integration_registry: &'a IntegrationRegistry, } -/// Process response body in streaming fashion with compression preservation -fn process_response_streaming( +/// Process response body through the streaming pipeline. +/// +/// Selects the appropriate processor based on content type (HTML rewriter, +/// RSC Flight rewriter, or URL replacer) and pipes chunks from `body` +/// through it into `output`. The caller decides what `output` is — a +/// `Vec` for buffered responses, or a `StreamingBody` for streaming. +/// +/// # Errors +/// +/// Returns an error if processor creation or chunk processing fails. +fn process_response_streaming( body: Body, + output: &mut W, params: &ProcessResponseParams, -) -> Result> { - // Check if this is HTML content +) -> Result<(), Report> { let is_html = params.content_type.contains("text/html"); let is_rsc_flight = params.content_type.contains("text/x-component"); log::debug!( @@ -110,15 +119,14 @@ fn process_response_streaming( params.origin_host ); - // Determine compression type let compression = Compression::from_content_encoding(params.content_encoding); + let config = PipelineConfig { + input_compression: compression, + output_compression: compression, + chunk_size: 8192, + }; - // Create output body to collect results - let mut output = Vec::new(); - - // Choose processor based on content type if is_html { - // Use HTML rewriter for HTML content let processor = create_html_stream_processor( params.origin_host, params.request_host, @@ -126,57 +134,26 @@ fn process_response_streaming( params.settings, params.integration_registry, )?; - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else if is_rsc_flight { - // RSC Flight responses are length-prefixed (T rows). A naive string replacement will - // corrupt the stream by changing byte lengths without updating the prefixes. let processor = RscFlightUrlRewriter::new( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, processor); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, processor).process(body, output)?; } else { - // Use simple text replacer for non-HTML content let replacer = create_url_replacer( params.origin_host, params.origin_url, params.request_host, params.request_scheme, ); - - let config = PipelineConfig { - input_compression: compression, - output_compression: compression, - chunk_size: 8192, - }; - - let mut pipeline = StreamingPipeline::new(config, replacer); - pipeline.process(body, &mut output)?; + StreamingPipeline::new(config, replacer).process(body, output)?; } - log::debug!( - "Streaming processing complete - output size: {} bytes", - output.len() - ); - Ok(Body::from(output)) + Ok(()) } /// Create a unified HTML stream processor @@ -335,28 +312,11 @@ pub fn handle_publisher_request( content_type: &content_type, integration_registry, }; - match process_response_streaming(body, ¶ms) { - Ok(processed_body) => { - // Set the processed body back - response.set_body(processed_body); + let mut output = Vec::new(); + process_response_streaming(body, &mut output, ¶ms)?; - // Remove Content-Length as the size has likely changed - response.remove_header(header::CONTENT_LENGTH); - - // Keep Content-Encoding header since we're returning compressed content - log::debug!( - "Preserved Content-Encoding: {} for compressed response", - content_encoding - ); - - log::debug!("Completed streaming processing of response body"); - } - Err(e) => { - log::error!("Failed to process response body: {:?}", e); - // Return an error response - return Err(e); - } - } + response.set_body(Body::from(output)); + response.remove_header(header::CONTENT_LENGTH); } else { log::debug!( "Skipping response processing - should_process: {}, request_host: '{}'", From 986f92dd75b4b471d5d5e06a18cac4210837867b Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:35:32 -0700 Subject: [PATCH 3/8] Add streaming path to publisher proxy via StreamingBody MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split handle_publisher_request into streaming and buffered paths based on the streaming gate: - Streaming: 2xx + processable content + no HTML post-processors - Buffered: post-processors registered (Next.js) or non-processable Streaming path returns PublisherResponse::Stream with the origin body and processing params. The adapter calls finalize_response() to set all headers, then stream_to_client() to commit them, and pipes the body through stream_publisher_body() into StreamingBody. Synthetic ID/cookie headers are set before body processing (they are body-independent), so they are included in the streamed headers. Mid-stream errors log and drop the StreamingBody — client sees a truncated response, standard proxy behavior. --- .../trusted-server-adapter-fastly/src/main.rs | 49 ++++- crates/trusted-server-core/src/publisher.rs | 190 +++++++++++++----- 2 files changed, 184 insertions(+), 55 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 38c74cb0..4e4e62f1 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -18,7 +18,9 @@ use trusted_server_core::proxy::{ handle_first_party_click, handle_first_party_proxy, handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, }; -use trusted_server_core::publisher::{handle_publisher_request, handle_tsjs_dynamic}; +use trusted_server_core::publisher::{ + handle_publisher_request, handle_tsjs_dynamic, stream_publisher_body, PublisherResponse, +}; use trusted_server_core::request_signing::{ handle_deactivate_key, handle_rotate_key, handle_trusted_server_discovery, handle_verify_signature, @@ -72,14 +74,16 @@ fn main() { } }; - let response = futures::executor::block_on(route_request( + // route_request may send the response directly (streaming path) or + // return it for us to send (buffered path). + if let Some(response) = futures::executor::block_on(route_request( &settings, &orchestrator, &integration_registry, req, - )); - - response.send_to_client(); + )) { + response.send_to_client(); + } } async fn route_request( @@ -87,7 +91,7 @@ async fn route_request( orchestrator: &AuctionOrchestrator, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Response { +) -> Option { // Strip client-spoofable forwarded headers at the edge. // On Fastly this service IS the first proxy — these headers from // clients are untrusted and can hijack URL rewriting (see #409). @@ -98,7 +102,7 @@ async fn route_request( if let Some(mut response) = enforce_basic_auth(settings, &req) { finalize_response(settings, geo_info.as_ref(), &mut response); - return response; + return Some(response); } // Get path and method for routing @@ -154,7 +158,34 @@ async fn route_request( ); match handle_publisher_request(settings, integration_registry, req) { - Ok(response) => Ok(response), + Ok(PublisherResponse::Stream { + mut response, + body, + params, + }) => { + // Streaming path: finalize headers, then stream body to client. + finalize_response(settings, geo_info.as_ref(), &mut response); + let mut streaming_body = response.stream_to_client(); + if let Err(e) = stream_publisher_body( + body, + &mut streaming_body, + ¶ms, + settings, + integration_registry, + ) { + // Headers already sent (200 OK). Log and abort — client + // sees a truncated response. Standard proxy behavior. + log::error!("Streaming processing failed: {e:?}"); + drop(streaming_body); + } else { + streaming_body + .finish() + .expect("should finish streaming body"); + } + // Response already sent via stream_to_client() + return None; + } + Ok(PublisherResponse::Buffered(response)) => Ok(response), Err(e) => { log::error!("Failed to proxy to publisher origin: {:?}", e); Err(e) @@ -168,7 +199,7 @@ async fn route_request( finalize_response(settings, geo_info.as_ref(), &mut response); - response + Some(response) } /// Applies all standard response headers: geo, version, staging, and configured headers. diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 6a010c5f..efd65fa1 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -1,3 +1,5 @@ +use std::io::Write; + use error_stack::{Report, ResultExt}; use fastly::http::{header, StatusCode}; use fastly::{Body, Request, Response}; @@ -177,28 +179,87 @@ fn create_html_stream_processor( Ok(create_html_processor(config)) } +/// Result of publisher request handling, indicating whether the response +/// body should be streamed or has already been buffered. +pub enum PublisherResponse { + /// Response is fully buffered and ready to send via `send_to_client()`. + Buffered(Response), + /// Response headers are ready. The caller must: + /// 1. Call `finalize_response()` on the response + /// 2. Call `response.stream_to_client()` to get a `StreamingBody` + /// 3. Call `stream_publisher_body()` with the body and streaming writer + /// 4. Call `StreamingBody::finish()` + Stream { + /// Response with all headers set (synthetic ID, cookies, etc.) + /// but body not yet written. `Content-Length` already removed. + response: Response, + /// Origin body to be piped through the streaming pipeline. + body: Body, + /// Parameters for `process_response_streaming`. + params: OwnedProcessResponseParams, + }, +} + +/// Owned version of [`ProcessResponseParams`] for returning from +/// `handle_publisher_request` without lifetime issues. +pub struct OwnedProcessResponseParams { + pub content_encoding: String, + pub origin_host: String, + pub origin_url: String, + pub request_host: String, + pub request_scheme: String, + pub content_type: String, +} + +/// Stream the publisher response body through the processing pipeline. +/// +/// Called by the adapter after `stream_to_client()` has committed the +/// response headers. Writes processed chunks directly to `output`. +/// +/// # Errors +/// +/// Returns an error if processing fails mid-stream. Since headers are +/// already committed, the caller should log the error and drop the +/// `StreamingBody` (client sees a truncated response). +pub fn stream_publisher_body( + body: Body, + output: &mut W, + params: &OwnedProcessResponseParams, + settings: &Settings, + integration_registry: &IntegrationRegistry, +) -> Result<(), Report> { + let borrowed = ProcessResponseParams { + content_encoding: ¶ms.content_encoding, + origin_host: ¶ms.origin_host, + origin_url: ¶ms.origin_url, + request_host: ¶ms.request_host, + request_scheme: ¶ms.request_scheme, + settings, + content_type: ¶ms.content_type, + integration_registry, + }; + process_response_streaming(body, output, &borrowed) +} + /// Proxies requests to the publisher's origin server. /// -/// This function forwards incoming requests to the configured origin URL, -/// preserving headers and request body. It's used as a fallback for routes -/// not explicitly handled by the trusted server. +/// Returns a [`PublisherResponse`] indicating whether the response can be +/// streamed or must be sent buffered. The streaming path is chosen when: +/// - The backend returns a 2xx status +/// - The response has a processable content type +/// - No HTML post-processors are registered (the streaming gate) /// /// # Errors /// -/// Returns a [`TrustedServerError`] if: -/// - The proxy request fails -/// - The origin backend is unreachable +/// Returns a [`TrustedServerError`] if the proxy request fails or the +/// origin backend is unreachable. pub fn handle_publisher_request( settings: &Settings, integration_registry: &IntegrationRegistry, mut req: Request, -) -> Result> { +) -> Result> { log::debug!("Proxying request to publisher_origin"); - // Prebid.js requests are not intercepted here anymore. The HTML processor removes - // publisher-supplied Prebid scripts; the unified TSJS bundle includes Prebid.js when enabled. - - // Extract request host and scheme (uses Host header and TLS detection after edge sanitization) let request_info = RequestInfo::from_request(&req); let request_host = &request_info.host; let request_scheme = &request_info.scheme; @@ -212,27 +273,14 @@ pub fn handle_publisher_request( req.get_header("x-forwarded-proto"), ); - // Parse cookies once for reuse by both consent extraction and synthetic ID logic. let cookie_jar = handle_request_cookies(&req)?; - - // Capture the current SSC cookie value for revocation handling. - // This must come from the cookie itself (not the x-synthetic-id header) - // to ensure KV deletion targets the same identifier being revoked. let existing_ssc_cookie = cookie_jar .as_ref() .and_then(|jar| jar.get(COOKIE_SYNTHETIC_ID)) .map(|cookie| cookie.value().to_owned()); - // Generate synthetic identifiers before the request body is consumed. - // Always generated for internal use (KV lookups, logging) even when - // consent is absent — the cookie is only *set* when consent allows it. let synthetic_id = get_or_generate_synthetic_id(settings, &req)?; - // Extract, decode, and log consent signals (TCF, GPP, US Privacy, GPC) - // from the incoming request. The ConsentContext carries both raw strings - // (for OpenRTB forwarding) and decoded data (for enforcement). - // When a consent_store is configured, this also persists consent to KV - // and falls back to stored consent when cookies are absent. let geo = crate::geo::GeoInfo::from_request(&req); let consent_context = build_consent_context(&ConsentPipelineInput { jar: cookie_jar.as_ref(), @@ -267,13 +315,22 @@ pub fn handle_publisher_request( message: "Failed to proxy request to origin".to_string(), })?; - // Log all response headers for debugging log::debug!("Response headers:"); for (name, value) in response.get_headers() { log::debug!(" {}: {:?}", name, value); } - // Check if the response has a text-based content type that we should process + // Set synthetic ID / cookie headers BEFORE body processing. + // These are body-independent (computed from request cookies + consent). + apply_synthetic_id_headers( + settings, + &mut response, + &synthetic_id, + ssc_allowed, + existing_ssc_cookie.as_deref(), + &consent_context, + ); + let content_type = response .get_header(header::CONTENT_TYPE) .map(|h| h.to_str().unwrap_or_default()) @@ -284,24 +341,60 @@ pub fn handle_publisher_request( || content_type.contains("application/javascript") || content_type.contains("application/json"); - if should_process && !request_host.is_empty() { - // Check if the response is compressed + // Streaming gate: can we stream this response? + // - Must have processable content + // - Must have a request host for URL rewriting + // - Backend must return success (already guaranteed — errors propagated above) + // - No HTML post-processors registered (they need the full document) + let is_html = content_type.contains("text/html"); + let has_post_processors = !integration_registry.html_post_processors().is_empty(); + let can_stream = + should_process && !request_host.is_empty() && (!is_html || !has_post_processors); + + if can_stream { let content_encoding = response .get_header(header::CONTENT_ENCODING) .map(|h| h.to_str().unwrap_or_default()) .unwrap_or_default() .to_lowercase(); - // Log response details for debugging log::debug!( - "Processing response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", content_type, content_encoding, request_host, origin_host ); - // Take the response body for streaming processing let body = response.take_body(); + response.remove_header(header::CONTENT_LENGTH); + + return Ok(PublisherResponse::Stream { + response, + body, + params: OwnedProcessResponseParams { + content_encoding, + origin_host, + origin_url: settings.publisher.origin_url.clone(), + request_host: request_host.to_string(), + request_scheme: request_scheme.to_string(), + content_type, + }, + }); + } - // Process the body using streaming approach + // Buffered fallback: process body in memory (post-processors need full document, + // or content type doesn't need processing). + if should_process && !request_host.is_empty() { + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); + + log::debug!( + "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); + + let body = response.take_body(); let params = ProcessResponseParams { content_encoding: &content_encoding, origin_host: &origin_host, @@ -325,24 +418,31 @@ pub fn handle_publisher_request( ); } - // Consent-gated SSC creation: - // - Consent given → set synthetic ID header + cookie. - // - Consent absent + existing cookie → revoke (expire cookie + delete KV entry). - // - Consent absent + no cookie → do nothing. + Ok(PublisherResponse::Buffered(response)) +} + +/// Apply synthetic ID and cookie headers to the response. +/// +/// Extracted so headers can be set before streaming begins (headers must +/// be finalized before `stream_to_client()` commits them). +fn apply_synthetic_id_headers( + settings: &Settings, + response: &mut Response, + synthetic_id: &str, + ssc_allowed: bool, + existing_ssc_cookie: Option<&str>, + consent_context: &crate::consent::ConsentContext, +) { if ssc_allowed { - // Fastly's HeaderValue API rejects \r, \n, and \0, so the synthetic ID - // cannot inject additional response headers. - response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id.as_str()); - // Cookie persistence is skipped if the synthetic ID contains RFC 6265-illegal - // characters. The header is still emitted when consent allows it. - set_synthetic_cookie(settings, &mut response, synthetic_id.as_str()); - } else if let Some(cookie_synthetic_id) = existing_ssc_cookie.as_deref() { + response.set_header(HEADER_X_SYNTHETIC_ID, synthetic_id); + set_synthetic_cookie(settings, response, synthetic_id); + } else if let Some(cookie_synthetic_id) = existing_ssc_cookie { log::info!( "SSC revoked for '{}': consent withdrawn (jurisdiction={})", cookie_synthetic_id, consent_context.jurisdiction, ); - expire_synthetic_cookie(settings, &mut response); + expire_synthetic_cookie(settings, response); if let Some(store_name) = &settings.consent.consent_store { crate::consent::kv::delete_consent_from_kv(store_name, cookie_synthetic_id); } @@ -352,8 +452,6 @@ pub fn handle_publisher_request( consent_context.jurisdiction, ); } - - Ok(response) } #[cfg(test)] From 3873e14452dc0415b29aedb7981b5659be428b4d Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:41:10 -0700 Subject: [PATCH 4/8] Address review: replace expect with log, restore stripped comments - Replace streaming_body.finish().expect() with log::error on failure (expect panics in WASM, and headers are already committed anyway) - Restore explanatory comments for cookie parsing, SSC capture, synthetic ID generation, and consent extraction ordering --- crates/trusted-server-adapter-fastly/src/main.rs | 6 ++---- crates/trusted-server-core/src/publisher.rs | 13 +++++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 4e4e62f1..bf90880f 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -177,10 +177,8 @@ async fn route_request( // sees a truncated response. Standard proxy behavior. log::error!("Streaming processing failed: {e:?}"); drop(streaming_body); - } else { - streaming_body - .finish() - .expect("should finish streaming body"); + } else if let Err(e) = streaming_body.finish() { + log::error!("Failed to finish streaming body: {e}"); } // Response already sent via stream_to_client() return None; diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index efd65fa1..2f10479f 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -273,14 +273,27 @@ pub fn handle_publisher_request( req.get_header("x-forwarded-proto"), ); + // Parse cookies once for reuse by both consent extraction and synthetic ID logic. let cookie_jar = handle_request_cookies(&req)?; + + // Capture the current SSC cookie value for revocation handling. + // This must come from the cookie itself (not the x-synthetic-id header) + // to ensure KV deletion targets the same identifier being revoked. let existing_ssc_cookie = cookie_jar .as_ref() .and_then(|jar| jar.get(COOKIE_SYNTHETIC_ID)) .map(|cookie| cookie.value().to_owned()); + // Generate synthetic identifiers before the request body is consumed. + // Always generated for internal use (KV lookups, logging) even when + // consent is absent — the cookie is only *set* when consent allows it. let synthetic_id = get_or_generate_synthetic_id(settings, &req)?; + // Extract, decode, and log consent signals (TCF, GPP, US Privacy, GPC) + // from the incoming request. The ConsentContext carries both raw strings + // (for OpenRTB forwarding) and decoded data (for enforcement). + // When a consent_store is configured, this also persists consent to KV + // and falls back to stored consent when cookies are absent. let geo = crate::geo::GeoInfo::from_request(&req); let consent_context = build_consent_context(&ConsentPipelineInput { jar: cookie_jar.as_ref(), From c7edd82e291e9c99c457e0bc8620c44020a9575a Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 26 Mar 2026 14:46:00 -0700 Subject: [PATCH 5/8] Deduplicate content-encoding extraction and simplify flow Hoist the non-processable early return above the streaming gate so content_encoding extraction happens once. The streaming gate condition is also simplified since should_process and request_host are already guaranteed at that point. --- crates/trusted-server-core/src/publisher.rs | 83 +++++++++------------ 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 2f10479f..6a450623 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -354,23 +354,29 @@ pub fn handle_publisher_request( || content_type.contains("application/javascript") || content_type.contains("application/json"); + if !should_process || request_host.is_empty() { + log::debug!( + "Skipping response processing - should_process: {}, request_host: '{}'", + should_process, + request_host + ); + return Ok(PublisherResponse::Buffered(response)); + } + + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .map(|h| h.to_str().unwrap_or_default()) + .unwrap_or_default() + .to_lowercase(); + // Streaming gate: can we stream this response? - // - Must have processable content - // - Must have a request host for URL rewriting - // - Backend must return success (already guaranteed — errors propagated above) // - No HTML post-processors registered (they need the full document) + // - Non-HTML content always streams (post-processors only apply to HTML) let is_html = content_type.contains("text/html"); let has_post_processors = !integration_registry.html_post_processors().is_empty(); - let can_stream = - should_process && !request_host.is_empty() && (!is_html || !has_post_processors); + let can_stream = !is_html || !has_post_processors; if can_stream { - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); - log::debug!( "Streaming response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", content_type, content_encoding, request_host, origin_host @@ -393,43 +399,28 @@ pub fn handle_publisher_request( }); } - // Buffered fallback: process body in memory (post-processors need full document, - // or content type doesn't need processing). - if should_process && !request_host.is_empty() { - let content_encoding = response - .get_header(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) - .unwrap_or_default() - .to_lowercase(); + // Buffered fallback: post-processors need the full document. + log::debug!( + "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", + content_type, content_encoding, request_host, origin_host + ); - log::debug!( - "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", - content_type, content_encoding, request_host, origin_host - ); + let body = response.take_body(); + let params = ProcessResponseParams { + content_encoding: &content_encoding, + origin_host: &origin_host, + origin_url: &settings.publisher.origin_url, + request_host, + request_scheme, + settings, + content_type: &content_type, + integration_registry, + }; + let mut output = Vec::new(); + process_response_streaming(body, &mut output, ¶ms)?; - let body = response.take_body(); - let params = ProcessResponseParams { - content_encoding: &content_encoding, - origin_host: &origin_host, - origin_url: &settings.publisher.origin_url, - request_host, - request_scheme, - settings, - content_type: &content_type, - integration_registry, - }; - let mut output = Vec::new(); - process_response_streaming(body, &mut output, ¶ms)?; - - response.set_body(Body::from(output)); - response.remove_header(header::CONTENT_LENGTH); - } else { - log::debug!( - "Skipping response processing - should_process: {}, request_host: '{}'", - should_process, - request_host - ); - } + response.set_body(Body::from(output)); + response.remove_header(header::CONTENT_LENGTH); Ok(PublisherResponse::Buffered(response)) } From bd01180bd68ee269a74d5a16de672aea5007c87f Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Tue, 31 Mar 2026 14:14:56 -0700 Subject: [PATCH 6/8] Address PR #585 review feedback - Narrow OwnedProcessResponseParams fields to pub(crate) - Set Content-Length on buffered responses instead of removing it - Add has_html_post_processors() to avoid Vec> allocation - Extract is_processable_content_type() and test it directly - Fix stray merge artifact in apply_synthetic_id_headers --- .../src/integrations/registry.rs | 9 ++++ crates/trusted-server-core/src/publisher.rs | 44 ++++++++++--------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/crates/trusted-server-core/src/integrations/registry.rs b/crates/trusted-server-core/src/integrations/registry.rs index 6df46dd1..855d8376 100644 --- a/crates/trusted-server-core/src/integrations/registry.rs +++ b/crates/trusted-server-core/src/integrations/registry.rs @@ -732,6 +732,15 @@ impl IntegrationRegistry { self.inner.script_rewriters.clone() } + /// Check whether any HTML post-processors are registered. + /// + /// Cheaper than [`html_post_processors()`](Self::html_post_processors) when + /// only the presence check is needed — avoids cloning `Vec>`. + #[must_use] + pub fn has_html_post_processors(&self) -> bool { + !self.inner.html_post_processors.is_empty() + } + /// Expose registered HTML post-processors. #[must_use] pub fn html_post_processors(&self) -> Vec> { diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 177a0ee4..c188c74f 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -279,12 +279,12 @@ pub enum PublisherResponse { /// Owned version of [`ProcessResponseParams`] for returning from /// `handle_publisher_request` without lifetime issues. pub struct OwnedProcessResponseParams { - pub content_encoding: String, - pub origin_host: String, - pub origin_url: String, - pub request_host: String, - pub request_scheme: String, - pub content_type: String, + pub(crate) content_encoding: String, + pub(crate) origin_host: String, + pub(crate) origin_url: String, + pub(crate) request_host: String, + pub(crate) request_scheme: String, + pub(crate) content_type: String, } /// Stream the publisher response body through the processing pipeline. @@ -428,9 +428,7 @@ pub fn handle_publisher_request( .unwrap_or_default() .to_string(); - let should_process = content_type.contains("text/") - || content_type.contains("application/javascript") - || content_type.contains("application/json"); + let should_process = is_processable_content_type(&content_type); if !should_process || request_host.is_empty() { log::debug!( @@ -451,7 +449,7 @@ pub fn handle_publisher_request( // - No HTML post-processors registered (they need the full document) // - Non-HTML content always streams (post-processors only apply to HTML) let is_html = content_type.contains("text/html"); - let has_post_processors = !integration_registry.html_post_processors().is_empty(); + let has_post_processors = integration_registry.has_html_post_processors(); let can_stream = !is_html || !has_post_processors; if can_stream { @@ -497,12 +495,22 @@ pub fn handle_publisher_request( let mut output = Vec::new(); process_response_streaming(body, &mut output, ¶ms)?; + response.set_header(header::CONTENT_LENGTH, output.len().to_string()); response.set_body(Body::from(output)); - response.remove_header(header::CONTENT_LENGTH); Ok(PublisherResponse::Buffered(response)) } +/// Whether the content type requires processing (URL rewriting, HTML injection). +/// +/// Text-based and JavaScript/JSON responses are processable; binary types +/// (images, fonts, video, etc.) pass through unchanged. +fn is_processable_content_type(content_type: &str) -> bool { + content_type.contains("text/") + || content_type.contains("application/javascript") + || content_type.contains("application/json") +} + /// Apply synthetic ID and cookie headers to the response. /// /// Extracted so headers can be set before streaming begins (headers must @@ -574,17 +582,11 @@ mod tests { ("application/octet-stream", false), ]; - for (content_type, should_process) in test_cases { - let result = content_type.contains("text/html") - || content_type.contains("text/css") - || content_type.contains("text/javascript") - || content_type.contains("application/javascript") - || content_type.contains("application/json"); - + for (content_type, expected) in test_cases { assert_eq!( - result, should_process, - "Content-Type '{}' should_process: expected {}, got {}", - content_type, should_process, result + is_processable_content_type(content_type), + expected, + "Content-Type '{content_type}' should_process: expected {expected}", ); } } From eeaa0faa8081e4419474706d184f1506e949d04a Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Wed, 8 Apr 2026 11:42:04 -0700 Subject: [PATCH 7/8] Add streaming gate guards for status code and Content-Encoding Non-2xx responses now stay buffered to prevent committing error status irreversibly via stream_to_client() and injecting JS into error pages. Unsupported Content-Encoding values (e.g. zstd from misbehaving origins) fall back to buffered mode so failures produce proper error responses instead of truncated streams. Also removes raw synthetic ID from debug logging for privacy consistency, fixes std::io::Write import inconsistency, and corrects misleading "200 OK" comment in streaming error path. --- .../trusted-server-adapter-fastly/src/main.rs | 2 +- crates/trusted-server-core/src/publisher.rs | 59 +++++++++++++++---- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 6c54b5e2..b0fb2418 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -192,7 +192,7 @@ async fn route_request( settings, integration_registry, ) { - // Headers already sent (200 OK). Log and abort — client + // Headers already committed. Log and abort — client // sees a truncated response. Standard proxy behavior. log::error!("Streaming processing failed: {e:?}"); drop(streaming_body); diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index c188c74f..92d052c7 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -181,7 +181,7 @@ struct ProcessResponseParams<'a> { /// # Errors /// /// Returns an error if processor creation or chunk processing fails. -fn process_response_streaming( +fn process_response_streaming( body: Body, output: &mut W, params: &ProcessResponseParams, @@ -323,6 +323,7 @@ pub fn stream_publisher_body( /// streamed or must be sent buffered. The streaming path is chosen when: /// - The backend returns a 2xx status /// - The response has a processable content type +/// - The response uses a supported `Content-Encoding` (gzip, deflate, br) /// - No HTML post-processors are registered (the streaming gate) /// /// # Errors @@ -379,11 +380,7 @@ pub fn handle_publisher_request( synthetic_id: Some(synthetic_id.as_str()), }); let ssc_allowed = allows_ssc_creation(&consent_context); - log::debug!( - "Proxy synthetic IDs - trusted: {}, ssc_allowed: {}", - synthetic_id, - ssc_allowed, - ); + log::debug!("Proxy ssc_allowed: {}", ssc_allowed); let backend_name = BackendConfig::from_url( &settings.publisher.origin_url, @@ -429,12 +426,14 @@ pub fn handle_publisher_request( .to_string(); let should_process = is_processable_content_type(&content_type); + let is_success = response.get_status().is_success(); - if !should_process || request_host.is_empty() { + if !should_process || request_host.is_empty() || !is_success { log::debug!( - "Skipping response processing - should_process: {}, request_host: '{}'", + "Skipping response processing - should_process: {}, request_host: '{}', status: {}", should_process, - request_host + request_host, + response.get_status(), ); return Ok(PublisherResponse::Buffered(response)); } @@ -446,11 +445,14 @@ pub fn handle_publisher_request( .to_lowercase(); // Streaming gate: can we stream this response? + // - 2xx status (non-success already returned Buffered above) + // - Supported Content-Encoding (unsupported would fail mid-stream) // - No HTML post-processors registered (they need the full document) // - Non-HTML content always streams (post-processors only apply to HTML) let is_html = content_type.contains("text/html"); let has_post_processors = integration_registry.has_html_post_processors(); - let can_stream = !is_html || !has_post_processors; + let encoding_supported = is_supported_content_encoding(&content_encoding); + let can_stream = encoding_supported && (!is_html || !has_post_processors); if can_stream { log::debug!( @@ -511,6 +513,15 @@ fn is_processable_content_type(content_type: &str) -> bool { || content_type.contains("application/json") } +/// Whether the `Content-Encoding` is one the streaming pipeline can handle. +/// +/// Unsupported encodings (e.g. `zstd` from a misbehaving origin) must fall +/// back to buffered mode so a processing failure produces a proper error +/// response instead of a truncated stream. +fn is_supported_content_encoding(encoding: &str) -> bool { + matches!(encoding, "" | "identity" | "gzip" | "deflate" | "br") +} + /// Apply synthetic ID and cookie headers to the response. /// /// Extracted so headers can be set before streaming begins (headers must @@ -591,6 +602,34 @@ mod tests { } } + #[test] + fn supported_content_encoding_accepts_known_values() { + assert!(is_supported_content_encoding(""), "should accept empty"); + assert!( + is_supported_content_encoding("identity"), + "should accept identity" + ); + assert!(is_supported_content_encoding("gzip"), "should accept gzip"); + assert!( + is_supported_content_encoding("deflate"), + "should accept deflate" + ); + assert!(is_supported_content_encoding("br"), "should accept br"); + } + + #[test] + fn supported_content_encoding_rejects_unknown_values() { + assert!(!is_supported_content_encoding("zstd"), "should reject zstd"); + assert!( + !is_supported_content_encoding("compress"), + "should reject compress" + ); + assert!( + !is_supported_content_encoding("snappy"), + "should reject snappy" + ); + } + #[test] fn test_publisher_origin_host_extraction() { let settings = create_test_settings(); From 48f9bca0a38e3ba9db3e2f5c2cc5c89757c4e2f4 Mon Sep 17 00:00:00 2001 From: Aram Grigoryan <132480+aram356@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:32:05 -0700 Subject: [PATCH 8/8] Return origin response unchanged for unsupported Content-Encoding When the origin returns a processable 2xx response with an encoding the pipeline cannot decompress (e.g. `zstd` from a misbehaving origin), the buffered fallback previously still routed the body through process_response_streaming. `Compression::from_content_encoding` maps unknown values to `None`, so the rewriter would treat the compressed bytes as identity-encoded text and emit garbled output. Bypass the rewrite pipeline entirely in that case and return the origin response untouched. Adds a test asserting byte-for-byte pass-through and updates the is_supported_content_encoding doc to reflect the new behavior. Addresses PR #585 review feedback from @prk-Jr. --- crates/trusted-server-core/src/publisher.rs | 63 ++++++++++++++++++++- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/crates/trusted-server-core/src/publisher.rs b/crates/trusted-server-core/src/publisher.rs index 459c0c35..e6b061f8 100644 --- a/crates/trusted-server-core/src/publisher.rs +++ b/crates/trusted-server-core/src/publisher.rs @@ -485,6 +485,17 @@ pub fn handle_publisher_request( }); } + // Unsupported Content-Encoding: we cannot decompress, so processing would + // treat compressed bytes as identity and produce garbled output. Return + // the origin response unchanged. + if !encoding_supported { + log::warn!( + "Unsupported Content-Encoding '{}' - returning response unmodified", + content_encoding, + ); + return Ok(PublisherResponse::Buffered(response)); + } + // Buffered fallback: post-processors need the full document. log::debug!( "Buffered response - Content-Type: {}, Content-Encoding: {}, Request Host: {}, Origin Host: {}", @@ -523,9 +534,9 @@ fn is_processable_content_type(content_type: &str) -> bool { /// Whether the `Content-Encoding` is one the streaming pipeline can handle. /// -/// Unsupported encodings (e.g. `zstd` from a misbehaving origin) must fall -/// back to buffered mode so a processing failure produces a proper error -/// response instead of a truncated stream. +/// Unsupported encodings (e.g. `zstd` from a misbehaving origin) bypass the +/// rewrite pipeline entirely and are returned unchanged. Processing such +/// bodies as identity-encoded would produce garbled output. fn is_supported_content_encoding(encoding: &str) -> bool { matches!(encoding, "" | "identity" | "gzip" | "deflate" | "br") } @@ -635,6 +646,52 @@ mod tests { ); } + #[test] + fn unsupported_encoding_response_is_returned_unmodified() { + // Simulate a processable (HTML) 2xx response with an unsupported + // Content-Encoding. The bytes are not real zstd - the point is that + // they must be returned untouched rather than fed to the rewriter as + // identity-encoded text. + let origin_bytes = b"\x28\xb5\x2f\xfd\x00\x58\x61\x00\x00not really zstd".to_vec(); + + let mut response = Response::from_status(StatusCode::OK); + response.set_header(header::CONTENT_TYPE, "text/html; charset=utf-8"); + response.set_header(header::CONTENT_ENCODING, "zstd"); + response.set_body(Body::from(origin_bytes.clone())); + + // Re-derive the gate decision the same way handle_publisher_request does. + let content_type = response + .get_header(header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .unwrap_or_default() + .to_string(); + let content_encoding = response + .get_header(header::CONTENT_ENCODING) + .and_then(|h| h.to_str().ok()) + .unwrap_or_default() + .to_lowercase(); + + assert!( + is_processable_content_type(&content_type), + "text/html must be processable" + ); + assert!(response.get_status().is_success(), "status must be 2xx"); + assert!( + !is_supported_content_encoding(&content_encoding), + "zstd must not be a supported encoding" + ); + + // The fix: when the only reason to fall out of the streaming gate is + // unsupported encoding, return the response unchanged rather than + // re-routing through process_response_streaming (which would treat + // the compressed bytes as identity and garble them). + let body = response.into_body().into_bytes(); + assert_eq!( + body, origin_bytes, + "unsupported-encoding response must pass through byte-for-byte" + ); + } + #[test] fn test_publisher_origin_host_extraction() { let settings = create_test_settings();