diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index 198c2bf8bdd..f9e745999d1 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -19,6 +19,16 @@ use std::sync::atomic::{AtomicU64, Ordering}; use coarsetime::{Duration, Instant}; +#[derive(Debug, PartialEq)] +pub enum ShouldLog { + /// Emit the log normally, within the rate limit. + Yes, + /// Emit the log; `N` similar messages were suppressed since the last emission. + YesAfterSuppression(u32), + /// Suppressed — do not emit. + No, +} + /// Metadata for a log site. This is stored inside a single AtomicU64 when not in use. /// /// `call_count` is the number of calls since the last upgrade of generation, it's stored @@ -54,7 +64,7 @@ pub fn should_log Instant>( last_reset_atomic: &AtomicU64, limit: u32, now: F, -) -> bool { +) -> ShouldLog { // count_atomic is treated as 2 u32: upper bits count "generation", lower bits count number of // calls since LAST_RESET. We assume there won't be 2**32 calls to this log in ~60s. // Generation is free to wrap around. @@ -73,7 +83,7 @@ pub fn should_log Instant>( } = logsite_meta_u64.into(); if call_count < limit { - return true; + return ShouldLog::Yes; } let current_time = Duration::from_ticks(now().as_ticks()); @@ -83,7 +93,7 @@ pub fn should_log Instant>( if !should_reset { // we are over-limit and not far enough in time to reset: don't log - return false; + return ShouldLog::No; } let mut update_time = false; @@ -123,7 +133,20 @@ pub fn should_log Instant>( // *we* updated generation, so we must update last_reset too last_reset_atomic.store(current_time.as_ticks(), Ordering::Release); } - can_log + + if !can_log { + return ShouldLog::No; + } + // call_count is the pre-fetch_add value, equal to the number of prior calls in this period. + // Of those, `limit` were allowed through; the rest were suppressed. + // We only report when *we* did the reset (update_time), to avoid double-reporting. + if update_time { + let skipped = call_count.saturating_sub(limit); + if skipped > 0 { + return ShouldLog::YesAfterSuppression(skipped); + } + } + ShouldLog::Yes } #[macro_export] @@ -136,8 +159,15 @@ macro_rules! rate_limited_tracing { // we can't get time from constant context, so we pre-initialize with zero static LAST_RESET: AtomicU64 = AtomicU64::new(0); - if $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) { - ::tracing::$log_fn!($($args)*); + match $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) { + $crate::rate_limited_tracing::ShouldLog::No => {} + $crate::rate_limited_tracing::ShouldLog::Yes => { + ::tracing::$log_fn!($($args)*); + } + $crate::rate_limited_tracing::ShouldLog::YesAfterSuppression(skipped) => { + ::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute"); + ::tracing::$log_fn!($($args)*); + } } }}; } @@ -193,7 +223,7 @@ mod tests { use coarsetime::{Duration, Instant}; - use super::should_log; + use super::{should_log, ShouldLog}; // TODO as this is atomic code, we should test it with multiple threads to verify it behaves // like we'd expect, maybe using something like `loom`? @@ -207,9 +237,10 @@ mod tests { let mut simulated_time = Instant::now(); let simulation_step = Duration::from_secs(1); - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + ShouldLog::Yes + ); assert_eq!(count.load(Ordering::Relaxed), 1); let reset_timestamp = last_reset.load(Ordering::Relaxed); assert_ne!(reset_timestamp, 0); @@ -217,20 +248,20 @@ mod tests { simulated_time += simulation_step; for i in 1..limit { - // we log as many time as expected - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + ShouldLog::Yes + ); assert_eq!(count.load(Ordering::Relaxed), i + 1); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; } for i in limit..(limit * 2) { - // we don't log, nor update - assert!(!should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + ShouldLog::No + ); assert_eq!(count.load(Ordering::Relaxed), i + 1); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; @@ -239,9 +270,11 @@ mod tests { // advance enough to reset counter simulated_time += simulation_step * 60; - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + // the first log after a reset reports how many were suppressed + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + ShouldLog::YesAfterSuppression(limit as u32) + ); // counter got reset, generation increased assert_eq!(count.load(Ordering::Relaxed), 1 + (1 << 32)); // last reset changed too @@ -249,20 +282,20 @@ mod tests { let reset_timestamp = last_reset.load(Ordering::Relaxed); for i in 1..limit { - // we log as many time as expected - assert!(should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + ShouldLog::Yes + ); assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32)); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; } for i in limit..(limit * 2) { - // we don't log, nor update - assert!(!should_log(&count, &last_reset, limit as _, || { - simulated_time - })); + assert_eq!( + should_log(&count, &last_reset, limit as _, || simulated_time), + ShouldLog::No + ); assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32)); assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp); simulated_time += simulation_step; diff --git a/quickwit/quickwit-lambda-client/src/invoker.rs b/quickwit/quickwit-lambda-client/src/invoker.rs index c8ffa0716a0..69f7cb6519d 100644 --- a/quickwit/quickwit-lambda-client/src/invoker.rs +++ b/quickwit/quickwit-lambda-client/src/invoker.rs @@ -27,7 +27,7 @@ use quickwit_common::retry::RetryParams; use quickwit_lambda_server::{LambdaSearchRequestPayload, LambdaSearchResponsePayload}; use quickwit_proto::search::{LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest}; use quickwit_search::{LambdaLeafSearchInvoker, SearchError}; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, instrument}; use crate::metrics::LAMBDA_METRICS; @@ -203,7 +203,8 @@ impl AwsLambdaInvoker { LambdaInvokeError::Permanent(_) => return Err(error.into_search_error()), }; - warn!( + quickwit_common::rate_limited_warn!( + limit_per_min = 10, num_attempts = num_attempts, delay_ms = delay.as_millis(), "lambda invocation failed, retrying"