diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 9f70a805..75e330f8 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -222,11 +222,16 @@ impl BlockChainServer { /// Build and publish a block for the given slot and validator. fn propose_block(&mut self, slot: u64, validator_id: u64) { info!(%slot, %validator_id, "We are the proposer for this slot"); + let _block_timing = metrics::time_block_building(); // Build the block with attestation signatures let Ok((block, attestation_signatures, post_checkpoints)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) + store::produce_block_with_signatures(&mut self.store, slot, validator_id).inspect_err( + |err| { + metrics::inc_block_building_failures(); + error!(%slot, %validator_id, %err, "Failed to build block"); + }, + ) else { return; }; @@ -278,10 +283,13 @@ impl BlockChainServer { // Process the block locally before publishing if let Err(err) = self.process_block(signed_block.clone()) { + metrics::inc_block_building_failures(); error!(%slot, %validator_id, %err, "Failed to process built block"); return; }; + metrics::inc_block_building_success(); + // Publish to gossip network if let Some(ref p2p) = self.p2p { let _ = p2p diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 20092cd7..71a886ee 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -261,11 +261,55 @@ static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_aggregated_payloads", + "Number of aggregated_payloads in a block", + vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_building_payload_aggregation_time_seconds", + "Time taken to build aggregated_payloads during block building", + vec![0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_building_time_seconds", + "Time taken to build a block", + vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_SUCCESS_TOTAL: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_counter!( + "lean_block_building_success_total", + "Successful block builds" + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_FAILURES_TOTAL: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_counter!("lean_block_building_failures_total", "Failed block builds").unwrap() + }); + static LEAN_FORK_CHOICE_REORG_DEPTH: std::sync::LazyLock = std::sync::LazyLock::new(|| { register_histogram!( @@ -314,6 +358,11 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_SUCCESS_TOTAL); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_FAILURES_TOTAL); std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH); } @@ -476,3 +525,28 @@ pub fn set_attestation_committee_count(count: u64) { pub fn observe_fork_choice_reorg_depth(depth: u64) { LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64); } + +/// Observe the number of aggregated payloads in a produced block. +pub fn observe_block_aggregated_payloads(count: usize) { + LEAN_BLOCK_AGGREGATED_PAYLOADS.observe(count as f64); +} + +/// Start timing payload aggregation during block building. Records duration when the guard is dropped. +pub fn time_block_building_payload_aggregation() -> TimingGuard { + TimingGuard::new(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS) +} + +/// Start timing block building. Records duration when the guard is dropped. +pub fn time_block_building() -> TimingGuard { + TimingGuard::new(&LEAN_BLOCK_BUILDING_TIME_SECONDS) +} + +/// Increment the successful block builds counter. +pub fn inc_block_building_success() { + LEAN_BLOCK_BUILDING_SUCCESS_TOTAL.inc(); +} + +/// Increment the failed block builds counter. +pub fn inc_block_building_failures() { + LEAN_BLOCK_BUILDING_FAILURES_TOTAL.inc(); +} diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index eb8c4fec..4f7add8c 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -823,6 +823,8 @@ pub fn produce_block_with_signatures( &aggregated_payloads, )?; + metrics::observe_block_aggregated_payloads(signatures.len()); + Ok((block, signatures, post_checkpoints)) } @@ -1037,6 +1039,8 @@ fn build_block( let mut accumulated_proof_bytes: usize = 0; if !aggregated_payloads.is_empty() { + let _payload_timing = metrics::time_block_building_payload_aggregation(); + // Genesis edge case: when building on genesis (slot 0), // process_block_header will set latest_justified.root = parent_root. // Derive this upfront so attestation filtering matches. diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 7ab52430..516e0ab3 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -15,7 +15,7 @@ use super::{ attestation_subnet_topic, }, }; -use crate::P2PServer; +use crate::{P2PServer, metrics}; pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let Event::Message { @@ -29,6 +29,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let topic_kind = message.topic.as_str().split("/").nth(3); match topic_kind { Some(BLOCK_TOPIC_KIND) => { + metrics::observe_gossip_block_size(message.data.len()); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) else { @@ -60,6 +61,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(AGGREGATION_TOPIC_KIND) => { + metrics::observe_gossip_aggregation_size(message.data.len()); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) else { @@ -89,6 +91,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { + metrics::observe_gossip_attestation_size(message.data.len()); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index 14059d1d..4254628f 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -95,6 +95,54 @@ pub fn notify_peer_connected(peer_id: &Option, direction: &str, result: } } +static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + ethlambda_metrics::register_histogram!( + "lean_gossip_block_size_bytes", + "Bytes size of a gossip block message", + vec![ + 10000.0, 50000.0, 100000.0, 250000.0, 500000.0, 1000000.0, 2000000.0, 5000000.0 + ] + ) + .unwrap() +}); + +static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = + LazyLock::new(|| { + ethlambda_metrics::register_histogram!( + "lean_gossip_attestation_size_bytes", + "Bytes size of a gossip attestation message", + vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0] + ) + .unwrap() + }); + +static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = + LazyLock::new(|| { + ethlambda_metrics::register_histogram!( + "lean_gossip_aggregation_size_bytes", + "Bytes size of a gossip aggregated attestation message", + vec![ + 1024.0, 4096.0, 16384.0, 65536.0, 131072.0, 262144.0, 524288.0, 1048576.0 + ] + ) + .unwrap() + }); + +/// Observe the compressed size of a gossip block message. +pub fn observe_gossip_block_size(bytes: usize) { + LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64); +} + +/// Observe the compressed size of a gossip attestation message. +pub fn observe_gossip_attestation_size(bytes: usize) { + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64); +} + +/// Observe the compressed size of a gossip aggregated attestation message. +pub fn observe_gossip_aggregation_size(bytes: usize) { + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64); +} + /// Notify that a peer disconnected. /// /// Decrements the connected peer count and increments the disconnection event counter.