From 78ba842834025187a1802aa2a3116e8478072647 Mon Sep 17 00:00:00 2001 From: Francois Massot Date: Sun, 19 Apr 2026 19:21:14 +0200 Subject: [PATCH 1/3] fix(lambda): rate-limit retry warn log to avoid log storms Co-Authored-By: Claude Sonnet 4.6 --- quickwit/quickwit-lambda-client/src/invoker.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-lambda-client/src/invoker.rs b/quickwit/quickwit-lambda-client/src/invoker.rs index c8ffa0716a0..69f7cb6519d 100644 --- a/quickwit/quickwit-lambda-client/src/invoker.rs +++ b/quickwit/quickwit-lambda-client/src/invoker.rs @@ -27,7 +27,7 @@ use quickwit_common::retry::RetryParams; use quickwit_lambda_server::{LambdaSearchRequestPayload, LambdaSearchResponsePayload}; use quickwit_proto::search::{LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest}; use quickwit_search::{LambdaLeafSearchInvoker, SearchError}; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, instrument}; use crate::metrics::LAMBDA_METRICS; @@ -203,7 +203,8 @@ impl AwsLambdaInvoker { LambdaInvokeError::Permanent(_) => return Err(error.into_search_error()), }; - warn!( + quickwit_common::rate_limited_warn!( + limit_per_min = 10, num_attempts = num_attempts, delay_ms = delay.as_millis(), "lambda invocation failed, retrying" From 62036e891bd10c5f819aaf34690307bfc0c60488 Mon Sep 17 00:00:00 2001 From: Francois Massot Date: Wed, 22 Apr 2026 23:18:16 +0200 Subject: [PATCH 2/3] feat(rate-limited-log): report suppressed message count on period reset When the rate limiter opens a new window after suppressing messages, it now emits a preceding log at the same level: "suppressed N similar log messages in the last minute" Closes #6322. Co-Authored-By: Claude Sonnet 4.6 --- .../src/rate_limited_tracing.rs | 69 ++++++++++++------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index 198c2bf8bdd..cf465bd0ca4 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -49,12 +49,15 @@ impl From for u64 { /// Helper function used in [`rate_limited_tracing`] to determine if this line should log, /// and update the related counters. +/// +/// Returns `(should_log, skipped)` where `skipped` is the number of suppressed calls since +/// the last reset. `skipped` is non-zero only on the first log after a rate-limit period ends. pub fn should_log Instant>( count_atomic: &AtomicU64, last_reset_atomic: &AtomicU64, limit: u32, now: F, -) -> bool { +) -> (bool, u32) { // count_atomic is treated as 2 u32: upper bits count "generation", lower bits count number of // calls since LAST_RESET. We assume there won't be 2**32 calls to this log in ~60s. // Generation is free to wrap around. @@ -73,7 +76,7 @@ pub fn should_log Instant>( } = logsite_meta_u64.into(); if call_count < limit { - return true; + return (true, 0); } let current_time = Duration::from_ticks(now().as_ticks()); @@ -83,7 +86,7 @@ pub fn should_log Instant>( if !should_reset { // we are over-limit and not far enough in time to reset: don't log - return false; + return (false, 0); } let mut update_time = false; @@ -123,7 +126,16 @@ pub fn should_log Instant>( // *we* updated generation, so we must update last_reset too last_reset_atomic.store(current_time.as_ticks(), Ordering::Release); } - can_log + + // call_count is the pre-fetch_add value, equal to the number of prior calls in this period. + // Of those, `limit` were allowed through; the rest were suppressed. + // We only report when *we* did the reset (update_time), to avoid double-reporting. + let skipped = if update_time { + call_count.saturating_sub(limit) + } else { + 0 + }; + (can_log, skipped) } #[macro_export] @@ -136,7 +148,11 @@ macro_rules! rate_limited_tracing { // we can't get time from constant context, so we pre-initialize with zero static LAST_RESET: AtomicU64 = AtomicU64::new(0); - if $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) { + let (can_log, skipped) = $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now); + if skipped > 0 { + ::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute"); + } + if can_log { ::tracing::$log_fn!($($args)*); } }}; @@ -207,9 +223,10 @@ mod tests { let mut simulated_time = Instant::now(); let simulation_step = Duration::from_secs(1); - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + (true, 0) + ); assert_eq!(count.load(Ordering::Relaxed), 1); let reset_timestamp = last_reset.load(Ordering::Relaxed); assert_ne!(reset_timestamp, 0); @@ -218,9 +235,10 @@ mod tests { for i in 1..limit { // we log as many time as expected - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + (true, 0) + ); assert_eq!(count.load(Ordering::Relaxed), i + 1); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; @@ -228,9 +246,10 @@ mod tests { for i in limit..(limit * 2) { // we don't log, nor update - assert!(!should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + (false, 0) + ); assert_eq!(count.load(Ordering::Relaxed), i + 1); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; @@ -239,9 +258,11 @@ mod tests { // advance enough to reset counter simulated_time += simulation_step * 60; - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + // the first log after a reset reports how many were suppressed + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + (true, limit as u32) + ); // counter got reset, generation increased assert_eq!(count.load(Ordering::Relaxed), 1 + (1 << 32)); // last reset changed too @@ -250,9 +271,10 @@ mod tests { for i in 1..limit { // we log as many time as expected - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + (true, 0) + ); assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32)); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; @@ -260,9 +282,10 @@ mod tests { for i in limit..(limit * 2) { // we don't log, nor update - assert!(!should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + (false, 0) + ); assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32)); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; From e30ade83b9f9e5e79c52c3f41438747ae76e71a2 Mon Sep 17 00:00:00 2001 From: Francois Massot Date: Thu, 23 Apr 2026 09:48:08 +0200 Subject: [PATCH 3/3] refactor(rate-limited-log): replace (bool, u32) return with ShouldLog enum The three possible outcomes (log normally, log after suppression, suppress) are now distinct enum variants, removing the implicit invariant that the skipped count is only meaningful when the bool is true. Co-Authored-By: Claude Sonnet 4.6 --- .../src/rate_limited_tracing.rs | 68 +++++++++++-------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index cf465bd0ca4..f9e745999d1 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -19,6 +19,16 @@ use std::sync::atomic::{AtomicU64, Ordering}; use coarsetime::{Duration, Instant}; +#[derive(Debug, PartialEq)] +pub enum ShouldLog { + /// Emit the log normally, within the rate limit. + Yes, + /// Emit the log; `N` similar messages were suppressed since the last emission. + YesAfterSuppression(u32), + /// Suppressed — do not emit. + No, +} + /// Metadata for a log site. This is stored inside a single AtomicU64 when not in use. /// /// `call_count` is the number of calls since the last upgrade of generation, it's stored @@ -49,15 +59,12 @@ impl From for u64 { /// Helper function used in [`rate_limited_tracing`] to determine if this line should log, /// and update the related counters. -/// -/// Returns `(should_log, skipped)` where `skipped` is the number of suppressed calls since -/// the last reset. `skipped` is non-zero only on the first log after a rate-limit period ends. pub fn should_log Instant>( count_atomic: &AtomicU64, last_reset_atomic: &AtomicU64, limit: u32, now: F, -) -> (bool, u32) { +) -> ShouldLog { // count_atomic is treated as 2 u32: upper bits count "generation", lower bits count number of // calls since LAST_RESET. We assume there won't be 2**32 calls to this log in ~60s. // Generation is free to wrap around. @@ -76,7 +83,7 @@ pub fn should_log Instant>( } = logsite_meta_u64.into(); if call_count < limit { - return (true, 0); + return ShouldLog::Yes; } let current_time = Duration::from_ticks(now().as_ticks()); @@ -86,7 +93,7 @@ pub fn should_log Instant>( if !should_reset { // we are over-limit and not far enough in time to reset: don't log - return (false, 0); + return ShouldLog::No; } let mut update_time = false; @@ -127,15 +134,19 @@ pub fn should_log Instant>( last_reset_atomic.store(current_time.as_ticks(), Ordering::Release); } + if !can_log { + return ShouldLog::No; + } // call_count is the pre-fetch_add value, equal to the number of prior calls in this period. // Of those, `limit` were allowed through; the rest were suppressed. // We only report when *we* did the reset (update_time), to avoid double-reporting. - let skipped = if update_time { - call_count.saturating_sub(limit) - } else { - 0 - }; - (can_log, skipped) + if update_time { + let skipped = call_count.saturating_sub(limit); + if skipped > 0 { + return ShouldLog::YesAfterSuppression(skipped); + } + } + ShouldLog::Yes } #[macro_export] @@ -148,12 +159,15 @@ macro_rules! rate_limited_tracing { // we can't get time from constant context, so we pre-initialize with zero static LAST_RESET: AtomicU64 = AtomicU64::new(0); - let (can_log, skipped) = $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now); - if skipped > 0 { - ::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute"); - } - if can_log { - ::tracing::$log_fn!($($args)*); + match $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) { + $crate::rate_limited_tracing::ShouldLog::No => {} + $crate::rate_limited_tracing::ShouldLog::Yes => { + ::tracing::$log_fn!($($args)*); + } + $crate::rate_limited_tracing::ShouldLog::YesAfterSuppression(skipped) => { + ::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute"); + ::tracing::$log_fn!($($args)*); + } } }}; } @@ -209,7 +223,7 @@ mod tests { use coarsetime::{Duration, Instant}; - use super::should_log; + use super::{should_log, ShouldLog}; // TODO as this is atomic code, we should test it with multiple threads to verify it behaves // like we'd expect, maybe using something like `loom`? @@ -225,7 +239,7 @@ mod tests { assert_eq!( should_log(&count, &last_reset, limit as _, || simulated_time), - (true, 0) + ShouldLog::Yes ); assert_eq!(count.load(Ordering::Relaxed), 1); let reset_timestamp = last_reset.load(Ordering::Relaxed); @@ -234,10 +248,9 @@ mod tests { simulated_time += simulation_step; for i in 1..limit { - // we log as many time as expected assert_eq!( should_log(&count, &last_reset, limit as _, || simulated_time), - (true, 0) + ShouldLog::Yes ); assert_eq!(count.load(Ordering::Relaxed), i + 1); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); @@ -245,10 +258,9 @@ mod tests { } for i in limit..(limit * 2) { - // we don't log, nor update assert_eq!( should_log(&count, &last_reset, limit as _, || simulated_time), - (false, 0) + ShouldLog::No ); assert_eq!(count.load(Ordering::Relaxed), i + 1); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); @@ -261,7 +273,7 @@ mod tests { // the first log after a reset reports how many were suppressed assert_eq!( should_log(&count, &last_reset, limit as _, || simulated_time), - (true, limit as u32) + ShouldLog::YesAfterSuppression(limit as u32) ); // counter got reset, generation increased assert_eq!(count.load(Ordering::Relaxed), 1 + (1 << 32)); @@ -270,10 +282,9 @@ mod tests { let reset_timestamp = last_reset.load(Ordering::Relaxed); for i in 1..limit { - // we log as many time as expected assert_eq!( should_log(&count, &last_reset, limit as _, || simulated_time), - (true, 0) + ShouldLog::Yes ); assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32)); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); @@ -281,10 +292,9 @@ mod tests { } for i in limit..(limit * 2) { - // we don't log, nor update assert_eq!( should_log(&count, &last_reset, limit as _, || simulated_time), - (false, 0) + ShouldLog::No ); assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32)); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);