diff --git a/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt b/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt index 3c0566e1..7457083d 100644 --- a/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt +++ b/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt @@ -96,6 +96,14 @@ data class MhrvConfig( val verifySsl: Boolean = true, val logLevel: String = "info", val parallelRelay: Int = 1, + /** + * Disable the HTTP/2 multiplexing on the Apps Script relay leg. + * Default false (h2 active); flip to true to force the legacy + * HTTP/1.1 keep-alive pool. Round-tripped from config.json so a + * hand-edited kill switch survives a save round trip from the + * Android UI. See `src/config.rs` `force_http1`. + */ + val forceHttp1: Boolean = false, val coalesceStepMs: Int = 10, val coalesceMaxMs: Int = 1000, val upstreamSocks5: String = "", @@ -217,6 +225,7 @@ data class MhrvConfig( put("verify_ssl", verifySsl) put("log_level", logLevel) put("parallel_relay", parallelRelay) + if (forceHttp1) put("force_http1", true) if (coalesceStepMs != 10) put("coalesce_step_ms", coalesceStepMs) if (coalesceMaxMs != 1000) put("coalesce_max_ms", coalesceMaxMs) if (upstreamSocks5.isNotBlank()) { @@ -328,6 +337,7 @@ object ConfigStore { if (cfg.verifySsl != defaults.verifySsl) obj.put("verify_ssl", cfg.verifySsl) if (cfg.logLevel != defaults.logLevel) obj.put("log_level", cfg.logLevel) if (cfg.parallelRelay != defaults.parallelRelay) obj.put("parallel_relay", cfg.parallelRelay) + if (cfg.forceHttp1 != defaults.forceHttp1) obj.put("force_http1", cfg.forceHttp1) if (cfg.coalesceStepMs != defaults.coalesceStepMs) obj.put("coalesce_step_ms", cfg.coalesceStepMs) if (cfg.coalesceMaxMs != defaults.coalesceMaxMs) obj.put("coalesce_max_ms", cfg.coalesceMaxMs) if (cfg.upstreamSocks5.isNotBlank()) obj.put("upstream_socks5", cfg.upstreamSocks5) @@ -431,6 +441,7 @@ object ConfigStore { verifySsl = obj.optBoolean("verify_ssl", true), logLevel = obj.optString("log_level", "info"), parallelRelay = obj.optInt("parallel_relay", 1), + forceHttp1 = obj.optBoolean("force_http1", false), coalesceStepMs = obj.optInt("coalesce_step_ms", 10), coalesceMaxMs = obj.optInt("coalesce_max_ms", 1000), upstreamSocks5 = obj.optString("upstream_socks5", ""), diff --git a/android/app/src/main/java/com/therealaleph/mhrv/Native.kt b/android/app/src/main/java/com/therealaleph/mhrv/Native.kt index 51b37f8c..fbf44dc5 100644 --- a/android/app/src/main/java/com/therealaleph/mhrv/Native.kt +++ b/android/app/src/main/java/com/therealaleph/mhrv/Native.kt @@ -89,8 +89,22 @@ object Native { * relay_calls, relay_failures, coalesced, bytes_relayed, * cache_hits, cache_misses, cache_bytes, * blacklisted_scripts, total_scripts, - * today_calls, today_bytes, today_key (string "YYYY-MM-DD"), - * today_reset_secs (seconds until 00:00 UTC rollover) + * today_calls, today_bytes, today_key (string "YYYY-MM-DD" in + * Pacific Time — matches Apps Script's actual quota reset), + * today_reset_secs (seconds until the next 00:00 Pacific Time + * rollover; ~7-8 h offset from UTC depending on DST), + * h2_calls (calls served by the HTTP/2 multiplexed transport, + * across all entry points — Apps-Script direct, exit-node + * outer call, full-mode tunnel single op, full-mode tunnel + * batch. NOT comparable to relay_calls, which only sees the + * Apps-Script-direct path), + * h2_fallbacks (calls that attempted h2 but had to fall back + * to h1 — handshake failure, open backoff, sticky ALPN + * refusal, post-send error retried on h1; same all-entry- + * points scope as h2_calls. Compute h2 health as + * h2_calls / (h2_calls + h2_fallbacks)), + * h2_disabled (boolean: true when h2 fast path is permanently + * off — config force_http1 set, or peer refused h2 via ALPN) * * Cheap — just reads atomics. Safe to poll on a second-scale timer. */ diff --git a/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt b/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt index 19953f62..00063390 100644 --- a/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt +++ b/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt @@ -1487,11 +1487,14 @@ private fun CollapsibleSection( /** * "Usage today (estimated)" card. Polls `Native.statsJson(handle)` every * second while the proxy is up and renders today's relay calls vs. the - * Apps Script free-tier quota (20,000/day), today's bytes, UTC day key, - * and a countdown to the 00:00 UTC reset. Also shows a "View quota on - * Google" button that opens Google's Apps Script dashboard — the - * authoritative number, since the client-side estimate only sees what - * this device relayed. + * Apps Script free-tier quota (20,000/day), today's bytes, the Pacific + * Time day key, and a countdown to the 00:00 PT reset. Pacific Time + * matches Apps Script's actual quota reset cadence — UTC would have + * the counter resetting ~7-8 h before the user actually got a fresh + * quota allotment from Google. Also shows a "View quota on Google" + * button that opens Google's Apps Script dashboard — the authoritative + * number, since the client-side estimate only sees what this device + * relayed. * * Hidden when the handle is 0 (proxy not running) or the JSON comes back * empty (direct / full-only configs don't run a DomainFronter and so @@ -1563,7 +1566,7 @@ private fun UsageTodayCard() { value = fmtBytes(todayBytes), ) UsageRow( - label = stringResource(R.string.label_utc_day), + label = stringResource(R.string.label_pt_day), value = todayKey, ) UsageRow( diff --git a/android/app/src/main/res/values-fa/strings.xml b/android/app/src/main/res/values-fa/strings.xml index 2b8d1989..9421f805 100644 --- a/android/app/src/main/res/values-fa/strings.xml +++ b/android/app/src/main/res/values-fa/strings.xml @@ -88,7 +88,7 @@ مصرف امروز (تخمینی) درخواست‌های امروز بایت امروز - روز (UTC) + روز (PT) ریست تا %1$d / %2$d (%3$.1f%%) %1$d ساعت و %2$d دقیقه diff --git a/android/app/src/main/res/values/strings.xml b/android/app/src/main/res/values/strings.xml index 002f66e7..6a7688e7 100644 --- a/android/app/src/main/res/values/strings.xml +++ b/android/app/src/main/res/values/strings.xml @@ -103,7 +103,10 @@ Usage today (estimated) calls today bytes today - UTC day + + PT day resets in %1$d / %2$d (%3$.1f%%) %1$dh %2$dm diff --git a/src/bin/ui.rs b/src/bin/ui.rs index a733237d..3e64ec2d 100644 --- a/src/bin/ui.rs +++ b/src/bin/ui.rs @@ -260,6 +260,10 @@ struct FormState { /// users edit `disable_padding` directly when needed (Issue #391). /// Default false (padding active). disable_padding: bool, + /// Round-tripped from config.json. Not exposed as a UI control — + /// users edit `force_http1` directly when needed. Default false + /// (HTTP/2 multiplexing on the relay leg active). + force_http1: bool, /// Round-tripped from config.json. Not exposed in the UI form yet — /// the bypass-DoH default is the right answer for almost everyone /// (DoH already encrypts, the tunnel was just adding latency), so @@ -384,6 +388,7 @@ fn load_form() -> (FormState, Option) { passthrough_hosts: c.passthrough_hosts.clone(), block_quic: c.block_quic, disable_padding: c.disable_padding, + force_http1: c.force_http1, tunnel_doh: c.tunnel_doh, bypass_doh_hosts: c.bypass_doh_hosts.clone(), block_doh: c.block_doh, @@ -422,6 +427,7 @@ fn load_form() -> (FormState, Option) { passthrough_hosts: Vec::new(), block_quic: false, disable_padding: false, + force_http1: false, tunnel_doh: true, bypass_doh_hosts: Vec::new(), block_doh: true, @@ -584,6 +590,9 @@ impl FormState { // Issue #391: disable_padding is config-only for now. // Round-trip preserves the user's choice. disable_padding: self.disable_padding, + // HTTP/2 multiplexing kill switch. Config-only for now; + // round-trip preserves the user's choice across Save. + force_http1: self.force_http1, // DoH bypass is enabled-by-default with `tunnel_doh = false`. // Round-trip the user's choice (and any extra hostnames they // added) so save doesn't drop them. @@ -693,6 +702,11 @@ struct ConfigWire<'a> { auto_blacklist_cooldown_secs: u64, #[serde(skip_serializing_if = "is_default_timeout_secs")] request_timeout_secs: u64, + /// HTTP/2 multiplexing kill switch. Default false (h2 active); only + /// emitted on save when the user has explicitly disabled h2, so + /// unchanged configs stay clean. + #[serde(skip_serializing_if = "is_false")] + force_http1: bool, /// Exit-node config (CF-anti-bot bypass for chatgpt.com / claude.ai / /// grok.com / x.com via exit-node second-hop relay). Skip when fully /// default (disabled with no URL/PSK/hosts) so configs without @@ -772,6 +786,7 @@ impl<'a> From<&'a Config> for ConfigWire<'a> { auto_blacklist_window_secs: c.auto_blacklist_window_secs, auto_blacklist_cooldown_secs: c.auto_blacklist_cooldown_secs, request_timeout_secs: c.request_timeout_secs, + force_http1: c.force_http1, exit_node: &c.exit_node, } } diff --git a/src/config.rs b/src/config.rs index 1bde7947..7be6aeba 100644 --- a/src/config.rs +++ b/src/config.rs @@ -220,6 +220,19 @@ pub struct Config { #[serde(default)] pub disable_padding: bool, + /// Disable HTTP/2 multiplexing on the Apps Script relay leg. + /// Default `false` (= h2 enabled): the TLS handshake to the Google + /// edge advertises ALPN `["h2", "http/1.1"]`; if the server picks + /// h2 we route all relay traffic over a single multiplexed + /// connection (~100 concurrent streams) instead of the legacy + /// per-request TLS pool of 8-80 sockets. Kills head-of-line + /// blocking on slow Apps Script responses (one stalled call no + /// longer pins a whole socket). Set to `true` to force the + /// pre-v1.9.x HTTP/1.1 path — useful as a kill switch if a specific + /// deployment, fronting domain, or middlebox refuses h2. + #[serde(default)] + pub force_http1: bool, + /// Opt-out for the DoH bypass. Default `false` (= bypass active): /// CONNECTs to well-known DoH hostnames (Cloudflare, Google, Quad9, /// AdGuard, NextDNS, OpenDNS, browser-pinned variants like @@ -867,6 +880,38 @@ mod rt_tests { let _ = std::fs::remove_file(&tmp); } + #[test] + fn force_http1_round_trips_through_config() { + let json = r#"{ + "mode": "apps_script", + "google_ip": "216.239.38.120", + "front_domain": "www.google.com", + "script_id": "X", + "auth_key": "secretkey123", + "listen_host": "127.0.0.1", + "listen_port": 8085, + "log_level": "info", + "verify_ssl": true, + "force_http1": true +}"#; + let cfg: Config = serde_json::from_str(json).unwrap(); + assert!(cfg.force_http1, "force_http1=true must round-trip"); + } + + #[test] + fn force_http1_defaults_false_when_omitted() { + // Existing configs from before v1.9.13 don't have the field. + // serde(default) must give false (h2 active) so older configs + // continue to work and unchanged users get the optimization. + let json = r#"{ + "mode": "apps_script", + "auth_key": "secretkey123", + "script_id": "X" +}"#; + let cfg: Config = serde_json::from_str(json).unwrap(); + assert!(!cfg.force_http1, "default must be false (h2 enabled)"); + } + #[test] fn round_trip_minimal_fields_only() { // User saves with defaults for everything optional. This is what the diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index 034798e9..ed8e3554 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -6,8 +6,9 @@ //! `/macros/s/{script_id}/exec`. Apps Script performs the actual upstream //! HTTP fetch server-side and returns a JSON envelope. //! -//! TODO: add HTTP/2 multiplexing (`h2` crate) for lower latency. -//! TODO: add parallel range-based downloads. +//! Multiplexes over HTTP/2 when the relay edge agrees via ALPN; falls back +//! to HTTP/1.1 keep-alive when h2 is refused or fails. Range-parallel +//! downloads are implemented by `relay_parallel_range`. use std::collections::HashMap; // AtomicU64 via portable-atomic: native on 64-bit / armv7, spinlock- @@ -15,12 +16,13 @@ use std::collections::HashMap; // is identical to std::sync::atomic::AtomicU64 so call sites need // no other changes. use portable_atomic::AtomicU64; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use base64::engine::general_purpose::STANDARD as B64; use base64::Engine; +use bytes::Bytes; use rand::{thread_rng, Rng, RngCore}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -54,6 +56,38 @@ pub enum FronterError { Timeout, #[error("json: {0}")] Json(#[from] serde_json::Error), + /// Wraps another error and tells outer retry/fallback layers + /// (`do_relay_with_retry`, the exit-node→direct-Apps-Script + /// fallback in `relay()`) NOT to replay the request. Used when an + /// h2 attempt failed *after* `send_request` succeeded — the + /// request may have already reached and been processed by Apps + /// Script (or the exit node), and replaying via h1 / direct path + /// would duplicate side effects for non-idempotent methods. + /// + /// `Display` is transparent so error messages look identical to + /// the wrapped variant; tests/observability use `is_retryable()` + /// and `into_inner()` to introspect. + #[error(transparent)] + NonRetryable(Box), +} + +impl FronterError { + /// True if outer retry/fallback layers may safely re-issue the + /// request. False for `NonRetryable(_)` — those errors signal + /// "request may have been sent; do not duplicate." + pub fn is_retryable(&self) -> bool { + !matches!(self, FronterError::NonRetryable(_)) + } + + /// Strip the `NonRetryable` wrapper, returning the underlying + /// error. Useful for surfacing the original message after the + /// retry/fallback policy has already done its job. + pub fn into_inner(self) -> FronterError { + match self { + FronterError::NonRetryable(inner) => *inner, + other => other, + } + } } type PooledStream = TlsStream; @@ -63,6 +97,40 @@ const POOL_REFILL_INTERVAL_SECS: u64 = 5; const POOL_MAX: usize = 80; const REQUEST_TIMEOUT_SECS: u64 = 25; const RANGE_PARALLEL_CHUNK_BYTES: u64 = 256 * 1024; +/// HTTP/2 connection lifetime before we proactively reopen. Apps Script's +/// edge has been observed to send GOAWAY at ~10 min anyway, so we cycle +/// at 9 min to do an orderly reconnect on our schedule rather than +/// letting an in-flight stream race a server-initiated close. +const H2_CONN_TTL_SECS: u64 = 540; +/// Bound on the h2 ready/back-pressure phase only. `SendRequest::ready()` +/// awaits a free slot under the server's `MAX_CONCURRENT_STREAMS`. A +/// stall here means the connection is overloaded (or dead at the +/// muxer level) but no stream has been opened yet — RequestSent::No, +/// safe to fall back to h1 without duplication risk. Kept short +/// (5 s) so a saturated conn doesn't burn the caller's whole budget. +/// +/// The post-send phase (response headers + body drain) uses the +/// caller-supplied `response_deadline` instead — see +/// `h2_round_trip`. This way a slow but legitimate Apps Script call +/// isn't cut off at an arbitrary fixed cap, and Full-mode batches can +/// honor the user's `request_timeout_secs` setting. +const H2_READY_TIMEOUT_SECS: u64 = 5; +/// Default response-phase deadline used by `relay_uncoalesced` callers +/// (the Apps-Script direct path). Sized to be just under the outer +/// `REQUEST_TIMEOUT_SECS` (25 s) so an h2 timeout still leaves a few +/// seconds of outer budget for an h1 fallback round-trip when the +/// caller chose to retry. +const H2_RESPONSE_DEADLINE_DEFAULT_SECS: u64 = 20; +/// Bound on the TCP connect + TLS handshake + h2 handshake phase. A +/// blackholed `connect_host:443` previously stalled `ensure_h2` until +/// the outer 25 s timeout fired (returning 504 without ever falling +/// back). With this bound, a slow open trips after 8 s and the caller +/// drops to h1 with ~17 s of outer budget to spare. +const H2_OPEN_TIMEOUT_SECS: u64 = 8; +/// After an h2 open failure, suppress further open attempts for this +/// long. Prevents every concurrent caller during an h2 outage from +/// paying its own full handshake-timeout cost in turn. +const H2_OPEN_FAILURE_BACKOFF_SECS: u64 = 15; /// Cadence for Apps Script container keepalive pings. Apps Script /// containers go cold after ~5min idle and cost 1-3s on the first /// request to wake back up — most painful on YouTube / streaming where @@ -78,6 +146,69 @@ struct PoolEntry { created: Instant, } +/// Single shared HTTP/2 connection to the Google edge. One TCP/TLS +/// socket carries up to ~100 concurrent streams (server's +/// `MAX_CONCURRENT_STREAMS` setting); each relay request takes a clone +/// of the `SendRequest` handle and opens its own stream. Cheaper than +/// the legacy per-request socket pool — no head-of-line blocking when +/// a single Apps Script call stalls. +/// +/// `generation` is monotonic per fronter and lets `poison_h2_if_gen` +/// avoid the race where task A's stale failure clears task B's +/// freshly-reopened healthy cell. +struct H2Cell { + send: h2::client::SendRequest, + created: Instant, + generation: u64, +} + +/// "Did this request reach Apps Script?" signal carried out of every +/// h2 failure so callers know whether replaying via h1 is safe. +/// +/// - `No`: the failure occurred before `send_request` returned. The +/// stream was never opened on the wire; replaying through h1 is +/// guaranteed not to duplicate any side effect. +/// - `Maybe`: `send_request` succeeded (headers queued for sending) +/// but a later step failed — server may have already received the +/// request and may already be processing it. Replaying a +/// non-idempotent op (POST/PUT/DELETE, tunnel write, batch ops) +/// risks duplicating side effects. Only safe to retry for methods +/// that are idempotent by HTTP semantics. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum RequestSent { + No, + Maybe, +} + +/// Typed errors from `open_h2`. Used so `ensure_h2` can recognize the +/// "peer refused h2 in ALPN" outcome and sticky-disable the fast path +/// without resorting to string matching across function boundaries. +#[derive(Debug, thiserror::Error)] +enum OpenH2Error { + #[error("ALPN did not negotiate h2; peer prefers http/1.1")] + AlpnRefused, + #[error("io: {0}")] + Io(#[from] std::io::Error), + #[error("tls: {0}")] + Tls(#[from] rustls::Error), + #[error("dns: {0}")] + Dns(#[from] rustls::pki_types::InvalidDnsNameError), + #[error("h2 handshake: {0}")] + Handshake(String), +} + +impl From for FronterError { + fn from(e: OpenH2Error) -> Self { + match e { + OpenH2Error::Io(e) => FronterError::Io(e), + OpenH2Error::Tls(e) => FronterError::Tls(e), + OpenH2Error::Dns(e) => FronterError::Dns(e), + OpenH2Error::AlpnRefused => FronterError::Relay("alpn refused h2".into()), + OpenH2Error::Handshake(m) => FronterError::Relay(format!("h2 handshake: {}", m)), + } + } +} + pub struct DomainFronter { connect_host: String, /// Pool of SNI domains to rotate through per outbound connection. All of @@ -104,8 +235,45 @@ pub struct DomainFronter { /// Set once we've emitted the "UnknownIssuer means ISP MITM" hint, /// so we don't spam it every time a cert-validation error repeats. cert_hint_shown: std::sync::atomic::AtomicBool, + /// Connector used by `open_h2`: advertises ALPN `["h2", "http/1.1"]` + /// when the h2 fast path is enabled, else just `["http/1.1"]`. Never + /// used by the h1 pool path — see `tls_connector_h1`. tls_connector: TlsConnector, + /// Connector used by `open()` (h1 pool warm/refill/acquire). ALPN + /// is forced to `["http/1.1"]` so a Google edge that would have + /// preferred h2 still negotiates h1 here. Without this, pooled + /// sockets could end up speaking h2 frames after handshake, and + /// the `write_all(b"GET / HTTP/1.1\r\n...")` fallback would land + /// on a server that has no idea what we're doing. + tls_connector_h1: TlsConnector, pool: Arc>>, + /// HTTP/2 fast path. `None` until first relay opens it; cleared on + /// connection failure or expiry so the next call reopens. Skipped + /// entirely when `force_http1` is set or when the peer refused h2 + /// during ALPN (sticky `h2_disabled`). + h2_cell: Arc>>, + /// Serializes "open a new h2 connection" attempts so that during + /// an outage, only one task pays the handshake cost — concurrent + /// callers see the lock contended via `try_lock` and fall through + /// to h1 immediately rather than queueing behind a slow handshake. + /// Distinct from `h2_cell` so the cell mutex is never held across + /// network I/O. + h2_open_lock: Arc>, + /// Wall-clock timestamp of the last failed `open_h2`. While within + /// `H2_OPEN_FAILURE_BACKOFF_SECS` of this, `ensure_h2` returns None + /// without retrying — prevents thundering-herd handshake attempts + /// during transient h2 outages. + h2_open_failed_at: Arc>>, + /// Monotonic counter for `H2Cell::generation`. Each successful + /// `open_h2` increments and tags the new cell so `poison_h2_if_gen` + /// can avoid the race where a stale failure clears a freshly-opened + /// cell that another task just installed. + h2_generation: Arc, + /// Set when ALPN negotiates http/1.1 (peer refused h2) or when + /// `force_http1` is true. Sticky for the lifetime of the fronter: + /// once we know this peer doesn't speak h2, don't keep retrying + /// the handshake on every relay call. + h2_disabled: Arc, cache: Arc, inflight: Arc>>>>, coalesced: AtomicU64, @@ -120,6 +288,29 @@ pub struct DomainFronter { relay_calls: AtomicU64, relay_failures: AtomicU64, bytes_relayed: AtomicU64, + /// Relay calls that successfully completed over the h2 fast path, + /// across **all** entry points: Apps-Script direct relays, + /// exit-node outer calls, full-mode tunnel single ops, and + /// full-mode tunnel batches. + /// + /// **Not** comparable to `relay_calls`: that counter only counts + /// the Apps-Script-direct path (incremented in `relay_uncoalesced`). + /// The other three paths bypass `relay_uncoalesced` entirely, so in + /// full-mode deployments `h2_calls` can exceed `relay_calls` — + /// reading their ratio as a "% on h2" gives a wrong number. + /// + /// To gauge h2 health, compute `h2_calls / (h2_calls + h2_fallbacks)`. + /// That's the success ratio across all transports; a healthy + /// deployment shows > 95 %. + h2_calls: AtomicU64, + /// Relay calls that attempted h2 but had to fall back to h1 + /// (transient handshake failure, mid-stream error, conn poisoned, + /// open backoff, or `RequestSent::No` failure that the call site + /// chose to retry on h1). Same all-entry-points scope as + /// `h2_calls`. A persistently high `h2_fallbacks / (h2_calls + + /// h2_fallbacks)` ratio indicates an unhealthy h2 conn or a flaky + /// middlebox eating h2 frames; consider `force_http1: true`. + h2_fallbacks: AtomicU64, /// Per-host breakdown of traffic going through this fronter. Keyed by /// the host of the URL (e.g. "api.x.com"). Read-mostly; only touched /// on the slow path (once per relayed request), so a plain Mutex is @@ -275,19 +466,43 @@ impl DomainFronter { if script_ids.is_empty() { return Err(FronterError::Relay("no script_id configured".into())); } - let tls_config = if config.verify_ssl { - let mut roots = rustls::RootCertStore::empty(); - roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); - ClientConfig::builder() - .with_root_certificates(roots) - .with_no_client_auth() - } else { - ClientConfig::builder() - .dangerous() - .with_custom_certificate_verifier(Arc::new(NoVerify)) - .with_no_client_auth() + // Helper that builds a fresh ClientConfig with the verifier + // policy from config. We need two of these so the h2-capable + // and h1-only paths can advertise different ALPN sets without + // mutating one shared config across calls. + let build_tls_config = || { + if config.verify_ssl { + let mut roots = rustls::RootCertStore::empty(); + roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + ClientConfig::builder() + .with_root_certificates(roots) + .with_no_client_auth() + } else { + ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoVerify)) + .with_no_client_auth() + } }; - let tls_connector = TlsConnector::from(Arc::new(tls_config)); + + // Connector for `open_h2`: advertises h2 first (or just h1 if + // the kill switch is set, in which case both connectors end up + // identical — fine, just slightly redundant). + let mut tls_h2 = build_tls_config(); + if !config.force_http1 { + tls_h2.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + } else { + tls_h2.alpn_protocols = vec![b"http/1.1".to_vec()]; + } + let tls_connector = TlsConnector::from(Arc::new(tls_h2)); + + // Connector for `open()` (h1 pool path). ALPN is forced to + // http/1.1 so a Google edge that would otherwise prefer h2 + // still negotiates h1 here — pooled sockets always speak the + // protocol the fallback path expects. + let mut tls_h1 = build_tls_config(); + tls_h1.alpn_protocols = vec![b"http/1.1".to_vec()]; + let tls_connector_h1 = TlsConnector::from(Arc::new(tls_h1)); Ok(Self { connect_host: config.google_ip.clone(), @@ -304,7 +519,13 @@ impl DomainFronter { script_ids, script_idx: AtomicUsize::new(0), tls_connector, + tls_connector_h1, pool: Arc::new(Mutex::new(Vec::new())), + h2_cell: Arc::new(Mutex::new(None)), + h2_open_lock: Arc::new(Mutex::new(())), + h2_open_failed_at: Arc::new(Mutex::new(None)), + h2_generation: Arc::new(AtomicU64::new(0)), + h2_disabled: Arc::new(AtomicBool::new(config.force_http1)), cache: Arc::new(ResponseCache::with_default()), inflight: Arc::new(Mutex::new(HashMap::new())), coalesced: AtomicU64::new(0), @@ -313,6 +534,8 @@ impl DomainFronter { relay_calls: AtomicU64::new(0), relay_failures: AtomicU64::new(0), bytes_relayed: AtomicU64::new(0), + h2_calls: AtomicU64::new(0), + h2_fallbacks: AtomicU64::new(0), per_site: Arc::new(std::sync::Mutex::new(HashMap::new())), today_calls: AtomicU64::new(0), today_bytes: AtomicU64::new(0), @@ -461,6 +684,9 @@ impl DomainFronter { today_bytes: self.today_bytes.load(Ordering::Relaxed), today_key, today_reset_secs: seconds_until_pacific_midnight(), + h2_calls: self.h2_calls.load(Ordering::Relaxed), + h2_fallbacks: self.h2_fallbacks.load(Ordering::Relaxed), + h2_disabled: self.h2_disabled.load(Ordering::Relaxed), } } @@ -642,18 +868,40 @@ impl DomainFronter { let _ = tcp.set_nodelay(true); let sni = self.next_sni(); let name = ServerName::try_from(sni)?; - let tls = self.tls_connector.connect(name, tcp).await?; + // Always use the h1-only connector here — the pool only holds + // sockets that the raw HTTP/1.1 fallback path can write to. + // Using the shared connector would let some pooled sockets + // negotiate h2, which would then misframe every fallback + // request that lands on them. + let tls = self.tls_connector_h1.connect(name, tcp).await?; Ok(tls) } - /// Open `n` outbound TLS connections sequentially (500 ms apart) and - /// park them in the pool. Staggered so we don't burst N TLS handshakes - /// at Google edge simultaneously, and each connection gets an 8 s + /// Open outbound TLS connections eagerly so the first relay request + /// doesn't pay a cold handshake. + /// + /// When h2 is enabled, attempts to open the multiplexed h2 cell + /// first. Success there means one TCP/TLS handshake serves all + /// future requests, so we only need a tiny fallback h1 pool + /// (clamped to 2) instead of the full `n` requested. On h2 failure + /// (ALPN refusal, network error), falls back to the legacy + /// behavior: warm the full `n` h1 sockets. + /// + /// Staggered 500 ms apart so we don't burst N TLS handshakes at the + /// Google edge simultaneously, and each connection gets an 8 s /// expiry offset so they roll off gradually instead of all hitting /// POOL_TTL_SECS at once. pub async fn warm(self: &Arc, n: usize) { + // Try to bring up the h2 fast path first. If that succeeds, + // shrink the h1 pool warm count to the fallback minimum — the + // multiplexed h2 conn handles all real traffic, so the h1 pool + // only needs to cover the rare case where h2 dies mid-session. + let h2_alive = !self.h2_disabled.load(Ordering::Relaxed) + && self.ensure_h2().await.is_some(); + let h1_target = if h2_alive { 2.min(n) } else { n }; + let mut warmed = 0usize; - for i in 0..n { + for i in 0..h1_target { if i > 0 { tokio::time::sleep(Duration::from_millis(500)).await; } @@ -674,18 +922,34 @@ impl DomainFronter { } } } - if warmed > 0 { + if h2_alive { + tracing::info!( + "h2 fast path active; h1 fallback pool pre-warmed with {} connection(s)", + warmed + ); + } else if warmed > 0 { tracing::info!("pool pre-warmed with {} connection(s)", warmed); } } - /// Background loop that keeps at least `POOL_MIN` valid connections - /// ready. A connection only counts toward the minimum if it has at - /// least 20 s of TTL remaining — nearly-expired entries don't help. + /// Background loop that keeps the h1 fallback pool warm. + /// + /// Target depends on whether the h2 fast path is active: + /// - h2 disabled (or peer refused ALPN h2): keep `POOL_MIN` (8) + /// sockets so the per-request acquire never pays a cold handshake + /// — the pre-h2 default behavior. + /// - h2 active: keep just `POOL_MIN_H2_FALLBACK` (2). All real + /// traffic rides the multiplexed h2 connection; the h1 pool only + /// exists to cover the case where h2 dies and we need to fall + /// back instantly without a cold handshake. + /// + /// A connection only counts toward the minimum if it has at least + /// 20 s of TTL remaining — nearly-expired entries don't help. /// Checks every `POOL_REFILL_INTERVAL_SECS`, evicts expired entries, /// and opens replacements one at a time so there's no burst. pub async fn run_pool_refill(self: Arc) { const MIN_REMAINING_SECS: u64 = 20; + const POOL_MIN_H2_FALLBACK: usize = 2; loop { tokio::time::sleep(Duration::from_secs(POOL_REFILL_INTERVAL_SECS)).await; @@ -695,6 +959,22 @@ impl DomainFronter { pool.retain(|e| e.created.elapsed().as_secs() < POOL_TTL_SECS); } + // Decide target. We treat "h2 active right now" as having a + // fresh, non-poisoned cell. h2_disabled is the sticky flag + // (peer never agreed to h2); a transient cell-poison after + // h2 success briefly drops back to the larger target until + // ensure_h2 reopens. + let target = if self.h2_disabled.load(Ordering::Relaxed) { + POOL_MIN + } else { + let cell = self.h2_cell.lock().await; + let h2_alive = cell + .as_ref() + .map(|c| c.created.elapsed().as_secs() < H2_CONN_TTL_SECS) + .unwrap_or(false); + if h2_alive { POOL_MIN_H2_FALLBACK } else { POOL_MIN } + }; + // Count only connections with enough life left. // Refill one at a time to avoid bursting TLS handshakes. loop { @@ -707,7 +987,7 @@ impl DomainFronter { }) .count() }; - if healthy >= POOL_MIN { + if healthy >= target { break; } match self.open().await { @@ -731,13 +1011,17 @@ impl DomainFronter { /// Keep the Apps Script container warm with a periodic HEAD ping. /// - /// `acquire()` keeps the *TCP/TLS pool* warm but does nothing for the - /// V8 container Apps Script runs in: that goes cold ~5min after the - /// last UrlFetchApp call and costs 1-3s to spin back up. The symptom - /// is "first request after a quiet period stalls" — most visible on - /// YouTube where the player gives up on a 1.5s `googlevideo.com` + /// The TCP/TLS pool stays warm via `run_pool_refill`, but the V8 + /// container Apps Script runs in goes cold ~5min after the last + /// `UrlFetchApp` call and costs 1-3s to spin back up. The symptom + /// is "first request after a quiet period stalls" — most visible + /// on YouTube where the player gives up on a 1.5s `googlevideo.com` /// chunk that's actually waiting on a cold-start. /// + /// Transport-agnostic: the underlying call goes through the same + /// `relay_uncoalesced` path everything else uses, so when h2 is + /// up the keepalive rides the multiplexed connection too. + /// /// Bypasses the response cache (`cache_key_opt = None`) and the /// inflight coalescer — otherwise the second iteration would just /// hit the cached response from the first and never reach Apps @@ -749,7 +1033,7 @@ impl DomainFronter { /// quota-exhausted account doesn't spam warnings every 4 minutes. /// Loops forever — caller is expected to drop the JoinHandle on /// shutdown (the task lives as long as the process). - pub async fn run_h1_keepalive(self: Arc) { + pub async fn run_keepalive(self: Arc) { loop { tokio::time::sleep(Duration::from_secs(H1_KEEPALIVE_INTERVAL_SECS)).await; let t0 = Instant::now(); @@ -762,7 +1046,7 @@ impl DomainFronter { .relay_uncoalesced("HEAD", "http://example.com/", &[], &[], None) .await; tracing::debug!( - "H1 container keepalive: {}ms", + "container keepalive: {}ms", t0.elapsed().as_millis() ); } @@ -801,6 +1085,543 @@ impl DomainFronter { } } + /// Return a cloned `SendRequest` handle (paired with its cell + /// generation) to the active HTTP/2 connection, opening a new one + /// if needed. `None` means the h2 fast path is unavailable for + /// this call — the caller should fall through to the h1 path. + /// + /// Reasons we may return `None`: + /// - `force_http1` set, or peer previously refused h2 via ALPN + /// (sticky `h2_disabled`). + /// - We're inside the `H2_OPEN_FAILURE_BACKOFF_SECS` cooldown + /// after a recent open failure. + /// - Another task is currently opening a connection and we + /// don't want to pile on (`try_lock` on `h2_open_lock`). + /// - The open we just attempted timed out within + /// `H2_OPEN_TIMEOUT_SECS` or otherwise failed. + /// + /// The lock on `h2_cell` is *never* held across network I/O — + /// that's the whole point of `h2_open_lock`. Concurrent first-time + /// callers compete for `h2_open_lock` via `try_lock`; the loser + /// returns None immediately and uses h1 rather than serializing + /// behind a slow handshake. + /// + /// The returned generation lets the caller later + /// `poison_h2_if_gen(gen)` to clear *only* this specific cell on + /// per-stream error, avoiding the race where a stale failure + /// clobbers a freshly-reopened healthy cell. + async fn ensure_h2(&self) -> Option<(h2::client::SendRequest, u64)> { + if self.h2_disabled.load(Ordering::Relaxed) { + return None; + } + + // Fast path: existing cell, within TTL. Clone (Arc bump) and + // return without touching the open machinery. We can't peek at + // SendRequest liveness directly (h2 0.4 doesn't expose + // `is_closed`), so a request against a dead conn fails at + // `ready()`/`send_request` and the caller poisons by + // generation from there. + { + let cell = self.h2_cell.lock().await; + if let Some(c) = cell.as_ref() { + if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS { + return Some((c.send.clone(), c.generation)); + } + } + } + + // Backoff check — recent open failure means h2 is currently + // unhealthy; don't pile on retries until the window expires. + { + let last = self.h2_open_failed_at.lock().await; + if let Some(t) = *last { + if t.elapsed().as_secs() < H2_OPEN_FAILURE_BACKOFF_SECS { + return None; + } + } + } + + // Open dedup: only one task does the actual handshake at a + // time. Concurrent callers see the lock contended and fall + // through to h1 immediately — preserves cold-start latency + // for the burst that arrives during a slow open. + let _open_guard = match self.h2_open_lock.try_lock() { + Ok(g) => g, + Err(_) => return None, + }; + + // Re-check the cell under open_lock — another task may have + // just stored a fresh connection while we were arbitrating. + { + let cell = self.h2_cell.lock().await; + if let Some(c) = cell.as_ref() { + if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS { + return Some((c.send.clone(), c.generation)); + } + } + } + + // Bounded handshake. A blackholed connect target can stall + // for many seconds otherwise, eating the outer budget that + // should be reserved for an h1 fallback round-trip. + let open_result = + tokio::time::timeout(Duration::from_secs(H2_OPEN_TIMEOUT_SECS), self.open_h2()) + .await; + + let send = match open_result { + Ok(Ok(s)) => s, + Ok(Err(OpenH2Error::AlpnRefused)) => { + // Definitive: this peer doesn't speak h2. Sticky-disable + // so we never re-attempt the handshake. + self.h2_disabled.store(true, Ordering::Relaxed); + tracing::info!( + "relay peer refused h2 via ALPN; staying on http/1.1" + ); + *self.h2_cell.lock().await = None; + return None; + } + Ok(Err(e)) => { + tracing::debug!("h2 open failed: {} — falling back to h1", e); + *self.h2_open_failed_at.lock().await = Some(Instant::now()); + *self.h2_cell.lock().await = None; + return None; + } + Err(_) => { + tracing::debug!( + "h2 open timed out after {}s — falling back to h1", + H2_OPEN_TIMEOUT_SECS + ); + *self.h2_open_failed_at.lock().await = Some(Instant::now()); + *self.h2_cell.lock().await = None; + return None; + } + }; + + // Open succeeded. Tag with a fresh generation, store, return. + // Clear any stale backoff timestamp. + let generation = self.h2_generation.fetch_add(1, Ordering::Relaxed) + 1; + *self.h2_open_failed_at.lock().await = None; + let mut cell = self.h2_cell.lock().await; + *cell = Some(H2Cell { + send: send.clone(), + created: Instant::now(), + generation, + }); + Some((send, generation)) + } + + /// Open one TLS connection and run the h2 handshake. Returns a + /// typed `OpenH2Error` so the caller can recognize ALPN refusal + /// (sticky disable) without string-matching across boundaries. + async fn open_h2(&self) -> Result, OpenH2Error> { + let tcp = TcpStream::connect((self.connect_host.as_str(), 443u16)).await?; + let _ = tcp.set_nodelay(true); + let sni = self.next_sni(); + let name = ServerName::try_from(sni)?; + let tls = self.tls_connector.connect(name, tcp).await?; + Self::h2_handshake_post_tls(tls).await + } + + /// Post-TLS portion of the h2 open path: ALPN check + h2 handshake + /// + connection-driver task spawn. Split out from `open_h2` so + /// tests can drive it with a TLS stream from any local server, + /// bypassing the hard-coded `connect_host:443` target. + async fn h2_handshake_post_tls( + tls: PooledStream, + ) -> Result, OpenH2Error> { + let alpn_h2 = tls + .get_ref() + .1 + .alpn_protocol() + .map(|p| p == b"h2") + .unwrap_or(false); + if !alpn_h2 { + return Err(OpenH2Error::AlpnRefused); + } + // Larger initial windows mean we don't have to call + // `release_capacity` on every chunk for typical Apps Script + // payloads (usually < 1 MB; range chunks are 256 KB). We still + // release capacity in the body-read loop for safety on larger + // bodies. + let (send, conn) = h2::client::Builder::new() + .initial_window_size(4 * 1024 * 1024) + .initial_connection_window_size(8 * 1024 * 1024) + .handshake(tls) + .await + .map_err(|e| OpenH2Error::Handshake(e.to_string()))?; + // The connection task drives frame I/O independently of any + // SendRequest handle. When it ends (GOAWAY, network error, TTL), + // existing handles will start failing on `ready()` / `send_request` + // and `ensure_h2` will reopen on the next call. + tokio::spawn(async move { + if let Err(e) = conn.await { + tracing::debug!("h2 connection closed: {}", e); + } + }); + tracing::info!("h2 connection established to relay edge"); + Ok(send) + } + + /// React to an h2-fronting-incompatibility HTTP response (status + /// matched by `is_h2_fronting_refusal_status`) by: + /// * sticky-disabling the h2 fast path so subsequent calls go + /// straight to h1 without re-paying the handshake / refusal, + /// * clearing any current cell so the SendRequest is dropped, + /// * rebalancing the h2 stat counters so this request shows + /// up as a fallback, not a successful h2 call. (The + /// `run_h2_relay_with_send` Ok path bumps `h2_calls` for any + /// completed round-trip; for a 421 we want it counted as + /// `h2_fallbacks` instead since the request will take the + /// h1 path.) + /// Logs at info because this is a meaningful state transition for + /// the deployment, not a per-request hiccup. + async fn sticky_disable_h2_for_fronting_refusal(&self, status: u16, context: &str) { + if !self.h2_disabled.swap(true, Ordering::Relaxed) { + tracing::info!( + "h2 returned HTTP {} for {} — likely :authority/SNI mismatch via \ + domain fronting. Disabling h2 fast path for this fronter and \ + falling back to http/1.1.", + status, + context, + ); + } + *self.h2_cell.lock().await = None; + // Reclassify: undo the h2_calls increment from + // run_h2_relay_with_send and bill this attempt as a fallback. + // saturating_sub-style guard: only decrement if non-zero so a + // direct caller of this helper from a non-Ok path can't + // underflow the counter. + let _ = self.h2_calls.fetch_update( + Ordering::Relaxed, + Ordering::Relaxed, + |c| if c > 0 { Some(c - 1) } else { None }, + ); + self.h2_fallbacks.fetch_add(1, Ordering::Relaxed); + } + + /// Clear the h2 cell *only if* its generation matches the one the + /// caller observed. Prevents the race where: + /// 1. Task A holds SendRequest from generation N + /// 2. Generation N's connection dies; Task B reopens → cell now + /// holds generation N+1 (healthy) + /// 3. Task A's stale stream errors → unconditionally clearing + /// the cell would kill the healthy N+1 + /// With generation matching, A's poison is a no-op against N+1. + async fn poison_h2_if_gen(&self, generation: u64) { + let mut cell = self.h2_cell.lock().await; + if let Some(c) = cell.as_ref() { + if c.generation == generation { + *cell = None; + } + } + } + + /// Send one POST through the active h2 connection, follow up to 5 + /// redirects, and return `(status, headers, body)` — the same shape + /// the h1 path's `read_http_response` produces, so callers can stay + /// transport-agnostic from this point on. + /// + /// `path` is the HTTP path including the leading slash. The Host / + /// :authority header is taken from `self.http_host` for the initial + /// request and from the `Location` URL on redirect. `payload` is the + /// body bytes; `content_type` is set when non-None (for the JSON + /// envelope). Empty body + None content_type → GET (used for redirect + /// follow-up). + /// Run one h2 stream and return `(status, headers, body)`. Errors + /// carry a `RequestSent` flag so the caller can distinguish "never + /// sent" (safe to retry on h1) from "may have been processed by + /// origin" (only safe to retry for idempotent methods). + /// + /// Two phases, two timeouts: + /// * **Ready (back-pressure):** bounded by `H2_READY_TIMEOUT_SECS` + /// (5 s constant). A stall here means the conn is saturated + /// under `MAX_CONCURRENT_STREAMS` (or dead at the muxer level) + /// but no stream has opened — `RequestSent::No`. + /// * **Response (post-send):** bounded by the caller-provided + /// `response_deadline`. After `send_request` returns Ok the + /// headers are queued; we conservatively treat any later + /// failure or timeout as `RequestSent::Maybe`. Caller picks + /// the deadline so legitimate slow Apps Script calls and + /// Full-mode batches with custom `request_timeout_secs` aren't + /// cut off at an arbitrary fixed cap. + async fn h2_round_trip( + &self, + send: h2::client::SendRequest, + method: &str, + path: &str, + host: &str, + payload: Bytes, + content_type: Option<&str>, + response_deadline: Duration, + ) -> Result<(u16, Vec<(String, String)>, Vec), (FronterError, RequestSent)> { + // h2 requires absolute-form URIs with the :authority pseudo-header + // populated from the Host. http::Request's URI parser accepts + // `https://{host}{path}` for that. + let uri = format!("https://{}{}", host, path); + let mut builder = http::Request::builder().method(method).uri(uri); + // Apps Script accepts gzip on the response; mirror the h1 path so + // payloads stay small. + builder = builder.header("accept-encoding", "gzip"); + if let Some(ct) = content_type { + builder = builder.header("content-type", ct); + } + let req = builder.body(()).map_err(|e| { + ( + FronterError::Relay(format!("h2 request build: {}", e)), + RequestSent::No, + ) + })?; + + // Phase 1: ready/back-pressure. Bounded short. Timeout here + // means saturation, not server-side processing — the stream + // hasn't even opened, so `RequestSent::No`. + let ready_result = tokio::time::timeout( + Duration::from_secs(H2_READY_TIMEOUT_SECS), + send.ready(), + ) + .await; + let mut send = match ready_result { + Ok(Ok(s)) => s, + Ok(Err(e)) => { + return Err(( + FronterError::Relay(format!("h2 ready: {}", e)), + RequestSent::No, + )); + } + Err(_) => { + return Err((FronterError::Timeout, RequestSent::No)); + } + }; + + let has_body = !payload.is_empty(); + // send_request is synchronous; it queues the HEADERS frame. + // After this returns Ok we conservatively assume the request + // reached the server. An Err here means the stream couldn't + // be opened (e.g. connection-level GOAWAY), safe to retry. + let (response_fut, mut body_tx) = send.send_request(req, !has_body).map_err(|e| { + ( + FronterError::Relay(format!("h2 send_request: {}", e)), + RequestSent::No, + ) + })?; + + if has_body { + // body_tx errors here are RequestSent::Maybe — headers were + // already queued, so we may have invoked Apps Script's doPost + // even if the body never finished. + body_tx.send_data(payload, true).map_err(|e| { + ( + FronterError::Relay(format!("h2 send_data: {}", e)), + RequestSent::Maybe, + ) + })?; + } + + // Phase 2: response headers + body drain. Bounded by the + // caller's deadline. Errors and timeout here are + // `RequestSent::Maybe` — the request is on the wire and may + // already have side effects. + let response_phase = async { + let response = response_fut.await.map_err(|e| { + ( + FronterError::Relay(format!("h2 response: {}", e)), + RequestSent::Maybe, + ) + })?; + let (parts, mut body) = response.into_parts(); + let status = parts.status.as_u16(); + + // Convert headers to the (String, String) Vec the rest of + // the codebase expects. Multi-valued headers (set-cookie, + // etc.) are expanded one entry per value, matching + // httparse's emission. + let mut headers: Vec<(String, String)> = Vec::with_capacity(parts.headers.len()); + for (name, value) in parts.headers.iter() { + if let Ok(v) = value.to_str() { + headers.push((name.as_str().to_string(), v.to_string())); + } + } + + // Drain body. Release flow-control credit per chunk so + // large responses don't stall after the initial 4 MB window. + let mut buf: Vec = Vec::new(); + while let Some(chunk) = body.data().await { + let chunk = chunk.map_err(|e| { + ( + FronterError::Relay(format!("h2 body chunk: {}", e)), + RequestSent::Maybe, + ) + })?; + let n = chunk.len(); + buf.extend_from_slice(&chunk); + let _ = body.flow_control().release_capacity(n); + } + Ok::<_, (FronterError, RequestSent)>((status, headers, buf)) + }; + + let (status, headers, mut buf) = match tokio::time::timeout( + response_deadline, + response_phase, + ) + .await + { + Ok(Ok(t)) => t, + Ok(Err(e)) => return Err(e), + Err(_) => return Err((FronterError::Timeout, RequestSent::Maybe)), + }; + + // Mirror `read_http_response`: if the server gzipped the body + // (we asked for it via accept-encoding), decompress before + // handing back so downstream JSON / envelope parsers see plain + // bytes regardless of transport. + if let Some(enc) = header_get(&headers, "content-encoding") { + if enc.eq_ignore_ascii_case("gzip") { + if let Ok(decoded) = decode_gzip(&buf) { + buf = decoded; + } + } + } + + Ok((status, headers, buf)) + } + + /// Run a full relay round-trip over h2: initial POST + up to 5 + /// redirect hops. `path` is the Apps Script `/macros/s/{id}/exec` + /// path. Returns the same `(status, headers, body)` triple as the + /// h1 path on success. + /// + /// `response_deadline` bounds the post-send phase of each round + /// trip (response headers + body drain). The ready/back-pressure + /// phase has its own short bound (`H2_READY_TIMEOUT_SECS`). + /// Caller picks the deadline based on its own outer budget: + /// * Apps-Script direct (`relay_uncoalesced`): a few seconds + /// under `REQUEST_TIMEOUT_SECS` (25 s) so an h2 timeout still + /// leaves room for an h1 fallback. + /// * Full-mode tunnel (`tunnel_request` / `tunnel_batch_request_to`): + /// `self.batch_timeout` so the user's + /// `request_timeout_secs` setting actually applies. + /// + /// On error, the second tuple field is `RequestSent::No` if the + /// request never reached Apps Script (safe to retry on h1) or + /// `RequestSent::Maybe` if it may have been processed (replaying + /// risks duplicating side effects for non-idempotent methods). + /// `ensure_h2` returning None always reports `RequestSent::No`. + /// + /// Takes `payload` as `Bytes` so callers can clone (Arc bump, + /// not memcpy) when they want to retain a copy for h1 fallback. + async fn h2_relay_request( + &self, + path: &str, + payload: Bytes, + response_deadline: Duration, + ) -> Result<(u16, Vec<(String, String)>, Vec), (FronterError, RequestSent)> { + let (send, generation) = match self.ensure_h2().await { + Some(s) => s, + None => { + // ensure_h2 returning None covers: + // 1. force_http1 / sticky-disabled — never tried h2 + // this call. NOT a fallback, don't count. + // 2. open_h2 just failed / timed out / backoff active. + // We DID attempt h2 and lost it; count as fallback + // so the stat reflects reality. `ensure_h2` itself + // sets the backoff timestamp on failure. + if !self.h2_disabled.load(Ordering::Relaxed) { + self.h2_fallbacks.fetch_add(1, Ordering::Relaxed); + } + return Err(( + FronterError::Relay("h2 unavailable".into()), + RequestSent::No, + )); + } + }; + + self.run_h2_relay_with_send(send, generation, path, payload, response_deadline) + .await + } + + /// Inner h2 relay loop — split out so tests can inject a + /// `SendRequest` (from a local h2c test server) without going + /// through `ensure_h2`'s real-network handshake. + /// + /// Each h2_round_trip uses its own internal phase-split timeouts + /// (ready=5s constant, response=`response_deadline`). No outer + /// wrap is needed here — the inner timeouts are what poisons the + /// cell on stall. + async fn run_h2_relay_with_send( + &self, + send: h2::client::SendRequest, + generation: u64, + path: &str, + payload: Bytes, + response_deadline: Duration, + ) -> Result<(u16, Vec<(String, String)>, Vec), (FronterError, RequestSent)> { + let mut current_host = self.http_host.to_string(); + let mut current_path = path.to_string(); + + let res = self + .h2_round_trip( + send.clone(), + "POST", + ¤t_path, + ¤t_host, + payload, + Some("application/json"), + response_deadline, + ) + .await; + let (mut status, mut hdrs, mut body) = match res { + Ok(t) => t, + Err((e, sent)) => { + self.poison_h2_if_gen(generation).await; + self.h2_fallbacks.fetch_add(1, Ordering::Relaxed); + return Err((e, sent)); + } + }; + + // The initial POST already succeeded — the request reached + // Apps Script. From here on, redirect-follow failures are + // RequestSent::Maybe regardless of where they land in the + // chain, because the *original* Apps Script call may have + // already executed. + for _ in 0..5 { + if !matches!(status, 301 | 302 | 303 | 307 | 308) { + break; + } + let Some(loc) = header_get(&hdrs, "location") else { + break; + }; + let (rpath, rhost) = parse_redirect(&loc); + current_host = rhost.unwrap_or(current_host); + current_path = rpath; + let res = self + .h2_round_trip( + send.clone(), + "GET", + ¤t_path, + ¤t_host, + Bytes::new(), + None, + response_deadline, + ) + .await; + match res { + Ok((s, h, b)) => { + status = s; + hdrs = h; + body = b; + } + Err((e, _)) => { + self.poison_h2_if_gen(generation).await; + self.h2_fallbacks.fetch_add(1, Ordering::Relaxed); + return Err((e, RequestSent::Maybe)); + } + } + } + + self.h2_calls.fetch_add(1, Ordering::Relaxed); + Ok((status, hdrs, body)) + } + /// Relay an HTTP request through Apps Script. /// Returns a raw HTTP/1.1 response (status line + headers + body) suitable /// for writing back to the browser over an MITM'd TLS stream. @@ -847,6 +1668,22 @@ impl DomainFronter { ); return bytes; } + Err(e) if !e.is_retryable() => { + // The exit node may have already processed this + // request (h2 post-send failure on a POST etc.). + // Don't fall through to the direct path — that + // would re-send to the same destination via Apps + // Script and duplicate the side effect. + tracing::warn!( + "exit node failed for {} and request was already sent ({}); not falling back to direct Apps Script", + url, + e, + ); + self.relay_failures.fetch_add(1, Ordering::Relaxed); + let inner = e.into_inner(); + self.record_site(url, false, 0, t0.elapsed().as_nanos() as u64); + return error_response(502, &format!("Relay error: {}", inner)); + } Err(e) => { tracing::warn!( "exit node failed for {}: {} — falling back to direct Apps Script", @@ -1195,9 +2032,24 @@ impl DomainFronter { return self.do_relay_parallel(method, url, headers, body, fan).await; } - // Sequential path: one retry on connection failure. + // Sequential path: one retry on connection failure, *unless* + // the failure is `FronterError::NonRetryable` — that wrapper + // says "the request may have already reached the server, do + // not duplicate." Without this guard, an h2 post-send failure + // on a non-idempotent method (POST/PUT/PATCH/DELETE) that the + // h2 layer correctly refused to replay on h1 would be + // re-issued here anyway, defeating the safety policy. match self.do_relay_once(method, url, headers, body).await { Ok(v) => Ok(v), + Err(e) if !e.is_retryable() => { + tracing::warn!( + "relay attempt 1 failed and is non-retryable ({}); not duplicating {} {}", + e, + method, + url, + ); + Err(e.into_inner()) + } Err(e) => { tracing::debug!("relay attempt 1 failed: {}; retrying", e); self.do_relay_once(method, url, headers, body).await @@ -1255,9 +2107,102 @@ impl DomainFronter { headers: &[(String, String)], body: &[u8], ) -> Result, FronterError> { - let payload = self.build_payload_json(method, url, headers, body)?; + // Build once, wrap in Bytes (zero-copy move). h2 takes a clone + // (Arc bump, not memcpy); h1 fallback uses the same Bytes via + // Deref<&[u8]>. Saves a full payload allocation+copy per call + // — meaningful on range-parallel fan-out where N copies fire + // in parallel for one user-facing GET. + let payload: Bytes = Bytes::from(self.build_payload_json(method, url, headers, body)?); let path = format!("/macros/s/{}/exec", script_id); + // h2 fast path: one shared TCP/TLS connection multiplexes all + // streams. + // + // The h2 layer reports `RequestSent::No` when it can prove + // the request never reached Apps Script (ensure_h2 unavailable, + // ready/back-pressure timeout, send_request error). In that + // case we fall through to h1 unconditionally — there's no + // duplication risk. + // + // For `RequestSent::Maybe` (anything after send_request + // succeeded) we only fall through for HTTP-idempotent methods. + // POST / PUT / PATCH / DELETE get wrapped in + // `FronterError::NonRetryable` so `do_relay_with_retry`'s + // outer retry also skips replay — without that wrap, the + // outer retry would re-issue the request anyway and the + // safety policy would be illusory. + match self + .h2_relay_request( + &path, + payload.clone(), + Duration::from_secs(H2_RESPONSE_DEADLINE_DEFAULT_SECS), + ) + .await + { + Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => { + // Edge rejected the fronted h2 request before + // forwarding to Apps Script. Sticky-disable h2, + // log once, fall through to h1 — this request is + // safe to replay because it never reached Apps Script. + self.sticky_disable_h2_for_fronting_refusal( + status, + &format!("relay {} {}", method, url), + ) + .await; + // fall through to h1 + } + Ok((status, _hdrs, resp_body)) => { + if status != 200 { + let body_txt = String::from_utf8_lossy(&resp_body) + .chars() + .take(200) + .collect::(); + if should_blacklist(status, &body_txt) { + self.blacklist_script(&script_id, &format!("HTTP {}", status)); + } + return Err(FronterError::Relay(format!( + "Apps Script HTTP {}: {}", + status, body_txt + ))); + } + return parse_relay_json(&resp_body).map_err(|e| { + if let FronterError::Relay(ref msg) = e { + if looks_like_quota_error(msg) { + self.blacklist_script(&script_id, msg); + } + } + e + }); + } + Err((e, RequestSent::No)) => { + tracing::debug!("h2 pre-send failure: {} — falling back to h1", e); + } + Err((e, RequestSent::Maybe)) => { + if is_method_safe_for_fanout(method) { + tracing::debug!( + "h2 post-send failure for safe method {}: {} — falling back to h1", + method, + e + ); + } else { + tracing::warn!( + "h2 post-send failure for non-idempotent {} {}: {} — \ + marking non-retryable to prevent duplicating side effects", + method, + url, + e + ); + // NonRetryable wrapper bubbles all the way through + // do_relay_once_with → do_relay_with_retry, where + // the retry loop skips its second attempt. Without + // this wrap, returning a plain Err would let + // do_relay_with_retry re-issue the request via h1 + // (or a fresh h2 cell), defeating the safety policy. + return Err(FronterError::NonRetryable(Box::new(e))); + } + } + } + let mut entry = self.acquire().await?; let reuse_ok = { let write_res = async { @@ -1397,8 +2342,9 @@ impl DomainFronter { "Content-Type".to_string(), "application/json".to_string(), )]; - let outer_payload = - self.build_payload_json("POST", &exit_url, &outer_headers, &inner_json)?; + let outer_payload: Bytes = Bytes::from( + self.build_payload_json("POST", &exit_url, &outer_headers, &inner_json)?, + ); // Send the outer payload through the relay machinery and get back // Apps Script's response body (which is exit-node's JSON envelope). @@ -1465,11 +2411,70 @@ impl DomainFronter { /// has a different shape from Code.gs's raw-HTTP wrapping. async fn send_prebuilt_payload_through_relay( &self, - payload: Vec, + payload: Bytes, ) -> Result, FronterError> { let script_id = self.next_script_id(); let path = format!("/macros/s/{}/exec", script_id); + // h2 fast path. The exit-node outer call is always POST and + // carries the inner relay payload — replaying on h1 after the + // outer reached Apps Script duplicates the inner request to + // the exit node. Only fall back when h2 definitely never sent. + // Same default response deadline as the direct path; the + // exit-node leg ultimately exits via Apps Script too. + match self + .h2_relay_request( + &path, + payload.clone(), + Duration::from_secs(H2_RESPONSE_DEADLINE_DEFAULT_SECS), + ) + .await + { + Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => { + // Same fronting-refusal path as the direct relay. + // Safe to fall back: 421 means the edge rejected + // before invoking the exit node. + self.sticky_disable_h2_for_fronting_refusal( + status, + "exit-node outer call", + ) + .await; + // fall through to h1 + } + Ok((status, _hdrs, resp_body)) => { + if status != 200 { + let body_txt = String::from_utf8_lossy(&resp_body) + .chars() + .take(200) + .collect::(); + return Err(FronterError::Relay(format!( + "Apps Script HTTP {} (exit-node outer call): {}", + status, body_txt + ))); + } + return Ok(resp_body); + } + Err((e, RequestSent::No)) => { + tracing::debug!( + "h2 exit-node outer call pre-send failure: {} — falling back to h1", + e + ); + } + Err((e, RequestSent::Maybe)) => { + tracing::warn!( + "h2 exit-node outer call post-send failure: {} — \ + marking non-retryable to prevent duplicating the inner request", + e + ); + // NonRetryable propagates back to relay()'s exit-node + // match arm, which will *not* fall through to the + // direct Apps Script path (that fall-through would + // re-send the outer call and could also re-trigger + // the inner request to the destination). + return Err(FronterError::NonRetryable(Box::new(e))); + } + } + let mut entry = self.acquire().await?; let req_head = format!( "POST {path} HTTP/1.1\r\n\ @@ -1597,10 +2602,55 @@ impl DomainFronter { sid: Option<&str>, data: Option, ) -> Result { - let payload = self.build_tunnel_payload(op, host, port, sid, data)?; + let payload: Bytes = + Bytes::from(self.build_tunnel_payload(op, host, port, sid, data)?); let script_id = self.next_script_id(); let path = format!("/macros/s/{}/exec", script_id); + // h2 fast path. Tunnel ops are stateful — a `connect` may + // have opened an upstream socket; a `data` op may have + // forwarded bytes. Replaying on h1 after the op reached + // Apps Script can corrupt the tunnel session. Only fall back + // when h2 definitely never sent. + // Use the user-configured batch_timeout so Full-mode tuning + // (`request_timeout_secs`) is honored — a fixed cap would let + // legitimately slow batches incorrectly trip strike counters + // on healthy deployments at tunnel_client::fire_batch. + match self + .h2_relay_request(&path, payload.clone(), self.batch_timeout) + .await + { + Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => { + // Edge rejected the fronted h2 request. Safe to fall + // back to h1 — the tunnel op never executed because + // Apps Script never received the request. + self.sticky_disable_h2_for_fronting_refusal( + status, + &format!("tunnel op {}", op), + ) + .await; + // fall through to h1 + } + Ok((status, _hdrs, resp_body)) => { + return self.finalize_tunnel_response(&script_id, status, resp_body); + } + Err((e, RequestSent::No)) => { + tracing::debug!( + "h2 tunnel request pre-send failure: {} — falling back to h1", + e + ); + } + Err((e, RequestSent::Maybe)) => { + tracing::warn!( + "h2 tunnel request post-send failure (op={}): {} — \ + not replaying on h1 to avoid corrupting the tunnel session", + op, + e + ); + return Err(e); + } + } + let mut entry = self.acquire().await?; let req_head = format!( @@ -1648,42 +2698,55 @@ impl DomainFronter { resp_body = b; } + let resp = self.finalize_tunnel_response(&script_id, status, resp_body)?; + self.release(entry).await; + Ok(resp) + } + + /// Validate a tunnel-protocol response (status check + Apps-Script + /// HTML-prefix tolerance + JSON parse). Used by both the h2 and h1 + /// branches of `tunnel_request` so the parsing logic doesn't drift + /// across transports. + fn finalize_tunnel_response( + &self, + script_id: &str, + status: u16, + resp_body: Vec, + ) -> Result { if status != 200 { let body_txt = String::from_utf8_lossy(&resp_body) .chars() .take(200) .collect::(); if should_blacklist(status, &body_txt) { - self.blacklist_script(&script_id, &format!("HTTP {}", status)); + self.blacklist_script(script_id, &format!("HTTP {}", status)); } return Err(FronterError::Relay(format!( "tunnel HTTP {}: {}", status, body_txt ))); } - - // Parse tunnel response JSON let text = std::str::from_utf8(&resp_body) .map_err(|_| FronterError::BadResponse("non-utf8 tunnel response".into()))? .trim(); - - // Apps Script may prepend HTML; extract first {...} + // Apps Script may prepend HTML on cold-start or quota-exceeded + // pages; extract the first {...} block tolerantly so we don't + // bail on a recoverable warning frame. let json_str = if text.starts_with('{') { text } else { let start = text.find('{').ok_or_else(|| { - FronterError::BadResponse(format!("no json in tunnel response: {}", &text[..text.len().min(200)])) + FronterError::BadResponse(format!( + "no json in tunnel response: {}", + &text[..text.len().min(200)] + )) })?; let end = text.rfind('}').ok_or_else(|| { FronterError::BadResponse("no json end in tunnel response".into()) })?; &text[start..=end] }; - - let resp: TunnelResponse = serde_json::from_str(json_str)?; - - self.release(entry).await; - Ok(resp) + Ok(serde_json::from_str(json_str)?) } fn build_tunnel_payload( @@ -1742,10 +2805,48 @@ impl DomainFronter { if !self.disable_padding { add_random_pad(&mut map); } - let payload = serde_json::to_vec(&Value::Object(map))?; + let payload: Bytes = Bytes::from(serde_json::to_vec(&Value::Object(map))?); let path = format!("/macros/s/{}/exec", script_id); + // h2 fast path. A batch carries N stateful tunnel ops — each + // `data`/`udp_data`/`connect` may have already executed + // upstream when the response framing failed. Replaying the + // whole batch on h1 risks duplicating every op in it. Only + // fall back when h2 definitely never sent. Honors + // user-configured batch_timeout so a slow but legitimate + // batch isn't cut off at an arbitrary fixed cap. + match self + .h2_relay_request(&path, payload.clone(), self.batch_timeout) + .await + { + Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => { + // Edge rejected the batch before forwarding. Safe to + // fall back: no batched op reached Apps Script, so + // replaying via h1 won't double-fire any of them. + self.sticky_disable_h2_for_fronting_refusal(status, "tunnel batch") + .await; + // fall through to h1 + } + Ok((status, _hdrs, resp_body)) => { + return self.finalize_batch_response(script_id, status, resp_body); + } + Err((e, RequestSent::No)) => { + tracing::debug!( + "h2 batch request pre-send failure: {} — falling back to h1", + e + ); + } + Err((e, RequestSent::Maybe)) => { + tracing::warn!( + "h2 batch request post-send failure: {} — \ + not replaying on h1 to avoid duplicating batched ops", + e + ); + return Err(e); + } + } + let mut entry = self.acquire().await?; let req_head = format!( @@ -1782,41 +2883,94 @@ impl DomainFronter { status = s; resp_headers = h; resp_body = b; } + // Route through the same `finalize_batch_response` helper the + // h2 path uses. This keeps the redacted-logging policy in + // exactly one place — the previous inline parse here logged + // raw payload at debug AND error level, which leaked the + // base64-encoded tunneled bytes (TCP/UDP packets, possibly + // app data or credentials) into bug-report logs. Both + // transports now emit only `status=` + `body_len=`, with the + // raw body gated behind RUST_LOG=trace. + let resp = self.finalize_batch_response(script_id, status, resp_body)?; + self.release(entry).await; + Ok(resp) + } + + /// Parse a batch-tunnel response body once we already have it in + /// hand — used by the h2 fast path in `tunnel_batch_request_to`, + /// where the response is read off a multiplexed stream rather than + /// drained from a checked-out socket. Mirrors the validate-and-parse + /// tail of the h1 path (status check + JSON extraction + + /// quota-blacklist book-keeping). + fn finalize_batch_response( + &self, + script_id: &str, + status: u16, + resp_body: Vec, + ) -> Result { if status != 200 { - let body_txt = String::from_utf8_lossy(&resp_body).chars().take(200).collect::(); + let body_txt = String::from_utf8_lossy(&resp_body) + .chars() + .take(200) + .collect::(); if should_blacklist(status, &body_txt) { - self.blacklist_script(&script_id, &format!("HTTP {}", status)); + self.blacklist_script(script_id, &format!("HTTP {}", status)); } - return Err(FronterError::Relay(format!("batch tunnel HTTP {}: {}", status, body_txt))); + return Err(FronterError::Relay(format!( + "batch tunnel HTTP {}: {}", + status, body_txt + ))); } - let text = std::str::from_utf8(&resp_body) .map_err(|_| FronterError::BadResponse("non-utf8 batch response".into()))? .trim(); - let json_str = if text.starts_with('{') { text } else { let start = text.find('{').ok_or_else(|| { - FronterError::BadResponse(format!("no json in batch response: {}", &text[..text.len().min(200)])) + FronterError::BadResponse(format!( + "no json in batch response: {}", + &text[..text.len().min(200)] + )) })?; let end = text.rfind('}').ok_or_else(|| { FronterError::BadResponse("no json end in batch response".into()) })?; &text[start..=end] }; - - tracing::debug!("batch response body: {}", &json_str[..json_str.len().min(500)]); - - let resp: BatchTunnelResponse = match serde_json::from_str(json_str) { - Ok(v) => v, + // Don't log payload content. Batch responses carry base64-encoded + // tunneled bytes (TCP/UDP packets, possibly app data, possibly + // credentials), and even at debug level a leaked log line ends + // up in user-shared bug reports. Status + length are sufficient + // for diagnosis; full body is available behind RUST_LOG=trace. + tracing::debug!( + "batch response: status={} body_len={}", + status, + json_str.len() + ); + tracing::trace!( + "batch response body (trace only): {}", + &json_str[..json_str.len().min(500)] + ); + match serde_json::from_str(json_str) { + Ok(v) => Ok(v), Err(e) => { - tracing::error!("batch JSON parse error: {} — body: {}", e, &json_str[..json_str.len().min(300)]); - return Err(FronterError::Json(e)); + // Same redaction policy on the error path. Length and + // the serde error message are enough to locate the + // parse failure (offset / unexpected-token info comes + // from `e` itself); the raw body is trace-only. + tracing::error!( + "batch JSON parse error: {} (body_len={})", + e, + json_str.len() + ); + tracing::trace!( + "batch parse-error body (trace only): {}", + &json_str[..json_str.len().min(300)] + ); + Err(FronterError::Json(e)) } - }; - self.release(entry).await; - Ok(resp) + } } } @@ -2730,6 +3884,32 @@ fn is_method_safe_for_fanout(method: &str) -> bool { matches!(method.to_ascii_uppercase().as_str(), "GET" | "HEAD" | "OPTIONS") } +/// Recognize HTTP statuses from the h2 path that mean "this edge +/// won't accept your fronted h2 request, but might accept the same +/// request over h1." Used to trigger an automatic sticky-disable of +/// the h2 fast path + h1 fallback. +/// +/// 421 (Misdirected Request) is the spec signal: per RFC 7540 +/// §9.1.2, the server returns it when the connection's authority is +/// not appropriate for the request URI. With domain fronting that +/// means the edge enforced "TLS SNI must match :authority" — true +/// on h2 (the server sees both pseudo-headers in cleartext) but +/// historically lenient on h1 (the encrypted Host header is what +/// the bypass relies on). Treating 421 as h2-fallback rather than +/// "Apps Script error" prevents h2 default-on from breaking +/// previously-working h1 deployments. +/// +/// Other edge-level rejects (403, etc.) are ambiguous — could be a +/// real Apps Script geoblock or a real upstream — so we don't +/// blanket-treat them. +/// +/// The h2 layer treats this as a "request not sent upstream" +/// outcome (the edge rejected before forwarding to Apps Script), +/// so falling back to h1 is safe with no duplication risk. +fn is_h2_fronting_refusal_status(status: u16) -> bool { + status == 421 +} + /// Parse the JSON envelope from Apps Script and build a raw HTTP response. fn parse_relay_json(body: &[u8]) -> Result, FronterError> { let text = std::str::from_utf8(body) @@ -2946,6 +4126,22 @@ pub struct StatsSnapshot { /// Seconds until the next 00:00 PT rollover. Convenient for the UI /// to render "Resets in Xh Ym" without importing time libraries. pub today_reset_secs: u64, + /// Calls served by the HTTP/2 multiplexed transport, across all + /// entry points (Apps-Script direct, exit-node outer call, + /// full-mode tunnel single op, full-mode tunnel batch). + /// + /// Not comparable to `relay_calls` — that counter only sees the + /// Apps-Script-direct path. To gauge h2 health, compute + /// `h2_calls / (h2_calls + h2_fallbacks)`. + pub h2_calls: u64, + /// Calls that attempted h2 but had to fall back to h1 (per-call + /// failures, open timeout, backoff, sticky ALPN refusal). Same + /// all-entry-points scope as `h2_calls`. + pub h2_fallbacks: u64, + /// True when h2 is permanently off for this fronter (config kill + /// switch set, or peer refused h2 during ALPN). All traffic on the + /// h1 path. + pub h2_disabled: bool, } impl StatsSnapshot { @@ -2959,8 +4155,27 @@ impl StatsSnapshot { } pub fn fmt_line(&self) -> String { + // h2 segment is the success ratio across all transports + // (h2_calls + h2_fallbacks). Showing "X/Y" against relay_calls + // would mislead — relay_calls only counts the Apps-Script + // direct path, while h2_calls also includes exit-node and + // tunnel paths that bypass relay_uncoalesced. + let h2_seg = if self.h2_disabled { + " h2=off".to_string() + } else { + let total = self.h2_calls + self.h2_fallbacks; + if total == 0 { + String::new() + } else { + let pct = (self.h2_calls as f64 / total as f64) * 100.0; + format!( + " h2-success={}/{} ({:.0}%)", + self.h2_calls, total, pct + ) + } + }; format!( - "stats: relay={} ({}KB) failures={} coalesced={} cache={}/{} ({:.0}% hit, {}KB) scripts={}/{} active", + "stats: relay={} ({}KB) failures={} coalesced={} cache={}/{} ({:.0}% hit, {}KB) scripts={}/{} active{}", self.relay_calls, self.bytes_relayed / 1024, self.relay_failures, @@ -2971,6 +4186,7 @@ impl StatsSnapshot { self.cache_bytes / 1024, self.total_scripts - self.blacklisted_scripts, self.total_scripts, + h2_seg, ) } @@ -2983,7 +4199,7 @@ impl StatsSnapshot { s.replace('\\', "\\\\").replace('"', "\\\"") } format!( - r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{}}}"#, + r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{},"h2_calls":{},"h2_fallbacks":{},"h2_disabled":{}}}"#, self.relay_calls, self.relay_failures, self.coalesced, @@ -2997,6 +4213,9 @@ impl StatsSnapshot { self.today_bytes, esc(&self.today_key), self.today_reset_secs, + self.h2_calls, + self.h2_fallbacks, + self.h2_disabled, ) } } @@ -3809,4 +5028,662 @@ hello"; other => panic!("unexpected error: {}", other), } } + + // ─── h2 transport ────────────────────────────────────────────────── + + /// Generous response-phase deadline used by transport tests. We + /// pick something well above any expected latency on a localhost + /// h2c hop so test flakiness can't be confused with a real timeout + /// firing. Tests that *want* to observe a timeout pick a small + /// value explicitly. + const TEST_RESPONSE_DEADLINE: Duration = Duration::from_secs(10); + + /// Build a minimal valid `DomainFronter` for unit tests. The + /// `connect_host` is unused unless a test actually opens a socket; + /// `verify_ssl=true` and a placeholder `google_ip` are fine because + /// `DomainFronter::new` doesn't touch the network. + fn fronter_for_test(force_http1: bool) -> DomainFronter { + let json = format!( + r#"{{ + "mode": "apps_script", + "google_ip": "127.0.0.1", + "front_domain": "www.google.com", + "script_id": "TEST", + "auth_key": "test_auth_key", + "listen_host": "127.0.0.1", + "listen_port": 8085, + "log_level": "info", + "verify_ssl": true, + "force_http1": {} + }}"#, + force_http1 + ); + let cfg: Config = serde_json::from_str(&json).unwrap(); + DomainFronter::new(&cfg).expect("test fronter must construct") + } + + #[tokio::test(flavor = "current_thread")] + async fn force_http1_disables_h2_at_construction() { + // The kill switch: force_http1=true must mark the fronter as + // h2-disabled before the first call so ensure_h2 short-circuits + // without ever trying ALPN. + let fronter = fronter_for_test(true); + assert!( + fronter.h2_disabled.load(Ordering::Relaxed), + "force_http1=true must set h2_disabled at construction" + ); + assert!( + fronter.ensure_h2().await.is_none(), + "ensure_h2 must return None when h2 is disabled" + ); + } + + #[tokio::test(flavor = "current_thread")] + async fn force_http1_false_leaves_h2_enabled() { + let fronter = fronter_for_test(false); + assert!( + !fronter.h2_disabled.load(Ordering::Relaxed), + "default must leave h2 enabled" + ); + } + + #[tokio::test(flavor = "current_thread")] + async fn poison_h2_if_gen_is_noop_when_cell_is_empty() { + // Defensive: we call poison on every per-request error; cell + // may already be None due to a concurrent poison. Must not + // panic or wedge. + let fronter = fronter_for_test(false); + fronter.poison_h2_if_gen(0).await; + let cell = fronter.h2_cell.lock().await; + assert!(cell.is_none()); + } + + #[tokio::test(flavor = "current_thread")] + async fn poison_h2_if_gen_only_clears_matching_generation() { + // Race protection: task A holds gen=1 SendRequest, gen=1 dies, + // task B reopens → cell now gen=2 (healthy). Task A's + // poison(1) MUST NOT clear gen=2. Without generation matching + // the previous code unconditionally cleared the cell, causing + // connection churn during recovery. + let (addr, server_handle) = spawn_h2c_server(|_req| { + let resp = http::Response::builder().status(200).body(()).unwrap(); + (resp, Vec::new()) + }) + .await; + let send_v2 = h2c_client(addr).await; + + let fronter = fronter_for_test(false); + // Seed the cell with gen=2 (simulating "task B just reopened"). + { + let mut cell = fronter.h2_cell.lock().await; + *cell = Some(H2Cell { + send: send_v2.clone(), + created: Instant::now(), + generation: 2, + }); + } + // Task A poisons with stale gen=1. + fronter.poison_h2_if_gen(1).await; + // gen=2 cell must survive. + let cell = fronter.h2_cell.lock().await; + assert!( + cell.is_some(), + "poison_h2_if_gen(1) must not clear gen=2 cell" + ); + assert_eq!(cell.as_ref().unwrap().generation, 2); + drop(cell); + + // And matching gen=2 actually does clear. + fronter.poison_h2_if_gen(2).await; + let cell = fronter.h2_cell.lock().await; + assert!(cell.is_none(), "poison_h2_if_gen(2) must clear gen=2 cell"); + + server_handle.abort(); + } + + #[tokio::test(flavor = "current_thread")] + async fn ensure_h2_skips_reopen_during_failure_backoff() { + // After an open failure, ensure_h2 must return None for at + // least H2_OPEN_FAILURE_BACKOFF_SECS without attempting a + // new handshake — otherwise concurrent callers each pay the + // full handshake-timeout cost during an outage. + let fronter = fronter_for_test(false); + // Simulate a recent open failure. + *fronter.h2_open_failed_at.lock().await = Some(Instant::now()); + + // ensure_h2 must return None immediately, without trying open_h2 + // (open_h2 would try TCP-connect to 127.0.0.1:443 which would + // either fail slowly or succeed against an unrelated service — + // either way, this test would observably take longer if backoff + // wasn't honored). + let t0 = Instant::now(); + let result = fronter.ensure_h2().await; + assert!(result.is_none(), "must return None during backoff"); + assert!( + t0.elapsed() < Duration::from_millis(100), + "must return immediately without open attempt; took {:?}", + t0.elapsed() + ); + } + + /// Spawn a minimal local h2c server (plaintext h2, no TLS) on a + /// random port. The handler closure builds the response from the + /// incoming request — used by `h2_round_trip_*` tests below. + /// Returns the bound address and the JoinHandle so the test can + /// `abort()` the server when done. + async fn spawn_h2c_server( + handler: F, + ) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) + where + F: Fn(http::Request) -> (http::Response<()>, Vec) + + Send + + Sync + + 'static, + { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let handler = Arc::new(handler); + let handle = tokio::spawn(async move { + // Single-connection server is enough for these tests. + let (sock, _) = listener.accept().await.unwrap(); + let mut connection = h2::server::handshake(sock).await.unwrap(); + while let Some(result) = connection.accept().await { + let (req, mut respond) = match result { + Ok(v) => v, + Err(_) => break, + }; + let (resp, body) = handler(req); + let has_body = !body.is_empty(); + let mut send = respond + .send_response(resp, !has_body) + .expect("send_response in test"); + if has_body { + send.send_data(Bytes::from(body), true) + .expect("send_data in test"); + } + } + }); + (addr, handle) + } + + /// Variant that gives the handler async access to the request body + /// before producing the response. Needed to assert what the client + /// actually sent (rather than relying on the request's existence). + async fn spawn_h2c_echo_server() -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let handle = tokio::spawn(async move { + let (sock, _) = listener.accept().await.unwrap(); + let mut connection = h2::server::handshake(sock).await.unwrap(); + while let Some(result) = connection.accept().await { + let (req, mut respond) = match result { + Ok(v) => v, + Err(_) => break, + }; + let mut body = req.into_body(); + let mut received = Vec::new(); + while let Some(chunk) = body.data().await { + let chunk = match chunk { + Ok(c) => c, + Err(_) => break, + }; + let n = chunk.len(); + received.extend_from_slice(&chunk); + let _ = body.flow_control().release_capacity(n); + } + let resp = http::Response::builder().status(200).body(()).unwrap(); + let mut send = respond.send_response(resp, false).unwrap(); + send.send_data(Bytes::from(received), true).unwrap(); + } + }); + (addr, handle) + } + + /// Open a plaintext h2c connection to `addr` and return a usable + /// `SendRequest`. The connection driver is spawned in the + /// background and lives for the test's scope. + async fn h2c_client(addr: std::net::SocketAddr) -> h2::client::SendRequest { + let stream = TcpStream::connect(addr).await.unwrap(); + let (send, conn) = h2::client::handshake(stream).await.unwrap(); + tokio::spawn(async move { + let _ = conn.await; + }); + send + } + + #[tokio::test(flavor = "current_thread")] + async fn h2_round_trip_actually_transmits_post_body() { + // Server reads the request body and echoes it. We assert the + // server received the exact bytes we passed — proves the + // send_data path works, not just that 200 came back. + let (addr, server_handle) = spawn_h2c_echo_server().await; + + let send = h2c_client(addr).await; + let fronter = fronter_for_test(false); + let req_body = b"the-actual-payload-sent-by-h2_round_trip"; + let (status, _hdrs, echoed) = fronter + .h2_round_trip( + send, + "POST", + "/echo", + "127.0.0.1", + Bytes::from_static(req_body), + Some("application/json"), + TEST_RESPONSE_DEADLINE, + ) + .await + .expect("h2 round trip should succeed"); + assert_eq!(status, 200); + assert_eq!( + echoed, req_body, + "server must have received the exact bytes we sent" + ); + server_handle.abort(); + } + + #[tokio::test(flavor = "current_thread")] + async fn h2_round_trip_decodes_gzip_responses() { + // Mirror the h1 read_http_response behavior: gzip-encoded + // bodies must be transparently decompressed before we hand + // them back, so downstream JSON parsers see plain bytes + // regardless of transport. + use flate2::write::GzEncoder; + use flate2::Compression; + use std::io::Write; + + let plain = b"{\"hello\":\"world\"}"; + let mut enc = GzEncoder::new(Vec::new(), Compression::default()); + enc.write_all(plain).unwrap(); + let gzipped = enc.finish().unwrap(); + let gzipped_arc = Arc::new(gzipped); + + let g = gzipped_arc.clone(); + let (addr, server_handle) = spawn_h2c_server(move |_req| { + let resp = http::Response::builder() + .status(200) + .header("content-encoding", "gzip") + .body(()) + .unwrap(); + (resp, (*g).clone()) + }) + .await; + + let send = h2c_client(addr).await; + let fronter = fronter_for_test(false); + let (status, _hdrs, body) = fronter + .h2_round_trip(send, "GET", "/", "127.0.0.1", Bytes::new(), None, TEST_RESPONSE_DEADLINE) + .await + .unwrap(); + assert_eq!(status, 200); + assert_eq!(body, plain, "gzip body must be decoded transparently"); + server_handle.abort(); + } + + #[tokio::test(flavor = "current_thread")] + async fn run_h2_relay_with_send_follows_redirect_chain() { + // Now exercises run_h2_relay_with_send (the testable inner + // of h2_relay_request) so the production redirect loop — + // including timeout, RequestSent classification, and per-hop + // poison-by-gen — is actually under test, not a hand-rolled + // duplicate. + let counter = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let c = counter.clone(); + let (addr, server_handle) = spawn_h2c_server(move |req| { + let n = c.fetch_add(1, Ordering::Relaxed); + if n == 0 { + let resp = http::Response::builder() + .status(302) + .header("location", "/next") + .body(()) + .unwrap(); + (resp, Vec::new()) + } else { + assert_eq!(req.uri().path(), "/next", "second hop must follow Location"); + let resp = http::Response::builder().status(200).body(()).unwrap(); + (resp, b"final".to_vec()) + } + }) + .await; + + let send = h2c_client(addr).await; + let fronter = fronter_for_test(false); + + let (status, _hdrs, body) = fronter + .run_h2_relay_with_send( + send, + /* generation */ 1, + "/start", + Bytes::new(), + TEST_RESPONSE_DEADLINE, + ) + .await + .expect("h2 relay should follow redirect to 200"); + assert_eq!(status, 200); + assert_eq!(body, b"final"); + // Successful round-trip must increment h2_calls. + assert_eq!(fronter.h2_calls.load(Ordering::Relaxed), 1); + assert_eq!(fronter.h2_fallbacks.load(Ordering::Relaxed), 0); + server_handle.abort(); + } + + #[tokio::test(flavor = "current_thread")] + async fn run_h2_relay_with_send_reports_request_sent_no_on_dead_connection() { + // Set up an h2c client whose connection is severed before we + // call run_h2_relay_with_send. The first `send.ready().await` + // inside h2_round_trip should fail — RequestSent::No is the + // correct classification (stream never opened on the wire). + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + // Accept the connection, do the h2 handshake, then drop. + // After drop the client's SendRequest will fail at ready(). + let (sock, _) = listener.accept().await.unwrap(); + let _connection = h2::server::handshake(sock).await.unwrap(); + // Hold briefly so client can complete handshake, then drop. + tokio::time::sleep(Duration::from_millis(50)).await; + }); + + let send = h2c_client(addr).await; + // Wait for server to drop. + server_task.await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + + let fronter = fronter_for_test(false); + let result = fronter + .run_h2_relay_with_send( + send, + 1, + "/x", + Bytes::from_static(b"some-body"), + TEST_RESPONSE_DEADLINE, + ) + .await; + match result { + Err((_, RequestSent::No)) => {} // expected + Err((e, RequestSent::Maybe)) => { + panic!("dead-conn failure classified as Maybe (unsafe to retry): {}", e) + } + Ok(_) => panic!("expected error against dropped server"), + } + // Failure must increment h2_fallbacks counter. + assert_eq!(fronter.h2_fallbacks.load(Ordering::Relaxed), 1); + assert_eq!(fronter.h2_calls.load(Ordering::Relaxed), 0); + } + + #[tokio::test(flavor = "current_thread")] + async fn run_h2_relay_with_send_reports_request_sent_maybe_on_post_send_reset() { + // Server accepts headers (so the request reaches it) and then + // resets the stream. The client sees a stream error AFTER + // send_request returned Ok. RequestSent::Maybe is the only + // safe classification — Apps Script may have started executing. + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + let (sock, _) = listener.accept().await.unwrap(); + let mut connection = h2::server::handshake(sock).await.unwrap(); + if let Some(Ok((_req, mut respond))) = connection.accept().await { + // Reset the stream after receiving headers — simulates + // the server starting to process and then bailing + // (matches the "Apps Script started UrlFetchApp then + // failed" scenario). + respond.send_reset(h2::Reason::INTERNAL_ERROR); + } + // Keep the connection alive briefly so the client sees the + // RST_STREAM rather than a connection-level close. + tokio::time::sleep(Duration::from_millis(100)).await; + }); + + let send = h2c_client(addr).await; + let fronter = fronter_for_test(false); + let result = fronter + .run_h2_relay_with_send( + send, + 1, + "/x", + Bytes::from_static(b"body"), + TEST_RESPONSE_DEADLINE, + ) + .await; + match result { + Err((_, RequestSent::Maybe)) => {} // expected + Err((e, RequestSent::No)) => panic!( + "post-send RST classified as No — would let caller \ + unsafely replay non-idempotent request: {}", + e + ), + Ok(_) => panic!("expected error against RST_STREAM"), + } + + server_task.await.unwrap(); + } + + // ─── NonRetryable wrapper + retry/fallback policy ──────────────────── + + #[test] + fn nonretryable_wrapper_is_not_retryable_other_variants_are() { + // Surfaces the contract that do_relay_with_retry and the + // exit-node fallback rely on. If this ever flips, those + // sites would silently start re-issuing post-send failures. + let plain = FronterError::Relay("transient".into()); + assert!(plain.is_retryable(), "plain Relay error must be retryable"); + + let plain2 = FronterError::Timeout; + assert!(plain2.is_retryable(), "Timeout must be retryable"); + + let wrapped = FronterError::NonRetryable(Box::new(FronterError::Relay("post-send".into()))); + assert!(!wrapped.is_retryable(), "NonRetryable must not be retryable"); + + // Display must be transparent so log lines look identical. + let inner_msg = "h2 response: stream RST".to_string(); + let inner = FronterError::Relay(inner_msg.clone()); + let wrapped = FronterError::NonRetryable(Box::new(inner)); + let displayed = wrapped.to_string(); + assert!( + displayed.contains(&inner_msg), + "transparent Display should surface inner: got {}", + displayed + ); + + // into_inner unwraps once. + let inner_again = wrapped.into_inner(); + assert!(matches!(inner_again, FronterError::Relay(_))); + assert!(inner_again.is_retryable(), "unwrapped error is retryable"); + } + + // Note on test coverage gap: we don't have a deterministic test + // that the ready/back-pressure phase's timeout reports + // `RequestSent::No`. h2 client enforces remote + // `MAX_CONCURRENT_STREAMS` at `send_request` time rather than at + // `ready` time, so a "saturate the slots, expect ready to block" + // setup actually races down the response-phase path instead. + // The ready-arm code in `h2_round_trip` is small (single match + // arm with `RequestSent::No` literally written next to the + // timeout error) and covered by review. Other safety properties + // (post-send Maybe via stream RST, pre-send No via dead conn, + // NonRetryable wrap propagation) are covered by the tests above + // and below. + + #[tokio::test(flavor = "current_thread")] + async fn run_h2_relay_with_send_does_not_wrap_pre_send_in_nonretryable() { + // Regression guard: the NonRetryable wrap is the *call site's* + // job (do_relay_once_with applies it for unsafe methods only). + // run_h2_relay_with_send returns the raw RequestSent::No so + // the call site can decide. If h2_relay_request started + // wrapping unconditionally, even safe-method requests would + // become non-retryable on transient pre-send failures. + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server_task = tokio::spawn(async move { + let (sock, _) = listener.accept().await.unwrap(); + let _connection = h2::server::handshake(sock).await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + }); + let send = h2c_client(addr).await; + server_task.await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + + let fronter = fronter_for_test(false); + let result = fronter + .run_h2_relay_with_send( + send, + 1, + "/x", + Bytes::from_static(b"x"), + TEST_RESPONSE_DEADLINE, + ) + .await; + match result { + Err((e, RequestSent::No)) => { + assert!( + e.is_retryable(), + "pre-send error must be raw FronterError, not pre-wrapped NonRetryable; got {:?}", + e + ); + } + other => panic!("expected (Err, RequestSent::No); got {:?}", other), + } + } + + #[tokio::test(flavor = "current_thread")] + async fn sticky_disable_h2_for_fronting_refusal_flips_disabled_and_clears_cell() { + // Verify the helper that runs from each call site's 421 arm: + // sets h2_disabled, clears the cell, rebalances counters + // (h2_calls -=1 since the round-trip already counted; h2_fallbacks +=1). + // Tests the helper directly so we don't depend on a real h2 + // server returning 421 — call sites already exercise the + // status-match wiring through code review. + let (addr, server_handle) = spawn_h2c_server(|_req| { + let resp = http::Response::builder().status(200).body(()).unwrap(); + (resp, Vec::new()) + }) + .await; + let send = h2c_client(addr).await; + let fronter = fronter_for_test(false); + // Seed the cell so we can verify it gets cleared. + { + let mut cell = fronter.h2_cell.lock().await; + *cell = Some(H2Cell { + send: send.clone(), + created: Instant::now(), + generation: 7, + }); + } + // Pretend a round-trip just incremented h2_calls (which is + // what run_h2_relay_with_send does on Ok before the call site + // sees the 421 status). + fronter.h2_calls.fetch_add(1, Ordering::Relaxed); + + fronter + .sticky_disable_h2_for_fronting_refusal(421, "test context") + .await; + + assert!(fronter.h2_disabled.load(Ordering::Relaxed), "must sticky-disable"); + let cell = fronter.h2_cell.lock().await; + assert!(cell.is_none(), "cell must be cleared"); + assert_eq!( + fronter.h2_calls.load(Ordering::Relaxed), + 0, + "the h2_calls increment from the failed round-trip must be reversed" + ); + assert_eq!( + fronter.h2_fallbacks.load(Ordering::Relaxed), + 1, + "must count as a fallback" + ); + drop(cell); + + // Subsequent ensure_h2 must short-circuit to None without + // attempting to open. + let t0 = Instant::now(); + assert!(fronter.ensure_h2().await.is_none()); + assert!( + t0.elapsed() < Duration::from_millis(100), + "sticky-disabled ensure_h2 must return immediately" + ); + + // Calling the helper a second time must not log again or + // double-count fallbacks beyond +1 per call. + fronter + .sticky_disable_h2_for_fronting_refusal(421, "test context") + .await; + // h2_calls would underflow without the saturating guard; assert + // it stays at 0. + assert_eq!(fronter.h2_calls.load(Ordering::Relaxed), 0); + // h2_fallbacks goes up unconditionally (this is "another + // attempt that ended up on h1") — that's fine. + assert_eq!(fronter.h2_fallbacks.load(Ordering::Relaxed), 2); + + server_handle.abort(); + } + + #[test] + fn is_h2_fronting_refusal_status_only_matches_421() { + // Guard against the helper accidentally matching ambiguous + // edge statuses (403 could be a real Apps Script geoblock, + // 4xx generally is not a "this is h2's fault" signal). + assert!(is_h2_fronting_refusal_status(421)); + for s in [200, 301, 400, 403, 404, 429, 500, 502, 503] { + assert!( + !is_h2_fronting_refusal_status(s), + "status {} must NOT trigger sticky h2 disable", + s + ); + } + } + + #[tokio::test(flavor = "current_thread")] + async fn h2_handshake_post_tls_returns_alpn_refused_when_peer_picks_h1() { + // Verify the OpenH2Error::AlpnRefused path: if the TLS layer + // negotiated http/1.1 (not h2), the post-TLS helper must + // return the typed sentinel that ensure_h2 uses to sticky- + // disable. We construct a fake TlsStream by short-circuiting + // through a real local TLS server that only advertises h1. + // + // This needs a real TLS handshake (rustls + a self-signed + // cert), so we set up the smallest possible test server with + // ALPN forced to ["http/1.1"]. + let cert = rcgen::generate_simple_self_signed(vec!["127.0.0.1".to_string()]).unwrap(); + let cert_der = rustls::pki_types::CertificateDer::from(cert.cert.der().to_vec()); + let key_der = rustls::pki_types::PrivateKeyDer::Pkcs8( + rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()), + ); + + let mut server_cfg = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(vec![cert_der], key_der) + .unwrap(); + server_cfg.alpn_protocols = vec![b"http/1.1".to_vec()]; + let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_cfg)); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { + let (sock, _) = listener.accept().await.unwrap(); + // Drive the handshake; the test only needs the negotiation + // to complete with ALPN=h1. After that we can drop. + let _tls = acceptor.accept(sock).await.unwrap(); + tokio::time::sleep(Duration::from_millis(50)).await; + }); + + // Client side: open TLS with ALPN advertising h2 + h1.1; the + // server picks h1 → alpn_protocol() returns "http/1.1" not "h2". + let mut client_cfg = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(NoVerify)) + .with_no_client_auth(); + client_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + let connector = tokio_rustls::TlsConnector::from(Arc::new(client_cfg)); + + let tcp = TcpStream::connect(addr).await.unwrap(); + let name = rustls::pki_types::ServerName::try_from("127.0.0.1").unwrap(); + let tls = connector.connect(name, tcp).await.unwrap(); + + let result = DomainFronter::h2_handshake_post_tls(tls).await; + match result { + Err(OpenH2Error::AlpnRefused) => {} // expected + Err(other) => panic!("expected AlpnRefused, got {:?}", other), + Ok(_) => panic!("expected AlpnRefused, got Ok"), + } + server.await.unwrap(); + } } diff --git a/src/proxy_server.rs b/src/proxy_server.rs index a1be2780..0d29082f 100644 --- a/src/proxy_server.rs +++ b/src/proxy_server.rs @@ -587,7 +587,7 @@ impl ProxyServer { // accept loops. let keepalive_task = if let Some(keepalive_fronter) = self.fronter.clone() { tokio::spawn(async move { - keepalive_fronter.run_h1_keepalive().await; + keepalive_fronter.run_keepalive().await; }) } else { tokio::spawn(async move { std::future::pending::<()>().await })