From 8d8f11e15a506c02103fa9d4a803d7cc755b8271 Mon Sep 17 00:00:00 2001
From: dazzling-no-more <278675588+dazzling-no-more@users.noreply.github.com>
Date: Wed, 6 May 2026 10:04:25 +0400
Subject: [PATCH] feat: HTTP/2 multiplexing on relay leg with idempotency-safe
h1 fallback
---
.../java/com/therealaleph/mhrv/ConfigStore.kt | 11 +
.../main/java/com/therealaleph/mhrv/Native.kt | 18 +-
.../com/therealaleph/mhrv/ui/HomeScreen.kt | 15 +-
.../app/src/main/res/values-fa/strings.xml | 2 +-
android/app/src/main/res/values/strings.xml | 5 +-
src/bin/ui.rs | 15 +
src/config.rs | 45 +
src/domain_fronter.rs | 2013 ++++++++++++++++-
src/proxy_server.rs | 2 +-
9 files changed, 2047 insertions(+), 79 deletions(-)
diff --git a/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt b/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt
index 3c0566e1..7457083d 100644
--- a/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt
+++ b/android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt
@@ -96,6 +96,14 @@ data class MhrvConfig(
val verifySsl: Boolean = true,
val logLevel: String = "info",
val parallelRelay: Int = 1,
+ /**
+ * Disable the HTTP/2 multiplexing on the Apps Script relay leg.
+ * Default false (h2 active); flip to true to force the legacy
+ * HTTP/1.1 keep-alive pool. Round-tripped from config.json so a
+ * hand-edited kill switch survives a save round trip from the
+ * Android UI. See `src/config.rs` `force_http1`.
+ */
+ val forceHttp1: Boolean = false,
val coalesceStepMs: Int = 10,
val coalesceMaxMs: Int = 1000,
val upstreamSocks5: String = "",
@@ -217,6 +225,7 @@ data class MhrvConfig(
put("verify_ssl", verifySsl)
put("log_level", logLevel)
put("parallel_relay", parallelRelay)
+ if (forceHttp1) put("force_http1", true)
if (coalesceStepMs != 10) put("coalesce_step_ms", coalesceStepMs)
if (coalesceMaxMs != 1000) put("coalesce_max_ms", coalesceMaxMs)
if (upstreamSocks5.isNotBlank()) {
@@ -328,6 +337,7 @@ object ConfigStore {
if (cfg.verifySsl != defaults.verifySsl) obj.put("verify_ssl", cfg.verifySsl)
if (cfg.logLevel != defaults.logLevel) obj.put("log_level", cfg.logLevel)
if (cfg.parallelRelay != defaults.parallelRelay) obj.put("parallel_relay", cfg.parallelRelay)
+ if (cfg.forceHttp1 != defaults.forceHttp1) obj.put("force_http1", cfg.forceHttp1)
if (cfg.coalesceStepMs != defaults.coalesceStepMs) obj.put("coalesce_step_ms", cfg.coalesceStepMs)
if (cfg.coalesceMaxMs != defaults.coalesceMaxMs) obj.put("coalesce_max_ms", cfg.coalesceMaxMs)
if (cfg.upstreamSocks5.isNotBlank()) obj.put("upstream_socks5", cfg.upstreamSocks5)
@@ -431,6 +441,7 @@ object ConfigStore {
verifySsl = obj.optBoolean("verify_ssl", true),
logLevel = obj.optString("log_level", "info"),
parallelRelay = obj.optInt("parallel_relay", 1),
+ forceHttp1 = obj.optBoolean("force_http1", false),
coalesceStepMs = obj.optInt("coalesce_step_ms", 10),
coalesceMaxMs = obj.optInt("coalesce_max_ms", 1000),
upstreamSocks5 = obj.optString("upstream_socks5", ""),
diff --git a/android/app/src/main/java/com/therealaleph/mhrv/Native.kt b/android/app/src/main/java/com/therealaleph/mhrv/Native.kt
index 51b37f8c..fbf44dc5 100644
--- a/android/app/src/main/java/com/therealaleph/mhrv/Native.kt
+++ b/android/app/src/main/java/com/therealaleph/mhrv/Native.kt
@@ -89,8 +89,22 @@ object Native {
* relay_calls, relay_failures, coalesced, bytes_relayed,
* cache_hits, cache_misses, cache_bytes,
* blacklisted_scripts, total_scripts,
- * today_calls, today_bytes, today_key (string "YYYY-MM-DD"),
- * today_reset_secs (seconds until 00:00 UTC rollover)
+ * today_calls, today_bytes, today_key (string "YYYY-MM-DD" in
+ * Pacific Time — matches Apps Script's actual quota reset),
+ * today_reset_secs (seconds until the next 00:00 Pacific Time
+ * rollover; ~7-8 h offset from UTC depending on DST),
+ * h2_calls (calls served by the HTTP/2 multiplexed transport,
+ * across all entry points — Apps-Script direct, exit-node
+ * outer call, full-mode tunnel single op, full-mode tunnel
+ * batch. NOT comparable to relay_calls, which only sees the
+ * Apps-Script-direct path),
+ * h2_fallbacks (calls that attempted h2 but had to fall back
+ * to h1 — handshake failure, open backoff, sticky ALPN
+ * refusal, post-send error retried on h1; same all-entry-
+ * points scope as h2_calls. Compute h2 health as
+ * h2_calls / (h2_calls + h2_fallbacks)),
+ * h2_disabled (boolean: true when h2 fast path is permanently
+ * off — config force_http1 set, or peer refused h2 via ALPN)
*
* Cheap — just reads atomics. Safe to poll on a second-scale timer.
*/
diff --git a/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt b/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt
index 19953f62..00063390 100644
--- a/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt
+++ b/android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt
@@ -1487,11 +1487,14 @@ private fun CollapsibleSection(
/**
* "Usage today (estimated)" card. Polls `Native.statsJson(handle)` every
* second while the proxy is up and renders today's relay calls vs. the
- * Apps Script free-tier quota (20,000/day), today's bytes, UTC day key,
- * and a countdown to the 00:00 UTC reset. Also shows a "View quota on
- * Google" button that opens Google's Apps Script dashboard — the
- * authoritative number, since the client-side estimate only sees what
- * this device relayed.
+ * Apps Script free-tier quota (20,000/day), today's bytes, the Pacific
+ * Time day key, and a countdown to the 00:00 PT reset. Pacific Time
+ * matches Apps Script's actual quota reset cadence — UTC would have
+ * the counter resetting ~7-8 h before the user actually got a fresh
+ * quota allotment from Google. Also shows a "View quota on Google"
+ * button that opens Google's Apps Script dashboard — the authoritative
+ * number, since the client-side estimate only sees what this device
+ * relayed.
*
* Hidden when the handle is 0 (proxy not running) or the JSON comes back
* empty (direct / full-only configs don't run a DomainFronter and so
@@ -1563,7 +1566,7 @@ private fun UsageTodayCard() {
value = fmtBytes(todayBytes),
)
UsageRow(
- label = stringResource(R.string.label_utc_day),
+ label = stringResource(R.string.label_pt_day),
value = todayKey,
)
UsageRow(
diff --git a/android/app/src/main/res/values-fa/strings.xml b/android/app/src/main/res/values-fa/strings.xml
index 2b8d1989..9421f805 100644
--- a/android/app/src/main/res/values-fa/strings.xml
+++ b/android/app/src/main/res/values-fa/strings.xml
@@ -88,7 +88,7 @@
مصرف امروز (تخمینی)
درخواستهای امروز
بایت امروز
- روز (UTC)
+ روز (PT)
ریست تا
%1$d / %2$d (%3$.1f%%)
%1$d ساعت و %2$d دقیقه
diff --git a/android/app/src/main/res/values/strings.xml b/android/app/src/main/res/values/strings.xml
index 002f66e7..6a7688e7 100644
--- a/android/app/src/main/res/values/strings.xml
+++ b/android/app/src/main/res/values/strings.xml
@@ -103,7 +103,10 @@
Usage today (estimated)
calls today
bytes today
- UTC day
+
+ PT day
resets in
%1$d / %2$d (%3$.1f%%)
%1$dh %2$dm
diff --git a/src/bin/ui.rs b/src/bin/ui.rs
index a733237d..3e64ec2d 100644
--- a/src/bin/ui.rs
+++ b/src/bin/ui.rs
@@ -260,6 +260,10 @@ struct FormState {
/// users edit `disable_padding` directly when needed (Issue #391).
/// Default false (padding active).
disable_padding: bool,
+ /// Round-tripped from config.json. Not exposed as a UI control —
+ /// users edit `force_http1` directly when needed. Default false
+ /// (HTTP/2 multiplexing on the relay leg active).
+ force_http1: bool,
/// Round-tripped from config.json. Not exposed in the UI form yet —
/// the bypass-DoH default is the right answer for almost everyone
/// (DoH already encrypts, the tunnel was just adding latency), so
@@ -384,6 +388,7 @@ fn load_form() -> (FormState, Option) {
passthrough_hosts: c.passthrough_hosts.clone(),
block_quic: c.block_quic,
disable_padding: c.disable_padding,
+ force_http1: c.force_http1,
tunnel_doh: c.tunnel_doh,
bypass_doh_hosts: c.bypass_doh_hosts.clone(),
block_doh: c.block_doh,
@@ -422,6 +427,7 @@ fn load_form() -> (FormState, Option) {
passthrough_hosts: Vec::new(),
block_quic: false,
disable_padding: false,
+ force_http1: false,
tunnel_doh: true,
bypass_doh_hosts: Vec::new(),
block_doh: true,
@@ -584,6 +590,9 @@ impl FormState {
// Issue #391: disable_padding is config-only for now.
// Round-trip preserves the user's choice.
disable_padding: self.disable_padding,
+ // HTTP/2 multiplexing kill switch. Config-only for now;
+ // round-trip preserves the user's choice across Save.
+ force_http1: self.force_http1,
// DoH bypass is enabled-by-default with `tunnel_doh = false`.
// Round-trip the user's choice (and any extra hostnames they
// added) so save doesn't drop them.
@@ -693,6 +702,11 @@ struct ConfigWire<'a> {
auto_blacklist_cooldown_secs: u64,
#[serde(skip_serializing_if = "is_default_timeout_secs")]
request_timeout_secs: u64,
+ /// HTTP/2 multiplexing kill switch. Default false (h2 active); only
+ /// emitted on save when the user has explicitly disabled h2, so
+ /// unchanged configs stay clean.
+ #[serde(skip_serializing_if = "is_false")]
+ force_http1: bool,
/// Exit-node config (CF-anti-bot bypass for chatgpt.com / claude.ai /
/// grok.com / x.com via exit-node second-hop relay). Skip when fully
/// default (disabled with no URL/PSK/hosts) so configs without
@@ -772,6 +786,7 @@ impl<'a> From<&'a Config> for ConfigWire<'a> {
auto_blacklist_window_secs: c.auto_blacklist_window_secs,
auto_blacklist_cooldown_secs: c.auto_blacklist_cooldown_secs,
request_timeout_secs: c.request_timeout_secs,
+ force_http1: c.force_http1,
exit_node: &c.exit_node,
}
}
diff --git a/src/config.rs b/src/config.rs
index 1bde7947..7be6aeba 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -220,6 +220,19 @@ pub struct Config {
#[serde(default)]
pub disable_padding: bool,
+ /// Disable HTTP/2 multiplexing on the Apps Script relay leg.
+ /// Default `false` (= h2 enabled): the TLS handshake to the Google
+ /// edge advertises ALPN `["h2", "http/1.1"]`; if the server picks
+ /// h2 we route all relay traffic over a single multiplexed
+ /// connection (~100 concurrent streams) instead of the legacy
+ /// per-request TLS pool of 8-80 sockets. Kills head-of-line
+ /// blocking on slow Apps Script responses (one stalled call no
+ /// longer pins a whole socket). Set to `true` to force the
+ /// pre-v1.9.x HTTP/1.1 path — useful as a kill switch if a specific
+ /// deployment, fronting domain, or middlebox refuses h2.
+ #[serde(default)]
+ pub force_http1: bool,
+
/// Opt-out for the DoH bypass. Default `false` (= bypass active):
/// CONNECTs to well-known DoH hostnames (Cloudflare, Google, Quad9,
/// AdGuard, NextDNS, OpenDNS, browser-pinned variants like
@@ -867,6 +880,38 @@ mod rt_tests {
let _ = std::fs::remove_file(&tmp);
}
+ #[test]
+ fn force_http1_round_trips_through_config() {
+ let json = r#"{
+ "mode": "apps_script",
+ "google_ip": "216.239.38.120",
+ "front_domain": "www.google.com",
+ "script_id": "X",
+ "auth_key": "secretkey123",
+ "listen_host": "127.0.0.1",
+ "listen_port": 8085,
+ "log_level": "info",
+ "verify_ssl": true,
+ "force_http1": true
+}"#;
+ let cfg: Config = serde_json::from_str(json).unwrap();
+ assert!(cfg.force_http1, "force_http1=true must round-trip");
+ }
+
+ #[test]
+ fn force_http1_defaults_false_when_omitted() {
+ // Existing configs from before v1.9.13 don't have the field.
+ // serde(default) must give false (h2 active) so older configs
+ // continue to work and unchanged users get the optimization.
+ let json = r#"{
+ "mode": "apps_script",
+ "auth_key": "secretkey123",
+ "script_id": "X"
+}"#;
+ let cfg: Config = serde_json::from_str(json).unwrap();
+ assert!(!cfg.force_http1, "default must be false (h2 enabled)");
+ }
+
#[test]
fn round_trip_minimal_fields_only() {
// User saves with defaults for everything optional. This is what the
diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs
index 034798e9..ed8e3554 100644
--- a/src/domain_fronter.rs
+++ b/src/domain_fronter.rs
@@ -6,8 +6,9 @@
//! `/macros/s/{script_id}/exec`. Apps Script performs the actual upstream
//! HTTP fetch server-side and returns a JSON envelope.
//!
-//! TODO: add HTTP/2 multiplexing (`h2` crate) for lower latency.
-//! TODO: add parallel range-based downloads.
+//! Multiplexes over HTTP/2 when the relay edge agrees via ALPN; falls back
+//! to HTTP/1.1 keep-alive when h2 is refused or fails. Range-parallel
+//! downloads are implemented by `relay_parallel_range`.
use std::collections::HashMap;
// AtomicU64 via portable-atomic: native on 64-bit / armv7, spinlock-
@@ -15,12 +16,13 @@ use std::collections::HashMap;
// is identical to std::sync::atomic::AtomicU64 so call sites need
// no other changes.
use portable_atomic::AtomicU64;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine;
+use bytes::Bytes;
use rand::{thread_rng, Rng, RngCore};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -54,6 +56,38 @@ pub enum FronterError {
Timeout,
#[error("json: {0}")]
Json(#[from] serde_json::Error),
+ /// Wraps another error and tells outer retry/fallback layers
+ /// (`do_relay_with_retry`, the exit-node→direct-Apps-Script
+ /// fallback in `relay()`) NOT to replay the request. Used when an
+ /// h2 attempt failed *after* `send_request` succeeded — the
+ /// request may have already reached and been processed by Apps
+ /// Script (or the exit node), and replaying via h1 / direct path
+ /// would duplicate side effects for non-idempotent methods.
+ ///
+ /// `Display` is transparent so error messages look identical to
+ /// the wrapped variant; tests/observability use `is_retryable()`
+ /// and `into_inner()` to introspect.
+ #[error(transparent)]
+ NonRetryable(Box),
+}
+
+impl FronterError {
+ /// True if outer retry/fallback layers may safely re-issue the
+ /// request. False for `NonRetryable(_)` — those errors signal
+ /// "request may have been sent; do not duplicate."
+ pub fn is_retryable(&self) -> bool {
+ !matches!(self, FronterError::NonRetryable(_))
+ }
+
+ /// Strip the `NonRetryable` wrapper, returning the underlying
+ /// error. Useful for surfacing the original message after the
+ /// retry/fallback policy has already done its job.
+ pub fn into_inner(self) -> FronterError {
+ match self {
+ FronterError::NonRetryable(inner) => *inner,
+ other => other,
+ }
+ }
}
type PooledStream = TlsStream;
@@ -63,6 +97,40 @@ const POOL_REFILL_INTERVAL_SECS: u64 = 5;
const POOL_MAX: usize = 80;
const REQUEST_TIMEOUT_SECS: u64 = 25;
const RANGE_PARALLEL_CHUNK_BYTES: u64 = 256 * 1024;
+/// HTTP/2 connection lifetime before we proactively reopen. Apps Script's
+/// edge has been observed to send GOAWAY at ~10 min anyway, so we cycle
+/// at 9 min to do an orderly reconnect on our schedule rather than
+/// letting an in-flight stream race a server-initiated close.
+const H2_CONN_TTL_SECS: u64 = 540;
+/// Bound on the h2 ready/back-pressure phase only. `SendRequest::ready()`
+/// awaits a free slot under the server's `MAX_CONCURRENT_STREAMS`. A
+/// stall here means the connection is overloaded (or dead at the
+/// muxer level) but no stream has been opened yet — RequestSent::No,
+/// safe to fall back to h1 without duplication risk. Kept short
+/// (5 s) so a saturated conn doesn't burn the caller's whole budget.
+///
+/// The post-send phase (response headers + body drain) uses the
+/// caller-supplied `response_deadline` instead — see
+/// `h2_round_trip`. This way a slow but legitimate Apps Script call
+/// isn't cut off at an arbitrary fixed cap, and Full-mode batches can
+/// honor the user's `request_timeout_secs` setting.
+const H2_READY_TIMEOUT_SECS: u64 = 5;
+/// Default response-phase deadline used by `relay_uncoalesced` callers
+/// (the Apps-Script direct path). Sized to be just under the outer
+/// `REQUEST_TIMEOUT_SECS` (25 s) so an h2 timeout still leaves a few
+/// seconds of outer budget for an h1 fallback round-trip when the
+/// caller chose to retry.
+const H2_RESPONSE_DEADLINE_DEFAULT_SECS: u64 = 20;
+/// Bound on the TCP connect + TLS handshake + h2 handshake phase. A
+/// blackholed `connect_host:443` previously stalled `ensure_h2` until
+/// the outer 25 s timeout fired (returning 504 without ever falling
+/// back). With this bound, a slow open trips after 8 s and the caller
+/// drops to h1 with ~17 s of outer budget to spare.
+const H2_OPEN_TIMEOUT_SECS: u64 = 8;
+/// After an h2 open failure, suppress further open attempts for this
+/// long. Prevents every concurrent caller during an h2 outage from
+/// paying its own full handshake-timeout cost in turn.
+const H2_OPEN_FAILURE_BACKOFF_SECS: u64 = 15;
/// Cadence for Apps Script container keepalive pings. Apps Script
/// containers go cold after ~5min idle and cost 1-3s on the first
/// request to wake back up — most painful on YouTube / streaming where
@@ -78,6 +146,69 @@ struct PoolEntry {
created: Instant,
}
+/// Single shared HTTP/2 connection to the Google edge. One TCP/TLS
+/// socket carries up to ~100 concurrent streams (server's
+/// `MAX_CONCURRENT_STREAMS` setting); each relay request takes a clone
+/// of the `SendRequest` handle and opens its own stream. Cheaper than
+/// the legacy per-request socket pool — no head-of-line blocking when
+/// a single Apps Script call stalls.
+///
+/// `generation` is monotonic per fronter and lets `poison_h2_if_gen`
+/// avoid the race where task A's stale failure clears task B's
+/// freshly-reopened healthy cell.
+struct H2Cell {
+ send: h2::client::SendRequest,
+ created: Instant,
+ generation: u64,
+}
+
+/// "Did this request reach Apps Script?" signal carried out of every
+/// h2 failure so callers know whether replaying via h1 is safe.
+///
+/// - `No`: the failure occurred before `send_request` returned. The
+/// stream was never opened on the wire; replaying through h1 is
+/// guaranteed not to duplicate any side effect.
+/// - `Maybe`: `send_request` succeeded (headers queued for sending)
+/// but a later step failed — server may have already received the
+/// request and may already be processing it. Replaying a
+/// non-idempotent op (POST/PUT/DELETE, tunnel write, batch ops)
+/// risks duplicating side effects. Only safe to retry for methods
+/// that are idempotent by HTTP semantics.
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+enum RequestSent {
+ No,
+ Maybe,
+}
+
+/// Typed errors from `open_h2`. Used so `ensure_h2` can recognize the
+/// "peer refused h2 in ALPN" outcome and sticky-disable the fast path
+/// without resorting to string matching across function boundaries.
+#[derive(Debug, thiserror::Error)]
+enum OpenH2Error {
+ #[error("ALPN did not negotiate h2; peer prefers http/1.1")]
+ AlpnRefused,
+ #[error("io: {0}")]
+ Io(#[from] std::io::Error),
+ #[error("tls: {0}")]
+ Tls(#[from] rustls::Error),
+ #[error("dns: {0}")]
+ Dns(#[from] rustls::pki_types::InvalidDnsNameError),
+ #[error("h2 handshake: {0}")]
+ Handshake(String),
+}
+
+impl From for FronterError {
+ fn from(e: OpenH2Error) -> Self {
+ match e {
+ OpenH2Error::Io(e) => FronterError::Io(e),
+ OpenH2Error::Tls(e) => FronterError::Tls(e),
+ OpenH2Error::Dns(e) => FronterError::Dns(e),
+ OpenH2Error::AlpnRefused => FronterError::Relay("alpn refused h2".into()),
+ OpenH2Error::Handshake(m) => FronterError::Relay(format!("h2 handshake: {}", m)),
+ }
+ }
+}
+
pub struct DomainFronter {
connect_host: String,
/// Pool of SNI domains to rotate through per outbound connection. All of
@@ -104,8 +235,45 @@ pub struct DomainFronter {
/// Set once we've emitted the "UnknownIssuer means ISP MITM" hint,
/// so we don't spam it every time a cert-validation error repeats.
cert_hint_shown: std::sync::atomic::AtomicBool,
+ /// Connector used by `open_h2`: advertises ALPN `["h2", "http/1.1"]`
+ /// when the h2 fast path is enabled, else just `["http/1.1"]`. Never
+ /// used by the h1 pool path — see `tls_connector_h1`.
tls_connector: TlsConnector,
+ /// Connector used by `open()` (h1 pool warm/refill/acquire). ALPN
+ /// is forced to `["http/1.1"]` so a Google edge that would have
+ /// preferred h2 still negotiates h1 here. Without this, pooled
+ /// sockets could end up speaking h2 frames after handshake, and
+ /// the `write_all(b"GET / HTTP/1.1\r\n...")` fallback would land
+ /// on a server that has no idea what we're doing.
+ tls_connector_h1: TlsConnector,
pool: Arc>>,
+ /// HTTP/2 fast path. `None` until first relay opens it; cleared on
+ /// connection failure or expiry so the next call reopens. Skipped
+ /// entirely when `force_http1` is set or when the peer refused h2
+ /// during ALPN (sticky `h2_disabled`).
+ h2_cell: Arc>>,
+ /// Serializes "open a new h2 connection" attempts so that during
+ /// an outage, only one task pays the handshake cost — concurrent
+ /// callers see the lock contended via `try_lock` and fall through
+ /// to h1 immediately rather than queueing behind a slow handshake.
+ /// Distinct from `h2_cell` so the cell mutex is never held across
+ /// network I/O.
+ h2_open_lock: Arc>,
+ /// Wall-clock timestamp of the last failed `open_h2`. While within
+ /// `H2_OPEN_FAILURE_BACKOFF_SECS` of this, `ensure_h2` returns None
+ /// without retrying — prevents thundering-herd handshake attempts
+ /// during transient h2 outages.
+ h2_open_failed_at: Arc>>,
+ /// Monotonic counter for `H2Cell::generation`. Each successful
+ /// `open_h2` increments and tags the new cell so `poison_h2_if_gen`
+ /// can avoid the race where a stale failure clears a freshly-opened
+ /// cell that another task just installed.
+ h2_generation: Arc,
+ /// Set when ALPN negotiates http/1.1 (peer refused h2) or when
+ /// `force_http1` is true. Sticky for the lifetime of the fronter:
+ /// once we know this peer doesn't speak h2, don't keep retrying
+ /// the handshake on every relay call.
+ h2_disabled: Arc,
cache: Arc,
inflight: Arc>>>>,
coalesced: AtomicU64,
@@ -120,6 +288,29 @@ pub struct DomainFronter {
relay_calls: AtomicU64,
relay_failures: AtomicU64,
bytes_relayed: AtomicU64,
+ /// Relay calls that successfully completed over the h2 fast path,
+ /// across **all** entry points: Apps-Script direct relays,
+ /// exit-node outer calls, full-mode tunnel single ops, and
+ /// full-mode tunnel batches.
+ ///
+ /// **Not** comparable to `relay_calls`: that counter only counts
+ /// the Apps-Script-direct path (incremented in `relay_uncoalesced`).
+ /// The other three paths bypass `relay_uncoalesced` entirely, so in
+ /// full-mode deployments `h2_calls` can exceed `relay_calls` —
+ /// reading their ratio as a "% on h2" gives a wrong number.
+ ///
+ /// To gauge h2 health, compute `h2_calls / (h2_calls + h2_fallbacks)`.
+ /// That's the success ratio across all transports; a healthy
+ /// deployment shows > 95 %.
+ h2_calls: AtomicU64,
+ /// Relay calls that attempted h2 but had to fall back to h1
+ /// (transient handshake failure, mid-stream error, conn poisoned,
+ /// open backoff, or `RequestSent::No` failure that the call site
+ /// chose to retry on h1). Same all-entry-points scope as
+ /// `h2_calls`. A persistently high `h2_fallbacks / (h2_calls +
+ /// h2_fallbacks)` ratio indicates an unhealthy h2 conn or a flaky
+ /// middlebox eating h2 frames; consider `force_http1: true`.
+ h2_fallbacks: AtomicU64,
/// Per-host breakdown of traffic going through this fronter. Keyed by
/// the host of the URL (e.g. "api.x.com"). Read-mostly; only touched
/// on the slow path (once per relayed request), so a plain Mutex is
@@ -275,19 +466,43 @@ impl DomainFronter {
if script_ids.is_empty() {
return Err(FronterError::Relay("no script_id configured".into()));
}
- let tls_config = if config.verify_ssl {
- let mut roots = rustls::RootCertStore::empty();
- roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
- ClientConfig::builder()
- .with_root_certificates(roots)
- .with_no_client_auth()
- } else {
- ClientConfig::builder()
- .dangerous()
- .with_custom_certificate_verifier(Arc::new(NoVerify))
- .with_no_client_auth()
+ // Helper that builds a fresh ClientConfig with the verifier
+ // policy from config. We need two of these so the h2-capable
+ // and h1-only paths can advertise different ALPN sets without
+ // mutating one shared config across calls.
+ let build_tls_config = || {
+ if config.verify_ssl {
+ let mut roots = rustls::RootCertStore::empty();
+ roots.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
+ ClientConfig::builder()
+ .with_root_certificates(roots)
+ .with_no_client_auth()
+ } else {
+ ClientConfig::builder()
+ .dangerous()
+ .with_custom_certificate_verifier(Arc::new(NoVerify))
+ .with_no_client_auth()
+ }
};
- let tls_connector = TlsConnector::from(Arc::new(tls_config));
+
+ // Connector for `open_h2`: advertises h2 first (or just h1 if
+ // the kill switch is set, in which case both connectors end up
+ // identical — fine, just slightly redundant).
+ let mut tls_h2 = build_tls_config();
+ if !config.force_http1 {
+ tls_h2.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
+ } else {
+ tls_h2.alpn_protocols = vec![b"http/1.1".to_vec()];
+ }
+ let tls_connector = TlsConnector::from(Arc::new(tls_h2));
+
+ // Connector for `open()` (h1 pool path). ALPN is forced to
+ // http/1.1 so a Google edge that would otherwise prefer h2
+ // still negotiates h1 here — pooled sockets always speak the
+ // protocol the fallback path expects.
+ let mut tls_h1 = build_tls_config();
+ tls_h1.alpn_protocols = vec![b"http/1.1".to_vec()];
+ let tls_connector_h1 = TlsConnector::from(Arc::new(tls_h1));
Ok(Self {
connect_host: config.google_ip.clone(),
@@ -304,7 +519,13 @@ impl DomainFronter {
script_ids,
script_idx: AtomicUsize::new(0),
tls_connector,
+ tls_connector_h1,
pool: Arc::new(Mutex::new(Vec::new())),
+ h2_cell: Arc::new(Mutex::new(None)),
+ h2_open_lock: Arc::new(Mutex::new(())),
+ h2_open_failed_at: Arc::new(Mutex::new(None)),
+ h2_generation: Arc::new(AtomicU64::new(0)),
+ h2_disabled: Arc::new(AtomicBool::new(config.force_http1)),
cache: Arc::new(ResponseCache::with_default()),
inflight: Arc::new(Mutex::new(HashMap::new())),
coalesced: AtomicU64::new(0),
@@ -313,6 +534,8 @@ impl DomainFronter {
relay_calls: AtomicU64::new(0),
relay_failures: AtomicU64::new(0),
bytes_relayed: AtomicU64::new(0),
+ h2_calls: AtomicU64::new(0),
+ h2_fallbacks: AtomicU64::new(0),
per_site: Arc::new(std::sync::Mutex::new(HashMap::new())),
today_calls: AtomicU64::new(0),
today_bytes: AtomicU64::new(0),
@@ -461,6 +684,9 @@ impl DomainFronter {
today_bytes: self.today_bytes.load(Ordering::Relaxed),
today_key,
today_reset_secs: seconds_until_pacific_midnight(),
+ h2_calls: self.h2_calls.load(Ordering::Relaxed),
+ h2_fallbacks: self.h2_fallbacks.load(Ordering::Relaxed),
+ h2_disabled: self.h2_disabled.load(Ordering::Relaxed),
}
}
@@ -642,18 +868,40 @@ impl DomainFronter {
let _ = tcp.set_nodelay(true);
let sni = self.next_sni();
let name = ServerName::try_from(sni)?;
- let tls = self.tls_connector.connect(name, tcp).await?;
+ // Always use the h1-only connector here — the pool only holds
+ // sockets that the raw HTTP/1.1 fallback path can write to.
+ // Using the shared connector would let some pooled sockets
+ // negotiate h2, which would then misframe every fallback
+ // request that lands on them.
+ let tls = self.tls_connector_h1.connect(name, tcp).await?;
Ok(tls)
}
- /// Open `n` outbound TLS connections sequentially (500 ms apart) and
- /// park them in the pool. Staggered so we don't burst N TLS handshakes
- /// at Google edge simultaneously, and each connection gets an 8 s
+ /// Open outbound TLS connections eagerly so the first relay request
+ /// doesn't pay a cold handshake.
+ ///
+ /// When h2 is enabled, attempts to open the multiplexed h2 cell
+ /// first. Success there means one TCP/TLS handshake serves all
+ /// future requests, so we only need a tiny fallback h1 pool
+ /// (clamped to 2) instead of the full `n` requested. On h2 failure
+ /// (ALPN refusal, network error), falls back to the legacy
+ /// behavior: warm the full `n` h1 sockets.
+ ///
+ /// Staggered 500 ms apart so we don't burst N TLS handshakes at the
+ /// Google edge simultaneously, and each connection gets an 8 s
/// expiry offset so they roll off gradually instead of all hitting
/// POOL_TTL_SECS at once.
pub async fn warm(self: &Arc, n: usize) {
+ // Try to bring up the h2 fast path first. If that succeeds,
+ // shrink the h1 pool warm count to the fallback minimum — the
+ // multiplexed h2 conn handles all real traffic, so the h1 pool
+ // only needs to cover the rare case where h2 dies mid-session.
+ let h2_alive = !self.h2_disabled.load(Ordering::Relaxed)
+ && self.ensure_h2().await.is_some();
+ let h1_target = if h2_alive { 2.min(n) } else { n };
+
let mut warmed = 0usize;
- for i in 0..n {
+ for i in 0..h1_target {
if i > 0 {
tokio::time::sleep(Duration::from_millis(500)).await;
}
@@ -674,18 +922,34 @@ impl DomainFronter {
}
}
}
- if warmed > 0 {
+ if h2_alive {
+ tracing::info!(
+ "h2 fast path active; h1 fallback pool pre-warmed with {} connection(s)",
+ warmed
+ );
+ } else if warmed > 0 {
tracing::info!("pool pre-warmed with {} connection(s)", warmed);
}
}
- /// Background loop that keeps at least `POOL_MIN` valid connections
- /// ready. A connection only counts toward the minimum if it has at
- /// least 20 s of TTL remaining — nearly-expired entries don't help.
+ /// Background loop that keeps the h1 fallback pool warm.
+ ///
+ /// Target depends on whether the h2 fast path is active:
+ /// - h2 disabled (or peer refused ALPN h2): keep `POOL_MIN` (8)
+ /// sockets so the per-request acquire never pays a cold handshake
+ /// — the pre-h2 default behavior.
+ /// - h2 active: keep just `POOL_MIN_H2_FALLBACK` (2). All real
+ /// traffic rides the multiplexed h2 connection; the h1 pool only
+ /// exists to cover the case where h2 dies and we need to fall
+ /// back instantly without a cold handshake.
+ ///
+ /// A connection only counts toward the minimum if it has at least
+ /// 20 s of TTL remaining — nearly-expired entries don't help.
/// Checks every `POOL_REFILL_INTERVAL_SECS`, evicts expired entries,
/// and opens replacements one at a time so there's no burst.
pub async fn run_pool_refill(self: Arc) {
const MIN_REMAINING_SECS: u64 = 20;
+ const POOL_MIN_H2_FALLBACK: usize = 2;
loop {
tokio::time::sleep(Duration::from_secs(POOL_REFILL_INTERVAL_SECS)).await;
@@ -695,6 +959,22 @@ impl DomainFronter {
pool.retain(|e| e.created.elapsed().as_secs() < POOL_TTL_SECS);
}
+ // Decide target. We treat "h2 active right now" as having a
+ // fresh, non-poisoned cell. h2_disabled is the sticky flag
+ // (peer never agreed to h2); a transient cell-poison after
+ // h2 success briefly drops back to the larger target until
+ // ensure_h2 reopens.
+ let target = if self.h2_disabled.load(Ordering::Relaxed) {
+ POOL_MIN
+ } else {
+ let cell = self.h2_cell.lock().await;
+ let h2_alive = cell
+ .as_ref()
+ .map(|c| c.created.elapsed().as_secs() < H2_CONN_TTL_SECS)
+ .unwrap_or(false);
+ if h2_alive { POOL_MIN_H2_FALLBACK } else { POOL_MIN }
+ };
+
// Count only connections with enough life left.
// Refill one at a time to avoid bursting TLS handshakes.
loop {
@@ -707,7 +987,7 @@ impl DomainFronter {
})
.count()
};
- if healthy >= POOL_MIN {
+ if healthy >= target {
break;
}
match self.open().await {
@@ -731,13 +1011,17 @@ impl DomainFronter {
/// Keep the Apps Script container warm with a periodic HEAD ping.
///
- /// `acquire()` keeps the *TCP/TLS pool* warm but does nothing for the
- /// V8 container Apps Script runs in: that goes cold ~5min after the
- /// last UrlFetchApp call and costs 1-3s to spin back up. The symptom
- /// is "first request after a quiet period stalls" — most visible on
- /// YouTube where the player gives up on a 1.5s `googlevideo.com`
+ /// The TCP/TLS pool stays warm via `run_pool_refill`, but the V8
+ /// container Apps Script runs in goes cold ~5min after the last
+ /// `UrlFetchApp` call and costs 1-3s to spin back up. The symptom
+ /// is "first request after a quiet period stalls" — most visible
+ /// on YouTube where the player gives up on a 1.5s `googlevideo.com`
/// chunk that's actually waiting on a cold-start.
///
+ /// Transport-agnostic: the underlying call goes through the same
+ /// `relay_uncoalesced` path everything else uses, so when h2 is
+ /// up the keepalive rides the multiplexed connection too.
+ ///
/// Bypasses the response cache (`cache_key_opt = None`) and the
/// inflight coalescer — otherwise the second iteration would just
/// hit the cached response from the first and never reach Apps
@@ -749,7 +1033,7 @@ impl DomainFronter {
/// quota-exhausted account doesn't spam warnings every 4 minutes.
/// Loops forever — caller is expected to drop the JoinHandle on
/// shutdown (the task lives as long as the process).
- pub async fn run_h1_keepalive(self: Arc) {
+ pub async fn run_keepalive(self: Arc) {
loop {
tokio::time::sleep(Duration::from_secs(H1_KEEPALIVE_INTERVAL_SECS)).await;
let t0 = Instant::now();
@@ -762,7 +1046,7 @@ impl DomainFronter {
.relay_uncoalesced("HEAD", "http://example.com/", &[], &[], None)
.await;
tracing::debug!(
- "H1 container keepalive: {}ms",
+ "container keepalive: {}ms",
t0.elapsed().as_millis()
);
}
@@ -801,6 +1085,543 @@ impl DomainFronter {
}
}
+ /// Return a cloned `SendRequest` handle (paired with its cell
+ /// generation) to the active HTTP/2 connection, opening a new one
+ /// if needed. `None` means the h2 fast path is unavailable for
+ /// this call — the caller should fall through to the h1 path.
+ ///
+ /// Reasons we may return `None`:
+ /// - `force_http1` set, or peer previously refused h2 via ALPN
+ /// (sticky `h2_disabled`).
+ /// - We're inside the `H2_OPEN_FAILURE_BACKOFF_SECS` cooldown
+ /// after a recent open failure.
+ /// - Another task is currently opening a connection and we
+ /// don't want to pile on (`try_lock` on `h2_open_lock`).
+ /// - The open we just attempted timed out within
+ /// `H2_OPEN_TIMEOUT_SECS` or otherwise failed.
+ ///
+ /// The lock on `h2_cell` is *never* held across network I/O —
+ /// that's the whole point of `h2_open_lock`. Concurrent first-time
+ /// callers compete for `h2_open_lock` via `try_lock`; the loser
+ /// returns None immediately and uses h1 rather than serializing
+ /// behind a slow handshake.
+ ///
+ /// The returned generation lets the caller later
+ /// `poison_h2_if_gen(gen)` to clear *only* this specific cell on
+ /// per-stream error, avoiding the race where a stale failure
+ /// clobbers a freshly-reopened healthy cell.
+ async fn ensure_h2(&self) -> Option<(h2::client::SendRequest, u64)> {
+ if self.h2_disabled.load(Ordering::Relaxed) {
+ return None;
+ }
+
+ // Fast path: existing cell, within TTL. Clone (Arc bump) and
+ // return without touching the open machinery. We can't peek at
+ // SendRequest liveness directly (h2 0.4 doesn't expose
+ // `is_closed`), so a request against a dead conn fails at
+ // `ready()`/`send_request` and the caller poisons by
+ // generation from there.
+ {
+ let cell = self.h2_cell.lock().await;
+ if let Some(c) = cell.as_ref() {
+ if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS {
+ return Some((c.send.clone(), c.generation));
+ }
+ }
+ }
+
+ // Backoff check — recent open failure means h2 is currently
+ // unhealthy; don't pile on retries until the window expires.
+ {
+ let last = self.h2_open_failed_at.lock().await;
+ if let Some(t) = *last {
+ if t.elapsed().as_secs() < H2_OPEN_FAILURE_BACKOFF_SECS {
+ return None;
+ }
+ }
+ }
+
+ // Open dedup: only one task does the actual handshake at a
+ // time. Concurrent callers see the lock contended and fall
+ // through to h1 immediately — preserves cold-start latency
+ // for the burst that arrives during a slow open.
+ let _open_guard = match self.h2_open_lock.try_lock() {
+ Ok(g) => g,
+ Err(_) => return None,
+ };
+
+ // Re-check the cell under open_lock — another task may have
+ // just stored a fresh connection while we were arbitrating.
+ {
+ let cell = self.h2_cell.lock().await;
+ if let Some(c) = cell.as_ref() {
+ if c.created.elapsed().as_secs() < H2_CONN_TTL_SECS {
+ return Some((c.send.clone(), c.generation));
+ }
+ }
+ }
+
+ // Bounded handshake. A blackholed connect target can stall
+ // for many seconds otherwise, eating the outer budget that
+ // should be reserved for an h1 fallback round-trip.
+ let open_result =
+ tokio::time::timeout(Duration::from_secs(H2_OPEN_TIMEOUT_SECS), self.open_h2())
+ .await;
+
+ let send = match open_result {
+ Ok(Ok(s)) => s,
+ Ok(Err(OpenH2Error::AlpnRefused)) => {
+ // Definitive: this peer doesn't speak h2. Sticky-disable
+ // so we never re-attempt the handshake.
+ self.h2_disabled.store(true, Ordering::Relaxed);
+ tracing::info!(
+ "relay peer refused h2 via ALPN; staying on http/1.1"
+ );
+ *self.h2_cell.lock().await = None;
+ return None;
+ }
+ Ok(Err(e)) => {
+ tracing::debug!("h2 open failed: {} — falling back to h1", e);
+ *self.h2_open_failed_at.lock().await = Some(Instant::now());
+ *self.h2_cell.lock().await = None;
+ return None;
+ }
+ Err(_) => {
+ tracing::debug!(
+ "h2 open timed out after {}s — falling back to h1",
+ H2_OPEN_TIMEOUT_SECS
+ );
+ *self.h2_open_failed_at.lock().await = Some(Instant::now());
+ *self.h2_cell.lock().await = None;
+ return None;
+ }
+ };
+
+ // Open succeeded. Tag with a fresh generation, store, return.
+ // Clear any stale backoff timestamp.
+ let generation = self.h2_generation.fetch_add(1, Ordering::Relaxed) + 1;
+ *self.h2_open_failed_at.lock().await = None;
+ let mut cell = self.h2_cell.lock().await;
+ *cell = Some(H2Cell {
+ send: send.clone(),
+ created: Instant::now(),
+ generation,
+ });
+ Some((send, generation))
+ }
+
+ /// Open one TLS connection and run the h2 handshake. Returns a
+ /// typed `OpenH2Error` so the caller can recognize ALPN refusal
+ /// (sticky disable) without string-matching across boundaries.
+ async fn open_h2(&self) -> Result, OpenH2Error> {
+ let tcp = TcpStream::connect((self.connect_host.as_str(), 443u16)).await?;
+ let _ = tcp.set_nodelay(true);
+ let sni = self.next_sni();
+ let name = ServerName::try_from(sni)?;
+ let tls = self.tls_connector.connect(name, tcp).await?;
+ Self::h2_handshake_post_tls(tls).await
+ }
+
+ /// Post-TLS portion of the h2 open path: ALPN check + h2 handshake
+ /// + connection-driver task spawn. Split out from `open_h2` so
+ /// tests can drive it with a TLS stream from any local server,
+ /// bypassing the hard-coded `connect_host:443` target.
+ async fn h2_handshake_post_tls(
+ tls: PooledStream,
+ ) -> Result, OpenH2Error> {
+ let alpn_h2 = tls
+ .get_ref()
+ .1
+ .alpn_protocol()
+ .map(|p| p == b"h2")
+ .unwrap_or(false);
+ if !alpn_h2 {
+ return Err(OpenH2Error::AlpnRefused);
+ }
+ // Larger initial windows mean we don't have to call
+ // `release_capacity` on every chunk for typical Apps Script
+ // payloads (usually < 1 MB; range chunks are 256 KB). We still
+ // release capacity in the body-read loop for safety on larger
+ // bodies.
+ let (send, conn) = h2::client::Builder::new()
+ .initial_window_size(4 * 1024 * 1024)
+ .initial_connection_window_size(8 * 1024 * 1024)
+ .handshake(tls)
+ .await
+ .map_err(|e| OpenH2Error::Handshake(e.to_string()))?;
+ // The connection task drives frame I/O independently of any
+ // SendRequest handle. When it ends (GOAWAY, network error, TTL),
+ // existing handles will start failing on `ready()` / `send_request`
+ // and `ensure_h2` will reopen on the next call.
+ tokio::spawn(async move {
+ if let Err(e) = conn.await {
+ tracing::debug!("h2 connection closed: {}", e);
+ }
+ });
+ tracing::info!("h2 connection established to relay edge");
+ Ok(send)
+ }
+
+ /// React to an h2-fronting-incompatibility HTTP response (status
+ /// matched by `is_h2_fronting_refusal_status`) by:
+ /// * sticky-disabling the h2 fast path so subsequent calls go
+ /// straight to h1 without re-paying the handshake / refusal,
+ /// * clearing any current cell so the SendRequest is dropped,
+ /// * rebalancing the h2 stat counters so this request shows
+ /// up as a fallback, not a successful h2 call. (The
+ /// `run_h2_relay_with_send` Ok path bumps `h2_calls` for any
+ /// completed round-trip; for a 421 we want it counted as
+ /// `h2_fallbacks` instead since the request will take the
+ /// h1 path.)
+ /// Logs at info because this is a meaningful state transition for
+ /// the deployment, not a per-request hiccup.
+ async fn sticky_disable_h2_for_fronting_refusal(&self, status: u16, context: &str) {
+ if !self.h2_disabled.swap(true, Ordering::Relaxed) {
+ tracing::info!(
+ "h2 returned HTTP {} for {} — likely :authority/SNI mismatch via \
+ domain fronting. Disabling h2 fast path for this fronter and \
+ falling back to http/1.1.",
+ status,
+ context,
+ );
+ }
+ *self.h2_cell.lock().await = None;
+ // Reclassify: undo the h2_calls increment from
+ // run_h2_relay_with_send and bill this attempt as a fallback.
+ // saturating_sub-style guard: only decrement if non-zero so a
+ // direct caller of this helper from a non-Ok path can't
+ // underflow the counter.
+ let _ = self.h2_calls.fetch_update(
+ Ordering::Relaxed,
+ Ordering::Relaxed,
+ |c| if c > 0 { Some(c - 1) } else { None },
+ );
+ self.h2_fallbacks.fetch_add(1, Ordering::Relaxed);
+ }
+
+ /// Clear the h2 cell *only if* its generation matches the one the
+ /// caller observed. Prevents the race where:
+ /// 1. Task A holds SendRequest from generation N
+ /// 2. Generation N's connection dies; Task B reopens → cell now
+ /// holds generation N+1 (healthy)
+ /// 3. Task A's stale stream errors → unconditionally clearing
+ /// the cell would kill the healthy N+1
+ /// With generation matching, A's poison is a no-op against N+1.
+ async fn poison_h2_if_gen(&self, generation: u64) {
+ let mut cell = self.h2_cell.lock().await;
+ if let Some(c) = cell.as_ref() {
+ if c.generation == generation {
+ *cell = None;
+ }
+ }
+ }
+
+ /// Send one POST through the active h2 connection, follow up to 5
+ /// redirects, and return `(status, headers, body)` — the same shape
+ /// the h1 path's `read_http_response` produces, so callers can stay
+ /// transport-agnostic from this point on.
+ ///
+ /// `path` is the HTTP path including the leading slash. The Host /
+ /// :authority header is taken from `self.http_host` for the initial
+ /// request and from the `Location` URL on redirect. `payload` is the
+ /// body bytes; `content_type` is set when non-None (for the JSON
+ /// envelope). Empty body + None content_type → GET (used for redirect
+ /// follow-up).
+ /// Run one h2 stream and return `(status, headers, body)`. Errors
+ /// carry a `RequestSent` flag so the caller can distinguish "never
+ /// sent" (safe to retry on h1) from "may have been processed by
+ /// origin" (only safe to retry for idempotent methods).
+ ///
+ /// Two phases, two timeouts:
+ /// * **Ready (back-pressure):** bounded by `H2_READY_TIMEOUT_SECS`
+ /// (5 s constant). A stall here means the conn is saturated
+ /// under `MAX_CONCURRENT_STREAMS` (or dead at the muxer level)
+ /// but no stream has opened — `RequestSent::No`.
+ /// * **Response (post-send):** bounded by the caller-provided
+ /// `response_deadline`. After `send_request` returns Ok the
+ /// headers are queued; we conservatively treat any later
+ /// failure or timeout as `RequestSent::Maybe`. Caller picks
+ /// the deadline so legitimate slow Apps Script calls and
+ /// Full-mode batches with custom `request_timeout_secs` aren't
+ /// cut off at an arbitrary fixed cap.
+ async fn h2_round_trip(
+ &self,
+ send: h2::client::SendRequest,
+ method: &str,
+ path: &str,
+ host: &str,
+ payload: Bytes,
+ content_type: Option<&str>,
+ response_deadline: Duration,
+ ) -> Result<(u16, Vec<(String, String)>, Vec), (FronterError, RequestSent)> {
+ // h2 requires absolute-form URIs with the :authority pseudo-header
+ // populated from the Host. http::Request's URI parser accepts
+ // `https://{host}{path}` for that.
+ let uri = format!("https://{}{}", host, path);
+ let mut builder = http::Request::builder().method(method).uri(uri);
+ // Apps Script accepts gzip on the response; mirror the h1 path so
+ // payloads stay small.
+ builder = builder.header("accept-encoding", "gzip");
+ if let Some(ct) = content_type {
+ builder = builder.header("content-type", ct);
+ }
+ let req = builder.body(()).map_err(|e| {
+ (
+ FronterError::Relay(format!("h2 request build: {}", e)),
+ RequestSent::No,
+ )
+ })?;
+
+ // Phase 1: ready/back-pressure. Bounded short. Timeout here
+ // means saturation, not server-side processing — the stream
+ // hasn't even opened, so `RequestSent::No`.
+ let ready_result = tokio::time::timeout(
+ Duration::from_secs(H2_READY_TIMEOUT_SECS),
+ send.ready(),
+ )
+ .await;
+ let mut send = match ready_result {
+ Ok(Ok(s)) => s,
+ Ok(Err(e)) => {
+ return Err((
+ FronterError::Relay(format!("h2 ready: {}", e)),
+ RequestSent::No,
+ ));
+ }
+ Err(_) => {
+ return Err((FronterError::Timeout, RequestSent::No));
+ }
+ };
+
+ let has_body = !payload.is_empty();
+ // send_request is synchronous; it queues the HEADERS frame.
+ // After this returns Ok we conservatively assume the request
+ // reached the server. An Err here means the stream couldn't
+ // be opened (e.g. connection-level GOAWAY), safe to retry.
+ let (response_fut, mut body_tx) = send.send_request(req, !has_body).map_err(|e| {
+ (
+ FronterError::Relay(format!("h2 send_request: {}", e)),
+ RequestSent::No,
+ )
+ })?;
+
+ if has_body {
+ // body_tx errors here are RequestSent::Maybe — headers were
+ // already queued, so we may have invoked Apps Script's doPost
+ // even if the body never finished.
+ body_tx.send_data(payload, true).map_err(|e| {
+ (
+ FronterError::Relay(format!("h2 send_data: {}", e)),
+ RequestSent::Maybe,
+ )
+ })?;
+ }
+
+ // Phase 2: response headers + body drain. Bounded by the
+ // caller's deadline. Errors and timeout here are
+ // `RequestSent::Maybe` — the request is on the wire and may
+ // already have side effects.
+ let response_phase = async {
+ let response = response_fut.await.map_err(|e| {
+ (
+ FronterError::Relay(format!("h2 response: {}", e)),
+ RequestSent::Maybe,
+ )
+ })?;
+ let (parts, mut body) = response.into_parts();
+ let status = parts.status.as_u16();
+
+ // Convert headers to the (String, String) Vec the rest of
+ // the codebase expects. Multi-valued headers (set-cookie,
+ // etc.) are expanded one entry per value, matching
+ // httparse's emission.
+ let mut headers: Vec<(String, String)> = Vec::with_capacity(parts.headers.len());
+ for (name, value) in parts.headers.iter() {
+ if let Ok(v) = value.to_str() {
+ headers.push((name.as_str().to_string(), v.to_string()));
+ }
+ }
+
+ // Drain body. Release flow-control credit per chunk so
+ // large responses don't stall after the initial 4 MB window.
+ let mut buf: Vec = Vec::new();
+ while let Some(chunk) = body.data().await {
+ let chunk = chunk.map_err(|e| {
+ (
+ FronterError::Relay(format!("h2 body chunk: {}", e)),
+ RequestSent::Maybe,
+ )
+ })?;
+ let n = chunk.len();
+ buf.extend_from_slice(&chunk);
+ let _ = body.flow_control().release_capacity(n);
+ }
+ Ok::<_, (FronterError, RequestSent)>((status, headers, buf))
+ };
+
+ let (status, headers, mut buf) = match tokio::time::timeout(
+ response_deadline,
+ response_phase,
+ )
+ .await
+ {
+ Ok(Ok(t)) => t,
+ Ok(Err(e)) => return Err(e),
+ Err(_) => return Err((FronterError::Timeout, RequestSent::Maybe)),
+ };
+
+ // Mirror `read_http_response`: if the server gzipped the body
+ // (we asked for it via accept-encoding), decompress before
+ // handing back so downstream JSON / envelope parsers see plain
+ // bytes regardless of transport.
+ if let Some(enc) = header_get(&headers, "content-encoding") {
+ if enc.eq_ignore_ascii_case("gzip") {
+ if let Ok(decoded) = decode_gzip(&buf) {
+ buf = decoded;
+ }
+ }
+ }
+
+ Ok((status, headers, buf))
+ }
+
+ /// Run a full relay round-trip over h2: initial POST + up to 5
+ /// redirect hops. `path` is the Apps Script `/macros/s/{id}/exec`
+ /// path. Returns the same `(status, headers, body)` triple as the
+ /// h1 path on success.
+ ///
+ /// `response_deadline` bounds the post-send phase of each round
+ /// trip (response headers + body drain). The ready/back-pressure
+ /// phase has its own short bound (`H2_READY_TIMEOUT_SECS`).
+ /// Caller picks the deadline based on its own outer budget:
+ /// * Apps-Script direct (`relay_uncoalesced`): a few seconds
+ /// under `REQUEST_TIMEOUT_SECS` (25 s) so an h2 timeout still
+ /// leaves room for an h1 fallback.
+ /// * Full-mode tunnel (`tunnel_request` / `tunnel_batch_request_to`):
+ /// `self.batch_timeout` so the user's
+ /// `request_timeout_secs` setting actually applies.
+ ///
+ /// On error, the second tuple field is `RequestSent::No` if the
+ /// request never reached Apps Script (safe to retry on h1) or
+ /// `RequestSent::Maybe` if it may have been processed (replaying
+ /// risks duplicating side effects for non-idempotent methods).
+ /// `ensure_h2` returning None always reports `RequestSent::No`.
+ ///
+ /// Takes `payload` as `Bytes` so callers can clone (Arc bump,
+ /// not memcpy) when they want to retain a copy for h1 fallback.
+ async fn h2_relay_request(
+ &self,
+ path: &str,
+ payload: Bytes,
+ response_deadline: Duration,
+ ) -> Result<(u16, Vec<(String, String)>, Vec), (FronterError, RequestSent)> {
+ let (send, generation) = match self.ensure_h2().await {
+ Some(s) => s,
+ None => {
+ // ensure_h2 returning None covers:
+ // 1. force_http1 / sticky-disabled — never tried h2
+ // this call. NOT a fallback, don't count.
+ // 2. open_h2 just failed / timed out / backoff active.
+ // We DID attempt h2 and lost it; count as fallback
+ // so the stat reflects reality. `ensure_h2` itself
+ // sets the backoff timestamp on failure.
+ if !self.h2_disabled.load(Ordering::Relaxed) {
+ self.h2_fallbacks.fetch_add(1, Ordering::Relaxed);
+ }
+ return Err((
+ FronterError::Relay("h2 unavailable".into()),
+ RequestSent::No,
+ ));
+ }
+ };
+
+ self.run_h2_relay_with_send(send, generation, path, payload, response_deadline)
+ .await
+ }
+
+ /// Inner h2 relay loop — split out so tests can inject a
+ /// `SendRequest` (from a local h2c test server) without going
+ /// through `ensure_h2`'s real-network handshake.
+ ///
+ /// Each h2_round_trip uses its own internal phase-split timeouts
+ /// (ready=5s constant, response=`response_deadline`). No outer
+ /// wrap is needed here — the inner timeouts are what poisons the
+ /// cell on stall.
+ async fn run_h2_relay_with_send(
+ &self,
+ send: h2::client::SendRequest,
+ generation: u64,
+ path: &str,
+ payload: Bytes,
+ response_deadline: Duration,
+ ) -> Result<(u16, Vec<(String, String)>, Vec), (FronterError, RequestSent)> {
+ let mut current_host = self.http_host.to_string();
+ let mut current_path = path.to_string();
+
+ let res = self
+ .h2_round_trip(
+ send.clone(),
+ "POST",
+ ¤t_path,
+ ¤t_host,
+ payload,
+ Some("application/json"),
+ response_deadline,
+ )
+ .await;
+ let (mut status, mut hdrs, mut body) = match res {
+ Ok(t) => t,
+ Err((e, sent)) => {
+ self.poison_h2_if_gen(generation).await;
+ self.h2_fallbacks.fetch_add(1, Ordering::Relaxed);
+ return Err((e, sent));
+ }
+ };
+
+ // The initial POST already succeeded — the request reached
+ // Apps Script. From here on, redirect-follow failures are
+ // RequestSent::Maybe regardless of where they land in the
+ // chain, because the *original* Apps Script call may have
+ // already executed.
+ for _ in 0..5 {
+ if !matches!(status, 301 | 302 | 303 | 307 | 308) {
+ break;
+ }
+ let Some(loc) = header_get(&hdrs, "location") else {
+ break;
+ };
+ let (rpath, rhost) = parse_redirect(&loc);
+ current_host = rhost.unwrap_or(current_host);
+ current_path = rpath;
+ let res = self
+ .h2_round_trip(
+ send.clone(),
+ "GET",
+ ¤t_path,
+ ¤t_host,
+ Bytes::new(),
+ None,
+ response_deadline,
+ )
+ .await;
+ match res {
+ Ok((s, h, b)) => {
+ status = s;
+ hdrs = h;
+ body = b;
+ }
+ Err((e, _)) => {
+ self.poison_h2_if_gen(generation).await;
+ self.h2_fallbacks.fetch_add(1, Ordering::Relaxed);
+ return Err((e, RequestSent::Maybe));
+ }
+ }
+ }
+
+ self.h2_calls.fetch_add(1, Ordering::Relaxed);
+ Ok((status, hdrs, body))
+ }
+
/// Relay an HTTP request through Apps Script.
/// Returns a raw HTTP/1.1 response (status line + headers + body) suitable
/// for writing back to the browser over an MITM'd TLS stream.
@@ -847,6 +1668,22 @@ impl DomainFronter {
);
return bytes;
}
+ Err(e) if !e.is_retryable() => {
+ // The exit node may have already processed this
+ // request (h2 post-send failure on a POST etc.).
+ // Don't fall through to the direct path — that
+ // would re-send to the same destination via Apps
+ // Script and duplicate the side effect.
+ tracing::warn!(
+ "exit node failed for {} and request was already sent ({}); not falling back to direct Apps Script",
+ url,
+ e,
+ );
+ self.relay_failures.fetch_add(1, Ordering::Relaxed);
+ let inner = e.into_inner();
+ self.record_site(url, false, 0, t0.elapsed().as_nanos() as u64);
+ return error_response(502, &format!("Relay error: {}", inner));
+ }
Err(e) => {
tracing::warn!(
"exit node failed for {}: {} — falling back to direct Apps Script",
@@ -1195,9 +2032,24 @@ impl DomainFronter {
return self.do_relay_parallel(method, url, headers, body, fan).await;
}
- // Sequential path: one retry on connection failure.
+ // Sequential path: one retry on connection failure, *unless*
+ // the failure is `FronterError::NonRetryable` — that wrapper
+ // says "the request may have already reached the server, do
+ // not duplicate." Without this guard, an h2 post-send failure
+ // on a non-idempotent method (POST/PUT/PATCH/DELETE) that the
+ // h2 layer correctly refused to replay on h1 would be
+ // re-issued here anyway, defeating the safety policy.
match self.do_relay_once(method, url, headers, body).await {
Ok(v) => Ok(v),
+ Err(e) if !e.is_retryable() => {
+ tracing::warn!(
+ "relay attempt 1 failed and is non-retryable ({}); not duplicating {} {}",
+ e,
+ method,
+ url,
+ );
+ Err(e.into_inner())
+ }
Err(e) => {
tracing::debug!("relay attempt 1 failed: {}; retrying", e);
self.do_relay_once(method, url, headers, body).await
@@ -1255,9 +2107,102 @@ impl DomainFronter {
headers: &[(String, String)],
body: &[u8],
) -> Result, FronterError> {
- let payload = self.build_payload_json(method, url, headers, body)?;
+ // Build once, wrap in Bytes (zero-copy move). h2 takes a clone
+ // (Arc bump, not memcpy); h1 fallback uses the same Bytes via
+ // Deref<&[u8]>. Saves a full payload allocation+copy per call
+ // — meaningful on range-parallel fan-out where N copies fire
+ // in parallel for one user-facing GET.
+ let payload: Bytes = Bytes::from(self.build_payload_json(method, url, headers, body)?);
let path = format!("/macros/s/{}/exec", script_id);
+ // h2 fast path: one shared TCP/TLS connection multiplexes all
+ // streams.
+ //
+ // The h2 layer reports `RequestSent::No` when it can prove
+ // the request never reached Apps Script (ensure_h2 unavailable,
+ // ready/back-pressure timeout, send_request error). In that
+ // case we fall through to h1 unconditionally — there's no
+ // duplication risk.
+ //
+ // For `RequestSent::Maybe` (anything after send_request
+ // succeeded) we only fall through for HTTP-idempotent methods.
+ // POST / PUT / PATCH / DELETE get wrapped in
+ // `FronterError::NonRetryable` so `do_relay_with_retry`'s
+ // outer retry also skips replay — without that wrap, the
+ // outer retry would re-issue the request anyway and the
+ // safety policy would be illusory.
+ match self
+ .h2_relay_request(
+ &path,
+ payload.clone(),
+ Duration::from_secs(H2_RESPONSE_DEADLINE_DEFAULT_SECS),
+ )
+ .await
+ {
+ Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => {
+ // Edge rejected the fronted h2 request before
+ // forwarding to Apps Script. Sticky-disable h2,
+ // log once, fall through to h1 — this request is
+ // safe to replay because it never reached Apps Script.
+ self.sticky_disable_h2_for_fronting_refusal(
+ status,
+ &format!("relay {} {}", method, url),
+ )
+ .await;
+ // fall through to h1
+ }
+ Ok((status, _hdrs, resp_body)) => {
+ if status != 200 {
+ let body_txt = String::from_utf8_lossy(&resp_body)
+ .chars()
+ .take(200)
+ .collect::();
+ if should_blacklist(status, &body_txt) {
+ self.blacklist_script(&script_id, &format!("HTTP {}", status));
+ }
+ return Err(FronterError::Relay(format!(
+ "Apps Script HTTP {}: {}",
+ status, body_txt
+ )));
+ }
+ return parse_relay_json(&resp_body).map_err(|e| {
+ if let FronterError::Relay(ref msg) = e {
+ if looks_like_quota_error(msg) {
+ self.blacklist_script(&script_id, msg);
+ }
+ }
+ e
+ });
+ }
+ Err((e, RequestSent::No)) => {
+ tracing::debug!("h2 pre-send failure: {} — falling back to h1", e);
+ }
+ Err((e, RequestSent::Maybe)) => {
+ if is_method_safe_for_fanout(method) {
+ tracing::debug!(
+ "h2 post-send failure for safe method {}: {} — falling back to h1",
+ method,
+ e
+ );
+ } else {
+ tracing::warn!(
+ "h2 post-send failure for non-idempotent {} {}: {} — \
+ marking non-retryable to prevent duplicating side effects",
+ method,
+ url,
+ e
+ );
+ // NonRetryable wrapper bubbles all the way through
+ // do_relay_once_with → do_relay_with_retry, where
+ // the retry loop skips its second attempt. Without
+ // this wrap, returning a plain Err would let
+ // do_relay_with_retry re-issue the request via h1
+ // (or a fresh h2 cell), defeating the safety policy.
+ return Err(FronterError::NonRetryable(Box::new(e)));
+ }
+ }
+ }
+
let mut entry = self.acquire().await?;
let reuse_ok = {
let write_res = async {
@@ -1397,8 +2342,9 @@ impl DomainFronter {
"Content-Type".to_string(),
"application/json".to_string(),
)];
- let outer_payload =
- self.build_payload_json("POST", &exit_url, &outer_headers, &inner_json)?;
+ let outer_payload: Bytes = Bytes::from(
+ self.build_payload_json("POST", &exit_url, &outer_headers, &inner_json)?,
+ );
// Send the outer payload through the relay machinery and get back
// Apps Script's response body (which is exit-node's JSON envelope).
@@ -1465,11 +2411,70 @@ impl DomainFronter {
/// has a different shape from Code.gs's raw-HTTP wrapping.
async fn send_prebuilt_payload_through_relay(
&self,
- payload: Vec,
+ payload: Bytes,
) -> Result, FronterError> {
let script_id = self.next_script_id();
let path = format!("/macros/s/{}/exec", script_id);
+ // h2 fast path. The exit-node outer call is always POST and
+ // carries the inner relay payload — replaying on h1 after the
+ // outer reached Apps Script duplicates the inner request to
+ // the exit node. Only fall back when h2 definitely never sent.
+ // Same default response deadline as the direct path; the
+ // exit-node leg ultimately exits via Apps Script too.
+ match self
+ .h2_relay_request(
+ &path,
+ payload.clone(),
+ Duration::from_secs(H2_RESPONSE_DEADLINE_DEFAULT_SECS),
+ )
+ .await
+ {
+ Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => {
+ // Same fronting-refusal path as the direct relay.
+ // Safe to fall back: 421 means the edge rejected
+ // before invoking the exit node.
+ self.sticky_disable_h2_for_fronting_refusal(
+ status,
+ "exit-node outer call",
+ )
+ .await;
+ // fall through to h1
+ }
+ Ok((status, _hdrs, resp_body)) => {
+ if status != 200 {
+ let body_txt = String::from_utf8_lossy(&resp_body)
+ .chars()
+ .take(200)
+ .collect::();
+ return Err(FronterError::Relay(format!(
+ "Apps Script HTTP {} (exit-node outer call): {}",
+ status, body_txt
+ )));
+ }
+ return Ok(resp_body);
+ }
+ Err((e, RequestSent::No)) => {
+ tracing::debug!(
+ "h2 exit-node outer call pre-send failure: {} — falling back to h1",
+ e
+ );
+ }
+ Err((e, RequestSent::Maybe)) => {
+ tracing::warn!(
+ "h2 exit-node outer call post-send failure: {} — \
+ marking non-retryable to prevent duplicating the inner request",
+ e
+ );
+ // NonRetryable propagates back to relay()'s exit-node
+ // match arm, which will *not* fall through to the
+ // direct Apps Script path (that fall-through would
+ // re-send the outer call and could also re-trigger
+ // the inner request to the destination).
+ return Err(FronterError::NonRetryable(Box::new(e)));
+ }
+ }
+
let mut entry = self.acquire().await?;
let req_head = format!(
"POST {path} HTTP/1.1\r\n\
@@ -1597,10 +2602,55 @@ impl DomainFronter {
sid: Option<&str>,
data: Option,
) -> Result {
- let payload = self.build_tunnel_payload(op, host, port, sid, data)?;
+ let payload: Bytes =
+ Bytes::from(self.build_tunnel_payload(op, host, port, sid, data)?);
let script_id = self.next_script_id();
let path = format!("/macros/s/{}/exec", script_id);
+ // h2 fast path. Tunnel ops are stateful — a `connect` may
+ // have opened an upstream socket; a `data` op may have
+ // forwarded bytes. Replaying on h1 after the op reached
+ // Apps Script can corrupt the tunnel session. Only fall back
+ // when h2 definitely never sent.
+ // Use the user-configured batch_timeout so Full-mode tuning
+ // (`request_timeout_secs`) is honored — a fixed cap would let
+ // legitimately slow batches incorrectly trip strike counters
+ // on healthy deployments at tunnel_client::fire_batch.
+ match self
+ .h2_relay_request(&path, payload.clone(), self.batch_timeout)
+ .await
+ {
+ Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => {
+ // Edge rejected the fronted h2 request. Safe to fall
+ // back to h1 — the tunnel op never executed because
+ // Apps Script never received the request.
+ self.sticky_disable_h2_for_fronting_refusal(
+ status,
+ &format!("tunnel op {}", op),
+ )
+ .await;
+ // fall through to h1
+ }
+ Ok((status, _hdrs, resp_body)) => {
+ return self.finalize_tunnel_response(&script_id, status, resp_body);
+ }
+ Err((e, RequestSent::No)) => {
+ tracing::debug!(
+ "h2 tunnel request pre-send failure: {} — falling back to h1",
+ e
+ );
+ }
+ Err((e, RequestSent::Maybe)) => {
+ tracing::warn!(
+ "h2 tunnel request post-send failure (op={}): {} — \
+ not replaying on h1 to avoid corrupting the tunnel session",
+ op,
+ e
+ );
+ return Err(e);
+ }
+ }
+
let mut entry = self.acquire().await?;
let req_head = format!(
@@ -1648,42 +2698,55 @@ impl DomainFronter {
resp_body = b;
}
+ let resp = self.finalize_tunnel_response(&script_id, status, resp_body)?;
+ self.release(entry).await;
+ Ok(resp)
+ }
+
+ /// Validate a tunnel-protocol response (status check + Apps-Script
+ /// HTML-prefix tolerance + JSON parse). Used by both the h2 and h1
+ /// branches of `tunnel_request` so the parsing logic doesn't drift
+ /// across transports.
+ fn finalize_tunnel_response(
+ &self,
+ script_id: &str,
+ status: u16,
+ resp_body: Vec,
+ ) -> Result {
if status != 200 {
let body_txt = String::from_utf8_lossy(&resp_body)
.chars()
.take(200)
.collect::();
if should_blacklist(status, &body_txt) {
- self.blacklist_script(&script_id, &format!("HTTP {}", status));
+ self.blacklist_script(script_id, &format!("HTTP {}", status));
}
return Err(FronterError::Relay(format!(
"tunnel HTTP {}: {}",
status, body_txt
)));
}
-
- // Parse tunnel response JSON
let text = std::str::from_utf8(&resp_body)
.map_err(|_| FronterError::BadResponse("non-utf8 tunnel response".into()))?
.trim();
-
- // Apps Script may prepend HTML; extract first {...}
+ // Apps Script may prepend HTML on cold-start or quota-exceeded
+ // pages; extract the first {...} block tolerantly so we don't
+ // bail on a recoverable warning frame.
let json_str = if text.starts_with('{') {
text
} else {
let start = text.find('{').ok_or_else(|| {
- FronterError::BadResponse(format!("no json in tunnel response: {}", &text[..text.len().min(200)]))
+ FronterError::BadResponse(format!(
+ "no json in tunnel response: {}",
+ &text[..text.len().min(200)]
+ ))
})?;
let end = text.rfind('}').ok_or_else(|| {
FronterError::BadResponse("no json end in tunnel response".into())
})?;
&text[start..=end]
};
-
- let resp: TunnelResponse = serde_json::from_str(json_str)?;
-
- self.release(entry).await;
- Ok(resp)
+ Ok(serde_json::from_str(json_str)?)
}
fn build_tunnel_payload(
@@ -1742,10 +2805,48 @@ impl DomainFronter {
if !self.disable_padding {
add_random_pad(&mut map);
}
- let payload = serde_json::to_vec(&Value::Object(map))?;
+ let payload: Bytes = Bytes::from(serde_json::to_vec(&Value::Object(map))?);
let path = format!("/macros/s/{}/exec", script_id);
+ // h2 fast path. A batch carries N stateful tunnel ops — each
+ // `data`/`udp_data`/`connect` may have already executed
+ // upstream when the response framing failed. Replaying the
+ // whole batch on h1 risks duplicating every op in it. Only
+ // fall back when h2 definitely never sent. Honors
+ // user-configured batch_timeout so a slow but legitimate
+ // batch isn't cut off at an arbitrary fixed cap.
+ match self
+ .h2_relay_request(&path, payload.clone(), self.batch_timeout)
+ .await
+ {
+ Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => {
+ // Edge rejected the batch before forwarding. Safe to
+ // fall back: no batched op reached Apps Script, so
+ // replaying via h1 won't double-fire any of them.
+ self.sticky_disable_h2_for_fronting_refusal(status, "tunnel batch")
+ .await;
+ // fall through to h1
+ }
+ Ok((status, _hdrs, resp_body)) => {
+ return self.finalize_batch_response(script_id, status, resp_body);
+ }
+ Err((e, RequestSent::No)) => {
+ tracing::debug!(
+ "h2 batch request pre-send failure: {} — falling back to h1",
+ e
+ );
+ }
+ Err((e, RequestSent::Maybe)) => {
+ tracing::warn!(
+ "h2 batch request post-send failure: {} — \
+ not replaying on h1 to avoid duplicating batched ops",
+ e
+ );
+ return Err(e);
+ }
+ }
+
let mut entry = self.acquire().await?;
let req_head = format!(
@@ -1782,41 +2883,94 @@ impl DomainFronter {
status = s; resp_headers = h; resp_body = b;
}
+ // Route through the same `finalize_batch_response` helper the
+ // h2 path uses. This keeps the redacted-logging policy in
+ // exactly one place — the previous inline parse here logged
+ // raw payload at debug AND error level, which leaked the
+ // base64-encoded tunneled bytes (TCP/UDP packets, possibly
+ // app data or credentials) into bug-report logs. Both
+ // transports now emit only `status=` + `body_len=`, with the
+ // raw body gated behind RUST_LOG=trace.
+ let resp = self.finalize_batch_response(script_id, status, resp_body)?;
+ self.release(entry).await;
+ Ok(resp)
+ }
+
+ /// Parse a batch-tunnel response body once we already have it in
+ /// hand — used by the h2 fast path in `tunnel_batch_request_to`,
+ /// where the response is read off a multiplexed stream rather than
+ /// drained from a checked-out socket. Mirrors the validate-and-parse
+ /// tail of the h1 path (status check + JSON extraction +
+ /// quota-blacklist book-keeping).
+ fn finalize_batch_response(
+ &self,
+ script_id: &str,
+ status: u16,
+ resp_body: Vec,
+ ) -> Result {
if status != 200 {
- let body_txt = String::from_utf8_lossy(&resp_body).chars().take(200).collect::();
+ let body_txt = String::from_utf8_lossy(&resp_body)
+ .chars()
+ .take(200)
+ .collect::();
if should_blacklist(status, &body_txt) {
- self.blacklist_script(&script_id, &format!("HTTP {}", status));
+ self.blacklist_script(script_id, &format!("HTTP {}", status));
}
- return Err(FronterError::Relay(format!("batch tunnel HTTP {}: {}", status, body_txt)));
+ return Err(FronterError::Relay(format!(
+ "batch tunnel HTTP {}: {}",
+ status, body_txt
+ )));
}
-
let text = std::str::from_utf8(&resp_body)
.map_err(|_| FronterError::BadResponse("non-utf8 batch response".into()))?
.trim();
-
let json_str = if text.starts_with('{') {
text
} else {
let start = text.find('{').ok_or_else(|| {
- FronterError::BadResponse(format!("no json in batch response: {}", &text[..text.len().min(200)]))
+ FronterError::BadResponse(format!(
+ "no json in batch response: {}",
+ &text[..text.len().min(200)]
+ ))
})?;
let end = text.rfind('}').ok_or_else(|| {
FronterError::BadResponse("no json end in batch response".into())
})?;
&text[start..=end]
};
-
- tracing::debug!("batch response body: {}", &json_str[..json_str.len().min(500)]);
-
- let resp: BatchTunnelResponse = match serde_json::from_str(json_str) {
- Ok(v) => v,
+ // Don't log payload content. Batch responses carry base64-encoded
+ // tunneled bytes (TCP/UDP packets, possibly app data, possibly
+ // credentials), and even at debug level a leaked log line ends
+ // up in user-shared bug reports. Status + length are sufficient
+ // for diagnosis; full body is available behind RUST_LOG=trace.
+ tracing::debug!(
+ "batch response: status={} body_len={}",
+ status,
+ json_str.len()
+ );
+ tracing::trace!(
+ "batch response body (trace only): {}",
+ &json_str[..json_str.len().min(500)]
+ );
+ match serde_json::from_str(json_str) {
+ Ok(v) => Ok(v),
Err(e) => {
- tracing::error!("batch JSON parse error: {} — body: {}", e, &json_str[..json_str.len().min(300)]);
- return Err(FronterError::Json(e));
+ // Same redaction policy on the error path. Length and
+ // the serde error message are enough to locate the
+ // parse failure (offset / unexpected-token info comes
+ // from `e` itself); the raw body is trace-only.
+ tracing::error!(
+ "batch JSON parse error: {} (body_len={})",
+ e,
+ json_str.len()
+ );
+ tracing::trace!(
+ "batch parse-error body (trace only): {}",
+ &json_str[..json_str.len().min(300)]
+ );
+ Err(FronterError::Json(e))
}
- };
- self.release(entry).await;
- Ok(resp)
+ }
}
}
@@ -2730,6 +3884,32 @@ fn is_method_safe_for_fanout(method: &str) -> bool {
matches!(method.to_ascii_uppercase().as_str(), "GET" | "HEAD" | "OPTIONS")
}
+/// Recognize HTTP statuses from the h2 path that mean "this edge
+/// won't accept your fronted h2 request, but might accept the same
+/// request over h1." Used to trigger an automatic sticky-disable of
+/// the h2 fast path + h1 fallback.
+///
+/// 421 (Misdirected Request) is the spec signal: per RFC 7540
+/// §9.1.2, the server returns it when the connection's authority is
+/// not appropriate for the request URI. With domain fronting that
+/// means the edge enforced "TLS SNI must match :authority" — true
+/// on h2 (the server sees both pseudo-headers in cleartext) but
+/// historically lenient on h1 (the encrypted Host header is what
+/// the bypass relies on). Treating 421 as h2-fallback rather than
+/// "Apps Script error" prevents h2 default-on from breaking
+/// previously-working h1 deployments.
+///
+/// Other edge-level rejects (403, etc.) are ambiguous — could be a
+/// real Apps Script geoblock or a real upstream — so we don't
+/// blanket-treat them.
+///
+/// The h2 layer treats this as a "request not sent upstream"
+/// outcome (the edge rejected before forwarding to Apps Script),
+/// so falling back to h1 is safe with no duplication risk.
+fn is_h2_fronting_refusal_status(status: u16) -> bool {
+ status == 421
+}
+
/// Parse the JSON envelope from Apps Script and build a raw HTTP response.
fn parse_relay_json(body: &[u8]) -> Result, FronterError> {
let text = std::str::from_utf8(body)
@@ -2946,6 +4126,22 @@ pub struct StatsSnapshot {
/// Seconds until the next 00:00 PT rollover. Convenient for the UI
/// to render "Resets in Xh Ym" without importing time libraries.
pub today_reset_secs: u64,
+ /// Calls served by the HTTP/2 multiplexed transport, across all
+ /// entry points (Apps-Script direct, exit-node outer call,
+ /// full-mode tunnel single op, full-mode tunnel batch).
+ ///
+ /// Not comparable to `relay_calls` — that counter only sees the
+ /// Apps-Script-direct path. To gauge h2 health, compute
+ /// `h2_calls / (h2_calls + h2_fallbacks)`.
+ pub h2_calls: u64,
+ /// Calls that attempted h2 but had to fall back to h1 (per-call
+ /// failures, open timeout, backoff, sticky ALPN refusal). Same
+ /// all-entry-points scope as `h2_calls`.
+ pub h2_fallbacks: u64,
+ /// True when h2 is permanently off for this fronter (config kill
+ /// switch set, or peer refused h2 during ALPN). All traffic on the
+ /// h1 path.
+ pub h2_disabled: bool,
}
impl StatsSnapshot {
@@ -2959,8 +4155,27 @@ impl StatsSnapshot {
}
pub fn fmt_line(&self) -> String {
+ // h2 segment is the success ratio across all transports
+ // (h2_calls + h2_fallbacks). Showing "X/Y" against relay_calls
+ // would mislead — relay_calls only counts the Apps-Script
+ // direct path, while h2_calls also includes exit-node and
+ // tunnel paths that bypass relay_uncoalesced.
+ let h2_seg = if self.h2_disabled {
+ " h2=off".to_string()
+ } else {
+ let total = self.h2_calls + self.h2_fallbacks;
+ if total == 0 {
+ String::new()
+ } else {
+ let pct = (self.h2_calls as f64 / total as f64) * 100.0;
+ format!(
+ " h2-success={}/{} ({:.0}%)",
+ self.h2_calls, total, pct
+ )
+ }
+ };
format!(
- "stats: relay={} ({}KB) failures={} coalesced={} cache={}/{} ({:.0}% hit, {}KB) scripts={}/{} active",
+ "stats: relay={} ({}KB) failures={} coalesced={} cache={}/{} ({:.0}% hit, {}KB) scripts={}/{} active{}",
self.relay_calls,
self.bytes_relayed / 1024,
self.relay_failures,
@@ -2971,6 +4186,7 @@ impl StatsSnapshot {
self.cache_bytes / 1024,
self.total_scripts - self.blacklisted_scripts,
self.total_scripts,
+ h2_seg,
)
}
@@ -2983,7 +4199,7 @@ impl StatsSnapshot {
s.replace('\\', "\\\\").replace('"', "\\\"")
}
format!(
- r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{}}}"#,
+ r#"{{"relay_calls":{},"relay_failures":{},"coalesced":{},"bytes_relayed":{},"cache_hits":{},"cache_misses":{},"cache_bytes":{},"blacklisted_scripts":{},"total_scripts":{},"today_calls":{},"today_bytes":{},"today_key":"{}","today_reset_secs":{},"h2_calls":{},"h2_fallbacks":{},"h2_disabled":{}}}"#,
self.relay_calls,
self.relay_failures,
self.coalesced,
@@ -2997,6 +4213,9 @@ impl StatsSnapshot {
self.today_bytes,
esc(&self.today_key),
self.today_reset_secs,
+ self.h2_calls,
+ self.h2_fallbacks,
+ self.h2_disabled,
)
}
}
@@ -3809,4 +5028,662 @@ hello";
other => panic!("unexpected error: {}", other),
}
}
+
+ // ─── h2 transport ──────────────────────────────────────────────────
+
+ /// Generous response-phase deadline used by transport tests. We
+ /// pick something well above any expected latency on a localhost
+ /// h2c hop so test flakiness can't be confused with a real timeout
+ /// firing. Tests that *want* to observe a timeout pick a small
+ /// value explicitly.
+ const TEST_RESPONSE_DEADLINE: Duration = Duration::from_secs(10);
+
+ /// Build a minimal valid `DomainFronter` for unit tests. The
+ /// `connect_host` is unused unless a test actually opens a socket;
+ /// `verify_ssl=true` and a placeholder `google_ip` are fine because
+ /// `DomainFronter::new` doesn't touch the network.
+ fn fronter_for_test(force_http1: bool) -> DomainFronter {
+ let json = format!(
+ r#"{{
+ "mode": "apps_script",
+ "google_ip": "127.0.0.1",
+ "front_domain": "www.google.com",
+ "script_id": "TEST",
+ "auth_key": "test_auth_key",
+ "listen_host": "127.0.0.1",
+ "listen_port": 8085,
+ "log_level": "info",
+ "verify_ssl": true,
+ "force_http1": {}
+ }}"#,
+ force_http1
+ );
+ let cfg: Config = serde_json::from_str(&json).unwrap();
+ DomainFronter::new(&cfg).expect("test fronter must construct")
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn force_http1_disables_h2_at_construction() {
+ // The kill switch: force_http1=true must mark the fronter as
+ // h2-disabled before the first call so ensure_h2 short-circuits
+ // without ever trying ALPN.
+ let fronter = fronter_for_test(true);
+ assert!(
+ fronter.h2_disabled.load(Ordering::Relaxed),
+ "force_http1=true must set h2_disabled at construction"
+ );
+ assert!(
+ fronter.ensure_h2().await.is_none(),
+ "ensure_h2 must return None when h2 is disabled"
+ );
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn force_http1_false_leaves_h2_enabled() {
+ let fronter = fronter_for_test(false);
+ assert!(
+ !fronter.h2_disabled.load(Ordering::Relaxed),
+ "default must leave h2 enabled"
+ );
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn poison_h2_if_gen_is_noop_when_cell_is_empty() {
+ // Defensive: we call poison on every per-request error; cell
+ // may already be None due to a concurrent poison. Must not
+ // panic or wedge.
+ let fronter = fronter_for_test(false);
+ fronter.poison_h2_if_gen(0).await;
+ let cell = fronter.h2_cell.lock().await;
+ assert!(cell.is_none());
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn poison_h2_if_gen_only_clears_matching_generation() {
+ // Race protection: task A holds gen=1 SendRequest, gen=1 dies,
+ // task B reopens → cell now gen=2 (healthy). Task A's
+ // poison(1) MUST NOT clear gen=2. Without generation matching
+ // the previous code unconditionally cleared the cell, causing
+ // connection churn during recovery.
+ let (addr, server_handle) = spawn_h2c_server(|_req| {
+ let resp = http::Response::builder().status(200).body(()).unwrap();
+ (resp, Vec::new())
+ })
+ .await;
+ let send_v2 = h2c_client(addr).await;
+
+ let fronter = fronter_for_test(false);
+ // Seed the cell with gen=2 (simulating "task B just reopened").
+ {
+ let mut cell = fronter.h2_cell.lock().await;
+ *cell = Some(H2Cell {
+ send: send_v2.clone(),
+ created: Instant::now(),
+ generation: 2,
+ });
+ }
+ // Task A poisons with stale gen=1.
+ fronter.poison_h2_if_gen(1).await;
+ // gen=2 cell must survive.
+ let cell = fronter.h2_cell.lock().await;
+ assert!(
+ cell.is_some(),
+ "poison_h2_if_gen(1) must not clear gen=2 cell"
+ );
+ assert_eq!(cell.as_ref().unwrap().generation, 2);
+ drop(cell);
+
+ // And matching gen=2 actually does clear.
+ fronter.poison_h2_if_gen(2).await;
+ let cell = fronter.h2_cell.lock().await;
+ assert!(cell.is_none(), "poison_h2_if_gen(2) must clear gen=2 cell");
+
+ server_handle.abort();
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn ensure_h2_skips_reopen_during_failure_backoff() {
+ // After an open failure, ensure_h2 must return None for at
+ // least H2_OPEN_FAILURE_BACKOFF_SECS without attempting a
+ // new handshake — otherwise concurrent callers each pay the
+ // full handshake-timeout cost during an outage.
+ let fronter = fronter_for_test(false);
+ // Simulate a recent open failure.
+ *fronter.h2_open_failed_at.lock().await = Some(Instant::now());
+
+ // ensure_h2 must return None immediately, without trying open_h2
+ // (open_h2 would try TCP-connect to 127.0.0.1:443 which would
+ // either fail slowly or succeed against an unrelated service —
+ // either way, this test would observably take longer if backoff
+ // wasn't honored).
+ let t0 = Instant::now();
+ let result = fronter.ensure_h2().await;
+ assert!(result.is_none(), "must return None during backoff");
+ assert!(
+ t0.elapsed() < Duration::from_millis(100),
+ "must return immediately without open attempt; took {:?}",
+ t0.elapsed()
+ );
+ }
+
+ /// Spawn a minimal local h2c server (plaintext h2, no TLS) on a
+ /// random port. The handler closure builds the response from the
+ /// incoming request — used by `h2_round_trip_*` tests below.
+ /// Returns the bound address and the JoinHandle so the test can
+ /// `abort()` the server when done.
+ async fn spawn_h2c_server(
+ handler: F,
+ ) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>)
+ where
+ F: Fn(http::Request) -> (http::Response<()>, Vec)
+ + Send
+ + Sync
+ + 'static,
+ {
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let handler = Arc::new(handler);
+ let handle = tokio::spawn(async move {
+ // Single-connection server is enough for these tests.
+ let (sock, _) = listener.accept().await.unwrap();
+ let mut connection = h2::server::handshake(sock).await.unwrap();
+ while let Some(result) = connection.accept().await {
+ let (req, mut respond) = match result {
+ Ok(v) => v,
+ Err(_) => break,
+ };
+ let (resp, body) = handler(req);
+ let has_body = !body.is_empty();
+ let mut send = respond
+ .send_response(resp, !has_body)
+ .expect("send_response in test");
+ if has_body {
+ send.send_data(Bytes::from(body), true)
+ .expect("send_data in test");
+ }
+ }
+ });
+ (addr, handle)
+ }
+
+ /// Variant that gives the handler async access to the request body
+ /// before producing the response. Needed to assert what the client
+ /// actually sent (rather than relying on the request's existence).
+ async fn spawn_h2c_echo_server() -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) {
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let handle = tokio::spawn(async move {
+ let (sock, _) = listener.accept().await.unwrap();
+ let mut connection = h2::server::handshake(sock).await.unwrap();
+ while let Some(result) = connection.accept().await {
+ let (req, mut respond) = match result {
+ Ok(v) => v,
+ Err(_) => break,
+ };
+ let mut body = req.into_body();
+ let mut received = Vec::new();
+ while let Some(chunk) = body.data().await {
+ let chunk = match chunk {
+ Ok(c) => c,
+ Err(_) => break,
+ };
+ let n = chunk.len();
+ received.extend_from_slice(&chunk);
+ let _ = body.flow_control().release_capacity(n);
+ }
+ let resp = http::Response::builder().status(200).body(()).unwrap();
+ let mut send = respond.send_response(resp, false).unwrap();
+ send.send_data(Bytes::from(received), true).unwrap();
+ }
+ });
+ (addr, handle)
+ }
+
+ /// Open a plaintext h2c connection to `addr` and return a usable
+ /// `SendRequest`. The connection driver is spawned in the
+ /// background and lives for the test's scope.
+ async fn h2c_client(addr: std::net::SocketAddr) -> h2::client::SendRequest {
+ let stream = TcpStream::connect(addr).await.unwrap();
+ let (send, conn) = h2::client::handshake(stream).await.unwrap();
+ tokio::spawn(async move {
+ let _ = conn.await;
+ });
+ send
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn h2_round_trip_actually_transmits_post_body() {
+ // Server reads the request body and echoes it. We assert the
+ // server received the exact bytes we passed — proves the
+ // send_data path works, not just that 200 came back.
+ let (addr, server_handle) = spawn_h2c_echo_server().await;
+
+ let send = h2c_client(addr).await;
+ let fronter = fronter_for_test(false);
+ let req_body = b"the-actual-payload-sent-by-h2_round_trip";
+ let (status, _hdrs, echoed) = fronter
+ .h2_round_trip(
+ send,
+ "POST",
+ "/echo",
+ "127.0.0.1",
+ Bytes::from_static(req_body),
+ Some("application/json"),
+ TEST_RESPONSE_DEADLINE,
+ )
+ .await
+ .expect("h2 round trip should succeed");
+ assert_eq!(status, 200);
+ assert_eq!(
+ echoed, req_body,
+ "server must have received the exact bytes we sent"
+ );
+ server_handle.abort();
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn h2_round_trip_decodes_gzip_responses() {
+ // Mirror the h1 read_http_response behavior: gzip-encoded
+ // bodies must be transparently decompressed before we hand
+ // them back, so downstream JSON parsers see plain bytes
+ // regardless of transport.
+ use flate2::write::GzEncoder;
+ use flate2::Compression;
+ use std::io::Write;
+
+ let plain = b"{\"hello\":\"world\"}";
+ let mut enc = GzEncoder::new(Vec::new(), Compression::default());
+ enc.write_all(plain).unwrap();
+ let gzipped = enc.finish().unwrap();
+ let gzipped_arc = Arc::new(gzipped);
+
+ let g = gzipped_arc.clone();
+ let (addr, server_handle) = spawn_h2c_server(move |_req| {
+ let resp = http::Response::builder()
+ .status(200)
+ .header("content-encoding", "gzip")
+ .body(())
+ .unwrap();
+ (resp, (*g).clone())
+ })
+ .await;
+
+ let send = h2c_client(addr).await;
+ let fronter = fronter_for_test(false);
+ let (status, _hdrs, body) = fronter
+ .h2_round_trip(send, "GET", "/", "127.0.0.1", Bytes::new(), None, TEST_RESPONSE_DEADLINE)
+ .await
+ .unwrap();
+ assert_eq!(status, 200);
+ assert_eq!(body, plain, "gzip body must be decoded transparently");
+ server_handle.abort();
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn run_h2_relay_with_send_follows_redirect_chain() {
+ // Now exercises run_h2_relay_with_send (the testable inner
+ // of h2_relay_request) so the production redirect loop —
+ // including timeout, RequestSent classification, and per-hop
+ // poison-by-gen — is actually under test, not a hand-rolled
+ // duplicate.
+ let counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let c = counter.clone();
+ let (addr, server_handle) = spawn_h2c_server(move |req| {
+ let n = c.fetch_add(1, Ordering::Relaxed);
+ if n == 0 {
+ let resp = http::Response::builder()
+ .status(302)
+ .header("location", "/next")
+ .body(())
+ .unwrap();
+ (resp, Vec::new())
+ } else {
+ assert_eq!(req.uri().path(), "/next", "second hop must follow Location");
+ let resp = http::Response::builder().status(200).body(()).unwrap();
+ (resp, b"final".to_vec())
+ }
+ })
+ .await;
+
+ let send = h2c_client(addr).await;
+ let fronter = fronter_for_test(false);
+
+ let (status, _hdrs, body) = fronter
+ .run_h2_relay_with_send(
+ send,
+ /* generation */ 1,
+ "/start",
+ Bytes::new(),
+ TEST_RESPONSE_DEADLINE,
+ )
+ .await
+ .expect("h2 relay should follow redirect to 200");
+ assert_eq!(status, 200);
+ assert_eq!(body, b"final");
+ // Successful round-trip must increment h2_calls.
+ assert_eq!(fronter.h2_calls.load(Ordering::Relaxed), 1);
+ assert_eq!(fronter.h2_fallbacks.load(Ordering::Relaxed), 0);
+ server_handle.abort();
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn run_h2_relay_with_send_reports_request_sent_no_on_dead_connection() {
+ // Set up an h2c client whose connection is severed before we
+ // call run_h2_relay_with_send. The first `send.ready().await`
+ // inside h2_round_trip should fail — RequestSent::No is the
+ // correct classification (stream never opened on the wire).
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let server_task = tokio::spawn(async move {
+ // Accept the connection, do the h2 handshake, then drop.
+ // After drop the client's SendRequest will fail at ready().
+ let (sock, _) = listener.accept().await.unwrap();
+ let _connection = h2::server::handshake(sock).await.unwrap();
+ // Hold briefly so client can complete handshake, then drop.
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ });
+
+ let send = h2c_client(addr).await;
+ // Wait for server to drop.
+ server_task.await.unwrap();
+ tokio::time::sleep(Duration::from_millis(50)).await;
+
+ let fronter = fronter_for_test(false);
+ let result = fronter
+ .run_h2_relay_with_send(
+ send,
+ 1,
+ "/x",
+ Bytes::from_static(b"some-body"),
+ TEST_RESPONSE_DEADLINE,
+ )
+ .await;
+ match result {
+ Err((_, RequestSent::No)) => {} // expected
+ Err((e, RequestSent::Maybe)) => {
+ panic!("dead-conn failure classified as Maybe (unsafe to retry): {}", e)
+ }
+ Ok(_) => panic!("expected error against dropped server"),
+ }
+ // Failure must increment h2_fallbacks counter.
+ assert_eq!(fronter.h2_fallbacks.load(Ordering::Relaxed), 1);
+ assert_eq!(fronter.h2_calls.load(Ordering::Relaxed), 0);
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn run_h2_relay_with_send_reports_request_sent_maybe_on_post_send_reset() {
+ // Server accepts headers (so the request reaches it) and then
+ // resets the stream. The client sees a stream error AFTER
+ // send_request returned Ok. RequestSent::Maybe is the only
+ // safe classification — Apps Script may have started executing.
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let server_task = tokio::spawn(async move {
+ let (sock, _) = listener.accept().await.unwrap();
+ let mut connection = h2::server::handshake(sock).await.unwrap();
+ if let Some(Ok((_req, mut respond))) = connection.accept().await {
+ // Reset the stream after receiving headers — simulates
+ // the server starting to process and then bailing
+ // (matches the "Apps Script started UrlFetchApp then
+ // failed" scenario).
+ respond.send_reset(h2::Reason::INTERNAL_ERROR);
+ }
+ // Keep the connection alive briefly so the client sees the
+ // RST_STREAM rather than a connection-level close.
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ });
+
+ let send = h2c_client(addr).await;
+ let fronter = fronter_for_test(false);
+ let result = fronter
+ .run_h2_relay_with_send(
+ send,
+ 1,
+ "/x",
+ Bytes::from_static(b"body"),
+ TEST_RESPONSE_DEADLINE,
+ )
+ .await;
+ match result {
+ Err((_, RequestSent::Maybe)) => {} // expected
+ Err((e, RequestSent::No)) => panic!(
+ "post-send RST classified as No — would let caller \
+ unsafely replay non-idempotent request: {}",
+ e
+ ),
+ Ok(_) => panic!("expected error against RST_STREAM"),
+ }
+
+ server_task.await.unwrap();
+ }
+
+ // ─── NonRetryable wrapper + retry/fallback policy ────────────────────
+
+ #[test]
+ fn nonretryable_wrapper_is_not_retryable_other_variants_are() {
+ // Surfaces the contract that do_relay_with_retry and the
+ // exit-node fallback rely on. If this ever flips, those
+ // sites would silently start re-issuing post-send failures.
+ let plain = FronterError::Relay("transient".into());
+ assert!(plain.is_retryable(), "plain Relay error must be retryable");
+
+ let plain2 = FronterError::Timeout;
+ assert!(plain2.is_retryable(), "Timeout must be retryable");
+
+ let wrapped = FronterError::NonRetryable(Box::new(FronterError::Relay("post-send".into())));
+ assert!(!wrapped.is_retryable(), "NonRetryable must not be retryable");
+
+ // Display must be transparent so log lines look identical.
+ let inner_msg = "h2 response: stream RST".to_string();
+ let inner = FronterError::Relay(inner_msg.clone());
+ let wrapped = FronterError::NonRetryable(Box::new(inner));
+ let displayed = wrapped.to_string();
+ assert!(
+ displayed.contains(&inner_msg),
+ "transparent Display should surface inner: got {}",
+ displayed
+ );
+
+ // into_inner unwraps once.
+ let inner_again = wrapped.into_inner();
+ assert!(matches!(inner_again, FronterError::Relay(_)));
+ assert!(inner_again.is_retryable(), "unwrapped error is retryable");
+ }
+
+ // Note on test coverage gap: we don't have a deterministic test
+ // that the ready/back-pressure phase's timeout reports
+ // `RequestSent::No`. h2 client enforces remote
+ // `MAX_CONCURRENT_STREAMS` at `send_request` time rather than at
+ // `ready` time, so a "saturate the slots, expect ready to block"
+ // setup actually races down the response-phase path instead.
+ // The ready-arm code in `h2_round_trip` is small (single match
+ // arm with `RequestSent::No` literally written next to the
+ // timeout error) and covered by review. Other safety properties
+ // (post-send Maybe via stream RST, pre-send No via dead conn,
+ // NonRetryable wrap propagation) are covered by the tests above
+ // and below.
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn run_h2_relay_with_send_does_not_wrap_pre_send_in_nonretryable() {
+ // Regression guard: the NonRetryable wrap is the *call site's*
+ // job (do_relay_once_with applies it for unsafe methods only).
+ // run_h2_relay_with_send returns the raw RequestSent::No so
+ // the call site can decide. If h2_relay_request started
+ // wrapping unconditionally, even safe-method requests would
+ // become non-retryable on transient pre-send failures.
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let server_task = tokio::spawn(async move {
+ let (sock, _) = listener.accept().await.unwrap();
+ let _connection = h2::server::handshake(sock).await.unwrap();
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ });
+ let send = h2c_client(addr).await;
+ server_task.await.unwrap();
+ tokio::time::sleep(Duration::from_millis(50)).await;
+
+ let fronter = fronter_for_test(false);
+ let result = fronter
+ .run_h2_relay_with_send(
+ send,
+ 1,
+ "/x",
+ Bytes::from_static(b"x"),
+ TEST_RESPONSE_DEADLINE,
+ )
+ .await;
+ match result {
+ Err((e, RequestSent::No)) => {
+ assert!(
+ e.is_retryable(),
+ "pre-send error must be raw FronterError, not pre-wrapped NonRetryable; got {:?}",
+ e
+ );
+ }
+ other => panic!("expected (Err, RequestSent::No); got {:?}", other),
+ }
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn sticky_disable_h2_for_fronting_refusal_flips_disabled_and_clears_cell() {
+ // Verify the helper that runs from each call site's 421 arm:
+ // sets h2_disabled, clears the cell, rebalances counters
+ // (h2_calls -=1 since the round-trip already counted; h2_fallbacks +=1).
+ // Tests the helper directly so we don't depend on a real h2
+ // server returning 421 — call sites already exercise the
+ // status-match wiring through code review.
+ let (addr, server_handle) = spawn_h2c_server(|_req| {
+ let resp = http::Response::builder().status(200).body(()).unwrap();
+ (resp, Vec::new())
+ })
+ .await;
+ let send = h2c_client(addr).await;
+ let fronter = fronter_for_test(false);
+ // Seed the cell so we can verify it gets cleared.
+ {
+ let mut cell = fronter.h2_cell.lock().await;
+ *cell = Some(H2Cell {
+ send: send.clone(),
+ created: Instant::now(),
+ generation: 7,
+ });
+ }
+ // Pretend a round-trip just incremented h2_calls (which is
+ // what run_h2_relay_with_send does on Ok before the call site
+ // sees the 421 status).
+ fronter.h2_calls.fetch_add(1, Ordering::Relaxed);
+
+ fronter
+ .sticky_disable_h2_for_fronting_refusal(421, "test context")
+ .await;
+
+ assert!(fronter.h2_disabled.load(Ordering::Relaxed), "must sticky-disable");
+ let cell = fronter.h2_cell.lock().await;
+ assert!(cell.is_none(), "cell must be cleared");
+ assert_eq!(
+ fronter.h2_calls.load(Ordering::Relaxed),
+ 0,
+ "the h2_calls increment from the failed round-trip must be reversed"
+ );
+ assert_eq!(
+ fronter.h2_fallbacks.load(Ordering::Relaxed),
+ 1,
+ "must count as a fallback"
+ );
+ drop(cell);
+
+ // Subsequent ensure_h2 must short-circuit to None without
+ // attempting to open.
+ let t0 = Instant::now();
+ assert!(fronter.ensure_h2().await.is_none());
+ assert!(
+ t0.elapsed() < Duration::from_millis(100),
+ "sticky-disabled ensure_h2 must return immediately"
+ );
+
+ // Calling the helper a second time must not log again or
+ // double-count fallbacks beyond +1 per call.
+ fronter
+ .sticky_disable_h2_for_fronting_refusal(421, "test context")
+ .await;
+ // h2_calls would underflow without the saturating guard; assert
+ // it stays at 0.
+ assert_eq!(fronter.h2_calls.load(Ordering::Relaxed), 0);
+ // h2_fallbacks goes up unconditionally (this is "another
+ // attempt that ended up on h1") — that's fine.
+ assert_eq!(fronter.h2_fallbacks.load(Ordering::Relaxed), 2);
+
+ server_handle.abort();
+ }
+
+ #[test]
+ fn is_h2_fronting_refusal_status_only_matches_421() {
+ // Guard against the helper accidentally matching ambiguous
+ // edge statuses (403 could be a real Apps Script geoblock,
+ // 4xx generally is not a "this is h2's fault" signal).
+ assert!(is_h2_fronting_refusal_status(421));
+ for s in [200, 301, 400, 403, 404, 429, 500, 502, 503] {
+ assert!(
+ !is_h2_fronting_refusal_status(s),
+ "status {} must NOT trigger sticky h2 disable",
+ s
+ );
+ }
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn h2_handshake_post_tls_returns_alpn_refused_when_peer_picks_h1() {
+ // Verify the OpenH2Error::AlpnRefused path: if the TLS layer
+ // negotiated http/1.1 (not h2), the post-TLS helper must
+ // return the typed sentinel that ensure_h2 uses to sticky-
+ // disable. We construct a fake TlsStream by short-circuiting
+ // through a real local TLS server that only advertises h1.
+ //
+ // This needs a real TLS handshake (rustls + a self-signed
+ // cert), so we set up the smallest possible test server with
+ // ALPN forced to ["http/1.1"].
+ let cert = rcgen::generate_simple_self_signed(vec!["127.0.0.1".to_string()]).unwrap();
+ let cert_der = rustls::pki_types::CertificateDer::from(cert.cert.der().to_vec());
+ let key_der = rustls::pki_types::PrivateKeyDer::Pkcs8(
+ rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()),
+ );
+
+ let mut server_cfg = rustls::ServerConfig::builder()
+ .with_no_client_auth()
+ .with_single_cert(vec![cert_der], key_der)
+ .unwrap();
+ server_cfg.alpn_protocols = vec![b"http/1.1".to_vec()];
+ let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_cfg));
+
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+ let server = tokio::spawn(async move {
+ let (sock, _) = listener.accept().await.unwrap();
+ // Drive the handshake; the test only needs the negotiation
+ // to complete with ALPN=h1. After that we can drop.
+ let _tls = acceptor.accept(sock).await.unwrap();
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ });
+
+ // Client side: open TLS with ALPN advertising h2 + h1.1; the
+ // server picks h1 → alpn_protocol() returns "http/1.1" not "h2".
+ let mut client_cfg = rustls::ClientConfig::builder()
+ .dangerous()
+ .with_custom_certificate_verifier(Arc::new(NoVerify))
+ .with_no_client_auth();
+ client_cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
+ let connector = tokio_rustls::TlsConnector::from(Arc::new(client_cfg));
+
+ let tcp = TcpStream::connect(addr).await.unwrap();
+ let name = rustls::pki_types::ServerName::try_from("127.0.0.1").unwrap();
+ let tls = connector.connect(name, tcp).await.unwrap();
+
+ let result = DomainFronter::h2_handshake_post_tls(tls).await;
+ match result {
+ Err(OpenH2Error::AlpnRefused) => {} // expected
+ Err(other) => panic!("expected AlpnRefused, got {:?}", other),
+ Ok(_) => panic!("expected AlpnRefused, got Ok"),
+ }
+ server.await.unwrap();
+ }
}
diff --git a/src/proxy_server.rs b/src/proxy_server.rs
index a1be2780..0d29082f 100644
--- a/src/proxy_server.rs
+++ b/src/proxy_server.rs
@@ -587,7 +587,7 @@ impl ProxyServer {
// accept loops.
let keepalive_task = if let Some(keepalive_fronter) = self.fronter.clone() {
tokio::spawn(async move {
- keepalive_fronter.run_h1_keepalive().await;
+ keepalive_fronter.run_keepalive().await;
})
} else {
tokio::spawn(async move { std::future::pending::<()>().await })