From 9243f2e1bafa83eb797aaebbbda3f26d734ced3c Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Fri, 3 Apr 2026 17:00:40 +0200 Subject: [PATCH 1/7] Add OTel metrics dual export alongside Prometheus --- quickwit/Cargo.lock | 2 + quickwit/quickwit-actors/src/mailbox.rs | 6 +- quickwit/quickwit-cli/src/logger.rs | 117 ++- quickwit/quickwit-cli/src/main.rs | 15 +- quickwit/quickwit-common/Cargo.toml | 2 + quickwit/quickwit-common/src/io.rs | 5 +- quickwit/quickwit-common/src/metrics.rs | 982 ++++++++++++++++-- quickwit/quickwit-common/src/runtimes.rs | 3 +- quickwit/quickwit-common/src/stream_utils.rs | 3 +- quickwit/quickwit-common/src/thread_pool.rs | 3 +- .../src/tower/circuit_breaker.rs | 15 +- 11 files changed, 1021 insertions(+), 132 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 89eefe2bed7..1332a7c1edf 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7121,6 +7121,8 @@ dependencies = [ "hyper-util", "itertools 0.14.0", "once_cell", + "opentelemetry", + "opentelemetry_sdk", "pin-project", "pnet", "prometheus", diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 899e289182a..0d5aea15273 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -521,7 +521,7 @@ mod tests { .unwrap(); // At this point the actor was started and even processed a message entirely. let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let wait_duration = Duration::from_millis(1); let processed = mailbox .send_message_with_backpressure_counter( @@ -548,7 +548,7 @@ mod tests { .await .unwrap(); let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let wait_duration = Duration::from_millis(1); mailbox .send_message_with_backpressure_counter( @@ -580,7 +580,7 @@ mod tests { .await .unwrap(); let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let start = Instant::now(); mailbox .ask_with_backpressure_counter(Duration::from_millis(1), None) diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index f16e7db4ce5..f20d2f800a5 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -17,13 +17,15 @@ use std::sync::Arc; use std::{env, fmt}; use anyhow::Context; +use opentelemetry::metrics::MeterProvider; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::{ - LogExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, + LogExporter, MetricExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, }; use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality}; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; use opentelemetry_sdk::{Resource, trace}; @@ -81,6 +83,26 @@ impl OtlpProtocol { } .context("failed to initialize OTLP traces exporter") } + + fn metric_exporter(&self, temporality: Temporality) -> anyhow::Result { + match self { + OtlpProtocol::Grpc => MetricExporter::builder() + .with_tonic() + .with_temporality(temporality) + .build(), + OtlpProtocol::HttpProtobuf => MetricExporter::builder() + .with_http() + .with_temporality(temporality) + .with_protocol(OtlpWireProtocol::HttpBinary) + .build(), + OtlpProtocol::HttpJson => MetricExporter::builder() + .with_http() + .with_temporality(temporality) + .with_protocol(OtlpWireProtocol::HttpJson) + .build(), + } + .context("failed to initialize OTLP metrics exporter") + } } impl FromStr for OtlpProtocol { @@ -104,11 +126,60 @@ impl FromStr for OtlpProtocol { } } +struct OtlpMetricsTemporality(Temporality); + +impl FromStr for OtlpMetricsTemporality { + type Err = anyhow::Error; + + fn from_str(temporality_str: &str) -> anyhow::Result { + const TEMPORALITY_DELTA: &str = "delta"; + const TEMPORALITY_LOWMEMORY: &str = "lowmemory"; + const TEMPORALITY_CUMULATIVE: &str = "cumulative"; + + match temporality_str { + TEMPORALITY_DELTA => Ok(OtlpMetricsTemporality(Temporality::Delta)), + TEMPORALITY_LOWMEMORY => Ok(OtlpMetricsTemporality(Temporality::LowMemory)), + TEMPORALITY_CUMULATIVE => Ok(OtlpMetricsTemporality(Temporality::Cumulative)), + other => anyhow::bail!( + "unsupported OTLP metrics temporality `{other}`, supported values are \ + `{TEMPORALITY_DELTA}`, `{TEMPORALITY_LOWMEMORY}` and `{TEMPORALITY_CUMULATIVE}`" + ), + } + } +} + +impl From for Temporality { + fn from(t: OtlpMetricsTemporality) -> Self { + t.0 + } +} + +pub struct TelemetryProviders { + tracer_provider: SdkTracerProvider, + logger_provider: SdkLoggerProvider, + meter_provider: SdkMeterProvider, +} + +impl TelemetryProviders { + pub fn shutdown(self) -> anyhow::Result<()> { + self.tracer_provider + .shutdown() + .context("failed to shutdown OpenTelemetry tracer provider")?; + self.logger_provider + .shutdown() + .context("failed to shutdown OpenTelemetry logger provider")?; + self.meter_provider + .shutdown() + .context("failed to shutdown OpenTelemetry meter provider")?; + Ok(()) + } +} + #[cfg(feature = "tokio-console")] use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY; /// Load the default logging filter from the environment. The filter can later -/// be updated using the result callback of [setup_logging_and_tracing]. +/// be updated using the result callback of [init_telemetry_providers]. fn startup_env_filter(level: Level) -> anyhow::Result { let env_filter = env::var("RUST_LOG") .map(|_| EnvFilter::from_default_env()) @@ -119,14 +190,11 @@ fn startup_env_filter(level: Level) -> anyhow::Result { type ReloadLayer = tracing_subscriber::reload::Layer; -pub fn setup_logging_and_tracing( +pub fn init_telemetry_providers( level: Level, ansi_colors: bool, build_info: &BuildInfo, -) -> anyhow::Result<( - EnvFilterReloadFn, - Option<(SdkTracerProvider, SdkLoggerProvider)>, -)> { +) -> anyhow::Result<(EnvFilterReloadFn, Option)> { #[cfg(feature = "tokio-console")] { if get_bool_from_env(QW_ENABLE_TOKIO_CONSOLE_ENV_KEY, false) { @@ -204,12 +272,37 @@ pub fn setup_logging_and_tracing( .with_batch_exporter(log_exporter) .build(); - let tracing_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + let metrics_protocol_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", false); + let metrics_protocol = metrics_protocol_opt + .as_deref() + .map(OtlpProtocol::from_str) + .transpose()? + .unwrap_or(global_protocol); + let temporality_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE", false); + let temporality = temporality_opt + .as_deref() + .map(OtlpMetricsTemporality::from_str) + .transpose()? + .map(Temporality::from) + .unwrap_or(Temporality::Cumulative); + let metric_exporter = metrics_protocol.metric_exporter(temporality)?; + + let meter_provider = SdkMeterProvider::builder() + .with_resource(resource.clone()) + .with_periodic_exporter(metric_exporter) + .build(); + + let meter = meter_provider.meter("quickwit"); + quickwit_common::metrics::install_otel_meter(meter); + + let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() .with_span_processor(span_processor) .with_resource(resource) .build(); - let tracer = tracing_provider.tracer("quickwit"); + let tracer = tracer_provider.tracer("quickwit"); let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); // Bridge between tracing logs and otel tracing events @@ -220,7 +313,11 @@ pub fn setup_logging_and_tracing( .with(logs_otel_layer) .try_init() .context("failed to register tracing subscriber")?; - Some((tracing_provider, logger_provider)) + Some(TelemetryProviders { + tracer_provider, + logger_provider, + meter_provider, + }) } else { registry .try_init() diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 4a1f9ce036e..77064f2be63 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -22,7 +22,7 @@ use quickwit_cli::checklist::RED_COLOR; use quickwit_cli::cli::{CliCommand, build_cli}; #[cfg(feature = "jemalloc")] use quickwit_cli::jemalloc::start_jemalloc_metrics_loop; -use quickwit_cli::logger::setup_logging_and_tracing; +use quickwit_cli::logger::init_telemetry_providers; use quickwit_cli::{busy_detector, install_default_crypto_ring_provider}; use quickwit_common::runtimes::scrape_tokio_runtime_metrics; use quickwit_serve::BuildInfo; @@ -98,8 +98,8 @@ async fn main_impl() -> anyhow::Result<()> { start_jemalloc_metrics_loop(); let build_info = BuildInfo::get(); - let (env_filter_reload_fn, tracer_provider_opt) = - setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?; + let (env_filter_reload_fn, telemetry_providers) = + init_telemetry_providers(command.default_log_level(), ansi_colors, build_info)?; let return_code: i32 = if let Err(command_error) = command.execute(env_filter_reload_fn).await { error!(error=%command_error, "command failed"); @@ -113,13 +113,8 @@ async fn main_impl() -> anyhow::Result<()> { 0 }; - if let Some((trace_provider, logs_provider)) = tracer_provider_opt { - trace_provider - .shutdown() - .context("failed to shutdown OpenTelemetry tracer provider")?; - logs_provider - .shutdown() - .context("failed to shutdown OpenTelemetry logs provider")?; + if let Some(providers) = telemetry_providers { + providers.shutdown()?; } std::process::exit(return_code) diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index ee5fc303644..a49fed6c5d4 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -29,6 +29,7 @@ hyper = { workspace = true } hyper-util = { workspace = true, optional = true } itertools = { workspace = true } once_cell = { workspace = true } +opentelemetry = { workspace = true } pin-project = { workspace = true } pnet = { workspace = true } prometheus = { workspace = true } @@ -63,6 +64,7 @@ jemalloc-profiled = [ [dev-dependencies] hyper-util = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["testing"] } proptest = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index 69c2091c237..c4e5e5e1d1d 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -34,10 +34,9 @@ use async_speed_limit::limiter::Consume; use bytesize::ByteSize; use once_cell::sync::Lazy; use pin_project::pin_project; -use prometheus::IntCounter; use tokio::io::AsyncWrite; -use crate::metrics::{IntCounterVec, new_counter_vec}; +use crate::metrics::{IntCounter, IntCounterVec, new_counter_vec}; use crate::{KillSwitch, Progress, ProtectedZoneGuard}; // Max 1MB at a time. @@ -99,7 +98,7 @@ pub struct IoControls { impl Default for IoControls { fn default() -> Self { let default_bytes_counter = - IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap(); + IntCounter::new("default_write_num_bytes", "Default write counter.", "", &[]); IoControls { throughput_limiter_opt: None, progress: Progress::default(), diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 193def5e01a..19c5d9be3b5 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -13,29 +13,198 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; -use std::sync::{LazyLock, OnceLock}; +use std::sync::{Arc, LazyLock, OnceLock}; +use std::time::Instant; -use prometheus::{Gauge, HistogramOpts, Opts, TextEncoder}; -pub use prometheus::{ - Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter, - IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec, - exponential_buckets, linear_buckets, -}; +use opentelemetry::KeyValue; +use opentelemetry::metrics::Meter; +use prometheus::{HistogramOpts, Opts, TextEncoder}; +pub use prometheus::{exponential_buckets, linear_buckets}; + +static OTEL_METER: OnceLock = OnceLock::new(); +const METRICS_NAMESPACE: &str = "quickwit"; + +pub fn install_otel_meter(meter: Meter) { + OTEL_METER + .set(meter) + .expect("OTel meter should only be installed once"); +} + +fn otel_meter() -> Option<&'static Meter> { + OTEL_METER.get() +} + +fn build_otel_attributes(const_labels: &[(&str, &str)]) -> Vec { + const_labels + .iter() + .map(|(k, v)| KeyValue::new(k.to_string(), v.to_string())) + .collect() +} + +fn build_prometheus_labels(const_labels: &[(&str, &str)]) -> HashMap { + const_labels + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() +} + +struct OtelState { + build_instrument: Box T + Send + Sync>, + instrument: OnceLock, +} + +impl OtelState { + fn new(build_instrument: impl Fn(&Meter) -> T + Send + Sync + 'static) -> Self { + OtelState { + build_instrument: Box::new(build_instrument), + instrument: OnceLock::new(), + } + } + + fn bind(&self, meter: &Meter) -> Option<&T> { + Some( + self.instrument + .get_or_init(|| (self.build_instrument)(meter)), + ) + } + + fn get(&self) -> Option<&T> { + self.instrument.get() + } +} #[derive(Clone)] -pub struct HistogramVec { - underlying: PrometheusHistogramVec, +struct OtelMetric { + state: Option>>, + attributes: Vec, } -impl HistogramVec { - pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { - self.underlying.with_label_values(&label_values) +impl OtelMetric { + fn new(state: Option>>, attributes: Vec) -> Self { + Self { state, attributes } + } + + fn with_attributes(&self, names: &[String], values: [&str; N]) -> Self { + if self.state.is_none() { + return Self::default(); + } + let mut attributes = self.attributes.clone(); + for (name, value) in names.iter().zip(values.iter()) { + attributes.push(KeyValue::new(name.clone(), value.to_string())); + } + Self { + state: self.state.clone(), + attributes, + } + } + + /// Invokes a recording operation (e.g. `counter.add`) on the bound OTel + /// instrument, passing in this metric's attributes. Lazily binds the + /// instrument on first use after the meter becomes available. No-ops when + /// OTel is disabled or the meter has not been installed yet. + fn with_instrument(&self, f: impl FnOnce(&T, &[KeyValue])) { + let Some(instrument) = self.get_or_bind_instrument() else { + return; + }; + f(instrument, &self.attributes); + } + + fn get_or_bind_instrument(&self) -> Option<&T> { + let state = self.state.as_ref()?; + if let Some(instrument) = state.get() { + return Some(instrument); + } + let meter = otel_meter()?; + state.bind(meter) + } +} + +impl Default for OtelMetric { + fn default() -> Self { + Self { + state: None, + attributes: Vec::new(), + } + } +} + +impl OtelMetric> { + fn add(&self, value: u64) { + self.with_instrument(|counter, attributes| counter.add(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: i64) { + self.with_instrument(|gauge, attributes| gauge.record(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: f64) { + self.with_instrument(|gauge, attributes| gauge.record(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: f64) { + self.with_instrument(|histogram, attributes| histogram.record(value, attributes)); + } +} + +#[derive(Clone)] +pub struct IntCounter { + prometheus: prometheus::IntCounter, + otel: OtelMetric>, +} + +impl std::fmt::Debug for IntCounter { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("IntCounter") + .field("value", &self.prometheus.get()) + .finish() + } +} + +impl IntCounter { + pub fn new( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], + ) -> IntCounter { + let counter_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); + IntCounter { + prometheus: prom, + otel: OtelMetric::default(), + } + } + + pub fn inc(&self) { + self.prometheus.inc(); + self.otel.add(1); + } + + pub fn inc_by(&self, v: u64) { + self.prometheus.inc_by(v); + self.otel.add(v); + } + + pub fn get(&self) -> u64 { + self.prometheus.get() } } #[derive(Clone)] pub struct IntCounterVec { - underlying: PrometheusIntCounterVec, + prometheus: prometheus::IntCounterVec, + otel: OtelMetric>, + label_names: Vec, } impl IntCounterVec { @@ -46,62 +215,268 @@ impl IntCounterVec { const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let counter_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounterVec::new(counter_opts, &label_names) .expect("failed to create counter vec"); - IntCounterVec { underlying } + + IntCounterVec { + prometheus: prom, + otel: OtelMetric::new(None, build_otel_attributes(const_labels)), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } pub fn with_label_values(&self, label_values: [&str; N]) -> IntCounter { - self.underlying.with_label_values(&label_values) + IntCounter { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + } + } +} + +/// For relative operations (`inc`, `dec`, `add`, `sub`), the OTel value is derived by reading the +/// Prometheus gauge after mutation, since OTel gauges do not support relative updates. This is not +/// atomic: under concurrent updates, the OTel side may briefly record a stale value. This is +/// acceptable for now because gauges are inherently point-in-time approximations, and the next +/// update self-corrects. +/// +/// TODO: for strict correctness, manage a single `AtomicI64` as the source of truth and feed its +/// value into both Prometheus and OTel. +#[derive(Clone)] +pub struct IntGauge { + prometheus: prometheus::IntGauge, + otel: OtelMetric>, +} + +impl IntGauge { + pub fn set(&self, v: i64) { + self.prometheus.set(v); + self.otel.record(v); + } + + pub fn inc(&self) { + self.prometheus.inc(); + self.otel.record(self.prometheus.get()); + } + + pub fn dec(&self) { + self.prometheus.dec(); + self.otel.record(self.prometheus.get()); + } + + pub fn add(&self, delta: i64) { + self.prometheus.add(delta); + self.otel.record(self.prometheus.get()); + } + + pub fn sub(&self, delta: i64) { + self.prometheus.sub(delta); + self.otel.record(self.prometheus.get()); + } + + pub fn get(&self) -> i64 { + self.prometheus.get() } } #[derive(Clone)] pub struct IntGaugeVec { - underlying: PrometheusIntGaugeVec, + prometheus: prometheus::IntGaugeVec, + otel: OtelMetric>, + label_names: Vec, } impl IntGaugeVec { pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { - self.underlying.with_label_values(&label_values) + IntGauge { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + } + } +} + +#[derive(Clone)] +pub struct Gauge { + prometheus: prometheus::Gauge, + otel: OtelMetric>, +} + +impl Gauge { + pub fn set(&self, v: f64) { + self.prometheus.set(v); + self.otel.record(v); + } + + pub fn get(&self) -> f64 { + self.prometheus.get() + } +} + +#[derive(Clone)] +pub struct Histogram { + prometheus: prometheus::Histogram, + otel: OtelMetric>, +} + +impl Histogram { + pub fn observe(&self, v: f64) { + self.prometheus.observe(v); + self.otel.record(v); + } + + pub fn start_timer(&self) -> HistogramTimer { + HistogramTimer { + histogram: Some(self.clone()), + start: Instant::now(), + } + } +} + +pub struct HistogramTimer { + histogram: Option, + start: Instant, +} + +impl HistogramTimer { + pub fn observe_duration(mut self) { + if let Some(histogram) = self.histogram.take() { + histogram.observe(self.start.elapsed().as_secs_f64()); + } + } +} + +impl Drop for HistogramTimer { + fn drop(&mut self) { + if let Some(histogram) = self.histogram.take() { + histogram.observe(self.start.elapsed().as_secs_f64()); + } + } +} + +#[derive(Clone)] +pub struct HistogramVec { + prometheus: prometheus::HistogramVec, + otel: OtelMetric>, + label_names: Vec, +} + +impl HistogramVec { + pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { + Histogram { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + } } } pub fn register_info(name: &'static str, help: &'static str, kvs: BTreeMap<&'static str, String>) { - let mut counter_opts = Opts::new(name, help).namespace("quickwit"); + let mut counter_opts = Opts::new(name, help).namespace(METRICS_NAMESPACE); for (k, v) in kvs { counter_opts = counter_opts.const_label(k, v); } - let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); + let counter = + prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); counter.inc(); prometheus::register(Box::new(counter)).expect("failed to register counter"); } +fn new_otel_state( + name: &str, + subsystem: &str, + help: &str, + build_instrument: impl Fn(&Meter, &str, &str) -> T + Send + Sync + 'static, +) -> Arc> +where + T: Send + Sync + 'static, +{ + let name = if subsystem.is_empty() { + format!("{METRICS_NAMESPACE}_{name}") + } else { + format!("{METRICS_NAMESPACE}_{subsystem}_{name}") + }; + let description = help.to_string(); + Arc::new(OtelState::new(move |meter| { + build_instrument(meter, &name, &description) + })) +} + +fn new_counter_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .u64_counter(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_int_gauge_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .i64_gauge(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_float_gauge_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .f64_gauge(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_histogram_otel_state( + name: &str, + subsystem: &str, + help: &str, + boundaries: Vec, +) -> Arc>> { + new_otel_state(name, subsystem, help, move |meter, name, description| { + meter + .f64_histogram(name.to_string()) + .with_description(description.to_string()) + .with_boundaries(boundaries.clone()) + .build() + }) +} + pub fn new_counter( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], ) -> IntCounter { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let counter_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); - prometheus::register(Box::new(counter.clone())).expect("failed to register counter"); - counter + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); + prometheus::register(Box::new(prom.clone())).expect("failed to register counter"); + + IntCounter { + prometheus: prom, + otel: OtelMetric::new( + Some(new_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } pub fn new_counter_vec( @@ -111,82 +486,114 @@ pub fn new_counter_vec( const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let int_counter_vec = IntCounterVec::new(name, help, subsystem, const_labels, label_names); - let collector = Box::new(int_counter_vec.underlying.clone()); - prometheus::register(collector).expect("failed to register counter vec"); - int_counter_vec + let counter_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounterVec::new(counter_opts, &label_names) + .expect("failed to create counter vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register counter vec"); + + IntCounterVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } -pub fn new_float_gauge( +pub fn new_gauge( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], -) -> Gauge { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); +) -> IntGauge { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let gauge = Gauge::with_opts(gauge_opts).expect("failed to create float gauge"); - prometheus::register(Box::new(gauge.clone())).expect("failed to register float gauge"); - gauge + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge"); + + IntGauge { + prometheus: prom, + otel: OtelMetric::new( + Some(new_int_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } -pub fn new_gauge( +pub fn new_gauge_vec( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], -) -> IntGauge { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); + label_names: [&str; N], +) -> IntGaugeVec { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); - prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge"); - gauge + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge vec"); + + IntGaugeVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_int_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } -pub fn new_gauge_vec( +pub fn new_float_gauge( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], - label_names: [&str; N], -) -> IntGaugeVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); +) -> Gauge { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let underlying = - PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::Gauge::with_opts(gauge_opts).expect("failed to create float gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register float gauge"); - let collector = Box::new(underlying.clone()); - prometheus::register(collector).expect("failed to register counter vec"); - - IntGaugeVec { underlying } + Gauge { + prometheus: prom, + otel: OtelMetric::new( + Some(new_float_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } pub fn new_histogram(name: &str, help: &str, subsystem: &str, buckets: Vec) -> Histogram { let histogram_opts = HistogramOpts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .buckets(buckets); - let histogram = Histogram::with_opts(histogram_opts).expect("failed to create histogram"); - prometheus::register(Box::new(histogram.clone())).expect("failed to register histogram"); - histogram + .buckets(buckets.clone()); + let prom = + prometheus::Histogram::with_opts(histogram_opts).expect("failed to create histogram"); + prometheus::register(Box::new(prom.clone())).expect("failed to register histogram"); + + Histogram { + prometheus: prom, + otel: OtelMetric::new( + Some(new_histogram_otel_state( + name, + subsystem, + help, + buckets.clone(), + )), + Vec::new(), + ), + } } pub fn new_histogram_vec( @@ -197,22 +604,28 @@ pub fn new_histogram_vec( label_names: [&str; N], buckets: Vec, ) -> HistogramVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let histogram_opts = HistogramOpts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels) - .buckets(buckets); - let underlying = PrometheusHistogramVec::new(histogram_opts, &label_names) + .const_labels(build_prometheus_labels(const_labels)) + .buckets(buckets.clone()); + let prom = prometheus::HistogramVec::new(histogram_opts, &label_names) .expect("failed to create histogram vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register histogram vec"); - let collector = Box::new(underlying.clone()); - prometheus::register(collector).expect("failed to register histogram vec"); - - HistogramVec { underlying } + HistogramVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_histogram_otel_state( + name, + subsystem, + help, + buckets.clone(), + )), + build_otel_attributes(const_labels), + ), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } } pub struct GaugeGuard<'a> { @@ -291,8 +704,6 @@ impl Drop for OwnedGaugeGuard { pub fn metrics_text_payload() -> Result { let metric_families = prometheus::gather(); - // Arbitrary non-zero size in order to skip a bunch of - // buffer growth-reallocations when encoding metrics. let mut buffer = String::with_capacity(1024); let encoder = TextEncoder::new(); match encoder.encode_utf8(&metric_families, &mut buffer) { @@ -451,3 +862,384 @@ pub fn index_label(index_id: &str) -> &str { } pub static MEMORY_METRICS: LazyLock = LazyLock::new(MemoryMetrics::default); + +#[cfg(test)] +mod tests { + use std::sync::OnceLock; + + use opentelemetry::metrics::MeterProvider; + use opentelemetry_sdk::metrics::data::{ + AggregatedMetrics, HistogramDataPoint, MetricData, ResourceMetrics, + }; + use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider}; + use serial_test::serial; + + use super::*; + + static TEST_OTEL_EXPORTER: OnceLock = OnceLock::new(); + static TEST_OTEL_PROVIDER: OnceLock = OnceLock::new(); + + fn ensure_test_otel_provider() -> (&'static InMemoryMetricExporter, &'static SdkMeterProvider) { + let exporter = TEST_OTEL_EXPORTER.get_or_init(InMemoryMetricExporter::default); + let provider = TEST_OTEL_PROVIDER.get_or_init(|| { + let reader = PeriodicReader::builder(exporter.clone()).build(); + let provider = SdkMeterProvider::builder().with_reader(reader).build(); + install_otel_meter(provider.meter("quickwit-tests")); + provider + }); + (exporter, provider) + } + + fn find_metric_data<'a>( + metrics: &'a [ResourceMetrics], + metric_name: &str, + ) -> Option<&'a AggregatedMetrics> { + metrics + .iter() + .flat_map(|resource_metrics| resource_metrics.scope_metrics()) + .flat_map(|scope_metrics| scope_metrics.metrics()) + .find(|metric| metric.name() == metric_name) + .map(|metric| metric.data()) + } + + fn flush_and_read_metric( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + read: impl FnOnce(&AggregatedMetrics) -> T, + ) -> T { + provider.force_flush().unwrap(); + let exported_metrics = exporter.get_finished_metrics().unwrap(); + let data = find_metric_data(&exported_metrics, metric_name) + .unwrap_or_else(|| panic!("metric '{metric_name}' should be exported")); + read(data) + } + + fn flush_and_get_counter_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> u64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::U64(MetricData::Sum(sum_data)) = data else { + panic!("expected u64 sum metric"); + }; + sum_data + .data_points() + .next() + .expect("should have one data point") + .value() + }) + } + + fn flush_and_get_gauge_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> i64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::I64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected i64 gauge metric"); + }; + gauge_data + .data_points() + .last() + .expect("should have at least one data point") + .value() + }) + } + + fn flush_and_get_histogram_data_point( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> HistogramDataPoint { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::F64(MetricData::Histogram(histogram_data)) = data else { + panic!("expected f64 histogram metric"); + }; + histogram_data + .data_points() + .next() + .expect("should have one data point") + .clone() + }) + } + + fn flush_and_get_float_gauge_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> f64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::F64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected f64 gauge metric"); + }; + gauge_data + .data_points() + .last() + .expect("should have at least one data point") + .value() + }) + } + + #[test] + #[serial] + fn test_counter() { + let (exporter, provider) = ensure_test_otel_provider(); + + // inc + let counter = new_counter("test_ctr_inc", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc(); + assert_eq!(counter.get(), 1); + let otel_value = + flush_and_get_counter_value(exporter, provider, "quickwit_test_test_ctr_inc"); + assert_eq!(otel_value, 1); + + // inc_by + let counter = new_counter("test_ctr_inc_by", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc_by(5); + assert_eq!(counter.get(), 5); + let otel_value = + flush_and_get_counter_value(exporter, provider, "quickwit_test_test_ctr_inc_by"); + assert_eq!(otel_value, 5); + } + + #[test] + #[serial] + fn test_gauge() { + let (exporter, provider) = ensure_test_otel_provider(); + + // set + let gauge = new_gauge("test_gauge_set", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.set(10); + assert_eq!(gauge.get(), 10); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_set"); + assert_eq!(otel_value, 10); + + // inc + let gauge = new_gauge("test_gauge_inc", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.inc(); + assert_eq!(gauge.get(), 1); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_inc"); + assert_eq!(otel_value, 1); + + // dec + let gauge = new_gauge("test_gauge_dec", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.dec(); + assert_eq!(gauge.get(), -1); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_dec"); + assert_eq!(otel_value, -1); + + // add + let gauge = new_gauge("test_gauge_add", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.add(15); + assert_eq!(gauge.get(), 15); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_add"); + assert_eq!(otel_value, 15); + + // sub + let gauge = new_gauge("test_gauge_sub", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.sub(3); + assert_eq!(gauge.get(), -3); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_sub"); + assert_eq!(otel_value, -3); + } + + #[test] + #[serial] + fn test_float_gauge_set() { + let (exporter, provider) = ensure_test_otel_provider(); + let gauge = new_float_gauge("test_float_gauge", "test", "test", &[]); + assert_eq!(gauge.get(), 0.0); + gauge.set(1.23); + assert_eq!(gauge.get(), 1.23); + + let otel_value = + flush_and_get_float_gauge_value(exporter, provider, "quickwit_test_test_float_gauge"); + assert_eq!(otel_value, 1.23); + } + + #[test] + #[serial] + fn test_histogram_observe() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_obs", "test", "test", vec![1.0, 5.0, 10.0]); + histogram.observe(2.5); + histogram.observe(7.0); + + let dp = + flush_and_get_histogram_data_point(exporter, provider, "quickwit_test_test_hist_obs"); + assert_eq!(dp.count(), 2); + assert_eq!(dp.max().unwrap(), 7.0); + assert_eq!(dp.min().unwrap(), 2.5); + assert_eq!(dp.bounds().collect::>(), vec![1.0, 5.0, 10.0]); + } + + #[test] + #[serial] + fn test_histogram_vec_observe() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram_vec = new_histogram_vec( + "test_hist_vec_obs", + "test", + "test", + &[], + ["method"], + vec![0.5, 1.5, 3.0], + ); + histogram_vec.with_label_values(["GET"]).observe(1.0); + + flush_and_read_metric( + exporter, + provider, + "quickwit_test_test_hist_vec_obs", + |data| { + let AggregatedMetrics::F64(MetricData::Histogram(histogram_data)) = data else { + panic!("expected f64 histogram metric"); + }; + let data_point = histogram_data + .data_points() + .find(|point| { + point + .attributes() + .any(|kv| kv.key.as_str() == "method" && kv.value.as_str() == "GET") + }) + .expect("should contain the labelled data point"); + assert_eq!(data_point.count(), 1); + assert_eq!(data_point.min().unwrap(), 1.0); + }, + ); + } + + #[test] + #[serial] + fn test_histogram_timer_drop_observes() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_timer_drop", "test", "test", vec![1.0, 5.0, 10.0]); + { + let _timer = histogram.start_timer(); + } + + let dp = flush_and_get_histogram_data_point( + exporter, + provider, + "quickwit_test_test_hist_timer_drop", + ); + assert_eq!(dp.count(), 1); + } + + #[test] + #[serial] + fn test_histogram_timer_observe_duration() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_timer_obs", "test", "test", vec![1.0, 5.0, 10.0]); + let timer = histogram.start_timer(); + timer.observe_duration(); + + let dp = flush_and_get_histogram_data_point( + exporter, + provider, + "quickwit_test_test_hist_timer_obs", + ); + assert_eq!(dp.count(), 1); + } + + #[test] + #[serial] + fn test_counter_vec_with_label_values() { + let (exporter, provider) = ensure_test_otel_provider(); + let vec = new_counter_vec("test_cvec", "test", "test", &[], ["method"]); + let post_counter = vec.with_label_values(["POST"]); + post_counter.inc_by(3); + assert_eq!(post_counter.get(), 3); + + flush_and_read_metric(exporter, provider, "quickwit_test_test_cvec", |data| { + let AggregatedMetrics::U64(MetricData::Sum(sum_data)) = data else { + panic!("expected u64 sum metric"); + }; + let post_value = sum_data + .data_points() + .find(|dp| { + dp.attributes() + .any(|kv| kv.key.as_str() == "method" && kv.value.as_str() == "POST") + }) + .expect("should contain POST data point") + .value(); + assert_eq!(post_value, 3); + }); + } + + #[test] + #[serial] + fn test_gauge_vec_with_label_values() { + let (exporter, provider) = ensure_test_otel_provider(); + let vec = new_gauge_vec("test_gvec", "test", "test", &[], ["pool"]); + let indexing = vec.with_label_values(["indexing"]); + indexing.set(10); + assert_eq!(indexing.get(), 10); + + flush_and_read_metric(exporter, provider, "quickwit_test_test_gvec", |data| { + let AggregatedMetrics::I64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected i64 gauge metric"); + }; + let indexing_value = gauge_data + .data_points() + .find(|dp| { + dp.attributes() + .any(|kv| kv.key.as_str() == "pool" && kv.value.as_str() == "indexing") + }) + .expect("should contain pool=indexing data point") + .value(); + assert_eq!(indexing_value, 10); + }); + } + + #[test] + fn test_gauge_guard_add_sub_drop() { + let gauge = new_gauge("test_guard", "test", "test", &[]); + { + let mut guard = GaugeGuard::from_gauge(&gauge); + guard.add(5); + assert_eq!(gauge.get(), 5); + guard.sub(2); + assert_eq!(gauge.get(), 3); + } + // After drop, the delta (3) is subtracted. + assert_eq!(gauge.get(), 0); + } + + #[test] + fn test_owned_gauge_guard_add_sub_drop() { + let gauge = new_gauge("test_owned_guard", "test", "test", &[]); + { + let mut guard = OwnedGaugeGuard::from_gauge(gauge.clone()); + guard.add(5); + assert_eq!(gauge.get(), 5); + guard.sub(2); + assert_eq!(gauge.get(), 3); + } + assert_eq!(gauge.get(), 0); + } + + #[test] + fn test_metrics_text_payload_contains_registered_metrics() { + let counter = new_counter("test_payload_ctr", "test", "test", &[]); + counter.inc_by(42); + let payload = metrics_text_payload().unwrap(); + assert!(payload.contains("quickwit_test_test_payload_ctr")); + assert!(payload.contains("42")); + } +} diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index ee93ab7aba9..b34c3b477c7 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -17,11 +17,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use once_cell::sync::OnceCell; -use prometheus::{Gauge, IntCounter, IntGauge}; use tokio::runtime::Runtime; use tokio_metrics::{RuntimeMetrics, RuntimeMonitor}; -use crate::metrics::{new_counter, new_float_gauge, new_gauge}; +use crate::metrics::{Gauge, IntCounter, IntGauge, new_counter, new_float_gauge, new_gauge}; static RUNTIMES: OnceCell> = OnceCell::new(); diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index e0fc126b465..3740a7f0527 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -18,12 +18,11 @@ use std::pin::Pin; use bytesize::ByteSize; use futures::{Stream, StreamExt, TryStreamExt, stream}; -use prometheus::IntGauge; use tokio::sync::{mpsc, watch}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream}; use tracing::warn; -use crate::metrics::GaugeGuard; +use crate::metrics::{GaugeGuard, IntGauge}; use crate::tower::RpcName; pub type BoxStream = Pin + Send + Unpin + 'static>>; diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index 18201196cf9..17fcfc3415d 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -17,11 +17,10 @@ use std::sync::Arc; use futures::{Future, TryFutureExt}; use once_cell::sync::Lazy; -use prometheus::IntGauge; use tokio::sync::oneshot; use tracing::error; -use crate::metrics::{GaugeGuard, IntGaugeVec, OwnedGaugeGuard, new_gauge_vec}; +use crate::metrics::{GaugeGuard, IntGauge, IntGaugeVec, OwnedGaugeGuard, new_gauge_vec}; /// An executor backed by a thread pool to run CPU-intensive tasks. /// diff --git a/quickwit/quickwit-common/src/tower/circuit_breaker.rs b/quickwit/quickwit-common/src/tower/circuit_breaker.rs index 09ada07e187..3d7f2d272cf 100644 --- a/quickwit/quickwit-common/src/tower/circuit_breaker.rs +++ b/quickwit/quickwit-common/src/tower/circuit_breaker.rs @@ -19,10 +19,11 @@ use std::task::{Context, Poll}; use std::time::Duration; use pin_project::pin_project; -use prometheus::IntCounter; use tokio::time::Instant; use tower::{Layer, Service}; +use crate::metrics::IntCounter; + /// The circuit breaker layer implements the [circuit breaker pattern](https://martinfowler.com/bliki/CircuitBreaker.html). /// /// It counts the errors emitted by the inner service, and if the number of errors exceeds a certain @@ -49,7 +50,7 @@ pub struct CircuitBreakerLayer { time_window: Duration, timeout: Duration, evaluator: Evaluator, - circuit_break_total: prometheus::IntCounter, + circuit_break_total: IntCounter, } pub trait CircuitBreakerEvaluator: Clone { @@ -61,7 +62,7 @@ pub trait CircuitBreakerEvaluator: Clone { self, max_num_errors_per_secs: u32, timeout: Duration, - circuit_break_total: prometheus::IntCounter, + circuit_break_total: IntCounter, ) -> CircuitBreakerLayer { CircuitBreakerLayer { max_error_count_per_time_window: max_num_errors_per_secs, @@ -301,8 +302,12 @@ mod tests { const TIMEOUT: Duration = Duration::from_millis(500); - let int_counter: prometheus::IntCounter = - IntCounter::new("circuit_break_total_test", "test circuit breaker counter").unwrap(); + let int_counter = crate::metrics::new_counter( + "circuit_break_total_test", + "test circuit breaker counter", + "test", + &[], + ); let mut service = ServiceBuilder::new() .layer(TestCircuitBreakerEvaluator.make_layer(10, TIMEOUT, int_counter)) .service_fn(|_| async { From 685c636fd676ba94cc441acc1fa7c041913b8820 Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Fri, 10 Apr 2026 11:34:09 +0200 Subject: [PATCH 2/7] Split IntGauge into IntGauge (set-only) and IntUpDownCounter (relative ops) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OTel Gauge does not support relative operations (inc/dec/add/sub), which forced the dual-write layer to read back from Prometheus after mutation. This splits the type into two semantically correct variants: - IntGauge: absolute state via set()/get(), backed by OTel Gauge - IntUpDownCounter: relative deltas via inc()/dec()/add()/sub(), backed by OTel UpDownCounter Both still use prometheus::IntGauge on the Prometheus side. Also renames GaugeGuard → UpDownCounterGuard and OwnedGaugeGuard → OwnedUpDownCounterGuard to match the new semantics. --- quickwit/quickwit-actors/src/mailbox.rs | 18 +- quickwit/quickwit-common/src/metrics.rs | 363 ++++++++++++------ quickwit/quickwit-common/src/stream_utils.rs | 45 ++- quickwit/quickwit-common/src/thread_pool.rs | 23 +- quickwit/quickwit-common/src/tower/metrics.rs | 11 +- .../quickwit-indexing/src/actors/indexer.rs | 10 +- .../src/actors/indexing_pipeline.rs | 7 +- quickwit/quickwit-indexing/src/metrics.rs | 17 +- .../src/models/indexed_split.rs | 6 +- .../src/models/processed_doc.rs | 7 +- .../src/models/processed_parquet_batch.rs | 7 +- .../src/models/raw_doc_batch.rs | 9 +- quickwit/quickwit-indexing/src/source/mod.rs | 6 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 5 +- .../quickwit-ingest/src/ingest_v2/metrics.rs | 9 +- .../src/ingest_v2/replication.rs | 5 +- .../quickwit-ingest/src/ingest_v2/router.rs | 5 +- .../src/metastore/postgres/metrics.rs | 6 +- .../src/metastore/postgres/pool.rs | 5 +- quickwit/quickwit-search/src/metrics.rs | 16 +- .../quickwit-search/src/scroll_context.rs | 9 +- .../src/search_permit_provider.rs | 6 +- quickwit/quickwit-serve/src/decompression.rs | 7 +- quickwit/quickwit-serve/src/load_shield.rs | 12 +- quickwit/quickwit-serve/src/metrics.rs | 12 +- quickwit/quickwit-storage/src/metrics.rs | 33 +- 26 files changed, 402 insertions(+), 257 deletions(-) diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 0d5aea15273..06c6044ee72 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock, Weak}; use std::time::Instant; -use quickwit_common::metrics::{GaugeGuard, IntCounter, IntGauge}; +use quickwit_common::metrics::{IntCounter, IntUpDownCounter, UpDownCounterGuard}; use tokio::sync::oneshot; use crate::channel_with_priority::{Receiver, Sender, TrySendError}; @@ -308,7 +308,7 @@ impl Mailbox { struct InboxInner { rx: Receiver>, - _inboxes_count_gauge_guard: GaugeGuard<'static>, + _inboxes_count_gauge_guard: UpDownCounterGuard<'static>, } pub struct Inbox { @@ -385,19 +385,19 @@ impl Inbox { } } -fn get_actor_inboxes_count_gauge_guard() -> GaugeGuard<'static> { - static INBOX_GAUGE: std::sync::OnceLock = OnceLock::new(); - let gauge = INBOX_GAUGE.get_or_init(|| { - quickwit_common::metrics::new_gauge( +fn get_actor_inboxes_count_gauge_guard() -> UpDownCounterGuard<'static> { + static INBOX_COUNTER: std::sync::OnceLock = OnceLock::new(); + let counter = INBOX_COUNTER.get_or_init(|| { + quickwit_common::metrics::new_up_down_counter( "inboxes_count", "overall count of actors", "actor", &[], ) }); - let mut gauge_guard = GaugeGuard::from_gauge(gauge); - gauge_guard.add(1); - gauge_guard + let mut guard = UpDownCounterGuard::from_counter(counter); + guard.add(1); + guard } pub(crate) fn create_mailbox( diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 19c5d9be3b5..4d84d0617e6 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -152,6 +152,12 @@ impl OtelMetric> { } } +impl OtelMetric> { + fn add(&self, value: i64) { + self.with_instrument(|counter, attributes| counter.add(value, attributes)); + } +} + #[derive(Clone)] pub struct IntCounter { prometheus: prometheus::IntCounter, @@ -237,14 +243,6 @@ impl IntCounterVec { } } -/// For relative operations (`inc`, `dec`, `add`, `sub`), the OTel value is derived by reading the -/// Prometheus gauge after mutation, since OTel gauges do not support relative updates. This is not -/// atomic: under concurrent updates, the OTel side may briefly record a stale value. This is -/// acceptable for now because gauges are inherently point-in-time approximations, and the next -/// update self-corrects. -/// -/// TODO: for strict correctness, manage a single `AtomicI64` as the source of truth and feed its -/// value into both Prometheus and OTel. #[derive(Clone)] pub struct IntGauge { prometheus: prometheus::IntGauge, @@ -257,24 +255,56 @@ impl IntGauge { self.otel.record(v); } + pub fn get(&self) -> i64 { + self.prometheus.get() + } +} + +#[derive(Clone)] +pub struct IntGaugeVec { + prometheus: prometheus::IntGaugeVec, + otel: OtelMetric>, + label_names: Vec, +} + +impl IntGaugeVec { + pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { + IntGauge { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + } + } +} + +#[derive(Clone)] +pub struct IntUpDownCounter { + prometheus: prometheus::IntGauge, + otel: OtelMetric>, +} + +impl IntUpDownCounter { + /// No-op dummy. Exists only so call sites that mix `set` with relative ops + /// still compile after migrating from `IntGauge`. + pub fn set(&self, _v: i64) {} + pub fn inc(&self) { self.prometheus.inc(); - self.otel.record(self.prometheus.get()); + self.otel.add(1); } pub fn dec(&self) { self.prometheus.dec(); - self.otel.record(self.prometheus.get()); + self.otel.add(-1); } pub fn add(&self, delta: i64) { self.prometheus.add(delta); - self.otel.record(self.prometheus.get()); + self.otel.add(delta); } pub fn sub(&self, delta: i64) { self.prometheus.sub(delta); - self.otel.record(self.prometheus.get()); + self.otel.add(-delta); } pub fn get(&self) -> i64 { @@ -283,15 +313,15 @@ impl IntGauge { } #[derive(Clone)] -pub struct IntGaugeVec { +pub struct IntUpDownCounterVec { prometheus: prometheus::IntGaugeVec, - otel: OtelMetric>, + otel: OtelMetric>, label_names: Vec, } -impl IntGaugeVec { - pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { - IntGauge { +impl IntUpDownCounterVec { + pub fn with_label_values(&self, label_values: [&str; N]) -> IntUpDownCounter { + IntUpDownCounter { prometheus: self.prometheus.with_label_values(&label_values), otel: self.otel.with_attributes(&self.label_names, label_values), } @@ -442,6 +472,19 @@ fn new_float_gauge_otel_state( }) } +fn new_i64_up_down_counter_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .i64_up_down_counter(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + fn new_histogram_otel_state( name: &str, subsystem: &str, @@ -551,6 +594,53 @@ pub fn new_gauge_vec( } } +pub fn new_up_down_counter( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], +) -> IntUpDownCounter { + let gauge_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge"); + + IntUpDownCounter { + prometheus: prom, + otel: OtelMetric::new( + Some(new_i64_up_down_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } +} + +pub fn new_up_down_counter_vec( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], + label_names: [&str; N], +) -> IntUpDownCounterVec { + let gauge_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge vec"); + + IntUpDownCounterVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_i64_up_down_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: label_names.iter().map(|s| s.to_string()).collect(), + } +} + pub fn new_float_gauge( name: &str, help: &str, @@ -628,20 +718,23 @@ pub fn new_histogram_vec( } } -pub struct GaugeGuard<'a> { - gauge: &'a IntGauge, +pub struct UpDownCounterGuard<'a> { + counter: &'a IntUpDownCounter, delta: i64, } -impl std::fmt::Debug for GaugeGuard<'_> { +impl std::fmt::Debug for UpDownCounterGuard<'_> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.delta.fmt(f) } } -impl<'a> GaugeGuard<'a> { - pub fn from_gauge(gauge: &'a IntGauge) -> Self { - Self { gauge, delta: 0i64 } +impl<'a> UpDownCounterGuard<'a> { + pub fn from_counter(counter: &'a IntUpDownCounter) -> Self { + Self { + counter, + delta: 0i64, + } } pub fn get(&self) -> i64 { @@ -649,36 +742,39 @@ impl<'a> GaugeGuard<'a> { } pub fn add(&mut self, delta: i64) { - self.gauge.add(delta); + self.counter.add(delta); self.delta += delta; } pub fn sub(&mut self, delta: i64) { - self.gauge.sub(delta); + self.counter.sub(delta); self.delta -= delta; } } -impl Drop for GaugeGuard<'_> { +impl Drop for UpDownCounterGuard<'_> { fn drop(&mut self) { - self.gauge.sub(self.delta) + self.counter.sub(self.delta) } } -pub struct OwnedGaugeGuard { - gauge: IntGauge, +pub struct OwnedUpDownCounterGuard { + counter: IntUpDownCounter, delta: i64, } -impl std::fmt::Debug for OwnedGaugeGuard { +impl std::fmt::Debug for OwnedUpDownCounterGuard { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.delta.fmt(f) } } -impl OwnedGaugeGuard { - pub fn from_gauge(gauge: IntGauge) -> Self { - Self { gauge, delta: 0i64 } +impl OwnedUpDownCounterGuard { + pub fn from_counter(counter: IntUpDownCounter) -> Self { + Self { + counter, + delta: 0i64, + } } pub fn get(&self) -> i64 { @@ -686,19 +782,19 @@ impl OwnedGaugeGuard { } pub fn add(&mut self, delta: i64) { - self.gauge.add(delta); + self.counter.add(delta); self.delta += delta; } pub fn sub(&mut self, delta: i64) { - self.gauge.sub(delta); + self.counter.sub(delta); self.delta -= delta; } } -impl Drop for OwnedGaugeGuard { +impl Drop for OwnedUpDownCounterGuard { fn drop(&mut self) { - self.gauge.sub(self.delta) + self.counter.sub(self.delta) } } @@ -751,22 +847,22 @@ impl Default for MemoryMetrics { #[derive(Clone)] pub struct InFlightDataGauges { - pub rest_server: IntGauge, - pub ingest_router: IntGauge, - pub ingester_persist: IntGauge, - pub ingester_replicate: IntGauge, - pub wal: IntGauge, - pub fetch_stream: IntGauge, - pub multi_fetch_stream: IntGauge, - pub doc_processor_mailbox: IntGauge, - pub indexer_mailbox: IntGauge, - pub index_writer: IntGauge, - in_flight_gauge_vec: IntGaugeVec<1>, + pub rest_server: IntUpDownCounter, + pub ingest_router: IntUpDownCounter, + pub ingester_persist: IntUpDownCounter, + pub ingester_replicate: IntUpDownCounter, + pub wal: IntUpDownCounter, + pub fetch_stream: IntUpDownCounter, + pub multi_fetch_stream: IntUpDownCounter, + pub doc_processor_mailbox: IntUpDownCounter, + pub indexer_mailbox: IntUpDownCounter, + pub index_writer: IntUpDownCounter, + in_flight_counter_vec: IntUpDownCounterVec<1>, } impl Default for InFlightDataGauges { fn default() -> Self { - let in_flight_gauge_vec = new_gauge_vec( + let in_flight_counter_vec = new_up_down_counter_vec( "in_flight_data_bytes", "Amount of data in-flight in various buffers in bytes.", "memory", @@ -774,75 +870,82 @@ impl Default for InFlightDataGauges { ["component"], ); Self { - rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]), - ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]), - ingester_persist: in_flight_gauge_vec.with_label_values(["ingester_persist"]), - ingester_replicate: in_flight_gauge_vec.with_label_values(["ingester_replicate"]), - wal: in_flight_gauge_vec.with_label_values(["wal"]), - fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]), - multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]), - doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]), - indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]), - index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]), - in_flight_gauge_vec: in_flight_gauge_vec.clone(), + rest_server: in_flight_counter_vec.with_label_values(["rest_server"]), + ingest_router: in_flight_counter_vec.with_label_values(["ingest_router"]), + ingester_persist: in_flight_counter_vec.with_label_values(["ingester_persist"]), + ingester_replicate: in_flight_counter_vec.with_label_values(["ingester_replicate"]), + wal: in_flight_counter_vec.with_label_values(["wal"]), + fetch_stream: in_flight_counter_vec.with_label_values(["fetch_stream"]), + multi_fetch_stream: in_flight_counter_vec.with_label_values(["multi_fetch_stream"]), + doc_processor_mailbox: in_flight_counter_vec + .with_label_values(["doc_processor_mailbox"]), + indexer_mailbox: in_flight_counter_vec.with_label_values(["indexer_mailbox"]), + index_writer: in_flight_counter_vec.with_label_values(["index_writer"]), + in_flight_counter_vec: in_flight_counter_vec.clone(), } } } impl InFlightDataGauges { #[inline] - pub fn file(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["file_source"])) + pub fn file(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec + .with_label_values(["file_source"]) + }) } #[inline] - pub fn ingest(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn ingest(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["ingest_source"]) }) } #[inline] - pub fn kafka(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["kafka_source"])) + pub fn kafka(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec + .with_label_values(["kafka_source"]) + }) } #[inline] - pub fn kinesis(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn kinesis(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["kinesis_source"]) }) } #[inline] - pub fn pubsub(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn pubsub(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["pubsub_source"]) }) } #[inline] - pub fn pulsar(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn pulsar(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["pulsar_source"]) }) } #[inline] - pub fn other(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn other(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["pulsar_source"]) }) } @@ -1012,7 +1115,6 @@ mod tests { fn test_gauge() { let (exporter, provider) = ensure_test_otel_provider(); - // set let gauge = new_gauge("test_gauge_set", "test", "test", &[]); assert_eq!(gauge.get(), 0); gauge.set(10); @@ -1020,41 +1122,64 @@ mod tests { let otel_value = flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_set"); assert_eq!(otel_value, 10); + } + + fn flush_and_get_up_down_counter_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> i64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::I64(MetricData::Sum(sum_data)) = data else { + panic!("expected i64 sum metric"); + }; + sum_data + .data_points() + .next() + .expect("should have one data point") + .value() + }) + } + + #[test] + #[serial] + fn test_up_down_counter() { + let (exporter, provider) = ensure_test_otel_provider(); // inc - let gauge = new_gauge("test_gauge_inc", "test", "test", &[]); - assert_eq!(gauge.get(), 0); - gauge.inc(); - assert_eq!(gauge.get(), 1); + let counter = new_up_down_counter("test_udc_inc", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc(); + assert_eq!(counter.get(), 1); let otel_value = - flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_inc"); + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_inc"); assert_eq!(otel_value, 1); // dec - let gauge = new_gauge("test_gauge_dec", "test", "test", &[]); - assert_eq!(gauge.get(), 0); - gauge.dec(); - assert_eq!(gauge.get(), -1); + let counter = new_up_down_counter("test_udc_dec", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.dec(); + assert_eq!(counter.get(), -1); let otel_value = - flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_dec"); + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_dec"); assert_eq!(otel_value, -1); // add - let gauge = new_gauge("test_gauge_add", "test", "test", &[]); - assert_eq!(gauge.get(), 0); - gauge.add(15); - assert_eq!(gauge.get(), 15); + let counter = new_up_down_counter("test_udc_add", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.add(15); + assert_eq!(counter.get(), 15); let otel_value = - flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_add"); + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_add"); assert_eq!(otel_value, 15); // sub - let gauge = new_gauge("test_gauge_sub", "test", "test", &[]); - assert_eq!(gauge.get(), 0); - gauge.sub(3); - assert_eq!(gauge.get(), -3); + let counter = new_up_down_counter("test_udc_sub", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.sub(3); + assert_eq!(counter.get(), -3); let otel_value = - flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_sub"); + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_sub"); assert_eq!(otel_value, -3); } @@ -1208,30 +1333,30 @@ mod tests { } #[test] - fn test_gauge_guard_add_sub_drop() { - let gauge = new_gauge("test_guard", "test", "test", &[]); + fn test_up_down_counter_guard_add_sub_drop() { + let counter = new_up_down_counter("test_guard", "test", "test", &[]); { - let mut guard = GaugeGuard::from_gauge(&gauge); + let mut guard = UpDownCounterGuard::from_counter(&counter); guard.add(5); - assert_eq!(gauge.get(), 5); + assert_eq!(counter.get(), 5); guard.sub(2); - assert_eq!(gauge.get(), 3); + assert_eq!(counter.get(), 3); } // After drop, the delta (3) is subtracted. - assert_eq!(gauge.get(), 0); + assert_eq!(counter.get(), 0); } #[test] - fn test_owned_gauge_guard_add_sub_drop() { - let gauge = new_gauge("test_owned_guard", "test", "test", &[]); + fn test_owned_up_down_counter_guard_add_sub_drop() { + let counter = new_up_down_counter("test_owned_guard", "test", "test", &[]); { - let mut guard = OwnedGaugeGuard::from_gauge(gauge.clone()); + let mut guard = OwnedUpDownCounterGuard::from_counter(counter.clone()); guard.add(5); - assert_eq!(gauge.get(), 5); + assert_eq!(counter.get(), 5); guard.sub(2); - assert_eq!(gauge.get(), 3); + assert_eq!(counter.get(), 3); } - assert_eq!(gauge.get(), 0); + assert_eq!(counter.get(), 0); } #[test] diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index 3740a7f0527..b744d2fc3ac 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -22,7 +22,7 @@ use tokio::sync::{mpsc, watch}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream}; use tracing::warn; -use crate::metrics::{GaugeGuard, IntGauge}; +use crate::metrics::{IntUpDownCounter, UpDownCounterGuard}; use crate::tower::RpcName; pub type BoxStream = Pin + Send + Unpin + 'static>>; @@ -76,7 +76,7 @@ where T: Send + 'static pub fn new_bounded_with_gauge( capacity: usize, - gauge: &'static IntGauge, + gauge: &'static IntUpDownCounter, ) -> (TrackedSender, Self) { let (sender, receiver) = mpsc::channel(capacity); let tracked_sender = TrackedSender { sender, gauge }; @@ -93,7 +93,9 @@ where T: Send + 'static (sender, receiver.into()) } - pub fn new_unbounded_with_gauge(gauge: &'static IntGauge) -> (TrackedUnboundedSender, Self) { + pub fn new_unbounded_with_gauge( + gauge: &'static IntUpDownCounter, + ) -> (TrackedUnboundedSender, Self) { let (sender, receiver) = mpsc::unbounded_channel(); let tracked_sender = TrackedUnboundedSender { sender, gauge }; let receiver_stream = UnboundedReceiverStream::new(receiver) @@ -227,7 +229,7 @@ where T: RpcName } } -pub struct InFlightValue(T, #[allow(dead_code)] GaugeGuard<'static>); +pub struct InFlightValue(T, #[allow(dead_code)] UpDownCounterGuard<'static>); impl fmt::Debug for InFlightValue where T: fmt::Debug @@ -238,8 +240,8 @@ where T: fmt::Debug } impl InFlightValue { - pub fn new(value: T, value_size: ByteSize, gauge: &'static IntGauge) -> Self { - let mut gauge_guard = GaugeGuard::from_gauge(gauge); + pub fn new(value: T, value_size: ByteSize, gauge: &'static IntUpDownCounter) -> Self { + let mut gauge_guard = UpDownCounterGuard::from_counter(gauge); gauge_guard.add(value_size.as_u64() as i64); Self(value, gauge_guard) @@ -252,7 +254,7 @@ impl InFlightValue { pub struct TrackedSender { sender: mpsc::Sender>, - gauge: &'static IntGauge, + gauge: &'static IntUpDownCounter, } impl TrackedSender { @@ -270,7 +272,7 @@ impl TrackedSender { pub struct TrackedUnboundedSender { sender: mpsc::UnboundedSender>, - gauge: &'static IntGauge, + gauge: &'static IntUpDownCounter, } impl TrackedUnboundedSender { @@ -286,7 +288,7 @@ mod tests { use once_cell::sync::Lazy; use super::*; - use crate::metrics::new_gauge; + use crate::metrics::new_up_down_counter; #[tokio::test] async fn test_service_stream_map() { @@ -299,27 +301,28 @@ mod tests { #[tokio::test] async fn test_tracked_service_stream_bounded() { - static TEST_GAUGE: Lazy = - Lazy::new(|| new_gauge("common", "help", "test_tracked_service_stream_bounded", &[])); + static TEST_COUNTER: Lazy = Lazy::new(|| { + new_up_down_counter("common", "help", "test_tracked_service_stream_bounded", &[]) + }); let (service_stream_tx, mut service_stream) = - ServiceStream::new_bounded_with_gauge(3, &TEST_GAUGE); + ServiceStream::new_bounded_with_gauge(3, &TEST_COUNTER); service_stream_tx.send(1, ByteSize(42)).await.unwrap(); - assert_eq!(TEST_GAUGE.get(), 42); + assert_eq!(TEST_COUNTER.get(), 42); service_stream_tx.send(2, ByteSize(1337)).await.unwrap(); - assert_eq!(TEST_GAUGE.get(), 1379); + assert_eq!(TEST_COUNTER.get(), 1379); let value = service_stream.next().await.unwrap(); assert_eq!(value, 1); - assert_eq!(TEST_GAUGE.get(), 1337); + assert_eq!(TEST_COUNTER.get(), 1337); } #[tokio::test] async fn test_tracked_service_stream_unbounded() { - static TEST_GAUGE: Lazy = Lazy::new(|| { - new_gauge( + static TEST_COUNTER: Lazy = Lazy::new(|| { + new_up_down_counter( "common", "help", "test_tracked_service_stream_unbounded", @@ -328,16 +331,16 @@ mod tests { }); let (service_stream_tx, mut service_stream) = - ServiceStream::new_unbounded_with_gauge(&TEST_GAUGE); + ServiceStream::new_unbounded_with_gauge(&TEST_COUNTER); service_stream_tx.send(1, ByteSize(42)).unwrap(); - assert_eq!(TEST_GAUGE.get(), 42); + assert_eq!(TEST_COUNTER.get(), 42); service_stream_tx.send(2, ByteSize(1337)).unwrap(); - assert_eq!(TEST_GAUGE.get(), 1379); + assert_eq!(TEST_COUNTER.get(), 1379); let value = service_stream.next().await.unwrap(); assert_eq!(value, 1); - assert_eq!(TEST_GAUGE.get(), 1337); + assert_eq!(TEST_COUNTER.get(), 1337); } } diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index 17fcfc3415d..0b2365c2571 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -20,7 +20,10 @@ use once_cell::sync::Lazy; use tokio::sync::oneshot; use tracing::error; -use crate::metrics::{GaugeGuard, IntGauge, IntGaugeVec, OwnedGaugeGuard, new_gauge_vec}; +use crate::metrics::{ + IntUpDownCounter, IntUpDownCounterVec, OwnedUpDownCounterGuard, UpDownCounterGuard, + new_up_down_counter_vec, +}; /// An executor backed by a thread pool to run CPU-intensive tasks. /// @@ -29,8 +32,8 @@ use crate::metrics::{GaugeGuard, IntGauge, IntGaugeVec, OwnedGaugeGuard, new_gau #[derive(Clone)] pub struct ThreadPool { thread_pool: Arc, - ongoing_tasks: IntGauge, - pending_tasks: IntGauge, + ongoing_tasks: IntUpDownCounter, + pending_tasks: IntUpDownCounter, } impl ThreadPool { @@ -84,8 +87,8 @@ impl ThreadPool { { let span = tracing::Span::current(); let ongoing_tasks = self.ongoing_tasks.clone(); - let mut pending_tasks_guard: OwnedGaugeGuard = - OwnedGaugeGuard::from_gauge(self.pending_tasks.clone()); + let mut pending_tasks_guard: OwnedUpDownCounterGuard = + OwnedUpDownCounterGuard::from_counter(self.pending_tasks.clone()); pending_tasks_guard.add(1i64); let (tx, rx) = oneshot::channel(); self.thread_pool.spawn(move || { @@ -94,7 +97,7 @@ impl ThreadPool { return; } let _guard = span.enter(); - let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks); + let mut ongoing_task_guard = UpDownCounterGuard::from_counter(&ongoing_tasks); ongoing_task_guard.add(1i64); let result = cpu_intensive_fn(); let _ = tx.send(result); @@ -137,21 +140,21 @@ impl fmt::Display for Panicked { impl std::error::Error for Panicked {} struct ThreadPoolMetrics { - ongoing_tasks: IntGaugeVec<1>, - pending_tasks: IntGaugeVec<1>, + ongoing_tasks: IntUpDownCounterVec<1>, + pending_tasks: IntUpDownCounterVec<1>, } impl Default for ThreadPoolMetrics { fn default() -> Self { ThreadPoolMetrics { - ongoing_tasks: new_gauge_vec( + ongoing_tasks: new_up_down_counter_vec( "ongoing_tasks", "number of tasks being currently processed by threads in the thread pool", "thread_pool", &[], ["pool"], ), - pending_tasks: new_gauge_vec( + pending_tasks: new_up_down_counter_vec( "pending_tasks", "number of tasks waiting in the queue before being processed by the thread pool", "thread_pool", diff --git a/quickwit/quickwit-common/src/tower/metrics.rs b/quickwit/quickwit-common/src/tower/metrics.rs index b2d093adbe3..cdda9f18068 100644 --- a/quickwit/quickwit-common/src/tower/metrics.rs +++ b/quickwit/quickwit-common/src/tower/metrics.rs @@ -22,7 +22,8 @@ use prometheus::exponential_buckets; use tower::{Layer, Service}; use crate::metrics::{ - HistogramVec, IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec, new_histogram_vec, + HistogramVec, IntCounterVec, IntUpDownCounterVec, new_counter_vec, new_histogram_vec, + new_up_down_counter_vec, }; pub trait RpcName { @@ -33,7 +34,7 @@ pub trait RpcName { pub struct GrpcMetrics { inner: S, requests_total: IntCounterVec<2>, - requests_in_flight: IntGaugeVec<1>, + requests_in_flight: IntUpDownCounterVec<1>, request_duration_seconds: HistogramVec<2>, } @@ -72,7 +73,7 @@ where #[derive(Clone)] pub struct GrpcMetricsLayer { requests_total: IntCounterVec<2>, - requests_in_flight: IntGaugeVec<1>, + requests_in_flight: IntUpDownCounterVec<1>, request_duration_seconds: HistogramVec<2>, } @@ -86,7 +87,7 @@ impl GrpcMetricsLayer { &[("kind", kind)], ["rpc", "status"], ), - requests_in_flight: new_gauge_vec( + requests_in_flight: new_up_down_counter_vec( "grpc_requests_in_flight", "Number of gRPC requests in-flight.", subsystem, @@ -127,7 +128,7 @@ pub struct ResponseFuture { rpc_name: &'static str, status: &'static str, requests_total: IntCounterVec<2>, - requests_in_flight: IntGaugeVec<1>, + requests_in_flight: IntUpDownCounterVec<1>, request_duration_seconds: HistogramVec<2>, } diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 84ba3987f4a..c0f66d686e5 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -27,7 +27,7 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, Command, Handler, Mailbox, QueueCapacity, }; use quickwit_common::io::IoControls; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_common::runtimes::RuntimeType; use quickwit_common::temp_dir::TempDirectory; use quickwit_config::IndexingSettings; @@ -220,7 +220,7 @@ impl IndexerState { let publish_token_opt = self.publish_token_opt.clone(); let mut split_builders_guard = - GaugeGuard::from_gauge(&crate::metrics::INDEXER_METRICS.split_builders); + UpDownCounterGuard::from_counter(&crate::metrics::INDEXER_METRICS.split_builders); split_builders_guard.add(1); let workbench = IndexingWorkbench { @@ -233,7 +233,7 @@ impl IndexerState { publish_lock, publish_token_opt, last_delete_opstamp, - memory_usage: GaugeGuard::from_gauge( + memory_usage: UpDownCounterGuard::from_counter( &quickwit_common::metrics::MEMORY_METRICS .in_flight .index_writer, @@ -358,8 +358,8 @@ struct IndexingWorkbench { // We use this value to set the `delete_opstamp` of the workbench splits. last_delete_opstamp: u64, // Number of bytes declared as used by tantivy. - memory_usage: GaugeGuard<'static>, - split_builders_guard: GaugeGuard<'static>, + memory_usage: UpDownCounterGuard<'static>, + split_builders_guard: UpDownCounterGuard<'static>, cooperative_indexing_period: Option, } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 16daf18102f..401be1ca29a 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -23,7 +23,7 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, HEARTBEAT, Handler, Health, Mailbox, QueueCapacity, Supervisable, }; -use quickwit_common::metrics::OwnedGaugeGuard; +use quickwit_common::metrics::OwnedUpDownCounterGuard; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::{KillSwitch, is_metrics_index}; @@ -163,7 +163,7 @@ pub struct IndexingPipeline { // requiring a respawn of the pipeline. // We keep the list of shards here however, to reassign them after a respawn. shard_ids: BTreeSet, - _indexing_pipelines_gauge_guard: OwnedGaugeGuard, + _indexing_pipelines_gauge_guard: OwnedUpDownCounterGuard, } #[async_trait] @@ -201,7 +201,8 @@ impl IndexingPipeline { let indexing_pipelines_gauge = crate::metrics::INDEXER_METRICS .indexing_pipelines .with_label_values([¶ms.pipeline_id.index_uid.index_id]); - let indexing_pipelines_gauge_guard = OwnedGaugeGuard::from_gauge(indexing_pipelines_gauge); + let indexing_pipelines_gauge_guard = + OwnedUpDownCounterGuard::from_counter(indexing_pipelines_gauge); let params_fingerprint = params.params_fingerprint; IndexingPipeline { params, diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index d3186616252..f75860f6426 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -14,18 +14,19 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - IntCounter, IntCounterVec, IntGauge, IntGaugeVec, new_counter, new_counter_vec, new_gauge, - new_gauge_vec, + IntCounter, IntCounterVec, IntGauge, IntGaugeVec, IntUpDownCounter, IntUpDownCounterVec, + new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_up_down_counter, + new_up_down_counter_vec, }; pub struct IndexerMetrics { pub processed_docs_total: IntCounterVec<2>, pub processed_bytes: IntCounterVec<2>, - pub indexing_pipelines: IntGaugeVec<1>, + pub indexing_pipelines: IntUpDownCounterVec<1>, pub backpressure_micros: IntCounterVec<1>, pub available_concurrent_upload_permits: IntGaugeVec<1>, - pub split_builders: IntGauge, - pub ongoing_merge_operations: IntGauge, + pub split_builders: IntUpDownCounter, + pub ongoing_merge_operations: IntUpDownCounter, pub pending_merge_operations: IntGauge, pub pending_merge_bytes: IntGauge, // We use a lazy counter, as most users do not use Kafka. @@ -52,7 +53,7 @@ impl Default for IndexerMetrics { &[], ["index", "docs_processed_status"], ), - indexing_pipelines: new_gauge_vec( + indexing_pipelines: new_up_down_counter_vec( "indexing_pipelines", "Number of running indexing pipelines", "indexing", @@ -74,13 +75,13 @@ impl Default for IndexerMetrics { &[], ["component"], ), - split_builders: new_gauge( + split_builders: new_up_down_counter( "split_builders", "Number of existing index writer instances.", "indexing", &[], ), - ongoing_merge_operations: new_gauge( + ongoing_merge_operations: new_up_down_counter( "ongoing_merge_operations", "Number of ongoing merge operations", "indexing", diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index cd272bdc34c..6dd989acc3a 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -16,7 +16,7 @@ use std::fmt; use std::path::Path; use quickwit_common::io::IoControls; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_common::temp_dir::TempDirectory; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_proto::indexing::IndexingPipelineId; @@ -182,8 +182,8 @@ pub struct IndexedSplitBatchBuilder { pub publish_token_opt: Option, pub commit_trigger: CommitTrigger, pub batch_parent_span: Span, - pub memory_usage: GaugeGuard<'static>, - pub _split_builders_guard: GaugeGuard<'static>, + pub memory_usage: UpDownCounterGuard<'static>, + pub _split_builders_guard: UpDownCounterGuard<'static>, } /// Sends notifications to the Publisher that the last batch of splits was empty. diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs index bed695aa1d4..9f9814670cf 100644 --- a/quickwit/quickwit-indexing/src/models/processed_doc.rs +++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs @@ -14,7 +14,7 @@ use std::fmt; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; use tantivy::{DateTime, TantivyDocument}; @@ -41,7 +41,7 @@ pub struct ProcessedDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard<'static>, + _gauge_guard: UpDownCounterGuard<'static>, } impl ProcessedDocBatch { @@ -51,7 +51,8 @@ impl ProcessedDocBatch { force_commit: bool, ) -> Self { let delta = docs.iter().map(|doc| doc.num_bytes as i64).sum::(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.indexer_mailbox); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.indexer_mailbox); gauge_guard.add(delta); Self { docs, diff --git a/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs b/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs index 0db83abcc02..8d57ff99fc2 100644 --- a/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs +++ b/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs @@ -20,7 +20,7 @@ use std::fmt; use arrow::record_batch::RecordBatch; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; /// Batch of parquet data as Arrow RecordBatch for the parquet indexing pipeline. @@ -35,7 +35,7 @@ pub struct ProcessedParquetBatch { /// Force commit flag - when true, accumulator should flush immediately. pub force_commit: bool, /// Memory tracking gauge guard. - _gauge_guard: GaugeGuard<'static>, + _gauge_guard: UpDownCounterGuard<'static>, } impl ProcessedParquetBatch { @@ -57,7 +57,8 @@ impl ProcessedParquetBatch { .map(|col| col.get_array_memory_size() as i64) .sum(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.indexer_mailbox); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.indexer_mailbox); gauge_guard.add(memory_size); Self { diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs index f88d9fcac2b..01202aff069 100644 --- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs +++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs @@ -15,7 +15,7 @@ use std::fmt; use bytes::Bytes; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; pub struct RawDocBatch { @@ -24,7 +24,7 @@ pub struct RawDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard<'static>, + _gauge_guard: UpDownCounterGuard<'static>, } impl RawDocBatch { @@ -35,7 +35,7 @@ impl RawDocBatch { ) -> Self { let delta = docs.iter().map(|doc| doc.len() as i64).sum::(); let mut gauge_guard = - GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.doc_processor_mailbox); gauge_guard.add(delta); Self { @@ -67,7 +67,8 @@ impl fmt::Debug for RawDocBatch { impl Default for RawDocBatch { fn default() -> Self { - let _gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); + let _gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.doc_processor_mailbox); Self { docs: Vec::new(), checkpoint_delta: SourceCheckpointDelta::default(), diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 86d40e5288f..afc2b715025 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -91,7 +91,7 @@ pub use pulsar_source::{PulsarSource, PulsarSourceFactory}; #[cfg(feature = "sqs")] pub use queue_sources::sqs_queue; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{ @@ -542,7 +542,7 @@ pub(super) struct BatchBuilder { num_bytes: u64, checkpoint_delta: SourceCheckpointDelta, force_commit: bool, - gauge_guard: GaugeGuard<'static>, + gauge_guard: UpDownCounterGuard<'static>, } impl BatchBuilder { @@ -560,7 +560,7 @@ impl BatchBuilder { SourceType::Pulsar => MEMORY_METRICS.in_flight.pulsar(), _ => MEMORY_METRICS.in_flight.other(), }; - let gauge_guard = GaugeGuard::from_gauge(gauge); + let gauge_guard = UpDownCounterGuard::from_counter(gauge); Self { docs: Vec::with_capacity(capacity), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 5091784b885..9fa6da4d79b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -26,7 +26,7 @@ use futures::stream::FuturesUnordered; use mrecordlog::error::CreateQueueError; use once_cell::sync::OnceCell; use quickwit_cluster::Cluster; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -1112,7 +1112,8 @@ impl IngesterService for Ingester { _ => None, }) .sum::(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_persist); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.ingester_persist); gauge_guard.add(request_size_bytes as i64); self.persist_inner(persist_request).await diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 87975a3c462..f1a7903db50 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -15,8 +15,9 @@ use mrecordlog::ResourceUsage; use once_cell::sync::Lazy; use quickwit_common::metrics::{ - Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, exponential_buckets, - linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec, + Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntUpDownCounterVec, + exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_histogram, + new_histogram_vec, new_up_down_counter_vec, }; // Counter vec counting the different outcomes of ingest requests as @@ -77,7 +78,7 @@ pub(super) struct IngestV2Metrics { pub closed_shards: IntGauge, pub shard_lt_throughput_mib: Histogram, pub shard_st_throughput_mib: Histogram, - pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>, + pub wal_acquire_lock_requests_in_flight: IntUpDownCounterVec<2>, pub wal_acquire_lock_request_duration_secs: HistogramVec<2>, pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, @@ -127,7 +128,7 @@ impl Default for IngestV2Metrics { "ingest", linear_buckets(0.0f64, 1.0f64, 15).unwrap(), ), - wal_acquire_lock_requests_in_flight: new_gauge_vec( + wal_acquire_lock_requests_in_flight: new_up_down_counter_vec( "wal_acquire_lock_requests_in_flight", "Number of acquire lock requests in-flight.", "ingest", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index bbf0cd037c5..0947e43aa3e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -18,7 +18,7 @@ use std::time::{Duration, Instant}; use bytesize::ByteSize; use futures::{Future, StreamExt}; use mrecordlog::error::CreateQueueError; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::{ServiceStream, rate_limited_warn}; use quickwit_proto::ingest::ingester::{ AckReplicationMessage, IngesterStatus, InitReplicaRequest, InitReplicaResponse, @@ -504,7 +504,8 @@ impl ReplicationTask { ))); } let request_size_bytes = replicate_request.num_bytes(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_replicate); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.ingester_replicate); gauge_guard.add(request_size_bytes as i64); self.current_replication_seqno += 1; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index d27c5ebfb77..31474d52782 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -20,7 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::{rate_limited_error, rate_limited_warn}; use quickwit_proto::control_plane::{ @@ -578,7 +578,8 @@ impl IngestRouterService for IngestRouter { async fn ingest(&self, ingest_request: IngestRequestV2) -> IngestV2Result { let request_size_bytes = ingest_request.num_bytes(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.ingest_router); gauge_guard.add(request_size_bytes as i64); let num_subrequests = ingest_request.subrequests.len(); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs index 3807fcf6829..2e7493c2431 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs @@ -13,11 +13,11 @@ // limitations under the License. use once_cell::sync::Lazy; -use quickwit_common::metrics::{IntGauge, new_gauge}; +use quickwit_common::metrics::{IntGauge, IntUpDownCounter, new_gauge, new_up_down_counter}; #[derive(Clone)] pub(super) struct PostgresMetrics { - pub acquire_connections: IntGauge, + pub acquire_connections: IntUpDownCounter, pub active_connections: IntGauge, pub idle_connections: IntGauge, } @@ -25,7 +25,7 @@ pub(super) struct PostgresMetrics { impl Default for PostgresMetrics { fn default() -> Self { Self { - acquire_connections: new_gauge( + acquire_connections: new_up_down_counter( "acquire_connections", "Number of connections being acquired.", "metastore", diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs index a4c1e790e5b..c5dddeca99d 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs @@ -14,7 +14,7 @@ use futures::future::BoxFuture; use futures::stream::BoxStream; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use sqlx::pool::PoolConnection; use sqlx::pool::maybe::MaybePoolConnection; use sqlx::{ @@ -58,7 +58,8 @@ impl<'a, DB: Database> Acquire<'a> for &TrackedPool { .set(self.inner_pool.num_idle() as i64); Box::pin(async move { - let mut gauge_guard = GaugeGuard::from_gauge(&POSTGRES_METRICS.acquire_connections); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&POSTGRES_METRICS.acquire_connections); gauge_guard.add(1); let conn = acquire_conn_fut.await?; diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 9ed11d7ecbb..8c6053928ce 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -19,9 +19,9 @@ use std::fmt; use bytesize::ByteSize; use once_cell::sync::Lazy; use quickwit_common::metrics::{ - Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, exponential_buckets, - linear_buckets, new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, - new_histogram_vec, + Histogram, HistogramVec, IntCounter, IntCounterVec, IntUpDownCounter, exponential_buckets, + linear_buckets, new_counter, new_counter_vec, new_histogram, new_histogram_vec, + new_up_down_counter, new_up_down_counter_vec, }; fn print_if_not_null( @@ -115,10 +115,10 @@ pub struct SearchMetrics { pub split_search_outcome_total: SplitSearchOutcomeCounters, pub leaf_search_split_duration_secs: Histogram, pub job_assigned_total: IntCounterVec<1>, - pub leaf_search_single_split_tasks_pending: IntGauge, - pub leaf_search_single_split_tasks_ongoing: IntGauge, + pub leaf_search_single_split_tasks_pending: IntUpDownCounter, + pub leaf_search_single_split_tasks_ongoing: IntUpDownCounter, pub leaf_search_single_split_warmup_num_bytes: Histogram, - pub searcher_local_kv_store_size_bytes: IntGauge, + pub searcher_local_kv_store_size_bytes: IntUpDownCounter, } /// From 0.008s to 131.072s @@ -151,7 +151,7 @@ impl Default for SearchMetrics { ByteSize::gb(5).as_u64() as f64, ]; - let leaf_search_single_split_tasks = new_gauge_vec::<1>( + let leaf_search_single_split_tasks = new_up_down_counter_vec::<1>( "leaf_search_single_split_tasks", "Number of single split search tasks pending or ongoing", "search", @@ -239,7 +239,7 @@ impl Default for SearchMetrics { &[], ["affinity"], ), - searcher_local_kv_store_size_bytes: new_gauge( + searcher_local_kv_store_size_bytes: new_up_down_counter( "searcher_local_kv_store_size_bytes", "Size of the searcher kv store in bytes. This store is used to cache scroll \ contexts.", diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index a4a31a856b5..35a4e145321 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -22,7 +22,7 @@ use std::time::Duration; use anyhow::Context; use base64::Engine; use base64::prelude::BASE64_STANDARD; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_common::shared_consts::SCROLL_BATCH_LEN; use quickwit_metastore::SplitMetadata; use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; @@ -123,7 +123,7 @@ impl ScrollContext { struct TrackedValue { content: Vec, - _total_size_metric_guard: GaugeGuard<'static>, + _total_size_metric_guard: UpDownCounterGuard<'static>, } /// In memory key value store with TTL and limited size. @@ -148,8 +148,9 @@ impl Default for MiniKV { impl MiniKV { pub async fn put(&self, key: Vec, payload: Vec, ttl: Duration) { - let mut metric_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes); + let mut metric_guard = UpDownCounterGuard::from_counter( + &crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes, + ); metric_guard.add(payload.len() as i64); let mut cache_lock = self.ttl_with_cache.write().await; cache_lock.insert( diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 4cd5e99918f..2a33e1aa6a0 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use bytesize::ByteSize; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_proto::search::SplitIdAndFooterOffsets; use tokio::sync::{mpsc, oneshot}; @@ -332,7 +332,7 @@ impl SearchPermitActor { fn assign_available_permits(&mut self) { while let Some(permit_request) = self.pop_next_request_if_serviceable() { - let mut ongoing_gauge_guard = GaugeGuard::from_gauge( + let mut ongoing_gauge_guard = UpDownCounterGuard::from_counter( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); @@ -357,7 +357,7 @@ impl SearchPermitActor { } pub struct SearchPermit { - _ongoing_gauge_guard: GaugeGuard<'static>, + _ongoing_gauge_guard: UpDownCounterGuard<'static>, msg_sender: mpsc::WeakUnboundedSender, memory_allocation: u64, warmup_slot_freed: bool, diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index d65df7d3bea..54398e9a402 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -17,7 +17,7 @@ use std::sync::OnceLock; use bytes::Bytes; use flate2::read::{MultiGzDecoder, ZlibDecoder}; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::thread_pool::run_cpu_intensive; use thiserror::Error; use warp::Filter; @@ -108,13 +108,14 @@ pub(crate) fn get_body_bytes() -> impl Filter, + _gauge_guard: UpDownCounterGuard<'static>, _permit: LoadShieldPermit, } impl Body { pub fn new(content: Bytes, load_shield_permit: LoadShieldPermit) -> Body { - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.rest_server); gauge_guard.add(content.len() as i64); Body { content, diff --git a/quickwit/quickwit-serve/src/load_shield.rs b/quickwit/quickwit-serve/src/load_shield.rs index 568ffabd53b..4b5979ed43c 100644 --- a/quickwit/quickwit-serve/src/load_shield.rs +++ b/quickwit/quickwit-serve/src/load_shield.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use quickwit_common::metrics::{GaugeGuard, IntGauge}; +use quickwit_common::metrics::{IntUpDownCounter, UpDownCounterGuard}; use tokio::sync::{Semaphore, SemaphorePermit}; use crate::rest::TooManyRequests; @@ -22,14 +22,14 @@ use crate::rest::TooManyRequests; pub struct LoadShield { in_flight_semaphore_opt: Option, // This one is doing the load shedding. concurrency_semaphore_opt: Option, - ongoing_gauge: IntGauge, - pending_gauge: IntGauge, + ongoing_gauge: IntUpDownCounter, + pending_gauge: IntUpDownCounter, } pub struct LoadShieldPermit { _concurrency_permit_opt: Option>, _in_flight_permit_opt: Option>, - _ongoing_gauge_guard: GaugeGuard<'static>, + _ongoing_gauge_guard: UpDownCounterGuard<'static>, } impl LoadShield { @@ -78,12 +78,12 @@ impl LoadShield { } pub async fn acquire_permit(&'static self) -> Result { - let mut pending_gauge_guard = GaugeGuard::from_gauge(&self.pending_gauge); + let mut pending_gauge_guard = UpDownCounterGuard::from_counter(&self.pending_gauge); pending_gauge_guard.add(1); let in_flight_permit_opt = self.acquire_in_flight_permit().await?; let concurrency_permit_opt = self.acquire_concurrency_permit().await; drop(pending_gauge_guard); - let mut ongoing_gauge_guard = GaugeGuard::from_gauge(&self.ongoing_gauge); + let mut ongoing_gauge_guard = UpDownCounterGuard::from_counter(&self.ongoing_gauge); ongoing_gauge_guard.add(1); Ok(LoadShieldPermit { _in_flight_permit_opt: in_flight_permit_opt, diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index c1e4fa24d93..fb6a7bcf0a0 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -14,15 +14,15 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - HistogramVec, IntCounter, IntCounterVec, IntGaugeVec, new_counter, new_counter_vec, - new_gauge_vec, new_histogram_vec, + HistogramVec, IntCounter, IntCounterVec, IntUpDownCounterVec, new_counter, new_counter_vec, + new_histogram_vec, new_up_down_counter_vec, }; pub struct ServeMetrics { pub http_requests_total: IntCounterVec<2>, pub request_duration_secs: HistogramVec<2>, - pub ongoing_requests: IntGaugeVec<1>, - pub pending_requests: IntGaugeVec<1>, + pub ongoing_requests: IntUpDownCounterVec<1>, + pub pending_requests: IntUpDownCounterVec<1>, pub circuit_break_total: IntCounter, } @@ -51,14 +51,14 @@ impl Default for ServeMetrics { // last bucket is 163.84s quickwit_common::metrics::exponential_buckets(0.02, 2.0, 14).unwrap(), ), - ongoing_requests: new_gauge_vec( + ongoing_requests: new_up_down_counter_vec( "ongoing_requests", "Number of ongoing requests.", "", &[], ["endpoint_group"], ), - pending_requests: new_gauge_vec( + pending_requests: new_up_down_counter_vec( "pending_requests", "Number of pending requests.", "", diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 8b439bfaeaa..74042544c8d 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -19,8 +19,8 @@ use std::sync::RwLock; use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, - new_gauge, new_histogram_vec, + Histogram, IntCounter, IntCounterVec, IntUpDownCounter, UpDownCounterGuard, new_counter, + new_counter_vec, new_histogram_vec, new_up_down_counter, }; use quickwit_config::CacheConfig; @@ -37,8 +37,8 @@ pub struct StorageMetrics { pub get_slice_timeout_all_timeouts: IntCounter, pub object_storage_get_total: IntCounter, pub object_storage_get_errors_total: IntCounterVec<1>, - pub object_storage_get_slice_in_flight_count: IntGauge, - pub object_storage_get_slice_in_flight_num_bytes: IntGauge, + pub object_storage_get_slice_in_flight_count: IntUpDownCounter, + pub object_storage_get_slice_in_flight_num_bytes: IntUpDownCounter, pub object_storage_put_total: IntCounter, pub object_storage_put_parts: IntCounter, pub object_storage_download_num_bytes: IntCounter, @@ -117,14 +117,14 @@ impl Default for StorageMetrics { &[], ["code"], ), - object_storage_get_slice_in_flight_count: new_gauge( + object_storage_get_slice_in_flight_count: new_up_down_counter( "object_storage_get_slice_in_flight_count", "Number of GetObject for which the memory was allocated but the download is still \ in progress.", "storage", &[], ), - object_storage_get_slice_in_flight_num_bytes: new_gauge( + object_storage_get_slice_in_flight_num_bytes: new_up_down_counter( "object_storage_get_slice_in_flight_num_bytes", "Memory allocated for GetObject requests that are still in progress.", "storage", @@ -172,8 +172,8 @@ pub struct CacheMetrics { #[derive(Clone)] pub struct SingleCacheMetrics { - pub in_cache_count: IntGauge, - pub in_cache_num_bytes: IntGauge, + pub in_cache_count: IntUpDownCounter, + pub in_cache_num_bytes: IntUpDownCounter, pub hits_num_items: IntCounter, pub hits_num_bytes: IntCounter, pub misses_num_items: IntCounter, @@ -188,13 +188,13 @@ impl CacheMetrics { CacheMetrics { component_name: component_name.to_string(), cache_metrics: SingleCacheMetrics { - in_cache_count: new_gauge( + in_cache_count: new_up_down_counter( "in_cache_count", "Count of in cache by component", CACHE_METRICS_NAMESPACE, &labels, ), - in_cache_num_bytes: new_gauge( + in_cache_num_bytes: new_up_down_counter( "in_cache_num_bytes", "Number of bytes in cache by component", CACHE_METRICS_NAMESPACE, @@ -250,13 +250,13 @@ impl CacheMetrics { ("policy", &policy), ]; let new_virtual_cache_metrics = SingleCacheMetrics { - in_cache_count: new_gauge( + in_cache_count: new_up_down_counter( "virtual_in_cache_count", "Count of in cache by component", CACHE_METRICS_NAMESPACE, &labels, ), - in_cache_num_bytes: new_gauge( + in_cache_num_bytes: new_up_down_counter( "virtual_in_cache_num_bytes", "Number of bytes in cache by component", CACHE_METRICS_NAMESPACE, @@ -313,13 +313,14 @@ pub static CACHE_METRICS_FOR_TESTS: Lazy = pub fn object_storage_get_slice_in_flight_guards( get_request_size: usize, -) -> (GaugeGuard<'static>, GaugeGuard<'static>) { - let mut bytes_guard = GaugeGuard::from_gauge( +) -> (UpDownCounterGuard<'static>, UpDownCounterGuard<'static>) { + let mut bytes_guard = UpDownCounterGuard::from_counter( &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_num_bytes, ); bytes_guard.add(get_request_size as i64); - let mut count_guard = - GaugeGuard::from_gauge(&crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count); + let mut count_guard = UpDownCounterGuard::from_counter( + &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count, + ); count_guard.add(1); (bytes_guard, count_guard) } From bf9c31b68a0e054a142c6cf077f87d5074727d73 Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Fri, 10 Apr 2026 12:52:23 +0200 Subject: [PATCH 3/7] Remove IntUpDownCounter::set() no-op and fix affected call sites The no-op set() was a temporary shim after the IntGauge/IntUpDownCounter split. This removes it and fixes the three call sites that relied on it: - WAL: remove wal field from InFlightDataGauges, add wal_memory_allocated_bytes IntGauge to INGEST_V2_METRICS - Search: split pending (IntGauge) and ongoing (IntUpDownCounter) into standalone metrics instead of sharing a vec - Storage: replace fd_cache set() calls with inc/dec based on push/pop return values - Indexing: change ongoing_merge_operations to IntGauge --- quickwit/quickwit-common/src/metrics.rs | 6 ---- quickwit/quickwit-indexing/src/metrics.rs | 4 +-- .../quickwit-ingest/src/ingest_v2/metrics.rs | 12 +++++-- quickwit/quickwit-search/src/metrics.rs | 32 +++++++++---------- .../src/file_descriptor_cache.rs | 18 ++++------- 5 files changed, 34 insertions(+), 38 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 4d84d0617e6..1b314c712d4 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -283,10 +283,6 @@ pub struct IntUpDownCounter { } impl IntUpDownCounter { - /// No-op dummy. Exists only so call sites that mix `set` with relative ops - /// still compile after migrating from `IntGauge`. - pub fn set(&self, _v: i64) {} - pub fn inc(&self) { self.prometheus.inc(); self.otel.add(1); @@ -851,7 +847,6 @@ pub struct InFlightDataGauges { pub ingest_router: IntUpDownCounter, pub ingester_persist: IntUpDownCounter, pub ingester_replicate: IntUpDownCounter, - pub wal: IntUpDownCounter, pub fetch_stream: IntUpDownCounter, pub multi_fetch_stream: IntUpDownCounter, pub doc_processor_mailbox: IntUpDownCounter, @@ -874,7 +869,6 @@ impl Default for InFlightDataGauges { ingest_router: in_flight_counter_vec.with_label_values(["ingest_router"]), ingester_persist: in_flight_counter_vec.with_label_values(["ingester_persist"]), ingester_replicate: in_flight_counter_vec.with_label_values(["ingester_replicate"]), - wal: in_flight_counter_vec.with_label_values(["wal"]), fetch_stream: in_flight_counter_vec.with_label_values(["fetch_stream"]), multi_fetch_stream: in_flight_counter_vec.with_label_values(["multi_fetch_stream"]), doc_processor_mailbox: in_flight_counter_vec diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index f75860f6426..14d4c801f45 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -26,7 +26,7 @@ pub struct IndexerMetrics { pub backpressure_micros: IntCounterVec<1>, pub available_concurrent_upload_permits: IntGaugeVec<1>, pub split_builders: IntUpDownCounter, - pub ongoing_merge_operations: IntUpDownCounter, + pub ongoing_merge_operations: IntGauge, pub pending_merge_operations: IntGauge, pub pending_merge_bytes: IntGauge, // We use a lazy counter, as most users do not use Kafka. @@ -81,7 +81,7 @@ impl Default for IndexerMetrics { "indexing", &[], ), - ongoing_merge_operations: new_up_down_counter( + ongoing_merge_operations: new_gauge( "ongoing_merge_operations", "Number of ongoing merge operations", "indexing", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index f1a7903db50..ee450453126 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -81,6 +81,7 @@ pub(super) struct IngestV2Metrics { pub wal_acquire_lock_requests_in_flight: IntUpDownCounterVec<2>, pub wal_acquire_lock_request_duration_secs: HistogramVec<2>, pub wal_disk_used_bytes: IntGauge, + pub wal_memory_allocated_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, pub ingest_attempts: IntCounterVec<1>, @@ -149,6 +150,12 @@ impl Default for IngestV2Metrics { "ingest", &[], ), + wal_memory_allocated_bytes: new_gauge( + "wal_memory_allocated_bytes", + "WAL memory allocated in bytes.", + "ingest", + &[], + ), wal_memory_used_bytes: new_gauge( "wal_memory_used_bytes", "WAL memory used in bytes.", @@ -163,9 +170,8 @@ pub(super) fn report_wal_usage(wal_usage: ResourceUsage) { INGEST_V2_METRICS .wal_disk_used_bytes .set(wal_usage.disk_used_bytes as i64); - quickwit_common::metrics::MEMORY_METRICS - .in_flight - .wal + INGEST_V2_METRICS + .wal_memory_allocated_bytes .set(wal_usage.memory_allocated_bytes as i64); INGEST_V2_METRICS .wal_memory_used_bytes diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 8c6053928ce..de432a2c847 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -19,9 +19,9 @@ use std::fmt; use bytesize::ByteSize; use once_cell::sync::Lazy; use quickwit_common::metrics::{ - Histogram, HistogramVec, IntCounter, IntCounterVec, IntUpDownCounter, exponential_buckets, - linear_buckets, new_counter, new_counter_vec, new_histogram, new_histogram_vec, - new_up_down_counter, new_up_down_counter_vec, + Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntUpDownCounter, + exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge, new_histogram, + new_histogram_vec, new_up_down_counter, }; fn print_if_not_null( @@ -115,7 +115,7 @@ pub struct SearchMetrics { pub split_search_outcome_total: SplitSearchOutcomeCounters, pub leaf_search_split_duration_secs: Histogram, pub job_assigned_total: IntCounterVec<1>, - pub leaf_search_single_split_tasks_pending: IntUpDownCounter, + pub leaf_search_single_split_tasks_pending: IntGauge, pub leaf_search_single_split_tasks_ongoing: IntUpDownCounter, pub leaf_search_single_split_warmup_num_bytes: Histogram, pub searcher_local_kv_store_size_bytes: IntUpDownCounter, @@ -151,14 +151,6 @@ impl Default for SearchMetrics { ByteSize::gb(5).as_u64() as f64, ]; - let leaf_search_single_split_tasks = new_up_down_counter_vec::<1>( - "leaf_search_single_split_tasks", - "Number of single split search tasks pending or ongoing", - "search", - &[], - ["status"], // takes values "ongoing" or "pending" - ); - SearchMetrics { root_search_requests_total: new_counter_vec( "root_search_requests_total", @@ -222,10 +214,18 @@ impl Default for SearchMetrics { "search", duration_buckets(), ), - leaf_search_single_split_tasks_ongoing: leaf_search_single_split_tasks - .with_label_values(["ongoing"]), - leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks - .with_label_values(["pending"]), + leaf_search_single_split_tasks_ongoing: new_up_down_counter( + "leaf_search_single_split_tasks_ongoing", + "Number of single split search tasks ongoing.", + "search", + &[], + ), + leaf_search_single_split_tasks_pending: new_gauge( + "leaf_search_single_split_tasks_pending", + "Number of single split search tasks pending.", + "search", + &[], + ), leaf_search_single_split_warmup_num_bytes: new_histogram( "leaf_search_single_split_warmup_num_bytes", "Size of the short lived cache for a single split once the warmup is done.", diff --git a/quickwit/quickwit-storage/src/file_descriptor_cache.rs b/quickwit/quickwit-storage/src/file_descriptor_cache.rs index 4626e71bee9..b8367416df0 100644 --- a/quickwit/quickwit-storage/src/file_descriptor_cache.rs +++ b/quickwit/quickwit-storage/src/file_descriptor_cache.rs @@ -101,10 +101,9 @@ impl FileDescriptorCache { fn put_split_file(&self, split_id: Ulid, split_file: SplitFile) { let mut fd_cache_lock = self.fd_cache.lock().unwrap(); - fd_cache_lock.push(split_id, split_file); - self.fd_cache_metrics - .in_cache_count - .set(fd_cache_lock.len() as i64); + if fd_cache_lock.push(split_id, split_file).is_none() { + self.fd_cache_metrics.in_cache_count.inc(); + } } /// Evicts the given list of split ids from the file descriptor cache. @@ -112,14 +111,11 @@ impl FileDescriptorCache { pub fn evict_split_files(&self, split_ids: &[Ulid]) { let mut fd_cache_lock = self.fd_cache.lock().unwrap(); for split_id in split_ids { - fd_cache_lock.pop(split_id); + if fd_cache_lock.pop(split_id).is_some() { + self.fd_cache_metrics.in_cache_count.dec(); + self.fd_cache_metrics.evict_num_items.inc(); + } } - self.fd_cache_metrics - .in_cache_count - .set(fd_cache_lock.len() as i64); - self.fd_cache_metrics - .evict_num_items - .inc_by(split_ids.len() as u64); } pub async fn get_or_open_split_file( From 205715e6813cae6a1ea233dab7df66e03240f03a Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Tue, 14 Apr 2026 09:59:06 +0200 Subject: [PATCH 4/7] Use Arc<[T]> for OTel metric attributes and label names Replace Vec with Arc<[KeyValue]> for metric attributes and Vec with Arc<[Key]> for label names to reduce heap allocations on the hot path when recording metrics with labels. --- quickwit/quickwit-common/src/metrics.rs | 54 +++++++++++++++---------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 1b314c712d4..9ab5ee63a4d 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -16,8 +16,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, LazyLock, OnceLock}; use std::time::Instant; -use opentelemetry::KeyValue; use opentelemetry::metrics::Meter; +use opentelemetry::{Key, KeyValue}; use prometheus::{HistogramOpts, Opts, TextEncoder}; pub use prometheus::{exponential_buckets, linear_buckets}; @@ -34,11 +34,20 @@ fn otel_meter() -> Option<&'static Meter> { OTEL_METER.get() } -fn build_otel_attributes(const_labels: &[(&str, &str)]) -> Vec { +fn build_otel_attributes(const_labels: &[(&str, &str)]) -> Arc<[KeyValue]> { const_labels .iter() - .map(|(k, v)| KeyValue::new(k.to_string(), v.to_string())) - .collect() + .map(|(k, v)| KeyValue::new(Arc::::from(*k), Arc::::from(*v))) + .collect::>() + .into() +} + +fn build_otel_label_names(label_names: [&str; N]) -> Arc<[Key]> { + label_names + .into_iter() + .map(|label_name| Key::new(Arc::::from(label_name))) + .collect::>() + .into() } fn build_prometheus_labels(const_labels: &[(&str, &str)]) -> HashMap { @@ -76,25 +85,26 @@ impl OtelState { #[derive(Clone)] struct OtelMetric { state: Option>>, - attributes: Vec, + attributes: Arc<[KeyValue]>, } impl OtelMetric { - fn new(state: Option>>, attributes: Vec) -> Self { + fn new(state: Option>>, attributes: Arc<[KeyValue]>) -> Self { Self { state, attributes } } - fn with_attributes(&self, names: &[String], values: [&str; N]) -> Self { + fn with_attributes(&self, names: &[Key], values: [&str; N]) -> Self { if self.state.is_none() { return Self::default(); } - let mut attributes = self.attributes.clone(); + let mut attributes = Vec::with_capacity(self.attributes.len() + N); + attributes.extend(self.attributes.iter().cloned()); for (name, value) in names.iter().zip(values.iter()) { - attributes.push(KeyValue::new(name.clone(), value.to_string())); + attributes.push(KeyValue::new(name.clone(), Arc::::from(*value))); } Self { state: self.state.clone(), - attributes, + attributes: attributes.into(), } } @@ -106,7 +116,7 @@ impl OtelMetric { let Some(instrument) = self.get_or_bind_instrument() else { return; }; - f(instrument, &self.attributes); + f(instrument, self.attributes.as_ref()); } fn get_or_bind_instrument(&self) -> Option<&T> { @@ -123,7 +133,7 @@ impl Default for OtelMetric { fn default() -> Self { Self { state: None, - attributes: Vec::new(), + attributes: Vec::new().into(), } } } @@ -210,7 +220,7 @@ impl IntCounter { pub struct IntCounterVec { prometheus: prometheus::IntCounterVec, otel: OtelMetric>, - label_names: Vec, + label_names: Arc<[Key]>, } impl IntCounterVec { @@ -231,7 +241,7 @@ impl IntCounterVec { IntCounterVec { prometheus: prom, otel: OtelMetric::new(None, build_otel_attributes(const_labels)), - label_names: label_names.iter().map(|s| s.to_string()).collect(), + label_names: build_otel_label_names(label_names), } } @@ -264,7 +274,7 @@ impl IntGauge { pub struct IntGaugeVec { prometheus: prometheus::IntGaugeVec, otel: OtelMetric>, - label_names: Vec, + label_names: Arc<[Key]>, } impl IntGaugeVec { @@ -312,7 +322,7 @@ impl IntUpDownCounter { pub struct IntUpDownCounterVec { prometheus: prometheus::IntGaugeVec, otel: OtelMetric>, - label_names: Vec, + label_names: Arc<[Key]>, } impl IntUpDownCounterVec { @@ -386,7 +396,7 @@ impl Drop for HistogramTimer { pub struct HistogramVec { prometheus: prometheus::HistogramVec, otel: OtelMetric>, - label_names: Vec, + label_names: Arc<[Key]>, } impl HistogramVec { @@ -539,7 +549,7 @@ pub fn new_counter_vec( Some(new_counter_otel_state(name, subsystem, help)), build_otel_attributes(const_labels), ), - label_names: label_names.iter().map(|s| s.to_string()).collect(), + label_names: build_otel_label_names(label_names), } } @@ -586,7 +596,7 @@ pub fn new_gauge_vec( Some(new_int_gauge_otel_state(name, subsystem, help)), build_otel_attributes(const_labels), ), - label_names: label_names.iter().map(|s| s.to_string()).collect(), + label_names: build_otel_label_names(label_names), } } @@ -633,7 +643,7 @@ pub fn new_up_down_counter_vec( Some(new_i64_up_down_counter_otel_state(name, subsystem, help)), build_otel_attributes(const_labels), ), - label_names: label_names.iter().map(|s| s.to_string()).collect(), + label_names: build_otel_label_names(label_names), } } @@ -677,7 +687,7 @@ pub fn new_histogram(name: &str, help: &str, subsystem: &str, buckets: Vec) help, buckets.clone(), )), - Vec::new(), + Vec::new().into(), ), } } @@ -710,7 +720,7 @@ pub fn new_histogram_vec( )), build_otel_attributes(const_labels), ), - label_names: label_names.iter().map(|s| s.to_string()).collect(), + label_names: build_otel_label_names(label_names), } } From ed6cb2dd4ee90b4efc21f5c2f38920882a89cb8d Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Tue, 14 Apr 2026 13:39:19 +0200 Subject: [PATCH 5/7] Add missing LazyLock import in postgres metrics --- quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs index 0cdf5fe58d2..5c8eabd7db0 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::LazyLock; + use quickwit_common::metrics::{IntGauge, IntUpDownCounter, new_gauge, new_up_down_counter}; #[derive(Clone)] From 25fa76590c9fdf0a2accce5b7490052be0db7dd0 Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Wed, 15 Apr 2026 11:04:42 +0200 Subject: [PATCH 6/7] Cache metric instances in *Vec::with_label_values Add FNV-hash-keyed caches to IntCounterVec, IntGaugeVec, IntUpDownCounterVec, and HistogramVec, following the same strategy used by the prometheus crate internally. On cache hit, with_label_values now returns a clone of the cached metric (Arc refcount bumps only) instead of re-allocating OTel attributes on every call. --- quickwit/quickwit-common/src/metrics.rs | 111 ++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 9ab5ee63a4d..6f801b3abb2 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -13,9 +13,11 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; -use std::sync::{Arc, LazyLock, OnceLock}; +use std::hash::{BuildHasherDefault, Hasher}; +use std::sync::{Arc, LazyLock, OnceLock, RwLock}; use std::time::Instant; +use fnv::FnvHasher; use opentelemetry::metrics::Meter; use opentelemetry::{Key, KeyValue}; use prometheus::{HistogramOpts, Opts, TextEncoder}; @@ -57,6 +59,68 @@ fn build_prometheus_labels(const_labels: &[(&str, &str)]) -> HashMap u64 { + self.0 + } + + fn write(&mut self, _bytes: &[u8]) { + unreachable!("NoHashHasher only supports write_u64"); + } + + fn write_u64(&mut self, i: u64) { + self.0 = i; + } +} + +type BuildNoHashHasher = BuildHasherDefault; + +fn hash_label_values(values: &[&str]) -> u64 { + let mut hasher = FnvHasher::default(); + for value in values { + // Length prefix prevents collisions when label boundaries shift, + // e.g. ["ab", "c"] vs ["a", "bc"] produce the same byte stream + // without this. + hasher.write_u64(value.len() as u64); + hasher.write(value.as_bytes()); + } + hasher.finish() +} + +fn new_metric_cache() -> Arc>> { + Arc::new(RwLock::new(HashMap::with_hasher( + BuildNoHashHasher::default(), + ))) +} + +fn get_or_insert_cached( + cache: &RwLock>, + label_values: &[&str], + build: impl FnOnce() -> T, +) -> T { + let hash = hash_label_values(label_values); + { + let cache = cache.read().expect("metric cache lock poisoned"); + if let Some(metric) = cache.get(&hash) { + return metric.clone(); + } + } + let metric = build(); + let mut cache = cache.write().expect("metric cache lock poisoned"); + // Another thread may have inserted between the read lock and the write lock acquisition. + if let Some(cached) = cache.get(&hash) { + return cached.clone(); + } + cache.insert(hash, metric.clone()); + metric +} + struct OtelState { build_instrument: Box T + Send + Sync>, instrument: OnceLock, @@ -221,6 +285,7 @@ pub struct IntCounterVec { prometheus: prometheus::IntCounterVec, otel: OtelMetric>, label_names: Arc<[Key]>, + cache: Arc>>, } impl IntCounterVec { @@ -242,14 +307,15 @@ impl IntCounterVec { prometheus: prom, otel: OtelMetric::new(None, build_otel_attributes(const_labels)), label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), } } pub fn with_label_values(&self, label_values: [&str; N]) -> IntCounter { - IntCounter { + get_or_insert_cached(&self.cache, &label_values, || IntCounter { prometheus: self.prometheus.with_label_values(&label_values), otel: self.otel.with_attributes(&self.label_names, label_values), - } + }) } } @@ -275,14 +341,15 @@ pub struct IntGaugeVec { prometheus: prometheus::IntGaugeVec, otel: OtelMetric>, label_names: Arc<[Key]>, + cache: Arc>>, } impl IntGaugeVec { pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { - IntGauge { + get_or_insert_cached(&self.cache, &label_values, || IntGauge { prometheus: self.prometheus.with_label_values(&label_values), otel: self.otel.with_attributes(&self.label_names, label_values), - } + }) } } @@ -323,14 +390,15 @@ pub struct IntUpDownCounterVec { prometheus: prometheus::IntGaugeVec, otel: OtelMetric>, label_names: Arc<[Key]>, + cache: Arc>>, } impl IntUpDownCounterVec { pub fn with_label_values(&self, label_values: [&str; N]) -> IntUpDownCounter { - IntUpDownCounter { + get_or_insert_cached(&self.cache, &label_values, || IntUpDownCounter { prometheus: self.prometheus.with_label_values(&label_values), otel: self.otel.with_attributes(&self.label_names, label_values), - } + }) } } @@ -397,14 +465,15 @@ pub struct HistogramVec { prometheus: prometheus::HistogramVec, otel: OtelMetric>, label_names: Arc<[Key]>, + cache: Arc>>, } impl HistogramVec { pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { - Histogram { + get_or_insert_cached(&self.cache, &label_values, || Histogram { prometheus: self.prometheus.with_label_values(&label_values), otel: self.otel.with_attributes(&self.label_names, label_values), - } + }) } } @@ -550,6 +619,7 @@ pub fn new_counter_vec( build_otel_attributes(const_labels), ), label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), } } @@ -597,6 +667,7 @@ pub fn new_gauge_vec( build_otel_attributes(const_labels), ), label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), } } @@ -644,6 +715,7 @@ pub fn new_up_down_counter_vec( build_otel_attributes(const_labels), ), label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), } } @@ -721,6 +793,7 @@ pub fn new_histogram_vec( build_otel_attributes(const_labels), ), label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), } } @@ -1371,4 +1444,24 @@ mod tests { assert!(payload.contains("quickwit_test_test_payload_ctr")); assert!(payload.contains("42")); } + + #[test] + fn test_hash_label_values() { + // Same values produce the same hash. + assert_eq!( + hash_label_values(&["foo", "bar"]), + hash_label_values(&["foo", "bar"]), + ); + // Different values produce different hashes. + assert_ne!( + hash_label_values(&["foo", "bar"]), + hash_label_values(&["foo", "baz"]), + ); + // Shifted label boundaries produce different hashes + // (length prefix prevents "ab"+"c" from colliding with "a"+"bc"). + assert_ne!( + hash_label_values(&["ab", "c"]), + hash_label_values(&["a", "bc"]), + ); + } } From 166059f69cba33ba8ccb50ab1d532bcd682193ed Mon Sep 17 00:00:00 2001 From: Shuhei Kitagawa Date: Wed, 15 Apr 2026 15:12:07 +0200 Subject: [PATCH 7/7] Replace RwLock with DashMap for metric cache Reduces lock contention in with_label_values by switching from a single RwLock guarding the entire HashMap to DashMap's sharded concurrent map. --- quickwit/Cargo.lock | 1 + quickwit/Cargo.toml | 1 + quickwit/quickwit-common/Cargo.toml | 1 + quickwit/quickwit-common/src/metrics.rs | 34 ++++++++----------------- 4 files changed, 13 insertions(+), 24 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 2ba9dd277da..2488464b41e 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7207,6 +7207,7 @@ dependencies = [ "backtrace", "bytesize", "coarsetime", + "dashmap 6.1.0", "dyn-clone", "env_logger", "fnv", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 95cfbb5eac2..2efb92ea714 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -110,6 +110,7 @@ colored = "3.0" console-subscriber = "0.5" criterion = { version = "0.8", features = ["async_tokio"] } cron = "0.15" +dashmap = "6" dialoguer = { version = "0.12", default-features = false } dotenvy = "0.15" dyn-clone = "1.0" diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 7092fc28707..da656f3e13b 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -17,6 +17,7 @@ async-trait = { workspace = true } backtrace = { workspace = true, optional = true } bytesize = { workspace = true } coarsetime = { workspace = true } +dashmap = { workspace = true } dyn-clone = { workspace = true } env_logger = { workspace = true } fnv = { workspace = true } diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 6f801b3abb2..239d0c7db4f 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -14,9 +14,10 @@ use std::collections::{BTreeMap, HashMap}; use std::hash::{BuildHasherDefault, Hasher}; -use std::sync::{Arc, LazyLock, OnceLock, RwLock}; +use std::sync::{Arc, LazyLock, OnceLock}; use std::time::Instant; +use dashmap::DashMap; use fnv::FnvHasher; use opentelemetry::metrics::Meter; use opentelemetry::{Key, KeyValue}; @@ -93,32 +94,17 @@ fn hash_label_values(values: &[&str]) -> u64 { hasher.finish() } -fn new_metric_cache() -> Arc>> { - Arc::new(RwLock::new(HashMap::with_hasher( - BuildNoHashHasher::default(), - ))) +fn new_metric_cache() -> Arc> { + Arc::new(DashMap::with_hasher(BuildNoHashHasher::default())) } fn get_or_insert_cached( - cache: &RwLock>, + cache: &DashMap, label_values: &[&str], build: impl FnOnce() -> T, ) -> T { let hash = hash_label_values(label_values); - { - let cache = cache.read().expect("metric cache lock poisoned"); - if let Some(metric) = cache.get(&hash) { - return metric.clone(); - } - } - let metric = build(); - let mut cache = cache.write().expect("metric cache lock poisoned"); - // Another thread may have inserted between the read lock and the write lock acquisition. - if let Some(cached) = cache.get(&hash) { - return cached.clone(); - } - cache.insert(hash, metric.clone()); - metric + cache.entry(hash).or_insert_with(build).value().clone() } struct OtelState { @@ -285,7 +271,7 @@ pub struct IntCounterVec { prometheus: prometheus::IntCounterVec, otel: OtelMetric>, label_names: Arc<[Key]>, - cache: Arc>>, + cache: Arc>, } impl IntCounterVec { @@ -341,7 +327,7 @@ pub struct IntGaugeVec { prometheus: prometheus::IntGaugeVec, otel: OtelMetric>, label_names: Arc<[Key]>, - cache: Arc>>, + cache: Arc>, } impl IntGaugeVec { @@ -390,7 +376,7 @@ pub struct IntUpDownCounterVec { prometheus: prometheus::IntGaugeVec, otel: OtelMetric>, label_names: Arc<[Key]>, - cache: Arc>>, + cache: Arc>, } impl IntUpDownCounterVec { @@ -465,7 +451,7 @@ pub struct HistogramVec { prometheus: prometheus::HistogramVec, otel: OtelMetric>, label_names: Arc<[Key]>, - cache: Arc>>, + cache: Arc>, } impl HistogramVec {