From caae9009da5a00b177ecf3608b1a05e5413a61bb Mon Sep 17 00:00:00 2001 From: Chris O'Neil Date: Tue, 12 May 2026 20:55:30 +0100 Subject: [PATCH] feat(storage): emit per-RPC latency log for put/get handlers Adds a structured `info!` log event per chunk-store PUT and GET RPC with duration_ms, outcome, and address (and chunk_size on PUT). The target is `ant_node::storage::rpc_latency` so Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency histograms from the existing telegraf log forwarding without any node-side metric pipeline. Motivates DEV-01 post-mortem: aggregate upload throughput halved over 12h before any node failures, file-size sensitivity points at CLOSE_GROUP quorum / slowest-peer cost on the client, but the node-side per-RPC service time is currently unobservable. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/storage/handler.rs | 56 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/src/storage/handler.rs b/src/storage/handler.rs index ab40c39..41527e4 100644 --- a/src/storage/handler.rs +++ b/src/storage/handler.rs @@ -164,7 +164,38 @@ impl AntProtocol { } /// Handle a PUT request. + /// + /// Wraps `handle_put_inner` to emit a single structured tracing event per + /// PUT RPC at every exit path, including early-return validation paths. + /// The event uses `target: "ant_node::storage::rpc_latency"` so that + /// Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency + /// histograms from the existing telegraf log forwarding. async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse { + let start = std::time::Instant::now(); + let addr_hex = hex::encode(request.address); + let chunk_size = request.content.len(); + let response = self.handle_put_inner(request).await; + let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); + let outcome: &'static str = match &response { + ChunkPutResponse::Success { .. } => "success", + ChunkPutResponse::AlreadyExists { .. } => "already_exists", + ChunkPutResponse::PaymentRequired { .. } => "payment_required", + ChunkPutResponse::Error(_) => "error", + _ => "unknown", + }; + info!( + target: "ant_node::storage::rpc_latency", + duration_ms, + chunk_size, + outcome, + addr = %addr_hex, + "put_rpc" + ); + response + } + + /// Inner body of `handle_put` — see the wrapper for the per-RPC latency log. + async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse { let address = request.address; let addr_hex = hex::encode(address); debug!("Handling PUT request for {addr_hex}"); @@ -254,7 +285,32 @@ impl AntProtocol { } /// Handle a GET request. + /// + /// Wraps `handle_get_inner` to emit a single structured tracing event per + /// GET RPC at every exit path. See `handle_put` for the rationale. async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse { + let start = std::time::Instant::now(); + let addr_hex = hex::encode(request.address); + let response = self.handle_get_inner(request).await; + let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); + let outcome: &'static str = match &response { + ChunkGetResponse::Success { .. } => "success", + ChunkGetResponse::NotFound { .. } => "not_found", + ChunkGetResponse::Error(_) => "error", + _ => "unknown", + }; + info!( + target: "ant_node::storage::rpc_latency", + duration_ms, + outcome, + addr = %addr_hex, + "get_rpc" + ); + response + } + + /// Inner body of `handle_get` — see the wrapper for the per-RPC latency log. + async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse { let address = request.address; let addr_hex = hex::encode(address); debug!("Handling GET request for {addr_hex}");