From 51c6c933642846b1c6526042547316f880a54bc2 Mon Sep 17 00:00:00 2001 From: jr-kenny Date: Sat, 13 Jun 2026 15:46:26 +0100 Subject: [PATCH] fix(streaming): reject Fin with out-of-range sequence instead of overflowing A Fin proposal-part carries a peer-controlled sequence, and StreamState::insert turned it straight into expected_messages with `msg.sequence as usize + 1`. The comment there claimed that could not overflow because of MAX_MESSAGES_PER_STREAM, but expected_messages comes from the Fin sequence, not the message cap, so a Fin at u64::MAX overflowed and panicked in debug builds (and wedged the stream in release until the age sweep). A complete stream can never hold more than MAX_MESSAGES_PER_STREAM messages, so a Fin whose sequence is >= MAX_MESSAGES_PER_STREAM describes a stream that can never complete. Reject and evict it up front like the other malformed-stream paths, which also removes the overflow. Corrects the misleading comment and adds a regression test for the u64::MAX and boundary cases. --- crates/malachite-app/src/streaming.rs | 63 ++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/crates/malachite-app/src/streaming.rs b/crates/malachite-app/src/streaming.rs index 56f1bf5..13f2f19 100644 --- a/crates/malachite-app/src/streaming.rs +++ b/crates/malachite-app/src/streaming.rs @@ -226,6 +226,7 @@ enum StreamInsertResult { Incomplete(Option), ExceededMaxMessages, ExceededMaxChunkSize(usize), + FinSequenceOutOfRange(u64), Complete(Vec), } @@ -275,13 +276,23 @@ impl StreamState { // This is the `Fin` message. if msg.is_fin() { + // `fin.sequence` is the index of the stream's last message, so a complete + // stream holds `fin.sequence + 1` messages. A stream can never hold more + // than `MAX_MESSAGES_PER_STREAM` messages (enforced above), so a Fin whose + // sequence is `>= MAX_MESSAGES_PER_STREAM` describes a stream that can never + // be completed. Reject it instead of recording an `expected_messages` that + // can never be reached. This also prevents the `msg.sequence as usize + 1` + // below from overflowing `usize` for sequences near `u64::MAX` (the + // sequence is a peer-controlled wire field and is otherwise unbounded). + if msg.sequence >= MAX_MESSAGES_PER_STREAM as u64 { + return StreamInsertResult::FinSequenceOutOfRange(msg.sequence); + } + self.fin_received = true; - // If we have received the fin message, we can determine when we will be done. - // We are done if we have already received all messages from 0 to fin.sequence, - // included. That is to say, if we have received `fin.sequence + 1` messages. - // Sequence is a u64 protocol field; on 64-bit targets usize == u64. - // The +1 cannot overflow because MAX_MESSAGES_PER_STREAM << u64::MAX. + // We are done once we have received all messages from 0 to fin.sequence, + // included, i.e. `fin.sequence + 1` messages. Bounded by + // MAX_MESSAGES_PER_STREAM above, so the cast and `+ 1` cannot overflow. #[allow(clippy::cast_possible_truncation, clippy::arithmetic_side_effects)] { self.expected_messages = msg.sequence as usize + 1; @@ -516,6 +527,19 @@ impl PartStreamsMap { return InsertResult::Pending; } + StreamInsertResult::FinSequenceOutOfRange(sequence) => { + warn!( + %peer_id, + %stream_id, + sequence, + max = MAX_MESSAGES_PER_STREAM, + "Stream sent Fin with out-of-range sequence, evicting" + ); + + self.evict(&key); + return InsertResult::Pending; + } + StreamInsertResult::Complete(parts) => { self.streams.remove(&key); parts @@ -734,6 +758,35 @@ mod tests { // --- Unit Tests --- + #[test] + fn test_fin_with_out_of_range_sequence_is_rejected_without_overflow() { + // A Fin whose sequence implies more than MAX_MESSAGES_PER_STREAM messages + // describes a stream that can never complete (the message count is capped at + // MAX_MESSAGES_PER_STREAM). It must be rejected cleanly rather than recording + // an unreachable `expected_messages`, and for sequences near u64::MAX rather + // than overflowing `msg.sequence as usize + 1` (which previously panicked in + // debug builds). The sequence is a peer-controlled wire field, so this is + // reachable from a single gossiped proposal-part message. + let peer = PeerId::random(); + let mut map = PartStreamsMap::new(Height::new(1), NUM_VALIDATORS); + + // Extreme case: a lone Fin at u64::MAX (would overflow before the fix). + let stream = new_stream_id(Height::new(1), Round::new(0), 0); + let result = map.insert(peer, make_fin_message(&stream, u64::MAX)); + assert!(matches!(result, InsertResult::Pending)); + assert!( + map.streams.is_empty(), + "out-of-range Fin must not leave a tracked stream" + ); + + // Boundary: MAX_MESSAGES_PER_STREAM is one past the largest index a complete + // stream can have (valid indices are 0..MAX_MESSAGES_PER_STREAM). + let stream = new_stream_id(Height::new(1), Round::new(0), 1); + let result = map.insert(peer, make_fin_message(&stream, MAX_MESSAGES_PER_STREAM as u64)); + assert!(matches!(result, InsertResult::Pending)); + assert!(map.streams.is_empty()); + } + #[test] fn test_insert_single_message_stream_not_complete() { let peer_1 = PeerId::random();