diff --git a/ant-core/src/data/client/adaptive.rs b/ant-core/src/data/client/adaptive.rs index dac6895..f48a74c 100644 --- a/ant-core/src/data/client/adaptive.rs +++ b/ant-core/src/data/client/adaptive.rs @@ -82,8 +82,23 @@ pub struct ChannelMax { pub quote: usize, pub store: usize, pub fetch: usize, + /// Per-chunk peer fan-out for close-group replication. Bounded by + /// `CLOSE_GROUP_MAJORITY` (currently 4) — sending PUTs to more + /// peers than majority is wasted work since the chunk is + /// considered stored once majority acks. The floor is 1, which + /// means strictly sequential per-peer replication for a single + /// chunk's PUT (slow connections that saturate on parallel + /// streams). + pub replication: usize, } +/// Hard ceiling for the replication channel. Equals +/// `ant_protocol::CLOSE_GROUP_MAJORITY`. Defined here as a constant to +/// avoid the `adaptive` module taking a dependency on `ant_protocol` +/// — the wiring site (`chunk.rs`) is responsible for clamping the +/// effective fan-out at the protocol-defined majority. +const REPLICATION_MAX: usize = 4; + impl Default for ChannelMax { fn default() -> Self { // Generous ceilings that give the controller real headroom to @@ -94,6 +109,7 @@ impl Default for ChannelMax { quote: 128, store: 64, fetch: 256, + replication: REPLICATION_MAX, } } } @@ -160,6 +176,14 @@ impl AdaptiveConfig { self.max.quote = self.max.quote.max(self.min_concurrency); self.max.store = self.max.store.max(self.min_concurrency); self.max.fetch = self.max.fetch.max(self.min_concurrency); + // Replication ceiling cannot exceed CLOSE_GROUP_MAJORITY + // regardless of what the user / a hand-edited config supplies + // — sending PUTs to more peers than majority is meaningless. + self.max.replication = self + .max + .replication + .max(self.min_concurrency) + .min(REPLICATION_MAX); } } @@ -190,11 +214,16 @@ impl Default for AdaptiveConfig { /// small (handful of chunks); occasional larger ones are capped at /// `max_concurrency`. Start at 64 to keep small/medium downloads /// indistinguishable from the prior unbounded behavior. +/// - replication (per-chunk peer fan-out) starts at the ceiling +/// (`CLOSE_GROUP_MAJORITY`) — fast connections see the prior +/// "all majority peers in parallel" behavior. Slow uplinks halve +/// it down via the saturation classifier. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct ChannelStart { pub quote: usize, pub store: usize, pub fetch: usize, + pub replication: usize, } impl Default for ChannelStart { @@ -203,6 +232,7 @@ impl Default for ChannelStart { quote: 32, store: 8, fetch: 64, + replication: REPLICATION_MAX, } } } @@ -383,6 +413,57 @@ impl Limiter { pub fn snapshot(&self) -> usize { lock(&self.inner).current } + + /// p95 latency of recent **successful** observations in the + /// sliding window. Returns `None` if no successful samples are in + /// the window yet (e.g. cold start, or a window dominated by + /// timeouts/errors). + /// + /// Used by the adaptive store-timeout in `chunk.rs`: the timeout + /// is sized as `latency_inflation_factor × p95`, so a slow uplink + /// that observed 20-25 s successful PUTs gets a 40-50 s timeout + /// instead of the default 30 s. + #[must_use] + pub fn latency_p95(&self) -> Option { + let g = lock(&self.inner); + let mut latencies: Vec = g + .window + .iter() + .filter(|s| matches!(s.outcome, Outcome::Success)) + .map(|s| s.latency) + .collect(); + p95_of(&mut latencies) + } + + /// Read the configured `latency_inflation_factor`. Used by the + /// adaptive store-timeout sizing in `chunk.rs`. + #[must_use] + pub fn latency_inflation_factor(&self) -> f64 { + self.config.latency_inflation_factor + } + + /// Force an immediate multiplicative decrease, bypassing the + /// usual `min_window_ops` decrease gate. Used by the eager + /// saturation classifier in `chunk.rs` when an unambiguous + /// stress signal is observed on a single chunk PUT (e.g. all + /// attempted peers timed out simultaneously — the uplink is + /// saturated and we already know this without needing a window + /// of evidence). Floors at `min_concurrency`. No-op when the + /// controller is disabled. + pub fn force_decrease(&self) { + if !self.config.enabled { + return; + } + let mut g = lock(&self.inner); + g.left_slow_start = true; + let next = (g.current / 2).max(self.config.min_concurrency); + if next != g.current { + debug!(from = g.current, to = next, "adaptive: force_decrease"); + } + g.current = next; + g.samples_since_increase = 0; + g.samples_since_decrease = 0; + } } /// Outcome of evaluating one window. @@ -541,6 +622,12 @@ pub struct AdaptiveController { pub quote: Limiter, pub store: Limiter, pub fetch: Limiter, + /// Per-chunk peer fan-out for close-group replication. Bounded + /// `[1, CLOSE_GROUP_MAJORITY]`. Driven by the saturation + /// classifier in `chunk.rs` (correlated peer-timeouts within a + /// single chunk's fan-out → halve), not by the per-peer store + /// channel observations. + pub replication: Limiter, /// `pub(crate)` so external callers cannot mutate this /// post-construction. Each `Limiter` snapshots its own /// `Arc` at construction time, so external @@ -548,19 +635,15 @@ pub struct AdaptiveController { /// `enabled` check from the limiters' frozen copies. Read via /// `config()`. pub(crate) config: AdaptiveConfig, - /// Per-instance cold-start values. `warm_start` floors snapshot - /// values against THIS, not the global `ChannelStart::default()`, - /// so a controller built with custom (e.g. low) starts stays - /// faithful to its construction parameters. Constructed-once, - /// never mutated. - cold_start: ChannelStart, } impl AdaptiveController { /// Create a controller with cold-start values per channel. /// Sanitizes the config (NaN guards, floor/ceiling enforcement) - /// before constructing limiters. The supplied `start` is captured - /// as the per-instance cold-start floor for `warm_start`. + /// before constructing limiters. Cold-start values seed the + /// limiters; once `warm_start` applies a persisted snapshot, the + /// snapshot is the source of truth (snapshot-as-truth — see + /// `warm_start` doc). #[must_use] pub fn new(start: ChannelStart, config: AdaptiveConfig) -> Self { let mut config = config; @@ -568,12 +651,13 @@ impl AdaptiveController { let quote_cfg = LimiterConfig::from_adaptive(&config, config.max.quote); let store_cfg = LimiterConfig::from_adaptive(&config, config.max.store); let fetch_cfg = LimiterConfig::from_adaptive(&config, config.max.fetch); + let replication_cfg = LimiterConfig::from_adaptive(&config, config.max.replication); Self { quote: Limiter::new(start.quote, quote_cfg), store: Limiter::new(start.store, store_cfg), fetch: Limiter::new(start.fetch, fetch_cfg), + replication: Limiter::new(start.replication, replication_cfg), config, - cold_start: start, } } @@ -584,6 +668,7 @@ impl AdaptiveController { quote: self.quote.snapshot(), store: self.store.snapshot(), fetch: self.fetch.snapshot(), + replication: self.replication.snapshot(), } } @@ -599,14 +684,20 @@ impl AdaptiveController { /// Apply a previously-saved snapshot as the warm-start cap. /// - /// The effective warm value per channel is - /// `max(snapshot, self.cold_start)` — flooring at the - /// per-instance cold-start (NOT the global default) so: - /// 1. A prior bad run that pinned cap=1 doesn't pessimize this - /// run forever. - /// 2. A controller built with custom (e.g. low) cold starts for - /// benchmarking is not silently jumped above its construction - /// parameters. + /// Snapshot-as-truth: the effective warm value per channel is the + /// snapshot value, clamped only to the channel's hard + /// `[min_concurrency, max_concurrency]` bounds. The cold-start is + /// **not** used as a runtime floor — it only takes effect when no + /// snapshot exists at all. + /// + /// Rationale: a slow-uplink user whose previous run halved the + /// cap to 2 must persist that across runs. Flooring at cold-start + /// would force every fresh process to re-experience the same + /// failures before halving back down. The hard `min_concurrency` + /// bound (default 1) and the AIMD's additive-increase recovery + /// keep us safe from a stale or pathological snapshot — a + /// previously-bad uplink that's since improved climbs back up + /// over a few healthy windows. /// /// Does not clear sliding windows. When `enabled = false`, this /// is a no-op — fixed-concurrency mode means fixed-concurrency. @@ -614,12 +705,10 @@ impl AdaptiveController { if !self.config.enabled { return; } - self.quote - .warm_start(snapshot.quote.max(self.cold_start.quote)); - self.store - .warm_start(snapshot.store.max(self.cold_start.store)); - self.fetch - .warm_start(snapshot.fetch.max(self.cold_start.fetch)); + self.quote.warm_start(snapshot.quote); + self.store.warm_start(snapshot.store); + self.fetch.warm_start(snapshot.fetch); + self.replication.warm_start(snapshot.replication); } } @@ -844,7 +933,12 @@ struct PersistedState { channels: ChannelStart, } -const PERSIST_SCHEMA: u32 = 1; +/// Schema version for the on-disk snapshot file. Bumped to 2 with the +/// addition of the `replication` channel — schema-1 snapshots +/// (pre-replication-channel) are silently ignored on load and the +/// controller falls back to cold-start values, then writes a schema-2 +/// snapshot at process exit. +const PERSIST_SCHEMA: u32 = 2; const PERSIST_FILENAME: &str = "client_adaptive.json"; /// Default persistence path: `/client_adaptive.json`. Falls @@ -1005,6 +1099,7 @@ mod tests { quote: l.max_concurrency, store: l.max_concurrency, fetch: l.max_concurrency, + replication: REPLICATION_MAX, }, window_ops: l.window_ops, min_window_ops: l.min_window_ops, @@ -1171,6 +1266,7 @@ mod tests { quote: 64, store: 16, fetch: 64, + replication: REPLICATION_MAX, }, adaptive_cfg_for_tests(), ); @@ -1206,6 +1302,7 @@ mod tests { quote: 24, store: 6, fetch: 12, + replication: REPLICATION_MAX, }; save_snapshot(&path, snap); let loaded = load_snapshot(&path).unwrap(); @@ -1615,30 +1712,29 @@ mod tests { ); } - /// Persisted higher values from a prior run must beat low cold-start - /// defaults. Otherwise warm-start would silently pessimize throughput. - /// (Values BELOW cold-start are floored — see - /// `warm_start_floors_at_cold_defaults`.) + /// Persisted values from a prior run are the source of truth on + /// warm-start, regardless of the controller's cold-start values + /// (see `warm_start_uses_snapshot_below_cold_start` for the + /// below-cold-start case). #[test] - fn persisted_snapshot_warm_starts_above_cold_floor() { + fn persisted_snapshot_is_warm_start_source_of_truth() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("client_adaptive.json"); - // All snapshot values ABOVE the production cold-start defaults - // so the warm_start floor doesn't kick in. let saved = ChannelStart { quote: 64, store: 32, fetch: 128, + replication: REPLICATION_MAX, }; save_snapshot(&path, saved); let loaded = load_snapshot(&path).unwrap(); - // Build a controller with intentionally low cold-start values - // — these get overridden by warm_start. + // Cold-start values are intentionally below the snapshot. let low = ChannelStart { quote: 2, store: 2, fetch: 2, + replication: REPLICATION_MAX, }; let c = AdaptiveController::new(low, AdaptiveConfig::default()); c.warm_start(loaded); @@ -1661,11 +1757,13 @@ mod tests { quote: 10, store: 10, fetch: 10, + replication: REPLICATION_MAX, }; let snap_b = ChannelStart { quote: 99, store: 99, fetch: 99, + replication: REPLICATION_MAX, }; let h_a = thread::spawn(move || { for _ in 0..50 { @@ -1701,6 +1799,7 @@ mod tests { quote: 1, store: 1, fetch: 1, + replication: REPLICATION_MAX, }; // No panic = pass. Function returns unit, errors are logged. save_snapshot(&path, snap); @@ -1755,6 +1854,7 @@ mod tests { quote: 0, // sub-min; sanitize raises to min store: 0, fetch: 0, + replication: REPLICATION_MAX, }, window_ops: 10, min_window_ops: 50, // > window_ops; sanitize clamps @@ -1909,6 +2009,7 @@ mod tests { quote: 4, // tightly capped store: 8, // moderate fetch: 1024, // very high + replication: REPLICATION_MAX, }, ..AdaptiveConfig::default() }; @@ -1917,6 +2018,7 @@ mod tests { quote: 4, store: 8, fetch: 64, + replication: REPLICATION_MAX, }, cfg, ); @@ -2028,37 +2130,38 @@ mod tests { // ---- Codex review (round 3) regression tests ---- - /// Codex CRITICAL: warm_start was blindly restoring caps below the - /// cold-start floor. A prior bad run that drove store=1 would - /// pessimize every subsequent run forever. The fix floors warm - /// values at `ChannelStart::default()` per channel. + /// Snapshot-as-truth: a prior run that halved store=1 must persist + /// across runs. Cold-start is no longer used as a runtime floor on + /// warm-start — the snapshot is authoritative, bounded only by the + /// hard `min_concurrency` (default 1) and the per-channel max. + /// This is essential for slow-uplink users whose AIMD has learned + /// they need low fan-out: every fresh process must boot pre-tuned. #[test] - fn warm_start_floors_at_cold_defaults() { + fn warm_start_uses_snapshot_below_cold_start() { let c = AdaptiveController::default(); - let cold = ChannelStart::default(); - // Snapshot from a "bad prior run" — every channel pinned to 1. - let bad_snap = ChannelStart { + // Snapshot from a "previously-saturated" run — every channel + // halved repeatedly down to the floor. + let low_snap = ChannelStart { quote: 1, store: 1, fetch: 1, + replication: REPLICATION_MAX, }; - c.warm_start(bad_snap); - // After warm_start, each channel should be AT LEAST the - // cold-start value, not the persisted 1. + c.warm_start(low_snap); assert_eq!( c.quote.current(), - cold.quote, - "quote warm_start did not floor at cold default" + 1, + "quote warm_start must trust low snapshot, not floor at cold" ); assert_eq!( c.store.current(), - cold.store, - "store warm_start did not floor at cold default" + 1, + "store warm_start must trust low snapshot, not floor at cold" ); assert_eq!( c.fetch.current(), - cold.fetch, - "fetch warm_start did not floor at cold default" + 1, + "fetch warm_start must trust low snapshot, not floor at cold" ); } @@ -2072,6 +2175,7 @@ mod tests { quote: cold.quote * 2, store: cold.store * 4, fetch: cold.fetch * 2, + replication: REPLICATION_MAX, }; c.warm_start(snap); assert_eq!(c.quote.current(), snap.quote); @@ -2204,6 +2308,7 @@ mod tests { quote: 100, store: 50, fetch: 200, + replication: REPLICATION_MAX, }; save_snapshot(&path, snap); // The file must exist immediately after save_snapshot returns. @@ -2249,32 +2354,37 @@ mod tests { ); } - /// Codex CR-3 fix: warm_start floors against the per-instance - /// cold-start, NOT the global ChannelStart::default. A controller - /// built with custom low starts must stay faithful to its - /// construction parameters even after warm_start. + /// Snapshot-as-truth: `warm_start` no longer floors against the + /// per-instance cold-start. The snapshot is authoritative, + /// bounded only by `[min_concurrency, max_concurrency]` (handled + /// inside `Limiter::warm_start`). A controller's construction + /// cold-start values are used only when no snapshot exists at + /// all. #[test] - fn controller_warm_start_floors_at_per_instance_cold_start() { + fn controller_warm_start_uses_snapshot_below_cold_start() { let custom_cold = ChannelStart { quote: 2, - store: 1, + store: 8, fetch: 4, + replication: REPLICATION_MAX, }; let c = AdaptiveController::new(custom_cold, AdaptiveConfig::default()); - // Snapshot below the per-instance cold-start floors at custom values. + // Snapshot below the per-instance cold-start IS HONORED now. c.warm_start(ChannelStart { quote: 1, store: 1, fetch: 1, + replication: REPLICATION_MAX, }); - assert_eq!(c.quote.current(), 2); + assert_eq!(c.quote.current(), 1); assert_eq!(c.store.current(), 1); - assert_eq!(c.fetch.current(), 4); - // Snapshot above the per-instance cold-start uses the snapshot. + assert_eq!(c.fetch.current(), 1); + // Snapshot above the per-instance cold-start also honored. c.warm_start(ChannelStart { quote: 10, store: 10, fetch: 10, + replication: REPLICATION_MAX, }); assert_eq!(c.quote.current(), 10); assert_eq!(c.store.current(), 10); @@ -2294,12 +2404,14 @@ mod tests { quote: 5, store: 5, fetch: 5, + replication: REPLICATION_MAX, }; let c = AdaptiveController::new(custom_cold, cfg); c.warm_start(ChannelStart { quote: 100, store: 100, fetch: 100, + replication: REPLICATION_MAX, }); assert_eq!(c.quote.current(), 5, "warm_start moved cap when disabled"); assert_eq!(c.store.current(), 5, "warm_start moved cap when disabled"); @@ -2440,6 +2552,7 @@ mod tests { quote: i + 1, store: i + 1, fetch: i + 1, + replication: REPLICATION_MAX, }, ); } @@ -2745,6 +2858,7 @@ mod tests { quote: 7, store: 13, fetch: 200, + replication: REPLICATION_MAX, }; let json = serde_json::to_string(&m).unwrap(); let back: ChannelMax = serde_json::from_str(&json).unwrap(); @@ -2759,6 +2873,7 @@ mod tests { quote: 11, store: 22, fetch: 33, + replication: REPLICATION_MAX, }; let json = serde_json::to_string(&s).unwrap(); let back: ChannelStart = serde_json::from_str(&json).unwrap(); @@ -2893,6 +3008,7 @@ mod tests { quote: 1, store: 1, fetch: 1, + replication: REPLICATION_MAX, }, ); let stop = std::sync::Arc::new(AtomicBool::new(false)); @@ -2907,6 +3023,7 @@ mod tests { quote: i, store: i, fetch: i, + replication: REPLICATION_MAX, }, ); i = i.wrapping_add(1).max(1); @@ -2949,6 +3066,7 @@ mod tests { quote: 1, store: 1, fetch: 1, + replication: REPLICATION_MAX, }; let started = Instant::now(); save_snapshot_with_timeout(path, snap, Duration::from_secs(5)); @@ -2984,6 +3102,7 @@ mod tests { quote: 1, store: 1, fetch: 1, + replication: REPLICATION_MAX, }; let started = Instant::now(); // Deadline so short that on most machines the writer is diff --git a/ant-core/src/data/client/batch.rs b/ant-core/src/data/client/batch.rs index d1ff614..068fe0a 100644 --- a/ant-core/src/data/client/batch.rs +++ b/ant-core/src/data/client/batch.rs @@ -562,24 +562,22 @@ impl Client { let store_limiter = self.controller().store.clone(); let store_concurrency = store_limiter.current().min(to_retry.len().max(1)); + // Per-peer observation lives in `spawn_chunk_put`. The + // chunk-level wrap was too coarse for small files — a + // single Outcome per chunk could not cross + // `min_window_ops` in time to react before retries + // exhausted. let mut upload_stream = stream::iter(to_retry) .map(|chunk| { let chunk_clone = chunk.clone(); - let limiter = store_limiter.clone(); async move { - let result = observe_op( - &limiter, - || async move { - self.chunk_put_to_close_group( - chunk.content, - chunk.proof_bytes, - &chunk.quoted_peers, - ) - .await - }, - classify_error, - ) - .await; + let result = self + .chunk_put_to_close_group( + chunk.content, + chunk.proof_bytes, + &chunk.quoted_peers, + ) + .await; (chunk_clone, result) } }) diff --git a/ant-core/src/data/client/chunk.rs b/ant-core/src/data/client/chunk.rs index dab11a8..4b9fb96 100644 --- a/ant-core/src/data/client/chunk.rs +++ b/ant-core/src/data/client/chunk.rs @@ -3,6 +3,8 @@ //! Chunks are immutable, content-addressed data blocks where the address //! is the BLAKE3 hash of the content. +use crate::data::client::adaptive::{observe_op, Outcome}; +use crate::data::client::classify_error; use crate::data::client::peer_cache::record_peer_outcome; use crate::data::client::Client; use crate::data::error::{Error, Result}; @@ -21,16 +23,68 @@ use tracing::{debug, warn}; /// Data type identifier for chunks (used in quote requests). const CHUNK_DATA_TYPE: u32 = 0; -/// Store-response timeout for non-merkle chunk PUTs. -const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(30); - -fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration { +/// Cold-start timeout for non-merkle chunk PUTs when the store +/// channel has no recent successful observations to derive from. Once +/// the AIMD's sliding window contains successful PUTs, the timeout +/// is derived from observed p95 latency × `latency_inflation_factor` +/// — see `adaptive_store_timeout`. Matches the prior static value so +/// the cold path is identical to historical behavior. +const COLD_START_STORE_TIMEOUT: Duration = Duration::from_secs(30); + +/// Hard ceiling on the adaptive store timeout. Prevents indefinitely +/// growing timeouts on a hopelessly broken peer set. 10 minutes is +/// well past any plausible legitimate single-peer PUT — at typical +/// chunk sizes a 100 kbps uplink (~500 s for a 4 MB chunk) is the +/// outer envelope of "still trying". +const MAX_STORE_TIMEOUT: Duration = Duration::from_secs(600); + +/// Pick the store-response timeout for one peer PUT. +/// +/// **Merkle path** uses the configured `store_timeout_secs` directly +/// — its proof-of-payment construction has a separate latency +/// envelope and that knob is the user-facing tunable for it. +/// +/// **Single-payment path** is adaptive: we derive a timeout from the +/// store channel's observed p95 latency, multiplied by +/// `latency_inflation_factor` (default 2.0). The configured +/// `store_timeout_secs` is honored as a **floor** so users can raise +/// the minimum (slow uplink) but never reduce below the floor on +/// cold start. `COLD_START_STORE_TIMEOUT` (30 s) is used when the +/// store channel has no successful samples yet, preserving the +/// historic cold-path behavior. +fn store_response_timeout_for_proof( + proof: &[u8], + config_store_timeout_secs: u64, + store_limiter: &crate::data::client::adaptive::Limiter, +) -> Duration { match detect_proof_type(proof) { - Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs), - _ => STORE_RESPONSE_TIMEOUT, + Some(ProofType::Merkle) => Duration::from_secs(config_store_timeout_secs), + _ => adaptive_store_timeout(config_store_timeout_secs, store_limiter), } } +fn adaptive_store_timeout( + config_floor_secs: u64, + store_limiter: &crate::data::client::adaptive::Limiter, +) -> Duration { + let floor = Duration::from_secs(config_floor_secs).max(Duration::from_secs(1)); + let derived = match store_limiter.latency_p95() { + Some(p95) => { + let factor = store_limiter.latency_inflation_factor(); + // mul_f64 with non-finite/negative factors would panic; + // the limiter's `sanitize` guards float fields, but be + // explicit here too in case of future config drift. + if factor.is_finite() && factor > 0.0 { + p95.mul_f64(factor) + } else { + COLD_START_STORE_TIMEOUT + } + } + None => COLD_START_STORE_TIMEOUT, + }; + derived.max(floor).min(MAX_STORE_TIMEOUT) +} + impl Client { /// Store a chunk on the Autonomi network with payment. /// @@ -65,10 +119,23 @@ impl Client { /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set. /// - /// Initially sends the PUT concurrently to the first - /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the - /// remaining peers in the quoted set until majority is reached or - /// all peers are exhausted. + /// Maintains up to `parallelism` peer PUTs in flight at all times, + /// where `parallelism` comes from the adaptive controller's + /// `replication` channel and is bounded by + /// `[1, min(peers.len(), CLOSE_GROUP_MAJORITY)]`. Each completion + /// (success or failure) tops up the in-flight set with the next + /// available peer until majority succeeds or peers are exhausted. + /// + /// On slow uplinks the controller drops `parallelism` toward 1 + /// (sequential per-peer replication) so a single chunk doesn't + /// saturate the uplink with `MAJORITY × ~4 MB` simultaneous + /// streams. On fast connections it stays at the ceiling + /// (`CLOSE_GROUP_MAJORITY`) for minimum wall-clock latency. + /// + /// At completion, observes the chunk-level outcome on the + /// `replication` channel and force-decreases on the saturation + /// signature (≥ ⅔ of attempted peers timed out) — see + /// `classify_replication_outcome`. /// /// # Errors /// @@ -82,20 +149,39 @@ impl Client { ) -> Result { let address = compute_address(&content); - let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY); - let (initial_peers, fallback_peers) = peers.split_at(initial_count); + let replication_limiter = self.controller().replication.clone(); + let parallelism = replication_limiter + .current() + .min(peers.len()) + .clamp(1, CLOSE_GROUP_MAJORITY); + let started = Instant::now(); + let mut peers_iter = peers.iter(); let mut put_futures = FuturesUnordered::new(); - for (peer_id, addrs) in initial_peers { - put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs)); + + // Seed up to `parallelism` peers. The remainder are pulled in + // by the top-up logic in the loop, one per completion. + for _ in 0..parallelism { + if let Some((peer_id, addrs)) = peers_iter.next() { + put_futures.push(self.spawn_chunk_put( + content.clone(), + proof.clone(), + peer_id, + addrs, + )); + } else { + break; + } } let mut success_count = 0usize; + let mut attempted = 0usize; + let mut timeout_count = 0usize; let mut failures: Vec = Vec::new(); - let mut fallback_iter = fallback_peers.iter(); while let Some((peer_id, result)) = put_futures.next().await { - match result { + attempted += 1; + match &result { Ok(_) => { success_count += 1; if success_count >= CLOSE_GROUP_MAJORITY { @@ -103,29 +189,56 @@ impl Client { "Chunk {} stored on {success_count} peers (majority reached)", hex::encode(address) ); + observe_replication( + &replication_limiter, + Outcome::Success, + started.elapsed(), + attempted, + timeout_count, + ); return Ok(address); } } Err(e) => { + if matches!(e, Error::Timeout(_)) { + timeout_count += 1; + } warn!("Failed to store chunk on {peer_id}: {e}"); failures.push(format!("{peer_id}: {e}")); - - if let Some((fb_peer, fb_addrs)) = fallback_iter.next() { - debug!( - "Falling back to peer {fb_peer} for chunk {}", - hex::encode(address) - ); - put_futures.push(self.spawn_chunk_put( - content.clone(), - proof.clone(), - fb_peer, - fb_addrs, - )); - } } } + // Top up the in-flight set with the next peer (regardless + // of whether the just-completed put succeeded or failed). + // This pattern is what makes `parallelism = 1` work as + // strict per-peer sequential replication: one slot stays + // refilled until peers are exhausted or majority succeeds. + if let Some((next_peer, next_addrs)) = peers_iter.next() { + debug!( + "Seeding next peer {next_peer} for chunk {}", + hex::encode(address) + ); + put_futures.push(self.spawn_chunk_put( + content.clone(), + proof.clone(), + next_peer, + next_addrs, + )); + } } + // Chunk failed: classify the failure mode for the replication + // channel. ≥ ⅔ of attempted peers timing out is the saturation + // signature (see clip15 in PROD-LOCAL-UL-01: 11+ timeouts in + // the same millisecond) — feed it as Outcome::Timeout AND + // force_decrease so the next chunk runs at lower fan-out. + observe_replication( + &replication_limiter, + classify_replication_outcome(attempted, timeout_count), + started.elapsed(), + attempted, + timeout_count, + ); + Err(Error::InsufficientPeers(format!( "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]", failures.join("; ") @@ -133,6 +246,16 @@ impl Client { } /// Spawn a chunk PUT future for a single peer. + /// + /// Each peer PUT is observed individually on the **store** channel + /// of the adaptive controller. Per-peer (rather than per-chunk) + /// granularity matters on small uploads: a 3-chunk file with + /// majority-3 fan-out yields 9 samples per attempt, which crosses + /// `min_window_ops` (default 8) within a single attempt, so the + /// AIMD can react before the upload exhausts retries. Cancellation + /// of in-flight peer futures (early return on majority) is + /// silent — `ObserveGuard::Drop` only commits if the inner future + /// resolves. fn spawn_chunk_put<'a>( &'a self, content: Bytes, @@ -141,10 +264,17 @@ impl Client { addrs: &'a [MultiAddr], ) -> impl Future)> + 'a { let peer_id_owned = *peer_id; + let store_limiter = self.controller().store.clone(); async move { - let result = self - .chunk_put_with_proof(content, proof, &peer_id_owned, addrs) - .await; + let result = observe_op( + &store_limiter, + || async move { + self.chunk_put_with_proof(content, proof, &peer_id_owned, addrs) + .await + }, + classify_error, + ) + .await; (peer_id_owned, result) } } @@ -166,7 +296,11 @@ impl Client { ) -> Result { let address = compute_address(&content); let node = self.network().node(); - let timeout = store_response_timeout_for_proof(&proof, self.config().store_timeout_secs); + let timeout = store_response_timeout_for_proof( + &proof, + self.config().store_timeout_secs, + &self.controller().store, + ); let timeout_secs = timeout.as_secs(); let request_id = self.next_request_id(); @@ -363,9 +497,62 @@ impl Client { } } +/// Fraction of attempted peers that must timeout for a failed +/// close-group put to be classified as bandwidth saturation rather +/// than individual peer trouble. ⅔ is conservative: one or two slow +/// peers shouldn't shrink the fan-out — but most peers timing out +/// together points squarely at the uplink. +const SATURATION_TIMEOUT_RATIO: (usize, usize) = (2, 3); + +/// Classify a chunk-level close-group PUT failure for the replication +/// channel. +/// +/// - **Saturation**: ≥ ⅔ of attempted peers timed out → `Timeout` (a +/// capacity signal that drives the AIMD down). +/// - **Other failure**: peers exhausted, mixed network errors, etc. +/// → `ApplicationError` (no capacity signal — the chunk failed for +/// reasons unrelated to local fan-out concurrency). +/// +/// `attempted == 0` (no peers to try at all) returns +/// `ApplicationError`: not a fan-out problem, an upstream error. +fn classify_replication_outcome(attempted: usize, timeouts: usize) -> Outcome { + if attempted == 0 { + return Outcome::ApplicationError; + } + let (num, den) = SATURATION_TIMEOUT_RATIO; + if timeouts * den >= attempted * num { + Outcome::Timeout + } else { + Outcome::ApplicationError + } +} + +/// Record one chunk-level outcome on the replication channel and, on +/// the saturation signature, also force an immediate halve. Eager +/// halving bypasses the AIMD `min_window_ops` decrease gate because +/// the saturation signature is unambiguous on its own — every peer +/// in the fan-out timing out together means the uplink couldn't keep +/// up, and we already know that without needing a window of evidence. +fn observe_replication( + limiter: &crate::data::client::adaptive::Limiter, + outcome: Outcome, + latency: std::time::Duration, + attempted: usize, + timeouts: usize, +) { + limiter.observe(outcome, latency); + let (num, den) = SATURATION_TIMEOUT_RATIO; + if attempted > 0 && timeouts * den >= attempted * num { + limiter.force_decrease(); + } +} + #[cfg(test)] mod tests { use super::*; + use crate::data::client::adaptive::{ + AdaptiveConfig, AdaptiveController, ChannelStart, Limiter, + }; use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE}; /// Arbitrary configured Merkle store timeout used by the timeout-selection tests. @@ -373,27 +560,133 @@ mod tests { /// Sentinel byte used to represent an unknown/unrecognized proof tag. const UNKNOWN_PROOF_TAG: u8 = 0xff; - #[test] - fn single_node_proof_uses_store_response_timeout() { - let timeout = - store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS); - - assert_eq!(timeout, STORE_RESPONSE_TIMEOUT); + /// Build an isolated store `Limiter` for testing the + /// timeout-derivation logic without spinning up a full client. + /// Goes through `AdaptiveController::new` so the limiter sees the + /// production-shape window/ewma config. + fn store_limiter_for_tests() -> Limiter { + let controller = + AdaptiveController::new(ChannelStart::default(), AdaptiveConfig::default()); + controller.store.clone() } #[test] - fn unknown_proof_uses_store_response_timeout() { - let timeout = - store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS); + fn single_node_proof_uses_cold_start_timeout_until_baseline_exists() { + let limiter = store_limiter_for_tests(); + let timeout = store_response_timeout_for_proof( + &[PROOF_TAG_SINGLE_NODE], + 10, // config floor below cold-start + &limiter, + ); + // No successful samples → cold-start timeout. + assert_eq!(timeout, COLD_START_STORE_TIMEOUT); + } - assert_eq!(timeout, STORE_RESPONSE_TIMEOUT); + #[test] + fn unknown_proof_uses_cold_start_timeout_until_baseline_exists() { + let limiter = store_limiter_for_tests(); + let timeout = store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], 10, &limiter); + assert_eq!(timeout, COLD_START_STORE_TIMEOUT); } #[test] fn merkle_proof_uses_configured_store_timeout() { - let timeout = - store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS); + let limiter = store_limiter_for_tests(); + let timeout = store_response_timeout_for_proof( + &[PROOF_TAG_MERKLE], + TEST_MERKLE_TIMEOUT_SECS, + &limiter, + ); assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS)); } + + /// When the store channel has accumulated successful PUT + /// observations, the single-payment timeout grows with observed + /// p95 × inflation factor — this is the slow-uplink rescue. + #[test] + fn adaptive_timeout_grows_with_observed_p95_latency() { + let limiter = store_limiter_for_tests(); + // Feed enough successful samples for the window to populate. + // The default config uses min_window_ops=8 / window_ops=32. + for _ in 0..16 { + limiter.observe(Outcome::Success, Duration::from_secs(20)); + } + let timeout = store_response_timeout_for_proof( + &[PROOF_TAG_SINGLE_NODE], + 10, // config floor + &limiter, + ); + // p95 ≈ 20 s, factor = 2.0 → ~40 s. Should be well above + // both the floor (10 s) and the cold-start default (30 s). + assert!( + timeout > COLD_START_STORE_TIMEOUT, + "expected adaptive timeout > {COLD_START_STORE_TIMEOUT:?}, got {timeout:?}", + ); + assert!( + timeout >= Duration::from_secs(35), + "expected ≥35 s based on p95×2, got {timeout:?}", + ); + } + + /// `store_timeout_secs` is honored as a floor: a user pinning a + /// high value via `--store-timeout` always raises the minimum, + /// even when observed p95 would suggest a lower derived timeout. + #[test] + fn config_store_timeout_floor_raises_minimum() { + let limiter = store_limiter_for_tests(); + for _ in 0..16 { + limiter.observe(Outcome::Success, Duration::from_secs(2)); + } + let timeout = store_response_timeout_for_proof( + &[PROOF_TAG_SINGLE_NODE], + 120, // pinned floor of 2 minutes + &limiter, + ); + // p95 ≈ 2 s × 2.0 = 4 s, but the config floor is 120 s. + assert_eq!(timeout, Duration::from_secs(120)); + } + + /// The adaptive timeout is capped at `MAX_STORE_TIMEOUT` so a + /// pathologically slow peer set cannot drive it to infinity. + #[test] + fn adaptive_timeout_caps_at_max_ceiling() { + let limiter = store_limiter_for_tests(); + for _ in 0..16 { + limiter.observe(Outcome::Success, Duration::from_secs(1000)); + } + let timeout = store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], 10, &limiter); + assert_eq!(timeout, MAX_STORE_TIMEOUT); + } + + /// `classify_replication_outcome` flags ≥⅔ timeouts as the + /// saturation signature, otherwise falls through to non-capacity + /// signal. + #[test] + fn saturation_classifier_recognizes_majority_timeout_as_capacity_signal() { + // 4 attempts, 3 timeouts (75% > 66%) → Timeout + assert_eq!( + classify_replication_outcome(4, 3), + Outcome::Timeout, + "75% timeouts must classify as saturation" + ); + // 4 attempts, 2 timeouts (50% < 66%) → ApplicationError + assert_eq!( + classify_replication_outcome(4, 2), + Outcome::ApplicationError, + "50% timeouts is not the saturation signature" + ); + // 3 attempts, 2 timeouts (66.6% ≥ 66%) → Timeout + assert_eq!( + classify_replication_outcome(3, 2), + Outcome::Timeout, + "exactly ⅔ timeouts must trigger saturation" + ); + // No attempts → ApplicationError (upstream error, not fan-out). + assert_eq!( + classify_replication_outcome(0, 0), + Outcome::ApplicationError, + "no attempts is not a fan-out signal" + ); + } } diff --git a/ant-core/src/data/client/file.rs b/ant-core/src/data/client/file.rs index cfedec6..fb73338 100644 --- a/ant-core/src/data/client/file.rs +++ b/ant-core/src/data/client/file.rs @@ -1438,7 +1438,6 @@ impl Client { let store_concurrency = store_limiter.current().min(wave.len().max(1)); let mut upload_stream = stream::iter(wave.into_iter().map(|(content, addr)| { let proof_bytes = batch_result.proofs.get(&addr).cloned(); - let limiter = store_limiter.clone(); async move { let proof = proof_bytes.ok_or_else(|| { ( @@ -1450,16 +1449,11 @@ impl Client { ) })?; let peers = self.close_group_peers(&addr).await.map_err(|e| (addr, e))?; - observe_op( - &limiter, - || async move { - self.chunk_put_to_close_group(content, proof, &peers).await - }, - classify_error, - ) - .await - .map(|_| addr) - .map_err(|e| (addr, e)) + // Per-peer observation lives in `spawn_chunk_put`. + self.chunk_put_to_close_group(content, proof, &peers) + .await + .map(|_| addr) + .map_err(|e| (addr, e)) } })) .buffer_unordered(store_concurrency); diff --git a/ant-core/src/data/client/merkle.rs b/ant-core/src/data/client/merkle.rs index ac2e9a1..9d00071 100644 --- a/ant-core/src/data/client/merkle.rs +++ b/ant-core/src/data/client/merkle.rs @@ -4,8 +4,6 @@ //! by paying for the entire batch in a single on-chain transaction instead //! of one transaction per chunk. -use crate::data::client::adaptive::observe_op; -use crate::data::client::classify_error; use crate::data::client::file::UploadEvent; use crate::data::client::Client; use crate::data::error::{Error, Result}; @@ -544,7 +542,6 @@ impl Client { let mut upload_stream = stream::iter(chunk_contents.into_iter().zip(addresses).map( |(content, addr)| { let proof_bytes = batch_result.proofs.get(&addr).cloned(); - let limiter = store_limiter.clone(); async move { let proof = proof_bytes.ok_or_else(|| { Error::Payment(format!( @@ -553,14 +550,8 @@ impl Client { )) })?; let peers = self.close_group_peers(&addr).await?; - observe_op( - &limiter, - || async move { - self.chunk_put_to_close_group(content, proof, &peers).await - }, - classify_error, - ) - .await + // Per-peer observation lives in `spawn_chunk_put`. + self.chunk_put_to_close_group(content, proof, &peers).await } }, ))