diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 682d39d4..f46a7208 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -47,6 +47,7 @@ impl BlockChain { is_aggregator: bool, ) -> BlockChain { metrics::set_is_aggregator(is_aggregator); + metrics::set_node_sync_status(metrics::SyncStatus::Idle); let genesis_time = store.config().genesis_time; let key_manager = key_manager::KeyManager::new(validator_keys); let handle = BlockChainServer { @@ -217,11 +218,14 @@ impl BlockChainServer { fn propose_block(&mut self, slot: u64, validator_id: u64) { info!(%slot, %validator_id, "We are the proposer for this slot"); + let _timing = metrics::time_block_building(); + // Build the block with attestation signatures let Ok((block, attestation_signatures, _post_checkpoints)) = store::produce_block_with_signatures(&mut self.store, slot, validator_id) .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) else { + metrics::inc_block_building_failures(); return; }; @@ -232,6 +236,7 @@ impl BlockChainServer { .sign_block_root(validator_id, slot as u32, &block_root) .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to sign block root")) else { + metrics::inc_block_building_failures(); return; }; @@ -249,9 +254,12 @@ impl BlockChainServer { // Process the block locally before publishing if let Err(err) = self.process_block(signed_block.clone()) { error!(%slot, %validator_id, %err, "Failed to process built block"); + metrics::inc_block_building_failures(); return; }; + metrics::inc_block_building_success(); + // Publish to gossip network if let Some(ref p2p) = self.p2p { let _ = p2p @@ -264,10 +272,21 @@ impl BlockChainServer { fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> { store::on_block(&mut self.store, signed_block)?; - metrics::update_head_slot(self.store.head_slot()); + let head_slot = self.store.head_slot(); + metrics::update_head_slot(head_slot); metrics::update_latest_justified_slot(self.store.latest_justified().slot); metrics::update_latest_finalized_slot(self.store.latest_finalized().slot); metrics::update_validators_count(self.key_manager.validator_ids().len() as u64); + + // Update sync status based on head slot vs wall clock slot + let current_slot = self.store.time() / INTERVALS_PER_SLOT; + let status = if head_slot >= current_slot { + metrics::SyncStatus::Synced + } else { + metrics::SyncStatus::Syncing + }; + metrics::set_node_sync_status(status); + for table in ALL_TABLES { metrics::update_table_bytes(table.name(), self.store.estimate_table_bytes(table)); } diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 20092cd7..e59c5b9a 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -261,7 +261,7 @@ static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = .unwrap() }); +// --- Block Production --- + +static LEAN_BLOCK_AGGREGATED_PAYLOADS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_aggregated_payloads", + "Number of aggregated_payloads in a block", + vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_building_payload_aggregation_time_seconds", + "Time taken to build aggregated_payloads during block building", + vec![0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_building_time_seconds", + "Time taken to build a block", + vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_SUCCESS_TOTAL: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_counter!( + "lean_block_building_success_total", + "Successful block builds" + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_FAILURES_TOTAL: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_counter!("lean_block_building_failures_total", "Failed block builds").unwrap() + }); + +// --- Sync Status --- + +/// Node synchronization status. +pub enum SyncStatus { + Idle, + Syncing, + Synced, +} + +impl SyncStatus { + fn as_str(&self) -> &'static str { + match self { + SyncStatus::Idle => "idle", + SyncStatus::Syncing => "syncing", + SyncStatus::Synced => "synced", + } + } + + const ALL: &[&str] = &["idle", "syncing", "synced"]; +} + +static LEAN_NODE_SYNC_STATUS: std::sync::LazyLock = std::sync::LazyLock::new(|| { + register_int_gauge_vec!("lean_node_sync_status", "Node sync status", &["status"]).unwrap() +}); + // --- Initialization --- /// Register all metrics with the Prometheus registry so they appear in `/metrics` from startup. @@ -315,6 +386,14 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH); + // Block production + std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_SUCCESS_TOTAL); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_FAILURES_TOTAL); + // Sync status + std::sync::LazyLock::force(&LEAN_NODE_SYNC_STATUS); } // --- Public API --- @@ -476,3 +555,38 @@ pub fn set_attestation_committee_count(count: u64) { pub fn observe_fork_choice_reorg_depth(depth: u64) { LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64); } + +/// Observe the number of aggregated payloads in a built block. +pub fn observe_block_aggregated_payloads(count: usize) { + LEAN_BLOCK_AGGREGATED_PAYLOADS.observe(count as f64); +} + +/// Start timing payload aggregation during block building. Records duration when the guard is dropped. +pub fn time_block_building_payload_aggregation() -> TimingGuard { + TimingGuard::new(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS) +} + +/// Start timing block building. Records duration when the guard is dropped. +pub fn time_block_building() -> TimingGuard { + TimingGuard::new(&LEAN_BLOCK_BUILDING_TIME_SECONDS) +} + +/// Increment the successful block builds counter. +pub fn inc_block_building_success() { + LEAN_BLOCK_BUILDING_SUCCESS_TOTAL.inc(); +} + +/// Increment the failed block builds counter. +pub fn inc_block_building_failures() { + LEAN_BLOCK_BUILDING_FAILURES_TOTAL.inc(); +} + +/// Set the node sync status. Sets the given status label to 1 and all others to 0. +pub fn set_node_sync_status(status: SyncStatus) { + let active = status.as_str(); + for label in SyncStatus::ALL { + LEAN_NODE_SYNC_STATUS + .with_label_values(&[label]) + .set(i64::from(*label == active)); + } +} diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 33fd04e3..0f544d6d 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -1015,14 +1015,19 @@ pub fn produce_block_with_signatures( let known_block_roots = store.get_block_roots(); - let (block, signatures, post_checkpoints) = build_block( - &head_state, - slot, - validator_index, - head_root, - &known_block_roots, - &aggregated_payloads, - )?; + let (block, signatures, post_checkpoints) = { + let _timing = metrics::time_block_building_payload_aggregation(); + build_block( + &head_state, + slot, + validator_index, + head_root, + &known_block_roots, + &aggregated_payloads, + )? + }; + + metrics::observe_block_aggregated_payloads(signatures.len()); Ok((block, signatures, post_checkpoints)) } diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index eacaf447..4d418463 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -15,7 +15,7 @@ use super::{ attestation_subnet_topic, }, }; -use crate::P2PServer; +use crate::{P2PServer, metrics}; pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let Event::Message { @@ -34,6 +34,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { else { return; }; + metrics::observe_gossip_block_size(uncompressed_data.len()); let Ok(signed_block) = SignedBlock::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) @@ -65,6 +66,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { else { return; }; + metrics::observe_gossip_aggregation_size(uncompressed_data.len()); let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) @@ -94,6 +96,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { else { return; }; + metrics::observe_gossip_attestation_size(uncompressed_data.len()); let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index 14059d1d..d6d2c29b 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -68,6 +68,68 @@ static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock = LazyLock: .unwrap() }); +// --- Gossip Message Size Histograms --- + +static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram!( + "lean_gossip_block_size_bytes", + "Bytes size of a gossip block message", + vec![ + 10_000.0, + 50_000.0, + 100_000.0, + 250_000.0, + 500_000.0, + 1_000_000.0, + 2_000_000.0, + 5_000_000.0 + ] + ) + .unwrap() +}); + +static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram!( + "lean_gossip_attestation_size_bytes", + "Bytes size of a gossip attestation message", + vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0] + ) + .unwrap() +}); + +static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram!( + "lean_gossip_aggregation_size_bytes", + "Bytes size of a gossip aggregated attestation message", + vec![ + 1024.0, + 4096.0, + 16384.0, + 65536.0, + 131_072.0, + 262_144.0, + 524_288.0, + 1_048_576.0 + ] + ) + .unwrap() +}); + +/// Observe the size of a gossip block message. +pub fn observe_gossip_block_size(bytes: usize) { + LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64); +} + +/// Observe the size of a gossip attestation message. +pub fn observe_gossip_attestation_size(bytes: usize) { + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64); +} + +/// Observe the size of a gossip aggregated attestation message. +pub fn observe_gossip_aggregation_size(bytes: usize) { + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64); +} + /// Set the attestation committee subnet gauge. pub fn set_attestation_committee_subnet(subnet_id: u64) { static LEAN_ATTESTATION_COMMITTEE_SUBNET: LazyLock = LazyLock::new(|| {