Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 102 additions & 7 deletions src/domain_fronter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,32 @@ pub struct TunnelResponse {
/// `e` only when this is `None` and compatibility is needed.
#[serde(default)]
pub code: Option<String>,
/// Per-session sequence number echoed from the request's `seq` (if any).
/// `None` from servers that don't speak the pipelining protocol — the
/// client treats absence as "not pipelining-capable" and uses the
/// single-in-flight loop. Present and matching the request's seq
/// otherwise; the client routes the response into the per-session
/// reorder buffer keyed by this value.
///
/// `u64` (not `u32`): a long-lived TCP session generating ~100 ops/s
/// would hit `u32::MAX` in ~1.4 years, at which point the server's
/// `expected` saturates and refuses every subsequent op. `u64`
/// pushes that horizon past any realistic hardware lifetime.
#[serde(default)]
pub seq: Option<u64>,
/// Capability bitfield, advertised on `connect` / `connect_data`
/// success responses by tunnel-nodes that speak ≥ this protocol
/// version. Bit 0 = `CAPS_PIPELINE_SEQ`. Old tunnel-nodes return
/// `None`; the client opts into pipelining only when the bit is set.
#[serde(default)]
pub caps: Option<u32>,
}

/// Capability bit advertised by a tunnel-node that supports per-session
/// `data`-op sequence numbers (per-session pipelining). Only checked on
/// `connect` / `connect_data` success replies.
pub const CAPS_PIPELINE_SEQ: u32 = 1 << 0;

