Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 62 additions & 29 deletions quickwit/quickwit-common/src/rate_limited_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ use std::sync::atomic::{AtomicU64, Ordering};

use coarsetime::{Duration, Instant};

#[derive(Debug, PartialEq)]
pub enum ShouldLog {
/// Emit the log normally, within the rate limit.
Yes,
/// Emit the log; `N` similar messages were suppressed since the last emission.
YesAfterSuppression(u32),
/// Suppressed — do not emit.
No,
}

/// Metadata for a log site. This is stored inside a single AtomicU64 when not in use.
///
/// `call_count` is the number of calls since the last upgrade of generation, it's stored
Expand Down Expand Up @@ -54,7 +64,7 @@ pub fn should_log<F: Fn() -> Instant>(
last_reset_atomic: &AtomicU64,
limit: u32,
now: F,
) -> bool {
) -> ShouldLog {
// count_atomic is treated as 2 u32: upper bits count "generation", lower bits count number of
// calls since LAST_RESET. We assume there won't be 2**32 calls to this log in ~60s.
// Generation is free to wrap around.
Expand All @@ -73,7 +83,7 @@ pub fn should_log<F: Fn() -> Instant>(
} = logsite_meta_u64.into();

if call_count < limit {
return true;
return ShouldLog::Yes;
}

let current_time = Duration::from_ticks(now().as_ticks());
Expand All @@ -83,7 +93,7 @@ pub fn should_log<F: Fn() -> Instant>(

if !should_reset {
// we are over-limit and not far enough in time to reset: don't log
return false;
return ShouldLog::No;
}

let mut update_time = false;
Expand Down Expand Up @@ -123,7 +133,20 @@ pub fn should_log<F: Fn() -> Instant>(
// *we* updated generation, so we must update last_reset too
last_reset_atomic.store(current_time.as_ticks(), Ordering::Release);
}
can_log

if !can_log {
return ShouldLog::No;
}
// call_count is the pre-fetch_add value, equal to the number of prior calls in this period.
// Of those, `limit` were allowed through; the rest were suppressed.
// We only report when *we* did the reset (update_time), to avoid double-reporting.
if update_time {
let skipped = call_count.saturating_sub(limit);
if skipped > 0 {
return ShouldLog::YesAfterSuppression(skipped);
}
}
ShouldLog::Yes
}

#[macro_export]
Expand All @@ -136,8 +159,15 @@ macro_rules! rate_limited_tracing {
// we can't get time from constant context, so we pre-initialize with zero
static LAST_RESET: AtomicU64 = AtomicU64::new(0);

if $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) {
::tracing::$log_fn!($($args)*);
match $crate::rate_limited_tracing::should_log(&COUNT, &LAST_RESET, $limit, CoarsetimeInstant::now) {
$crate::rate_limited_tracing::ShouldLog::No => {}
$crate::rate_limited_tracing::ShouldLog::Yes => {
::tracing::$log_fn!($($args)*);
}
$crate::rate_limited_tracing::ShouldLog::YesAfterSuppression(skipped) => {
::tracing::$log_fn!("suppressed {skipped} similar log messages in the last minute");
::tracing::$log_fn!($($args)*);
}
}
}};
}
Expand Down Expand Up @@ -193,7 +223,7 @@ mod tests {

use coarsetime::{Duration, Instant};

use super::should_log;
use super::{should_log, ShouldLog};

// TODO as this is atomic code, we should test it with multiple threads to verify it behaves
// like we'd expect, maybe using something like `loom`?
Expand All @@ -207,30 +237,31 @@ mod tests {
let mut simulated_time = Instant::now();
let simulation_step = Duration::from_secs(1);

assert!(should_log(&count, &last_reset, limit as _, || {
simulated_time
}));
assert_eq!(
should_log(&count, &last_reset, limit as _, || simulated_time),
ShouldLog::Yes
);
assert_eq!(count.load(Ordering::Relaxed), 1);
let reset_timestamp = last_reset.load(Ordering::Relaxed);
assert_ne!(reset_timestamp, 0);

simulated_time += simulation_step;

for i in 1..limit {
// we log as many time as expected
assert!(should_log(&count, &last_reset, limit as _, || {
simulated_time
}));
assert_eq!(
should_log(&count, &last_reset, limit as _, || simulated_time),
ShouldLog::Yes
);
assert_eq!(count.load(Ordering::Relaxed), i + 1);
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
simulated_time += simulation_step;
}

for i in limit..(limit * 2) {
// we don't log, nor update
assert!(!should_log(&count, &last_reset, limit as _, || {
simulated_time
}));
assert_eq!(
should_log(&count, &last_reset, limit as _, || simulated_time),
ShouldLog::No
);
assert_eq!(count.load(Ordering::Relaxed), i + 1);
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
simulated_time += simulation_step;
Expand All @@ -239,30 +270,32 @@ mod tests {
// advance enough to reset counter
simulated_time += simulation_step * 60;

assert!(should_log(&count, &last_reset, limit as _, || {
simulated_time
}));
// the first log after a reset reports how many were suppressed
assert_eq!(
should_log(&count, &last_reset, limit as _, || simulated_time),
ShouldLog::YesAfterSuppression(limit as u32)
);
// counter got reset, generation increased
assert_eq!(count.load(Ordering::Relaxed), 1 + (1 << 32));
// last reset changed too
assert_ne!(last_reset.load(Ordering::Relaxed), reset_timestamp);
let reset_timestamp = last_reset.load(Ordering::Relaxed);

for i in 1..limit {
// we log as many time as expected
assert!(should_log(&count, &last_reset, limit as _, || {
simulated_time
}));
assert_eq!(
should_log(&count, &last_reset, limit as _, || simulated_time),
ShouldLog::Yes
);
assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32));
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
simulated_time += simulation_step;
}

for i in limit..(limit * 2) {
// we don't log, nor update
assert!(!should_log(&count, &last_reset, limit as _, || {
simulated_time
}));
assert_eq!(
should_log(&count, &last_reset, limit as _, || simulated_time),
ShouldLog::No
);
assert_eq!(count.load(Ordering::Relaxed), i + 1 + (1 << 32));
assert_eq!(last_reset.load(Ordering::Relaxed), reset_timestamp);
simulated_time += simulation_step;
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-lambda-client/src/invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_common::retry::RetryParams;
use quickwit_lambda_server::{LambdaSearchRequestPayload, LambdaSearchResponsePayload};
use quickwit_proto::search::{LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest};
use quickwit_search::{LambdaLeafSearchInvoker, SearchError};
use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, instrument};

use crate::metrics::LAMBDA_METRICS;

Expand Down Expand Up @@ -203,7 +203,8 @@ impl AwsLambdaInvoker {
LambdaInvokeError::Permanent(_) => return Err(error.into_search_error()),
};

warn!(
quickwit_common::rate_limited_warn!(
limit_per_min = 10,
num_attempts = num_attempts,
delay_ms = delay.as_millis(),
"lambda invocation failed, retrying"
Expand Down
Loading