/// A single op in a batch tunnel request.
#[derive(Serialize, Clone, Debug)]
pub struct BatchOp {
Expand All @@ -449,6 +473,16 @@ pub struct BatchOp {
pub port: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub d: Option<String>,
/// Per-session monotonic sequence number for `data` ops on a
/// pipelining-enabled session. Lets the tunnel-node enforce
/// in-order processing of ops that arrive in different batches
/// (potentially via different deployments and network paths).
/// Skipped from the wire when `None`; old tunnel-nodes that don't
/// recognize the field ignore it (serde default + structural-only
/// JSON dispatch on the server). `u64` to keep long-lived sessions
/// from saturating — see `TunnelResponse::seq` for the math.
#[serde(skip_serializing_if = "Option::is_none")]
pub seq: Option<u64>,
}

/// Batch tunnel response from Apps Script / tunnel node.
Expand Down Expand Up @@ -2793,10 +2827,35 @@ impl DomainFronter {
/// Like `tunnel_batch_request` but targets a specific deployment ID.
/// Used by the pipeline mux to pin a batch to a deployment whose
/// per-account concurrency slot has already been acquired.
///
/// Backward-compatible 3-arg signature: forwards to
/// `tunnel_batch_request_with_timeout` using the configured
/// `self.batch_timeout`. New callers that need pipelined seq's
/// longer effective budget should use the explicit
/// `tunnel_batch_request_with_timeout` directly.
pub async fn tunnel_batch_request_to(
&self,
script_id: &str,
ops: &[BatchOp],
) -> Result<BatchTunnelResponse, FronterError> {
self.tunnel_batch_request_with_timeout(script_id, ops, self.batch_timeout)
.await
}

/// Targets a specific deployment with an explicit per-batch
/// timeout, applied to both the h2 fast path (via
/// `h2_relay_request`'s `response_deadline`) and the h1
/// fallback's header read. The caller computes this: legacy
/// batches use `self.batch_timeout`, pipelined batches use
/// `max(self.batch_timeout, PIPELINED_BATCH_TIMEOUT_FLOOR)` so a
/// valid server-side wait of `SEQ_WAIT_TIMEOUT +
/// LONGPOLL_DEADLINE` doesn't fire a spurious transport timeout
/// AND a deployment timeout strike.
pub async fn tunnel_batch_request_with_timeout(
&self,
script_id: &str,
ops: &[BatchOp],
batch_timeout: Duration,
) -> Result<BatchTunnelResponse, FronterError> {
let mut map = serde_json::Map::new();
map.insert("k".into(), Value::String(self.auth_key.clone()));
Expand All @@ -2813,11 +2872,14 @@ impl DomainFronter {
// `data`/`udp_data`/`connect` may have already executed
// upstream when the response framing failed. Replaying the
// whole batch on h1 risks duplicating every op in it. Only
// fall back when h2 definitely never sent. Honors
// user-configured batch_timeout so a slow but legitimate
// batch isn't cut off at an arbitrary fixed cap.
// fall back when h2 definitely never sent. Honors the
// caller-supplied `batch_timeout` (legacy = configured
// `request_timeout_secs`; pipelined = max(configured,
// PIPELINED_BATCH_TIMEOUT_FLOOR)) so a slow but legitimate
// batch isn't cut off at an arbitrary fixed cap, and
// pipelined-batch server-side waits stay inside the budget.
match self
.h2_relay_request(&path, payload.clone(), self.batch_timeout)
.h2_relay_request(&path, payload.clone(), batch_timeout)
.await
{
Ok((status, _hdrs, _resp_body)) if is_h2_fronting_refusal_status(status) => {
Expand Down Expand Up @@ -2865,8 +2927,14 @@ impl DomainFronter {
entry.stream.write_all(&payload).await?;
entry.stream.flush().await?;

// Use `batch_timeout` for the per-read deadline on this h1
// fallback. The legacy 10 s default in `read_http_response`
// would fire before a pipelined batch's server-side
// `SEQ_WAIT_TIMEOUT + LONGPOLL_DEADLINE` (~45 s) wait could
// legitimately complete — that fired as "batch timed out"
// client-side AND recorded a deployment timeout strike.
let (mut status, mut resp_headers, mut resp_body) =
read_http_response(&mut entry.stream).await?;
read_http_response_with_timeout(&mut entry.stream, batch_timeout).await?;

// Follow redirect chain
for _ in 0..5 {
Expand All @@ -2879,7 +2947,7 @@ impl DomainFronter {
);
entry.stream.write_all(req.as_bytes()).await?;
entry.stream.flush().await?;
let (s, h, b) = read_http_response(&mut entry.stream).await?;
let (s, h, b) = read_http_response_with_timeout(&mut entry.stream, batch_timeout).await?;
status = s; resp_headers = h; resp_body = b;
}

Expand Down Expand Up @@ -3665,14 +3733,41 @@ fn parse_redirect(location: &str) -> (String, Option<String>) {

/// Read a single HTTP/1.1 response from the stream. Keep-alive safe: respects
/// Content-Length or chunked transfer-encoding.
/// Default first-byte / per-read deadline for `read_http_response`.
/// Single-tunnel-op and relay paths use this — Apps Script normally
/// streams headers within ~1 s, so 10 s catches stalls without
/// false-firing on jitter. The batched-tunnel path overrides via
/// `read_http_response_with_timeout` because pipelined batches can
/// legitimately wait `SEQ_WAIT_TIMEOUT + LONGPOLL_DEADLINE` before
/// Apps Script sends a single response byte.
const DEFAULT_HTTP_READ_TIMEOUT: Duration = Duration::from_secs(10);

async fn read_http_response<S>(stream: &mut S) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
where
S: tokio::io::AsyncRead + Unpin,
{
read_http_response_with_timeout(stream, DEFAULT_HTTP_READ_TIMEOUT).await
}

/// Like [`read_http_response`] but with a caller-controlled per-read
/// deadline. Used by the batched-tunnel h1 path so a pipelined batch
/// whose server-side worst-case wait is `SEQ_WAIT_TIMEOUT (30 s) +
/// LONGPOLL_DEADLINE (15 s)` doesn't get cut off at the legacy 10 s
/// header read — that fired before the response could legitimately
/// arrive, surfacing as a "batch timed out" client-side AND
/// recording a timeout strike against an otherwise-healthy
/// deployment.
async fn read_http_response_with_timeout<S>(
stream: &mut S,
read_timeout: Duration,
) -> Result<(u16, Vec<(String, String)>, Vec<u8>), FronterError>
where
S: tokio::io::AsyncRead + Unpin,
{
let mut buf = Vec::with_capacity(8192);
let mut tmp = [0u8; 8192];
let header_end = loop {
let n = timeout(Duration::from_secs(10), stream.read(&mut tmp)).await
let n = timeout(read_timeout, stream.read(&mut tmp)).await
.map_err(|_| FronterError::Timeout)??;
if n == 0 {
return Err(FronterError::BadResponse("connection closed before headers".into()));
Expand Down
Loading
Loading