diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index af1edc75a25..2488464b41e 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7207,6 +7207,7 @@ dependencies = [ "backtrace", "bytesize", "coarsetime", + "dashmap 6.1.0", "dyn-clone", "env_logger", "fnv", @@ -7218,6 +7219,8 @@ dependencies = [ "hyper 1.9.0", "hyper-util", "itertools 0.14.0", + "opentelemetry", + "opentelemetry_sdk", "pin-project", "pnet", "prometheus", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 95cfbb5eac2..2efb92ea714 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -110,6 +110,7 @@ colored = "3.0" console-subscriber = "0.5" criterion = { version = "0.8", features = ["async_tokio"] } cron = "0.15" +dashmap = "6" dialoguer = { version = "0.12", default-features = false } dotenvy = "0.15" dyn-clone = "1.0" diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index f222294e4c2..0046aa9274e 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock, Weak}; use std::time::Instant; -use quickwit_common::metrics::{GaugeGuard, IntCounter, IntGauge}; +use quickwit_common::metrics::{IntCounter, IntUpDownCounter, UpDownCounterGuard}; use tokio::sync::oneshot; use crate::channel_with_priority::{Receiver, Sender, TrySendError}; @@ -308,7 +308,7 @@ impl Mailbox { struct InboxInner { rx: Receiver>, - _inboxes_count_gauge_guard: GaugeGuard<'static>, + _inboxes_count_gauge_guard: UpDownCounterGuard<'static>, } pub struct Inbox { @@ -385,18 +385,18 @@ impl Inbox { } } -fn get_actor_inboxes_count_gauge_guard() -> GaugeGuard<'static> { - static INBOX_GAUGE: LazyLock = LazyLock::new(|| { - quickwit_common::metrics::new_gauge( +fn get_actor_inboxes_count_gauge_guard() -> UpDownCounterGuard<'static> { + static INBOX_COUNTER: LazyLock = LazyLock::new(|| { + quickwit_common::metrics::new_up_down_counter( "inboxes_count", "overall count of actors", "actor", &[], ) }); - let mut gauge_guard = GaugeGuard::from_gauge(&INBOX_GAUGE); - gauge_guard.add(1); - gauge_guard + let mut guard = UpDownCounterGuard::from_counter(&INBOX_COUNTER); + guard.add(1); + guard } pub(crate) fn create_mailbox( @@ -520,7 +520,7 @@ mod tests { .unwrap(); // At this point the actor was started and even processed a message entirely. let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let wait_duration = Duration::from_millis(1); let processed = mailbox .send_message_with_backpressure_counter( @@ -547,7 +547,7 @@ mod tests { .await .unwrap(); let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let wait_duration = Duration::from_millis(1); mailbox .send_message_with_backpressure_counter( @@ -579,7 +579,7 @@ mod tests { .await .unwrap(); let backpressure_micros_counter = - IntCounter::new("test_counter", "help for test_counter").unwrap(); + IntCounter::new("test_counter", "help for test_counter", "", &[]); let start = Instant::now(); mailbox .ask_with_backpressure_counter(Duration::from_millis(1), None) diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index d1c994e893b..2a1db6ec07c 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -17,13 +17,15 @@ use std::sync::Arc; use std::{env, fmt}; use anyhow::Context; +use opentelemetry::metrics::MeterProvider; use opentelemetry::trace::TracerProvider; use opentelemetry::{KeyValue, global}; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::{ - LogExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, + LogExporter, MetricExporter, Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, }; use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_sdk::metrics::{SdkMeterProvider, Temporality}; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; use opentelemetry_sdk::{Resource, trace}; @@ -81,6 +83,26 @@ impl OtlpProtocol { } .context("failed to initialize OTLP traces exporter") } + + fn metric_exporter(&self, temporality: Temporality) -> anyhow::Result { + match self { + OtlpProtocol::Grpc => MetricExporter::builder() + .with_tonic() + .with_temporality(temporality) + .build(), + OtlpProtocol::HttpProtobuf => MetricExporter::builder() + .with_http() + .with_temporality(temporality) + .with_protocol(OtlpWireProtocol::HttpBinary) + .build(), + OtlpProtocol::HttpJson => MetricExporter::builder() + .with_http() + .with_temporality(temporality) + .with_protocol(OtlpWireProtocol::HttpJson) + .build(), + } + .context("failed to initialize OTLP metrics exporter") + } } impl FromStr for OtlpProtocol { @@ -104,11 +126,60 @@ impl FromStr for OtlpProtocol { } } +struct OtlpMetricsTemporality(Temporality); + +impl FromStr for OtlpMetricsTemporality { + type Err = anyhow::Error; + + fn from_str(temporality_str: &str) -> anyhow::Result { + const TEMPORALITY_DELTA: &str = "delta"; + const TEMPORALITY_LOWMEMORY: &str = "lowmemory"; + const TEMPORALITY_CUMULATIVE: &str = "cumulative"; + + match temporality_str { + TEMPORALITY_DELTA => Ok(OtlpMetricsTemporality(Temporality::Delta)), + TEMPORALITY_LOWMEMORY => Ok(OtlpMetricsTemporality(Temporality::LowMemory)), + TEMPORALITY_CUMULATIVE => Ok(OtlpMetricsTemporality(Temporality::Cumulative)), + other => anyhow::bail!( + "unsupported OTLP metrics temporality `{other}`, supported values are \ + `{TEMPORALITY_DELTA}`, `{TEMPORALITY_LOWMEMORY}` and `{TEMPORALITY_CUMULATIVE}`" + ), + } + } +} + +impl From for Temporality { + fn from(t: OtlpMetricsTemporality) -> Self { + t.0 + } +} + +pub struct TelemetryProviders { + tracer_provider: SdkTracerProvider, + logger_provider: SdkLoggerProvider, + meter_provider: SdkMeterProvider, +} + +impl TelemetryProviders { + pub fn shutdown(self) -> anyhow::Result<()> { + self.tracer_provider + .shutdown() + .context("failed to shutdown OpenTelemetry tracer provider")?; + self.logger_provider + .shutdown() + .context("failed to shutdown OpenTelemetry logger provider")?; + self.meter_provider + .shutdown() + .context("failed to shutdown OpenTelemetry meter provider")?; + Ok(()) + } +} + #[cfg(feature = "tokio-console")] use crate::QW_ENABLE_TOKIO_CONSOLE_ENV_KEY; /// Load the default logging filter from the environment. The filter can later -/// be updated using the result callback of [setup_logging_and_tracing]. +/// be updated using the result callback of [init_telemetry_providers]. fn startup_env_filter(level: Level) -> anyhow::Result { let env_filter = env::var("RUST_LOG") .map(|_| EnvFilter::from_default_env()) @@ -119,14 +190,11 @@ fn startup_env_filter(level: Level) -> anyhow::Result { type ReloadLayer = tracing_subscriber::reload::Layer; -pub fn setup_logging_and_tracing( +pub fn init_telemetry_providers( level: Level, ansi_colors: bool, build_info: &BuildInfo, -) -> anyhow::Result<( - EnvFilterReloadFn, - Option<(SdkTracerProvider, SdkLoggerProvider)>, -)> { +) -> anyhow::Result<(EnvFilterReloadFn, Option)> { #[cfg(feature = "tokio-console")] { if get_bool_from_env(QW_ENABLE_TOKIO_CONSOLE_ENV_KEY, false) { @@ -204,12 +272,37 @@ pub fn setup_logging_and_tracing( .with_batch_exporter(log_exporter) .build(); - let tracing_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + let metrics_protocol_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", false); + let metrics_protocol = metrics_protocol_opt + .as_deref() + .map(OtlpProtocol::from_str) + .transpose()? + .unwrap_or(global_protocol); + let temporality_opt = + get_from_env_opt::("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE", false); + let temporality = temporality_opt + .as_deref() + .map(OtlpMetricsTemporality::from_str) + .transpose()? + .map(Temporality::from) + .unwrap_or(Temporality::Cumulative); + let metric_exporter = metrics_protocol.metric_exporter(temporality)?; + + let meter_provider = SdkMeterProvider::builder() + .with_resource(resource.clone()) + .with_periodic_exporter(metric_exporter) + .build(); + + let meter = meter_provider.meter("quickwit"); + quickwit_common::metrics::install_otel_meter(meter); + + let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() .with_span_processor(span_processor) .with_resource(resource) .build(); - let tracer = tracing_provider.tracer("quickwit"); + let tracer = tracer_provider.tracer("quickwit"); let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); // Bridge between tracing logs and otel tracing events @@ -220,7 +313,11 @@ pub fn setup_logging_and_tracing( .with(logs_otel_layer) .try_init() .context("failed to register tracing subscriber")?; - Some((tracing_provider, logger_provider)) + Some(TelemetryProviders { + tracer_provider, + logger_provider, + meter_provider, + }) } else { registry .try_init() diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 8828398edba..c1d9fc89704 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -22,7 +22,7 @@ use quickwit_cli::checklist::RED_COLOR; use quickwit_cli::cli::{CliCommand, build_cli}; #[cfg(feature = "jemalloc")] use quickwit_cli::jemalloc::start_jemalloc_metrics_loop; -use quickwit_cli::logger::setup_logging_and_tracing; +use quickwit_cli::logger::init_telemetry_providers; use quickwit_cli::{busy_detector, install_default_crypto_ring_provider}; use quickwit_common::runtimes::scrape_tokio_runtime_metrics; use quickwit_serve::BuildInfo; @@ -98,8 +98,8 @@ async fn main_impl() -> anyhow::Result<()> { start_jemalloc_metrics_loop(); let build_info = BuildInfo::get(); - let (env_filter_reload_fn, tracer_provider_opt) = - setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?; + let (env_filter_reload_fn, telemetry_providers) = + init_telemetry_providers(command.default_log_level(), ansi_colors, build_info)?; #[cfg(not(test))] quickwit_cli::logger::setup_metrics(build_info)?; @@ -116,13 +116,8 @@ async fn main_impl() -> anyhow::Result<()> { 0 }; - if let Some((trace_provider, logs_provider)) = tracer_provider_opt { - trace_provider - .shutdown() - .context("failed to shutdown OpenTelemetry tracer provider")?; - logs_provider - .shutdown() - .context("failed to shutdown OpenTelemetry logs provider")?; + if let Some(providers) = telemetry_providers { + providers.shutdown()?; } std::process::exit(return_code) diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 14c05e19c5e..da656f3e13b 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -17,6 +17,7 @@ async-trait = { workspace = true } backtrace = { workspace = true, optional = true } bytesize = { workspace = true } coarsetime = { workspace = true } +dashmap = { workspace = true } dyn-clone = { workspace = true } env_logger = { workspace = true } fnv = { workspace = true } @@ -28,6 +29,7 @@ http = { workspace = true } hyper = { workspace = true } hyper-util = { workspace = true, optional = true } itertools = { workspace = true } +opentelemetry = { workspace = true } pin-project = { workspace = true } pnet = { workspace = true } prometheus = { workspace = true } @@ -62,6 +64,7 @@ jemalloc-profiled = [ [dev-dependencies] hyper-util = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["testing"] } proptest = { workspace = true } serde_json = { workspace = true } serial_test = { workspace = true } diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index e1d9ad796f1..5f2cf22cadf 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -34,10 +34,9 @@ use async_speed_limit::clock::StandardClock; use async_speed_limit::limiter::Consume; use bytesize::ByteSize; use pin_project::pin_project; -use prometheus::IntCounter; use tokio::io::AsyncWrite; -use crate::metrics::{IntCounterVec, new_counter_vec}; +use crate::metrics::{IntCounter, IntCounterVec, new_counter_vec}; use crate::{KillSwitch, Progress, ProtectedZoneGuard}; // Max 1MB at a time. @@ -99,7 +98,7 @@ pub struct IoControls { impl Default for IoControls { fn default() -> Self { let default_bytes_counter = - IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap(); + IntCounter::new("default_write_num_bytes", "Default write counter.", "", &[]); IoControls { throughput_limiter_opt: None, progress: Progress::default(), diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 193def5e01a..239d0c7db4f 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -13,29 +13,265 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; -use std::sync::{LazyLock, OnceLock}; +use std::hash::{BuildHasherDefault, Hasher}; +use std::sync::{Arc, LazyLock, OnceLock}; +use std::time::Instant; -use prometheus::{Gauge, HistogramOpts, Opts, TextEncoder}; -pub use prometheus::{ - Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter, - IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec, - exponential_buckets, linear_buckets, -}; +use dashmap::DashMap; +use fnv::FnvHasher; +use opentelemetry::metrics::Meter; +use opentelemetry::{Key, KeyValue}; +use prometheus::{HistogramOpts, Opts, TextEncoder}; +pub use prometheus::{exponential_buckets, linear_buckets}; + +static OTEL_METER: OnceLock = OnceLock::new(); +const METRICS_NAMESPACE: &str = "quickwit"; + +pub fn install_otel_meter(meter: Meter) { + OTEL_METER + .set(meter) + .expect("OTel meter should only be installed once"); +} + +fn otel_meter() -> Option<&'static Meter> { + OTEL_METER.get() +} + +fn build_otel_attributes(const_labels: &[(&str, &str)]) -> Arc<[KeyValue]> { + const_labels + .iter() + .map(|(k, v)| KeyValue::new(Arc::::from(*k), Arc::::from(*v))) + .collect::>() + .into() +} + +fn build_otel_label_names(label_names: [&str; N]) -> Arc<[Key]> { + label_names + .into_iter() + .map(|label_name| Key::new(Arc::::from(label_name))) + .collect::>() + .into() +} + +fn build_prometheus_labels(const_labels: &[(&str, &str)]) -> HashMap { + const_labels + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() +} + +/// Identity hasher for pre-hashed `u64` keys. The metric cache keys are +/// already FNV-hashed, so re-hashing them inside the HashMap (default SipHash) +/// would be redundant. +#[derive(Default)] +struct NoHashHasher(u64); + +impl Hasher for NoHashHasher { + fn finish(&self) -> u64 { + self.0 + } + + fn write(&mut self, _bytes: &[u8]) { + unreachable!("NoHashHasher only supports write_u64"); + } + + fn write_u64(&mut self, i: u64) { + self.0 = i; + } +} + +type BuildNoHashHasher = BuildHasherDefault; + +fn hash_label_values(values: &[&str]) -> u64 { + let mut hasher = FnvHasher::default(); + for value in values { + // Length prefix prevents collisions when label boundaries shift, + // e.g. ["ab", "c"] vs ["a", "bc"] produce the same byte stream + // without this. + hasher.write_u64(value.len() as u64); + hasher.write(value.as_bytes()); + } + hasher.finish() +} + +fn new_metric_cache() -> Arc> { + Arc::new(DashMap::with_hasher(BuildNoHashHasher::default())) +} + +fn get_or_insert_cached( + cache: &DashMap, + label_values: &[&str], + build: impl FnOnce() -> T, +) -> T { + let hash = hash_label_values(label_values); + cache.entry(hash).or_insert_with(build).value().clone() +} + +struct OtelState { + build_instrument: Box T + Send + Sync>, + instrument: OnceLock, +} + +impl OtelState { + fn new(build_instrument: impl Fn(&Meter) -> T + Send + Sync + 'static) -> Self { + OtelState { + build_instrument: Box::new(build_instrument), + instrument: OnceLock::new(), + } + } + + fn bind(&self, meter: &Meter) -> Option<&T> { + Some( + self.instrument + .get_or_init(|| (self.build_instrument)(meter)), + ) + } + + fn get(&self) -> Option<&T> { + self.instrument.get() + } +} #[derive(Clone)] -pub struct HistogramVec { - underlying: PrometheusHistogramVec, +struct OtelMetric { + state: Option>>, + attributes: Arc<[KeyValue]>, } -impl HistogramVec { - pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { - self.underlying.with_label_values(&label_values) +impl OtelMetric { + fn new(state: Option>>, attributes: Arc<[KeyValue]>) -> Self { + Self { state, attributes } + } + + fn with_attributes(&self, names: &[Key], values: [&str; N]) -> Self { + if self.state.is_none() { + return Self::default(); + } + let mut attributes = Vec::with_capacity(self.attributes.len() + N); + attributes.extend(self.attributes.iter().cloned()); + for (name, value) in names.iter().zip(values.iter()) { + attributes.push(KeyValue::new(name.clone(), Arc::::from(*value))); + } + Self { + state: self.state.clone(), + attributes: attributes.into(), + } + } + + /// Invokes a recording operation (e.g. `counter.add`) on the bound OTel + /// instrument, passing in this metric's attributes. Lazily binds the + /// instrument on first use after the meter becomes available. No-ops when + /// OTel is disabled or the meter has not been installed yet. + fn with_instrument(&self, f: impl FnOnce(&T, &[KeyValue])) { + let Some(instrument) = self.get_or_bind_instrument() else { + return; + }; + f(instrument, self.attributes.as_ref()); + } + + fn get_or_bind_instrument(&self) -> Option<&T> { + let state = self.state.as_ref()?; + if let Some(instrument) = state.get() { + return Some(instrument); + } + let meter = otel_meter()?; + state.bind(meter) + } +} + +impl Default for OtelMetric { + fn default() -> Self { + Self { + state: None, + attributes: Vec::new().into(), + } + } +} + +impl OtelMetric> { + fn add(&self, value: u64) { + self.with_instrument(|counter, attributes| counter.add(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: i64) { + self.with_instrument(|gauge, attributes| gauge.record(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: f64) { + self.with_instrument(|gauge, attributes| gauge.record(value, attributes)); + } +} + +impl OtelMetric> { + fn record(&self, value: f64) { + self.with_instrument(|histogram, attributes| histogram.record(value, attributes)); + } +} + +impl OtelMetric> { + fn add(&self, value: i64) { + self.with_instrument(|counter, attributes| counter.add(value, attributes)); + } +} + +#[derive(Clone)] +pub struct IntCounter { + prometheus: prometheus::IntCounter, + otel: OtelMetric>, +} + +impl std::fmt::Debug for IntCounter { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("IntCounter") + .field("value", &self.prometheus.get()) + .finish() + } +} + +impl IntCounter { + pub fn new( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], + ) -> IntCounter { + let counter_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); + IntCounter { + prometheus: prom, + otel: OtelMetric::default(), + } + } + + pub fn inc(&self) { + self.prometheus.inc(); + self.otel.add(1); + } + + pub fn inc_by(&self, v: u64) { + self.prometheus.inc_by(v); + self.otel.add(v); + } + + pub fn get(&self) -> u64 { + self.prometheus.get() } } #[derive(Clone)] pub struct IntCounterVec { - underlying: PrometheusIntCounterVec, + prometheus: prometheus::IntCounterVec, + otel: OtelMetric>, + label_names: Arc<[Key]>, + cache: Arc>, } impl IntCounterVec { @@ -46,62 +282,305 @@ impl IntCounterVec { const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let counter_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounterVec::new(counter_opts, &label_names) .expect("failed to create counter vec"); - IntCounterVec { underlying } + + IntCounterVec { + prometheus: prom, + otel: OtelMetric::new(None, build_otel_attributes(const_labels)), + label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), + } } pub fn with_label_values(&self, label_values: [&str; N]) -> IntCounter { - self.underlying.with_label_values(&label_values) + get_or_insert_cached(&self.cache, &label_values, || IntCounter { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + }) + } +} + +#[derive(Clone)] +pub struct IntGauge { + prometheus: prometheus::IntGauge, + otel: OtelMetric>, +} + +impl IntGauge { + pub fn set(&self, v: i64) { + self.prometheus.set(v); + self.otel.record(v); + } + + pub fn get(&self) -> i64 { + self.prometheus.get() } } #[derive(Clone)] pub struct IntGaugeVec { - underlying: PrometheusIntGaugeVec, + prometheus: prometheus::IntGaugeVec, + otel: OtelMetric>, + label_names: Arc<[Key]>, + cache: Arc>, } impl IntGaugeVec { pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { - self.underlying.with_label_values(&label_values) + get_or_insert_cached(&self.cache, &label_values, || IntGauge { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + }) + } +} + +#[derive(Clone)] +pub struct IntUpDownCounter { + prometheus: prometheus::IntGauge, + otel: OtelMetric>, +} + +impl IntUpDownCounter { + pub fn inc(&self) { + self.prometheus.inc(); + self.otel.add(1); + } + + pub fn dec(&self) { + self.prometheus.dec(); + self.otel.add(-1); + } + + pub fn add(&self, delta: i64) { + self.prometheus.add(delta); + self.otel.add(delta); + } + + pub fn sub(&self, delta: i64) { + self.prometheus.sub(delta); + self.otel.add(-delta); + } + + pub fn get(&self) -> i64 { + self.prometheus.get() + } +} + +#[derive(Clone)] +pub struct IntUpDownCounterVec { + prometheus: prometheus::IntGaugeVec, + otel: OtelMetric>, + label_names: Arc<[Key]>, + cache: Arc>, +} + +impl IntUpDownCounterVec { + pub fn with_label_values(&self, label_values: [&str; N]) -> IntUpDownCounter { + get_or_insert_cached(&self.cache, &label_values, || IntUpDownCounter { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + }) + } +} + +#[derive(Clone)] +pub struct Gauge { + prometheus: prometheus::Gauge, + otel: OtelMetric>, +} + +impl Gauge { + pub fn set(&self, v: f64) { + self.prometheus.set(v); + self.otel.record(v); + } + + pub fn get(&self) -> f64 { + self.prometheus.get() + } +} + +#[derive(Clone)] +pub struct Histogram { + prometheus: prometheus::Histogram, + otel: OtelMetric>, +} + +impl Histogram { + pub fn observe(&self, v: f64) { + self.prometheus.observe(v); + self.otel.record(v); + } + + pub fn start_timer(&self) -> HistogramTimer { + HistogramTimer { + histogram: Some(self.clone()), + start: Instant::now(), + } + } +} + +pub struct HistogramTimer { + histogram: Option, + start: Instant, +} + +impl HistogramTimer { + pub fn observe_duration(mut self) { + if let Some(histogram) = self.histogram.take() { + histogram.observe(self.start.elapsed().as_secs_f64()); + } + } +} + +impl Drop for HistogramTimer { + fn drop(&mut self) { + if let Some(histogram) = self.histogram.take() { + histogram.observe(self.start.elapsed().as_secs_f64()); + } + } +} + +#[derive(Clone)] +pub struct HistogramVec { + prometheus: prometheus::HistogramVec, + otel: OtelMetric>, + label_names: Arc<[Key]>, + cache: Arc>, +} + +impl HistogramVec { + pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { + get_or_insert_cached(&self.cache, &label_values, || Histogram { + prometheus: self.prometheus.with_label_values(&label_values), + otel: self.otel.with_attributes(&self.label_names, label_values), + }) } } pub fn register_info(name: &'static str, help: &'static str, kvs: BTreeMap<&'static str, String>) { - let mut counter_opts = Opts::new(name, help).namespace("quickwit"); + let mut counter_opts = Opts::new(name, help).namespace(METRICS_NAMESPACE); for (k, v) in kvs { counter_opts = counter_opts.const_label(k, v); } - let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); + let counter = + prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); counter.inc(); prometheus::register(Box::new(counter)).expect("failed to register counter"); } +fn new_otel_state( + name: &str, + subsystem: &str, + help: &str, + build_instrument: impl Fn(&Meter, &str, &str) -> T + Send + Sync + 'static, +) -> Arc> +where + T: Send + Sync + 'static, +{ + let name = if subsystem.is_empty() { + format!("{METRICS_NAMESPACE}_{name}") + } else { + format!("{METRICS_NAMESPACE}_{subsystem}_{name}") + }; + let description = help.to_string(); + Arc::new(OtelState::new(move |meter| { + build_instrument(meter, &name, &description) + })) +} + +fn new_counter_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .u64_counter(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_int_gauge_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .i64_gauge(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_float_gauge_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .f64_gauge(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_i64_up_down_counter_otel_state( + name: &str, + subsystem: &str, + help: &str, +) -> Arc>> { + new_otel_state(name, subsystem, help, |meter, name, description| { + meter + .i64_up_down_counter(name.to_string()) + .with_description(description.to_string()) + .build() + }) +} + +fn new_histogram_otel_state( + name: &str, + subsystem: &str, + help: &str, + boundaries: Vec, +) -> Arc>> { + new_otel_state(name, subsystem, help, move |meter, name, description| { + meter + .f64_histogram(name.to_string()) + .with_description(description.to_string()) + .with_boundaries(boundaries.clone()) + .build() + }) +} + pub fn new_counter( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], ) -> IntCounter { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let counter_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); - prometheus::register(Box::new(counter.clone())).expect("failed to register counter"); - counter + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounter::with_opts(counter_opts).expect("failed to create counter"); + prometheus::register(Box::new(prom.clone())).expect("failed to register counter"); + + IntCounter { + prometheus: prom, + otel: OtelMetric::new( + Some(new_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } pub fn new_counter_vec( @@ -111,82 +590,164 @@ pub fn new_counter_vec( const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let int_counter_vec = IntCounterVec::new(name, help, subsystem, const_labels, label_names); - let collector = Box::new(int_counter_vec.underlying.clone()); - prometheus::register(collector).expect("failed to register counter vec"); - int_counter_vec + let counter_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntCounterVec::new(counter_opts, &label_names) + .expect("failed to create counter vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register counter vec"); + + IntCounterVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), + } } -pub fn new_float_gauge( +pub fn new_gauge( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], -) -> Gauge { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); +) -> IntGauge { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let gauge = Gauge::with_opts(gauge_opts).expect("failed to create float gauge"); - prometheus::register(Box::new(gauge.clone())).expect("failed to register float gauge"); - gauge + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge"); + + IntGauge { + prometheus: prom, + otel: OtelMetric::new( + Some(new_int_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } -pub fn new_gauge( +pub fn new_gauge_vec( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], -) -> IntGauge { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); + label_names: [&str; N], +) -> IntGaugeVec { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); - prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge"); - gauge + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge vec"); + + IntGaugeVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_int_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), + } } -pub fn new_gauge_vec( +pub fn new_up_down_counter( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], +) -> IntUpDownCounter { + let gauge_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge"); + + IntUpDownCounter { + prometheus: prom, + otel: OtelMetric::new( + Some(new_i64_up_down_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } +} + +pub fn new_up_down_counter_vec( name: &str, help: &str, subsystem: &str, const_labels: &[(&str, &str)], label_names: [&str; N], -) -> IntGaugeVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); +) -> IntUpDownCounterVec { let gauge_opts = Opts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels); - let underlying = - PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + .const_labels(build_prometheus_labels(const_labels)); + let prom = + prometheus::IntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register gauge vec"); - let collector = Box::new(underlying.clone()); - prometheus::register(collector).expect("failed to register counter vec"); + IntUpDownCounterVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_i64_up_down_counter_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), + } +} - IntGaugeVec { underlying } +pub fn new_float_gauge( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], +) -> Gauge { + let gauge_opts = Opts::new(name, help) + .namespace(METRICS_NAMESPACE) + .subsystem(subsystem) + .const_labels(build_prometheus_labels(const_labels)); + let prom = prometheus::Gauge::with_opts(gauge_opts).expect("failed to create float gauge"); + prometheus::register(Box::new(prom.clone())).expect("failed to register float gauge"); + + Gauge { + prometheus: prom, + otel: OtelMetric::new( + Some(new_float_gauge_otel_state(name, subsystem, help)), + build_otel_attributes(const_labels), + ), + } } pub fn new_histogram(name: &str, help: &str, subsystem: &str, buckets: Vec) -> Histogram { let histogram_opts = HistogramOpts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .buckets(buckets); - let histogram = Histogram::with_opts(histogram_opts).expect("failed to create histogram"); - prometheus::register(Box::new(histogram.clone())).expect("failed to register histogram"); - histogram + .buckets(buckets.clone()); + let prom = + prometheus::Histogram::with_opts(histogram_opts).expect("failed to create histogram"); + prometheus::register(Box::new(prom.clone())).expect("failed to register histogram"); + + Histogram { + prometheus: prom, + otel: OtelMetric::new( + Some(new_histogram_otel_state( + name, + subsystem, + help, + buckets.clone(), + )), + Vec::new().into(), + ), + } } pub fn new_histogram_vec( @@ -197,38 +758,48 @@ pub fn new_histogram_vec( label_names: [&str; N], buckets: Vec, ) -> HistogramVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); let histogram_opts = HistogramOpts::new(name, help) - .namespace("quickwit") + .namespace(METRICS_NAMESPACE) .subsystem(subsystem) - .const_labels(owned_const_labels) - .buckets(buckets); - let underlying = PrometheusHistogramVec::new(histogram_opts, &label_names) + .const_labels(build_prometheus_labels(const_labels)) + .buckets(buckets.clone()); + let prom = prometheus::HistogramVec::new(histogram_opts, &label_names) .expect("failed to create histogram vec"); + prometheus::register(Box::new(prom.clone())).expect("failed to register histogram vec"); - let collector = Box::new(underlying.clone()); - prometheus::register(collector).expect("failed to register histogram vec"); - - HistogramVec { underlying } + HistogramVec { + prometheus: prom, + otel: OtelMetric::new( + Some(new_histogram_otel_state( + name, + subsystem, + help, + buckets.clone(), + )), + build_otel_attributes(const_labels), + ), + label_names: build_otel_label_names(label_names), + cache: new_metric_cache(), + } } -pub struct GaugeGuard<'a> { - gauge: &'a IntGauge, +pub struct UpDownCounterGuard<'a> { + counter: &'a IntUpDownCounter, delta: i64, } -impl std::fmt::Debug for GaugeGuard<'_> { +impl std::fmt::Debug for UpDownCounterGuard<'_> { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.delta.fmt(f) } } -impl<'a> GaugeGuard<'a> { - pub fn from_gauge(gauge: &'a IntGauge) -> Self { - Self { gauge, delta: 0i64 } +impl<'a> UpDownCounterGuard<'a> { + pub fn from_counter(counter: &'a IntUpDownCounter) -> Self { + Self { + counter, + delta: 0i64, + } } pub fn get(&self) -> i64 { @@ -236,36 +807,39 @@ impl<'a> GaugeGuard<'a> { } pub fn add(&mut self, delta: i64) { - self.gauge.add(delta); + self.counter.add(delta); self.delta += delta; } pub fn sub(&mut self, delta: i64) { - self.gauge.sub(delta); + self.counter.sub(delta); self.delta -= delta; } } -impl Drop for GaugeGuard<'_> { +impl Drop for UpDownCounterGuard<'_> { fn drop(&mut self) { - self.gauge.sub(self.delta) + self.counter.sub(self.delta) } } -pub struct OwnedGaugeGuard { - gauge: IntGauge, +pub struct OwnedUpDownCounterGuard { + counter: IntUpDownCounter, delta: i64, } -impl std::fmt::Debug for OwnedGaugeGuard { +impl std::fmt::Debug for OwnedUpDownCounterGuard { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { self.delta.fmt(f) } } -impl OwnedGaugeGuard { - pub fn from_gauge(gauge: IntGauge) -> Self { - Self { gauge, delta: 0i64 } +impl OwnedUpDownCounterGuard { + pub fn from_counter(counter: IntUpDownCounter) -> Self { + Self { + counter, + delta: 0i64, + } } pub fn get(&self) -> i64 { @@ -273,26 +847,24 @@ impl OwnedGaugeGuard { } pub fn add(&mut self, delta: i64) { - self.gauge.add(delta); + self.counter.add(delta); self.delta += delta; } pub fn sub(&mut self, delta: i64) { - self.gauge.sub(delta); + self.counter.sub(delta); self.delta -= delta; } } -impl Drop for OwnedGaugeGuard { +impl Drop for OwnedUpDownCounterGuard { fn drop(&mut self) { - self.gauge.sub(self.delta) + self.counter.sub(self.delta) } } pub fn metrics_text_payload() -> Result { let metric_families = prometheus::gather(); - // Arbitrary non-zero size in order to skip a bunch of - // buffer growth-reallocations when encoding metrics. let mut buffer = String::with_capacity(1024); let encoder = TextEncoder::new(); match encoder.encode_utf8(&metric_families, &mut buffer) { @@ -340,22 +912,21 @@ impl Default for MemoryMetrics { #[derive(Clone)] pub struct InFlightDataGauges { - pub rest_server: IntGauge, - pub ingest_router: IntGauge, - pub ingester_persist: IntGauge, - pub ingester_replicate: IntGauge, - pub wal: IntGauge, - pub fetch_stream: IntGauge, - pub multi_fetch_stream: IntGauge, - pub doc_processor_mailbox: IntGauge, - pub indexer_mailbox: IntGauge, - pub index_writer: IntGauge, - in_flight_gauge_vec: IntGaugeVec<1>, + pub rest_server: IntUpDownCounter, + pub ingest_router: IntUpDownCounter, + pub ingester_persist: IntUpDownCounter, + pub ingester_replicate: IntUpDownCounter, + pub fetch_stream: IntUpDownCounter, + pub multi_fetch_stream: IntUpDownCounter, + pub doc_processor_mailbox: IntUpDownCounter, + pub indexer_mailbox: IntUpDownCounter, + pub index_writer: IntUpDownCounter, + in_flight_counter_vec: IntUpDownCounterVec<1>, } impl Default for InFlightDataGauges { fn default() -> Self { - let in_flight_gauge_vec = new_gauge_vec( + let in_flight_counter_vec = new_up_down_counter_vec( "in_flight_data_bytes", "Amount of data in-flight in various buffers in bytes.", "memory", @@ -363,75 +934,81 @@ impl Default for InFlightDataGauges { ["component"], ); Self { - rest_server: in_flight_gauge_vec.with_label_values(["rest_server"]), - ingest_router: in_flight_gauge_vec.with_label_values(["ingest_router"]), - ingester_persist: in_flight_gauge_vec.with_label_values(["ingester_persist"]), - ingester_replicate: in_flight_gauge_vec.with_label_values(["ingester_replicate"]), - wal: in_flight_gauge_vec.with_label_values(["wal"]), - fetch_stream: in_flight_gauge_vec.with_label_values(["fetch_stream"]), - multi_fetch_stream: in_flight_gauge_vec.with_label_values(["multi_fetch_stream"]), - doc_processor_mailbox: in_flight_gauge_vec.with_label_values(["doc_processor_mailbox"]), - indexer_mailbox: in_flight_gauge_vec.with_label_values(["indexer_mailbox"]), - index_writer: in_flight_gauge_vec.with_label_values(["index_writer"]), - in_flight_gauge_vec: in_flight_gauge_vec.clone(), + rest_server: in_flight_counter_vec.with_label_values(["rest_server"]), + ingest_router: in_flight_counter_vec.with_label_values(["ingest_router"]), + ingester_persist: in_flight_counter_vec.with_label_values(["ingester_persist"]), + ingester_replicate: in_flight_counter_vec.with_label_values(["ingester_replicate"]), + fetch_stream: in_flight_counter_vec.with_label_values(["fetch_stream"]), + multi_fetch_stream: in_flight_counter_vec.with_label_values(["multi_fetch_stream"]), + doc_processor_mailbox: in_flight_counter_vec + .with_label_values(["doc_processor_mailbox"]), + indexer_mailbox: in_flight_counter_vec.with_label_values(["indexer_mailbox"]), + index_writer: in_flight_counter_vec.with_label_values(["index_writer"]), + in_flight_counter_vec: in_flight_counter_vec.clone(), } } } impl InFlightDataGauges { #[inline] - pub fn file(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["file_source"])) + pub fn file(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec + .with_label_values(["file_source"]) + }) } #[inline] - pub fn ingest(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn ingest(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["ingest_source"]) }) } #[inline] - pub fn kafka(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| self.in_flight_gauge_vec.with_label_values(["kafka_source"])) + pub fn kafka(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec + .with_label_values(["kafka_source"]) + }) } #[inline] - pub fn kinesis(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn kinesis(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["kinesis_source"]) }) } #[inline] - pub fn pubsub(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn pubsub(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["pubsub_source"]) }) } #[inline] - pub fn pulsar(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn pulsar(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["pulsar_source"]) }) } #[inline] - pub fn other(&self) -> &IntGauge { - static GAUGE: OnceLock = OnceLock::new(); - GAUGE.get_or_init(|| { - self.in_flight_gauge_vec + pub fn other(&self) -> &IntUpDownCounter { + static COUNTER: OnceLock = OnceLock::new(); + COUNTER.get_or_init(|| { + self.in_flight_counter_vec .with_label_values(["pulsar_source"]) }) } @@ -451,3 +1028,426 @@ pub fn index_label(index_id: &str) -> &str { } pub static MEMORY_METRICS: LazyLock = LazyLock::new(MemoryMetrics::default); + +#[cfg(test)] +mod tests { + use std::sync::OnceLock; + + use opentelemetry::metrics::MeterProvider; + use opentelemetry_sdk::metrics::data::{ + AggregatedMetrics, HistogramDataPoint, MetricData, ResourceMetrics, + }; + use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader, SdkMeterProvider}; + use serial_test::serial; + + use super::*; + + static TEST_OTEL_EXPORTER: OnceLock = OnceLock::new(); + static TEST_OTEL_PROVIDER: OnceLock = OnceLock::new(); + + fn ensure_test_otel_provider() -> (&'static InMemoryMetricExporter, &'static SdkMeterProvider) { + let exporter = TEST_OTEL_EXPORTER.get_or_init(InMemoryMetricExporter::default); + let provider = TEST_OTEL_PROVIDER.get_or_init(|| { + let reader = PeriodicReader::builder(exporter.clone()).build(); + let provider = SdkMeterProvider::builder().with_reader(reader).build(); + install_otel_meter(provider.meter("quickwit-tests")); + provider + }); + (exporter, provider) + } + + fn find_metric_data<'a>( + metrics: &'a [ResourceMetrics], + metric_name: &str, + ) -> Option<&'a AggregatedMetrics> { + metrics + .iter() + .flat_map(|resource_metrics| resource_metrics.scope_metrics()) + .flat_map(|scope_metrics| scope_metrics.metrics()) + .find(|metric| metric.name() == metric_name) + .map(|metric| metric.data()) + } + + fn flush_and_read_metric( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + read: impl FnOnce(&AggregatedMetrics) -> T, + ) -> T { + provider.force_flush().unwrap(); + let exported_metrics = exporter.get_finished_metrics().unwrap(); + let data = find_metric_data(&exported_metrics, metric_name) + .unwrap_or_else(|| panic!("metric '{metric_name}' should be exported")); + read(data) + } + + fn flush_and_get_counter_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> u64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::U64(MetricData::Sum(sum_data)) = data else { + panic!("expected u64 sum metric"); + }; + sum_data + .data_points() + .next() + .expect("should have one data point") + .value() + }) + } + + fn flush_and_get_gauge_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> i64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::I64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected i64 gauge metric"); + }; + gauge_data + .data_points() + .last() + .expect("should have at least one data point") + .value() + }) + } + + fn flush_and_get_histogram_data_point( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> HistogramDataPoint { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::F64(MetricData::Histogram(histogram_data)) = data else { + panic!("expected f64 histogram metric"); + }; + histogram_data + .data_points() + .next() + .expect("should have one data point") + .clone() + }) + } + + fn flush_and_get_float_gauge_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> f64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::F64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected f64 gauge metric"); + }; + gauge_data + .data_points() + .last() + .expect("should have at least one data point") + .value() + }) + } + + #[test] + #[serial] + fn test_counter() { + let (exporter, provider) = ensure_test_otel_provider(); + + // inc + let counter = new_counter("test_ctr_inc", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc(); + assert_eq!(counter.get(), 1); + let otel_value = + flush_and_get_counter_value(exporter, provider, "quickwit_test_test_ctr_inc"); + assert_eq!(otel_value, 1); + + // inc_by + let counter = new_counter("test_ctr_inc_by", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc_by(5); + assert_eq!(counter.get(), 5); + let otel_value = + flush_and_get_counter_value(exporter, provider, "quickwit_test_test_ctr_inc_by"); + assert_eq!(otel_value, 5); + } + + #[test] + #[serial] + fn test_gauge() { + let (exporter, provider) = ensure_test_otel_provider(); + + let gauge = new_gauge("test_gauge_set", "test", "test", &[]); + assert_eq!(gauge.get(), 0); + gauge.set(10); + assert_eq!(gauge.get(), 10); + let otel_value = + flush_and_get_gauge_value(exporter, provider, "quickwit_test_test_gauge_set"); + assert_eq!(otel_value, 10); + } + + fn flush_and_get_up_down_counter_value( + exporter: &InMemoryMetricExporter, + provider: &SdkMeterProvider, + metric_name: &str, + ) -> i64 { + flush_and_read_metric(exporter, provider, metric_name, |data| { + let AggregatedMetrics::I64(MetricData::Sum(sum_data)) = data else { + panic!("expected i64 sum metric"); + }; + sum_data + .data_points() + .next() + .expect("should have one data point") + .value() + }) + } + + #[test] + #[serial] + fn test_up_down_counter() { + let (exporter, provider) = ensure_test_otel_provider(); + + // inc + let counter = new_up_down_counter("test_udc_inc", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.inc(); + assert_eq!(counter.get(), 1); + let otel_value = + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_inc"); + assert_eq!(otel_value, 1); + + // dec + let counter = new_up_down_counter("test_udc_dec", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.dec(); + assert_eq!(counter.get(), -1); + let otel_value = + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_dec"); + assert_eq!(otel_value, -1); + + // add + let counter = new_up_down_counter("test_udc_add", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.add(15); + assert_eq!(counter.get(), 15); + let otel_value = + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_add"); + assert_eq!(otel_value, 15); + + // sub + let counter = new_up_down_counter("test_udc_sub", "test", "test", &[]); + assert_eq!(counter.get(), 0); + counter.sub(3); + assert_eq!(counter.get(), -3); + let otel_value = + flush_and_get_up_down_counter_value(exporter, provider, "quickwit_test_test_udc_sub"); + assert_eq!(otel_value, -3); + } + + #[test] + #[serial] + fn test_float_gauge_set() { + let (exporter, provider) = ensure_test_otel_provider(); + let gauge = new_float_gauge("test_float_gauge", "test", "test", &[]); + assert_eq!(gauge.get(), 0.0); + gauge.set(1.23); + assert_eq!(gauge.get(), 1.23); + + let otel_value = + flush_and_get_float_gauge_value(exporter, provider, "quickwit_test_test_float_gauge"); + assert_eq!(otel_value, 1.23); + } + + #[test] + #[serial] + fn test_histogram_observe() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_obs", "test", "test", vec![1.0, 5.0, 10.0]); + histogram.observe(2.5); + histogram.observe(7.0); + + let dp = + flush_and_get_histogram_data_point(exporter, provider, "quickwit_test_test_hist_obs"); + assert_eq!(dp.count(), 2); + assert_eq!(dp.max().unwrap(), 7.0); + assert_eq!(dp.min().unwrap(), 2.5); + assert_eq!(dp.bounds().collect::>(), vec![1.0, 5.0, 10.0]); + } + + #[test] + #[serial] + fn test_histogram_vec_observe() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram_vec = new_histogram_vec( + "test_hist_vec_obs", + "test", + "test", + &[], + ["method"], + vec![0.5, 1.5, 3.0], + ); + histogram_vec.with_label_values(["GET"]).observe(1.0); + + flush_and_read_metric( + exporter, + provider, + "quickwit_test_test_hist_vec_obs", + |data| { + let AggregatedMetrics::F64(MetricData::Histogram(histogram_data)) = data else { + panic!("expected f64 histogram metric"); + }; + let data_point = histogram_data + .data_points() + .find(|point| { + point + .attributes() + .any(|kv| kv.key.as_str() == "method" && kv.value.as_str() == "GET") + }) + .expect("should contain the labelled data point"); + assert_eq!(data_point.count(), 1); + assert_eq!(data_point.min().unwrap(), 1.0); + }, + ); + } + + #[test] + #[serial] + fn test_histogram_timer_drop_observes() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_timer_drop", "test", "test", vec![1.0, 5.0, 10.0]); + { + let _timer = histogram.start_timer(); + } + + let dp = flush_and_get_histogram_data_point( + exporter, + provider, + "quickwit_test_test_hist_timer_drop", + ); + assert_eq!(dp.count(), 1); + } + + #[test] + #[serial] + fn test_histogram_timer_observe_duration() { + let (exporter, provider) = ensure_test_otel_provider(); + let histogram = new_histogram("test_hist_timer_obs", "test", "test", vec![1.0, 5.0, 10.0]); + let timer = histogram.start_timer(); + timer.observe_duration(); + + let dp = flush_and_get_histogram_data_point( + exporter, + provider, + "quickwit_test_test_hist_timer_obs", + ); + assert_eq!(dp.count(), 1); + } + + #[test] + #[serial] + fn test_counter_vec_with_label_values() { + let (exporter, provider) = ensure_test_otel_provider(); + let vec = new_counter_vec("test_cvec", "test", "test", &[], ["method"]); + let post_counter = vec.with_label_values(["POST"]); + post_counter.inc_by(3); + assert_eq!(post_counter.get(), 3); + + flush_and_read_metric(exporter, provider, "quickwit_test_test_cvec", |data| { + let AggregatedMetrics::U64(MetricData::Sum(sum_data)) = data else { + panic!("expected u64 sum metric"); + }; + let post_value = sum_data + .data_points() + .find(|dp| { + dp.attributes() + .any(|kv| kv.key.as_str() == "method" && kv.value.as_str() == "POST") + }) + .expect("should contain POST data point") + .value(); + assert_eq!(post_value, 3); + }); + } + + #[test] + #[serial] + fn test_gauge_vec_with_label_values() { + let (exporter, provider) = ensure_test_otel_provider(); + let vec = new_gauge_vec("test_gvec", "test", "test", &[], ["pool"]); + let indexing = vec.with_label_values(["indexing"]); + indexing.set(10); + assert_eq!(indexing.get(), 10); + + flush_and_read_metric(exporter, provider, "quickwit_test_test_gvec", |data| { + let AggregatedMetrics::I64(MetricData::Gauge(gauge_data)) = data else { + panic!("expected i64 gauge metric"); + }; + let indexing_value = gauge_data + .data_points() + .find(|dp| { + dp.attributes() + .any(|kv| kv.key.as_str() == "pool" && kv.value.as_str() == "indexing") + }) + .expect("should contain pool=indexing data point") + .value(); + assert_eq!(indexing_value, 10); + }); + } + + #[test] + fn test_up_down_counter_guard_add_sub_drop() { + let counter = new_up_down_counter("test_guard", "test", "test", &[]); + { + let mut guard = UpDownCounterGuard::from_counter(&counter); + guard.add(5); + assert_eq!(counter.get(), 5); + guard.sub(2); + assert_eq!(counter.get(), 3); + } + // After drop, the delta (3) is subtracted. + assert_eq!(counter.get(), 0); + } + + #[test] + fn test_owned_up_down_counter_guard_add_sub_drop() { + let counter = new_up_down_counter("test_owned_guard", "test", "test", &[]); + { + let mut guard = OwnedUpDownCounterGuard::from_counter(counter.clone()); + guard.add(5); + assert_eq!(counter.get(), 5); + guard.sub(2); + assert_eq!(counter.get(), 3); + } + assert_eq!(counter.get(), 0); + } + + #[test] + fn test_metrics_text_payload_contains_registered_metrics() { + let counter = new_counter("test_payload_ctr", "test", "test", &[]); + counter.inc_by(42); + let payload = metrics_text_payload().unwrap(); + assert!(payload.contains("quickwit_test_test_payload_ctr")); + assert!(payload.contains("42")); + } + + #[test] + fn test_hash_label_values() { + // Same values produce the same hash. + assert_eq!( + hash_label_values(&["foo", "bar"]), + hash_label_values(&["foo", "bar"]), + ); + // Different values produce different hashes. + assert_ne!( + hash_label_values(&["foo", "bar"]), + hash_label_values(&["foo", "baz"]), + ); + // Shifted label boundaries produce different hashes + // (length prefix prevents "ab"+"c" from colliding with "a"+"bc"). + assert_ne!( + hash_label_values(&["ab", "c"]), + hash_label_values(&["a", "bc"]), + ); + } +} diff --git a/quickwit/quickwit-common/src/runtimes.rs b/quickwit/quickwit-common/src/runtimes.rs index 79ac2611bd9..3357ba0b24c 100644 --- a/quickwit/quickwit-common/src/runtimes.rs +++ b/quickwit/quickwit-common/src/runtimes.rs @@ -17,11 +17,10 @@ use std::sync::OnceLock; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; -use prometheus::{Gauge, IntCounter, IntGauge}; use tokio::runtime::Runtime; use tokio_metrics::{RuntimeMetrics, RuntimeMonitor}; -use crate::metrics::{new_counter, new_float_gauge, new_gauge}; +use crate::metrics::{Gauge, IntCounter, IntGauge, new_counter, new_float_gauge, new_gauge}; static RUNTIMES: OnceLock> = OnceLock::new(); diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index 00b40ee4b43..59d73e42cfe 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -18,12 +18,11 @@ use std::pin::Pin; use bytesize::ByteSize; use futures::{Stream, StreamExt, TryStreamExt, stream}; -use prometheus::IntGauge; use tokio::sync::{mpsc, watch}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream}; use tracing::warn; -use crate::metrics::GaugeGuard; +use crate::metrics::{IntUpDownCounter, UpDownCounterGuard}; use crate::tower::RpcName; pub type BoxStream = Pin + Send + Unpin + 'static>>; @@ -77,7 +76,7 @@ where T: Send + 'static pub fn new_bounded_with_gauge( capacity: usize, - gauge: &'static IntGauge, + gauge: &'static IntUpDownCounter, ) -> (TrackedSender, Self) { let (sender, receiver) = mpsc::channel(capacity); let tracked_sender = TrackedSender { sender, gauge }; @@ -94,7 +93,9 @@ where T: Send + 'static (sender, receiver.into()) } - pub fn new_unbounded_with_gauge(gauge: &'static IntGauge) -> (TrackedUnboundedSender, Self) { + pub fn new_unbounded_with_gauge( + gauge: &'static IntUpDownCounter, + ) -> (TrackedUnboundedSender, Self) { let (sender, receiver) = mpsc::unbounded_channel(); let tracked_sender = TrackedUnboundedSender { sender, gauge }; let receiver_stream = UnboundedReceiverStream::new(receiver) @@ -228,7 +229,7 @@ where T: RpcName } } -pub struct InFlightValue(T, #[allow(dead_code)] GaugeGuard<'static>); +pub struct InFlightValue(T, #[allow(dead_code)] UpDownCounterGuard<'static>); impl fmt::Debug for InFlightValue where T: fmt::Debug @@ -239,8 +240,8 @@ where T: fmt::Debug } impl InFlightValue { - pub fn new(value: T, value_size: ByteSize, gauge: &'static IntGauge) -> Self { - let mut gauge_guard = GaugeGuard::from_gauge(gauge); + pub fn new(value: T, value_size: ByteSize, gauge: &'static IntUpDownCounter) -> Self { + let mut gauge_guard = UpDownCounterGuard::from_counter(gauge); gauge_guard.add(value_size.as_u64() as i64); Self(value, gauge_guard) @@ -253,7 +254,7 @@ impl InFlightValue { pub struct TrackedSender { sender: mpsc::Sender>, - gauge: &'static IntGauge, + gauge: &'static IntUpDownCounter, } impl TrackedSender { @@ -271,7 +272,7 @@ impl TrackedSender { pub struct TrackedUnboundedSender { sender: mpsc::UnboundedSender>, - gauge: &'static IntGauge, + gauge: &'static IntUpDownCounter, } impl TrackedUnboundedSender { @@ -287,7 +288,7 @@ mod tests { use std::sync::LazyLock; use super::*; - use crate::metrics::new_gauge; + use crate::metrics::new_up_down_counter; #[tokio::test] async fn test_service_stream_map() { @@ -300,28 +301,28 @@ mod tests { #[tokio::test] async fn test_tracked_service_stream_bounded() { - static TEST_GAUGE: LazyLock = LazyLock::new(|| { - new_gauge("common", "help", "test_tracked_service_stream_bounded", &[]) + static TEST_COUNTER: LazyLock = LazyLock::new(|| { + new_up_down_counter("common", "help", "test_tracked_service_stream_bounded", &[]) }); let (service_stream_tx, mut service_stream) = - ServiceStream::new_bounded_with_gauge(3, &TEST_GAUGE); + ServiceStream::new_bounded_with_gauge(3, &TEST_COUNTER); service_stream_tx.send(1, ByteSize(42)).await.unwrap(); - assert_eq!(TEST_GAUGE.get(), 42); + assert_eq!(TEST_COUNTER.get(), 42); service_stream_tx.send(2, ByteSize(1337)).await.unwrap(); - assert_eq!(TEST_GAUGE.get(), 1379); + assert_eq!(TEST_COUNTER.get(), 1379); let value = service_stream.next().await.unwrap(); assert_eq!(value, 1); - assert_eq!(TEST_GAUGE.get(), 1337); + assert_eq!(TEST_COUNTER.get(), 1337); } #[tokio::test] async fn test_tracked_service_stream_unbounded() { - static TEST_GAUGE: LazyLock = LazyLock::new(|| { - new_gauge( + static TEST_COUNTER: LazyLock = LazyLock::new(|| { + new_up_down_counter( "common", "help", "test_tracked_service_stream_unbounded", @@ -330,16 +331,16 @@ mod tests { }); let (service_stream_tx, mut service_stream) = - ServiceStream::new_unbounded_with_gauge(&TEST_GAUGE); + ServiceStream::new_unbounded_with_gauge(&TEST_COUNTER); service_stream_tx.send(1, ByteSize(42)).unwrap(); - assert_eq!(TEST_GAUGE.get(), 42); + assert_eq!(TEST_COUNTER.get(), 42); service_stream_tx.send(2, ByteSize(1337)).unwrap(); - assert_eq!(TEST_GAUGE.get(), 1379); + assert_eq!(TEST_COUNTER.get(), 1379); let value = service_stream.next().await.unwrap(); assert_eq!(value, 1); - assert_eq!(TEST_GAUGE.get(), 1337); + assert_eq!(TEST_COUNTER.get(), 1337); } } diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index f4b738ef2c0..71e26ff4f3f 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -16,11 +16,13 @@ use std::fmt; use std::sync::{Arc, LazyLock}; use futures::{Future, TryFutureExt}; -use prometheus::IntGauge; use tokio::sync::oneshot; use tracing::error; -use crate::metrics::{GaugeGuard, IntGaugeVec, OwnedGaugeGuard, new_gauge_vec}; +use crate::metrics::{ + IntUpDownCounter, IntUpDownCounterVec, OwnedUpDownCounterGuard, UpDownCounterGuard, + new_up_down_counter_vec, +}; /// An executor backed by a thread pool to run CPU-intensive tasks. /// @@ -29,8 +31,8 @@ use crate::metrics::{GaugeGuard, IntGaugeVec, OwnedGaugeGuard, new_gauge_vec}; #[derive(Clone)] pub struct ThreadPool { thread_pool: Arc, - ongoing_tasks: IntGauge, - pending_tasks: IntGauge, + ongoing_tasks: IntUpDownCounter, + pending_tasks: IntUpDownCounter, } impl ThreadPool { @@ -84,8 +86,8 @@ impl ThreadPool { { let span = tracing::Span::current(); let ongoing_tasks = self.ongoing_tasks.clone(); - let mut pending_tasks_guard: OwnedGaugeGuard = - OwnedGaugeGuard::from_gauge(self.pending_tasks.clone()); + let mut pending_tasks_guard: OwnedUpDownCounterGuard = + OwnedUpDownCounterGuard::from_counter(self.pending_tasks.clone()); pending_tasks_guard.add(1i64); let (tx, rx) = oneshot::channel(); self.thread_pool.spawn(move || { @@ -94,7 +96,7 @@ impl ThreadPool { return; } let _guard = span.enter(); - let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks); + let mut ongoing_task_guard = UpDownCounterGuard::from_counter(&ongoing_tasks); ongoing_task_guard.add(1i64); let result = cpu_intensive_fn(); let _ = tx.send(result); @@ -135,21 +137,21 @@ impl fmt::Display for Panicked { impl std::error::Error for Panicked {} struct ThreadPoolMetrics { - ongoing_tasks: IntGaugeVec<1>, - pending_tasks: IntGaugeVec<1>, + ongoing_tasks: IntUpDownCounterVec<1>, + pending_tasks: IntUpDownCounterVec<1>, } impl Default for ThreadPoolMetrics { fn default() -> Self { ThreadPoolMetrics { - ongoing_tasks: new_gauge_vec( + ongoing_tasks: new_up_down_counter_vec( "ongoing_tasks", "number of tasks being currently processed by threads in the thread pool", "thread_pool", &[], ["pool"], ), - pending_tasks: new_gauge_vec( + pending_tasks: new_up_down_counter_vec( "pending_tasks", "number of tasks waiting in the queue before being processed by the thread pool", "thread_pool", diff --git a/quickwit/quickwit-common/src/tower/circuit_breaker.rs b/quickwit/quickwit-common/src/tower/circuit_breaker.rs index 09ada07e187..3d7f2d272cf 100644 --- a/quickwit/quickwit-common/src/tower/circuit_breaker.rs +++ b/quickwit/quickwit-common/src/tower/circuit_breaker.rs @@ -19,10 +19,11 @@ use std::task::{Context, Poll}; use std::time::Duration; use pin_project::pin_project; -use prometheus::IntCounter; use tokio::time::Instant; use tower::{Layer, Service}; +use crate::metrics::IntCounter; + /// The circuit breaker layer implements the [circuit breaker pattern](https://martinfowler.com/bliki/CircuitBreaker.html). /// /// It counts the errors emitted by the inner service, and if the number of errors exceeds a certain @@ -49,7 +50,7 @@ pub struct CircuitBreakerLayer { time_window: Duration, timeout: Duration, evaluator: Evaluator, - circuit_break_total: prometheus::IntCounter, + circuit_break_total: IntCounter, } pub trait CircuitBreakerEvaluator: Clone { @@ -61,7 +62,7 @@ pub trait CircuitBreakerEvaluator: Clone { self, max_num_errors_per_secs: u32, timeout: Duration, - circuit_break_total: prometheus::IntCounter, + circuit_break_total: IntCounter, ) -> CircuitBreakerLayer { CircuitBreakerLayer { max_error_count_per_time_window: max_num_errors_per_secs, @@ -301,8 +302,12 @@ mod tests { const TIMEOUT: Duration = Duration::from_millis(500); - let int_counter: prometheus::IntCounter = - IntCounter::new("circuit_break_total_test", "test circuit breaker counter").unwrap(); + let int_counter = crate::metrics::new_counter( + "circuit_break_total_test", + "test circuit breaker counter", + "test", + &[], + ); let mut service = ServiceBuilder::new() .layer(TestCircuitBreakerEvaluator.make_layer(10, TIMEOUT, int_counter)) .service_fn(|_| async { diff --git a/quickwit/quickwit-common/src/tower/metrics.rs b/quickwit/quickwit-common/src/tower/metrics.rs index b2d093adbe3..cdda9f18068 100644 --- a/quickwit/quickwit-common/src/tower/metrics.rs +++ b/quickwit/quickwit-common/src/tower/metrics.rs @@ -22,7 +22,8 @@ use prometheus::exponential_buckets; use tower::{Layer, Service}; use crate::metrics::{ - HistogramVec, IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec, new_histogram_vec, + HistogramVec, IntCounterVec, IntUpDownCounterVec, new_counter_vec, new_histogram_vec, + new_up_down_counter_vec, }; pub trait RpcName { @@ -33,7 +34,7 @@ pub trait RpcName { pub struct GrpcMetrics { inner: S, requests_total: IntCounterVec<2>, - requests_in_flight: IntGaugeVec<1>, + requests_in_flight: IntUpDownCounterVec<1>, request_duration_seconds: HistogramVec<2>, } @@ -72,7 +73,7 @@ where #[derive(Clone)] pub struct GrpcMetricsLayer { requests_total: IntCounterVec<2>, - requests_in_flight: IntGaugeVec<1>, + requests_in_flight: IntUpDownCounterVec<1>, request_duration_seconds: HistogramVec<2>, } @@ -86,7 +87,7 @@ impl GrpcMetricsLayer { &[("kind", kind)], ["rpc", "status"], ), - requests_in_flight: new_gauge_vec( + requests_in_flight: new_up_down_counter_vec( "grpc_requests_in_flight", "Number of gRPC requests in-flight.", subsystem, @@ -127,7 +128,7 @@ pub struct ResponseFuture { rpc_name: &'static str, status: &'static str, requests_total: IntCounterVec<2>, - requests_in_flight: IntGaugeVec<1>, + requests_in_flight: IntUpDownCounterVec<1>, request_duration_seconds: HistogramVec<2>, } diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 84ba3987f4a..c0f66d686e5 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -27,7 +27,7 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, Command, Handler, Mailbox, QueueCapacity, }; use quickwit_common::io::IoControls; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_common::runtimes::RuntimeType; use quickwit_common::temp_dir::TempDirectory; use quickwit_config::IndexingSettings; @@ -220,7 +220,7 @@ impl IndexerState { let publish_token_opt = self.publish_token_opt.clone(); let mut split_builders_guard = - GaugeGuard::from_gauge(&crate::metrics::INDEXER_METRICS.split_builders); + UpDownCounterGuard::from_counter(&crate::metrics::INDEXER_METRICS.split_builders); split_builders_guard.add(1); let workbench = IndexingWorkbench { @@ -233,7 +233,7 @@ impl IndexerState { publish_lock, publish_token_opt, last_delete_opstamp, - memory_usage: GaugeGuard::from_gauge( + memory_usage: UpDownCounterGuard::from_counter( &quickwit_common::metrics::MEMORY_METRICS .in_flight .index_writer, @@ -358,8 +358,8 @@ struct IndexingWorkbench { // We use this value to set the `delete_opstamp` of the workbench splits. last_delete_opstamp: u64, // Number of bytes declared as used by tantivy. - memory_usage: GaugeGuard<'static>, - split_builders_guard: GaugeGuard<'static>, + memory_usage: UpDownCounterGuard<'static>, + split_builders_guard: UpDownCounterGuard<'static>, cooperative_indexing_period: Option, } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index a102d0b4f0c..055ea959e2e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -23,7 +23,7 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, HEARTBEAT, Handler, Health, Mailbox, QueueCapacity, Supervisable, }; -use quickwit_common::metrics::OwnedGaugeGuard; +use quickwit_common::metrics::OwnedUpDownCounterGuard; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::{KillSwitch, is_metrics_index}; @@ -163,7 +163,7 @@ pub struct IndexingPipeline { // requiring a respawn of the pipeline. // We keep the list of shards here however, to reassign them after a respawn. shard_ids: BTreeSet, - _indexing_pipelines_gauge_guard: OwnedGaugeGuard, + _indexing_pipelines_gauge_guard: OwnedUpDownCounterGuard, } #[async_trait] @@ -201,7 +201,8 @@ impl IndexingPipeline { let indexing_pipelines_gauge = crate::metrics::INDEXER_METRICS .indexing_pipelines .with_label_values([¶ms.pipeline_id.index_uid.index_id]); - let indexing_pipelines_gauge_guard = OwnedGaugeGuard::from_gauge(indexing_pipelines_gauge); + let indexing_pipelines_gauge_guard = + OwnedUpDownCounterGuard::from_counter(indexing_pipelines_gauge); let params_fingerprint = params.params_fingerprint; IndexingPipeline { params, diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index 98ca19636a2..f1ba051ee1e 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -15,17 +15,18 @@ use std::sync::LazyLock; use quickwit_common::metrics::{ - IntCounter, IntCounterVec, IntGauge, IntGaugeVec, new_counter, new_counter_vec, new_gauge, - new_gauge_vec, + IntCounter, IntCounterVec, IntGauge, IntGaugeVec, IntUpDownCounter, IntUpDownCounterVec, + new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_up_down_counter, + new_up_down_counter_vec, }; pub struct IndexerMetrics { pub processed_docs_total: IntCounterVec<2>, pub processed_bytes: IntCounterVec<2>, - pub indexing_pipelines: IntGaugeVec<1>, + pub indexing_pipelines: IntUpDownCounterVec<1>, pub backpressure_micros: IntCounterVec<1>, pub available_concurrent_upload_permits: IntGaugeVec<1>, - pub split_builders: IntGauge, + pub split_builders: IntUpDownCounter, pub ongoing_merge_operations: IntGauge, pub pending_merge_operations: IntGauge, pub pending_merge_bytes: IntGauge, @@ -53,7 +54,7 @@ impl Default for IndexerMetrics { &[], ["index", "docs_processed_status"], ), - indexing_pipelines: new_gauge_vec( + indexing_pipelines: new_up_down_counter_vec( "indexing_pipelines", "Number of running indexing pipelines", "indexing", @@ -75,7 +76,7 @@ impl Default for IndexerMetrics { &[], ["component"], ), - split_builders: new_gauge( + split_builders: new_up_down_counter( "split_builders", "Number of existing index writer instances.", "indexing", diff --git a/quickwit/quickwit-indexing/src/models/indexed_split.rs b/quickwit/quickwit-indexing/src/models/indexed_split.rs index cd272bdc34c..6dd989acc3a 100644 --- a/quickwit/quickwit-indexing/src/models/indexed_split.rs +++ b/quickwit/quickwit-indexing/src/models/indexed_split.rs @@ -16,7 +16,7 @@ use std::fmt; use std::path::Path; use quickwit_common::io::IoControls; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_common::temp_dir::TempDirectory; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_proto::indexing::IndexingPipelineId; @@ -182,8 +182,8 @@ pub struct IndexedSplitBatchBuilder { pub publish_token_opt: Option, pub commit_trigger: CommitTrigger, pub batch_parent_span: Span, - pub memory_usage: GaugeGuard<'static>, - pub _split_builders_guard: GaugeGuard<'static>, + pub memory_usage: UpDownCounterGuard<'static>, + pub _split_builders_guard: UpDownCounterGuard<'static>, } /// Sends notifications to the Publisher that the last batch of splits was empty. diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs index bed695aa1d4..9f9814670cf 100644 --- a/quickwit/quickwit-indexing/src/models/processed_doc.rs +++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs @@ -14,7 +14,7 @@ use std::fmt; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; use tantivy::{DateTime, TantivyDocument}; @@ -41,7 +41,7 @@ pub struct ProcessedDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard<'static>, + _gauge_guard: UpDownCounterGuard<'static>, } impl ProcessedDocBatch { @@ -51,7 +51,8 @@ impl ProcessedDocBatch { force_commit: bool, ) -> Self { let delta = docs.iter().map(|doc| doc.num_bytes as i64).sum::(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.indexer_mailbox); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.indexer_mailbox); gauge_guard.add(delta); Self { docs, diff --git a/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs b/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs index c70afe35976..ece5962005a 100644 --- a/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs +++ b/quickwit/quickwit-indexing/src/models/processed_parquet_batch.rs @@ -20,7 +20,7 @@ use std::fmt; use arrow::record_batch::RecordBatch; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; /// Batch of parquet data as Arrow RecordBatch for the parquet indexing pipeline. @@ -35,7 +35,7 @@ pub struct ProcessedParquetBatch { /// Force commit flag - when true, accumulator should flush immediately. pub force_commit: bool, /// Memory tracking gauge guard. - _gauge_guard: GaugeGuard<'static>, + _gauge_guard: UpDownCounterGuard<'static>, } impl ProcessedParquetBatch { @@ -57,7 +57,8 @@ impl ProcessedParquetBatch { .map(|col| col.get_array_memory_size() as i64) .sum(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.indexer_mailbox); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.indexer_mailbox); gauge_guard.add(memory_size); Self { diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs index f88d9fcac2b..01202aff069 100644 --- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs +++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs @@ -15,7 +15,7 @@ use std::fmt; use bytes::Bytes; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_metastore::checkpoint::SourceCheckpointDelta; pub struct RawDocBatch { @@ -24,7 +24,7 @@ pub struct RawDocBatch { pub docs: Vec, pub checkpoint_delta: SourceCheckpointDelta, pub force_commit: bool, - _gauge_guard: GaugeGuard<'static>, + _gauge_guard: UpDownCounterGuard<'static>, } impl RawDocBatch { @@ -35,7 +35,7 @@ impl RawDocBatch { ) -> Self { let delta = docs.iter().map(|doc| doc.len() as i64).sum::(); let mut gauge_guard = - GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.doc_processor_mailbox); gauge_guard.add(delta); Self { @@ -67,7 +67,8 @@ impl fmt::Debug for RawDocBatch { impl Default for RawDocBatch { fn default() -> Self { - let _gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); + let _gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.doc_processor_mailbox); Self { docs: Vec::new(), checkpoint_delta: SourceCheckpointDelta::default(), diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index a43d305a9ea..7aa3b7a63b5 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -91,7 +91,7 @@ pub use pulsar_source::{PulsarSource, PulsarSourceFactory}; #[cfg(feature = "sqs")] pub use queue_sources::sqs_queue; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{ @@ -542,7 +542,7 @@ pub(super) struct BatchBuilder { num_bytes: u64, checkpoint_delta: SourceCheckpointDelta, force_commit: bool, - gauge_guard: GaugeGuard<'static>, + gauge_guard: UpDownCounterGuard<'static>, } impl BatchBuilder { @@ -560,7 +560,7 @@ impl BatchBuilder { SourceType::Pulsar => MEMORY_METRICS.in_flight.pulsar(), _ => MEMORY_METRICS.in_flight.other(), }; - let gauge_guard = GaugeGuard::from_gauge(gauge); + let gauge_guard = UpDownCounterGuard::from_counter(gauge); Self { docs: Vec::with_capacity(capacity), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index c176f3d9313..05da25137ee 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -25,7 +25,7 @@ use futures::StreamExt; use futures::stream::FuturesUnordered; use mrecordlog::error::CreateQueueError; use quickwit_cluster::Cluster; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -1111,7 +1111,8 @@ impl IngesterService for Ingester { _ => None, }) .sum::(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_persist); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.ingester_persist); gauge_guard.add(request_size_bytes as i64); self.persist_inner(persist_request).await diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 5e034f1bd36..ed0847c30b0 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -16,8 +16,9 @@ use std::sync::LazyLock; use mrecordlog::ResourceUsage; use quickwit_common::metrics::{ - Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, exponential_buckets, - linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec, + Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntUpDownCounterVec, + exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_histogram, + new_histogram_vec, new_up_down_counter_vec, }; // Counter vec counting the different outcomes of ingest requests as @@ -78,9 +79,10 @@ pub(super) struct IngestV2Metrics { pub closed_shards: IntGauge, pub shard_lt_throughput_mib: Histogram, pub shard_st_throughput_mib: Histogram, - pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>, + pub wal_acquire_lock_requests_in_flight: IntUpDownCounterVec<2>, pub wal_acquire_lock_request_duration_secs: HistogramVec<2>, pub wal_disk_used_bytes: IntGauge, + pub wal_memory_allocated_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, pub ingest_attempts: IntCounterVec<1>, @@ -128,7 +130,7 @@ impl Default for IngestV2Metrics { "ingest", linear_buckets(0.0f64, 1.0f64, 15).unwrap(), ), - wal_acquire_lock_requests_in_flight: new_gauge_vec( + wal_acquire_lock_requests_in_flight: new_up_down_counter_vec( "wal_acquire_lock_requests_in_flight", "Number of acquire lock requests in-flight.", "ingest", @@ -149,6 +151,12 @@ impl Default for IngestV2Metrics { "ingest", &[], ), + wal_memory_allocated_bytes: new_gauge( + "wal_memory_allocated_bytes", + "WAL memory allocated in bytes.", + "ingest", + &[], + ), wal_memory_used_bytes: new_gauge( "wal_memory_used_bytes", "WAL memory used in bytes.", @@ -163,9 +171,8 @@ pub(super) fn report_wal_usage(wal_usage: ResourceUsage) { INGEST_V2_METRICS .wal_disk_used_bytes .set(wal_usage.disk_used_bytes as i64); - quickwit_common::metrics::MEMORY_METRICS - .in_flight - .wal + INGEST_V2_METRICS + .wal_memory_allocated_bytes .set(wal_usage.memory_allocated_bytes as i64); INGEST_V2_METRICS .wal_memory_used_bytes diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index bbf0cd037c5..0947e43aa3e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -18,7 +18,7 @@ use std::time::{Duration, Instant}; use bytesize::ByteSize; use futures::{Future, StreamExt}; use mrecordlog::error::CreateQueueError; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::{ServiceStream, rate_limited_warn}; use quickwit_proto::ingest::ingester::{ AckReplicationMessage, IngesterStatus, InitReplicaRequest, InitReplicaResponse, @@ -504,7 +504,8 @@ impl ReplicationTask { ))); } let request_size_bytes = replicate_request.num_bytes(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_replicate); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.ingester_replicate); gauge_guard.add(request_size_bytes as i64); self.current_replication_seqno += 1; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index e249dd1e0fe..94c2a2280d4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -20,7 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::{rate_limited_error, rate_limited_warn}; use quickwit_proto::control_plane::{ @@ -578,7 +578,8 @@ impl IngestRouterService for IngestRouter { async fn ingest(&self, ingest_request: IngestRequestV2) -> IngestV2Result { let request_size_bytes = ingest_request.num_bytes(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.ingest_router); gauge_guard.add(request_size_bytes as i64); let num_subrequests = ingest_request.subrequests.len(); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs index 59cea1db805..5c8eabd7db0 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metrics.rs @@ -14,11 +14,11 @@ use std::sync::LazyLock; -use quickwit_common::metrics::{IntGauge, new_gauge}; +use quickwit_common::metrics::{IntGauge, IntUpDownCounter, new_gauge, new_up_down_counter}; #[derive(Clone)] pub(super) struct PostgresMetrics { - pub acquire_connections: IntGauge, + pub acquire_connections: IntUpDownCounter, pub active_connections: IntGauge, pub idle_connections: IntGauge, } @@ -26,7 +26,7 @@ pub(super) struct PostgresMetrics { impl Default for PostgresMetrics { fn default() -> Self { Self { - acquire_connections: new_gauge( + acquire_connections: new_up_down_counter( "acquire_connections", "Number of connections being acquired.", "metastore", diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs index a4c1e790e5b..c5dddeca99d 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs @@ -14,7 +14,7 @@ use futures::future::BoxFuture; use futures::stream::BoxStream; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use sqlx::pool::PoolConnection; use sqlx::pool::maybe::MaybePoolConnection; use sqlx::{ @@ -58,7 +58,8 @@ impl<'a, DB: Database> Acquire<'a> for &TrackedPool { .set(self.inner_pool.num_idle() as i64); Box::pin(async move { - let mut gauge_guard = GaugeGuard::from_gauge(&POSTGRES_METRICS.acquire_connections); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&POSTGRES_METRICS.acquire_connections); gauge_guard.add(1); let conn = acquire_conn_fut.await?; diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 3e430d7b24b..6a53af08755 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -19,9 +19,9 @@ use std::sync::LazyLock; use bytesize::ByteSize; use quickwit_common::metrics::{ - Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, exponential_buckets, - linear_buckets, new_counter, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, - new_histogram_vec, + Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntUpDownCounter, + exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge, new_histogram, + new_histogram_vec, new_up_down_counter, }; fn print_if_not_null( @@ -116,9 +116,9 @@ pub struct SearchMetrics { pub leaf_search_split_duration_secs: Histogram, pub job_assigned_total: IntCounterVec<1>, pub leaf_search_single_split_tasks_pending: IntGauge, - pub leaf_search_single_split_tasks_ongoing: IntGauge, + pub leaf_search_single_split_tasks_ongoing: IntUpDownCounter, pub leaf_search_single_split_warmup_num_bytes: Histogram, - pub searcher_local_kv_store_size_bytes: IntGauge, + pub searcher_local_kv_store_size_bytes: IntUpDownCounter, } /// From 0.008s to 131.072s @@ -151,14 +151,6 @@ impl Default for SearchMetrics { ByteSize::gb(5).as_u64() as f64, ]; - let leaf_search_single_split_tasks = new_gauge_vec::<1>( - "leaf_search_single_split_tasks", - "Number of single split search tasks pending or ongoing", - "search", - &[], - ["status"], // takes values "ongoing" or "pending" - ); - SearchMetrics { root_search_requests_total: new_counter_vec( "root_search_requests_total", @@ -222,10 +214,18 @@ impl Default for SearchMetrics { "search", duration_buckets(), ), - leaf_search_single_split_tasks_ongoing: leaf_search_single_split_tasks - .with_label_values(["ongoing"]), - leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks - .with_label_values(["pending"]), + leaf_search_single_split_tasks_ongoing: new_up_down_counter( + "leaf_search_single_split_tasks_ongoing", + "Number of single split search tasks ongoing.", + "search", + &[], + ), + leaf_search_single_split_tasks_pending: new_gauge( + "leaf_search_single_split_tasks_pending", + "Number of single split search tasks pending.", + "search", + &[], + ), leaf_search_single_split_warmup_num_bytes: new_histogram( "leaf_search_single_split_warmup_num_bytes", "Size of the short lived cache for a single split once the warmup is done.", @@ -239,7 +239,7 @@ impl Default for SearchMetrics { &[], ["affinity"], ), - searcher_local_kv_store_size_bytes: new_gauge( + searcher_local_kv_store_size_bytes: new_up_down_counter( "searcher_local_kv_store_size_bytes", "Size of the searcher kv store in bytes. This store is used to cache scroll \ contexts.", diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index a4a31a856b5..35a4e145321 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -22,7 +22,7 @@ use std::time::Duration; use anyhow::Context; use base64::Engine; use base64::prelude::BASE64_STANDARD; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_common::shared_consts::SCROLL_BATCH_LEN; use quickwit_metastore::SplitMetadata; use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; @@ -123,7 +123,7 @@ impl ScrollContext { struct TrackedValue { content: Vec, - _total_size_metric_guard: GaugeGuard<'static>, + _total_size_metric_guard: UpDownCounterGuard<'static>, } /// In memory key value store with TTL and limited size. @@ -148,8 +148,9 @@ impl Default for MiniKV { impl MiniKV { pub async fn put(&self, key: Vec, payload: Vec, ttl: Duration) { - let mut metric_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes); + let mut metric_guard = UpDownCounterGuard::from_counter( + &crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes, + ); metric_guard.add(payload.len() as i64); let mut cache_lock = self.ttl_with_cache.write().await; cache_lock.insert( diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 4cd5e99918f..2a33e1aa6a0 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use bytesize::ByteSize; -use quickwit_common::metrics::GaugeGuard; +use quickwit_common::metrics::UpDownCounterGuard; use quickwit_proto::search::SplitIdAndFooterOffsets; use tokio::sync::{mpsc, oneshot}; @@ -332,7 +332,7 @@ impl SearchPermitActor { fn assign_available_permits(&mut self) { while let Some(permit_request) = self.pop_next_request_if_serviceable() { - let mut ongoing_gauge_guard = GaugeGuard::from_gauge( + let mut ongoing_gauge_guard = UpDownCounterGuard::from_counter( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, ); ongoing_gauge_guard.add(1); @@ -357,7 +357,7 @@ impl SearchPermitActor { } pub struct SearchPermit { - _ongoing_gauge_guard: GaugeGuard<'static>, + _ongoing_gauge_guard: UpDownCounterGuard<'static>, msg_sender: mpsc::WeakUnboundedSender, memory_allocation: u64, warmup_slot_freed: bool, diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index 9e2ddd28197..0c0cdd36c0d 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -17,7 +17,7 @@ use std::sync::LazyLock; use bytes::Bytes; use flate2::read::{MultiGzDecoder, ZlibDecoder}; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{MEMORY_METRICS, UpDownCounterGuard}; use quickwit_common::thread_pool::run_cpu_intensive; use thiserror::Error; use warp::Filter; @@ -108,13 +108,14 @@ pub(crate) fn get_body_bytes() -> impl Filter, + _gauge_guard: UpDownCounterGuard<'static>, _permit: LoadShieldPermit, } impl Body { pub fn new(content: Bytes, load_shield_permit: LoadShieldPermit) -> Body { - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server); + let mut gauge_guard = + UpDownCounterGuard::from_counter(&MEMORY_METRICS.in_flight.rest_server); gauge_guard.add(content.len() as i64); Body { content, diff --git a/quickwit/quickwit-serve/src/load_shield.rs b/quickwit/quickwit-serve/src/load_shield.rs index 568ffabd53b..4b5979ed43c 100644 --- a/quickwit/quickwit-serve/src/load_shield.rs +++ b/quickwit/quickwit-serve/src/load_shield.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use quickwit_common::metrics::{GaugeGuard, IntGauge}; +use quickwit_common::metrics::{IntUpDownCounter, UpDownCounterGuard}; use tokio::sync::{Semaphore, SemaphorePermit}; use crate::rest::TooManyRequests; @@ -22,14 +22,14 @@ use crate::rest::TooManyRequests; pub struct LoadShield { in_flight_semaphore_opt: Option, // This one is doing the load shedding. concurrency_semaphore_opt: Option, - ongoing_gauge: IntGauge, - pending_gauge: IntGauge, + ongoing_gauge: IntUpDownCounter, + pending_gauge: IntUpDownCounter, } pub struct LoadShieldPermit { _concurrency_permit_opt: Option>, _in_flight_permit_opt: Option>, - _ongoing_gauge_guard: GaugeGuard<'static>, + _ongoing_gauge_guard: UpDownCounterGuard<'static>, } impl LoadShield { @@ -78,12 +78,12 @@ impl LoadShield { } pub async fn acquire_permit(&'static self) -> Result { - let mut pending_gauge_guard = GaugeGuard::from_gauge(&self.pending_gauge); + let mut pending_gauge_guard = UpDownCounterGuard::from_counter(&self.pending_gauge); pending_gauge_guard.add(1); let in_flight_permit_opt = self.acquire_in_flight_permit().await?; let concurrency_permit_opt = self.acquire_concurrency_permit().await; drop(pending_gauge_guard); - let mut ongoing_gauge_guard = GaugeGuard::from_gauge(&self.ongoing_gauge); + let mut ongoing_gauge_guard = UpDownCounterGuard::from_counter(&self.ongoing_gauge); ongoing_gauge_guard.add(1); Ok(LoadShieldPermit { _in_flight_permit_opt: in_flight_permit_opt, diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index 7d922c7fb36..55801e697c9 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -15,15 +15,15 @@ use std::sync::LazyLock; use quickwit_common::metrics::{ - HistogramVec, IntCounter, IntCounterVec, IntGaugeVec, new_counter, new_counter_vec, - new_gauge_vec, new_histogram_vec, + HistogramVec, IntCounter, IntCounterVec, IntUpDownCounterVec, new_counter, new_counter_vec, + new_histogram_vec, new_up_down_counter_vec, }; pub struct ServeMetrics { pub http_requests_total: IntCounterVec<2>, pub request_duration_secs: HistogramVec<2>, - pub ongoing_requests: IntGaugeVec<1>, - pub pending_requests: IntGaugeVec<1>, + pub ongoing_requests: IntUpDownCounterVec<1>, + pub pending_requests: IntUpDownCounterVec<1>, pub circuit_break_total: IntCounter, } @@ -52,14 +52,14 @@ impl Default for ServeMetrics { // last bucket is 163.84s quickwit_common::metrics::exponential_buckets(0.02, 2.0, 14).unwrap(), ), - ongoing_requests: new_gauge_vec( + ongoing_requests: new_up_down_counter_vec( "ongoing_requests", "Number of ongoing requests.", "", &[], ["endpoint_group"], ), - pending_requests: new_gauge_vec( + pending_requests: new_up_down_counter_vec( "pending_requests", "Number of pending requests.", "", diff --git a/quickwit/quickwit-storage/src/file_descriptor_cache.rs b/quickwit/quickwit-storage/src/file_descriptor_cache.rs index 4626e71bee9..b8367416df0 100644 --- a/quickwit/quickwit-storage/src/file_descriptor_cache.rs +++ b/quickwit/quickwit-storage/src/file_descriptor_cache.rs @@ -101,10 +101,9 @@ impl FileDescriptorCache { fn put_split_file(&self, split_id: Ulid, split_file: SplitFile) { let mut fd_cache_lock = self.fd_cache.lock().unwrap(); - fd_cache_lock.push(split_id, split_file); - self.fd_cache_metrics - .in_cache_count - .set(fd_cache_lock.len() as i64); + if fd_cache_lock.push(split_id, split_file).is_none() { + self.fd_cache_metrics.in_cache_count.inc(); + } } /// Evicts the given list of split ids from the file descriptor cache. @@ -112,14 +111,11 @@ impl FileDescriptorCache { pub fn evict_split_files(&self, split_ids: &[Ulid]) { let mut fd_cache_lock = self.fd_cache.lock().unwrap(); for split_id in split_ids { - fd_cache_lock.pop(split_id); + if fd_cache_lock.pop(split_id).is_some() { + self.fd_cache_metrics.in_cache_count.dec(); + self.fd_cache_metrics.evict_num_items.inc(); + } } - self.fd_cache_metrics - .in_cache_count - .set(fd_cache_lock.len() as i64); - self.fd_cache_metrics - .evict_num_items - .inc_by(split_ids.len() as u64); } pub async fn get_or_open_split_file( diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 888d137cc18..9b9df8d5009 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -18,8 +18,8 @@ use std::collections::HashMap; use std::sync::{LazyLock, RwLock}; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, - new_gauge, new_histogram_vec, + Histogram, IntCounter, IntCounterVec, IntUpDownCounter, UpDownCounterGuard, new_counter, + new_counter_vec, new_histogram_vec, new_up_down_counter, }; use quickwit_config::CacheConfig; @@ -36,8 +36,8 @@ pub struct StorageMetrics { pub get_slice_timeout_all_timeouts: IntCounter, pub object_storage_get_total: IntCounter, pub object_storage_get_errors_total: IntCounterVec<1>, - pub object_storage_get_slice_in_flight_count: IntGauge, - pub object_storage_get_slice_in_flight_num_bytes: IntGauge, + pub object_storage_get_slice_in_flight_count: IntUpDownCounter, + pub object_storage_get_slice_in_flight_num_bytes: IntUpDownCounter, pub object_storage_put_total: IntCounter, pub object_storage_put_parts: IntCounter, pub object_storage_download_num_bytes: IntCounter, @@ -116,14 +116,14 @@ impl Default for StorageMetrics { &[], ["code"], ), - object_storage_get_slice_in_flight_count: new_gauge( + object_storage_get_slice_in_flight_count: new_up_down_counter( "object_storage_get_slice_in_flight_count", "Number of GetObject for which the memory was allocated but the download is still \ in progress.", "storage", &[], ), - object_storage_get_slice_in_flight_num_bytes: new_gauge( + object_storage_get_slice_in_flight_num_bytes: new_up_down_counter( "object_storage_get_slice_in_flight_num_bytes", "Memory allocated for GetObject requests that are still in progress.", "storage", @@ -171,8 +171,8 @@ pub struct CacheMetrics { #[derive(Clone)] pub struct SingleCacheMetrics { - pub in_cache_count: IntGauge, - pub in_cache_num_bytes: IntGauge, + pub in_cache_count: IntUpDownCounter, + pub in_cache_num_bytes: IntUpDownCounter, pub hits_num_items: IntCounter, pub hits_num_bytes: IntCounter, pub misses_num_items: IntCounter, @@ -187,13 +187,13 @@ impl CacheMetrics { CacheMetrics { component_name: component_name.to_string(), cache_metrics: SingleCacheMetrics { - in_cache_count: new_gauge( + in_cache_count: new_up_down_counter( "in_cache_count", "Count of in cache by component", CACHE_METRICS_NAMESPACE, &labels, ), - in_cache_num_bytes: new_gauge( + in_cache_num_bytes: new_up_down_counter( "in_cache_num_bytes", "Number of bytes in cache by component", CACHE_METRICS_NAMESPACE, @@ -249,13 +249,13 @@ impl CacheMetrics { ("policy", &policy), ]; let new_virtual_cache_metrics = SingleCacheMetrics { - in_cache_count: new_gauge( + in_cache_count: new_up_down_counter( "virtual_in_cache_count", "Count of in cache by component", CACHE_METRICS_NAMESPACE, &labels, ), - in_cache_num_bytes: new_gauge( + in_cache_num_bytes: new_up_down_counter( "virtual_in_cache_num_bytes", "Number of bytes in cache by component", CACHE_METRICS_NAMESPACE, @@ -312,13 +312,14 @@ pub static CACHE_METRICS_FOR_TESTS: LazyLock = pub fn object_storage_get_slice_in_flight_guards( get_request_size: usize, -) -> (GaugeGuard<'static>, GaugeGuard<'static>) { - let mut bytes_guard = GaugeGuard::from_gauge( +) -> (UpDownCounterGuard<'static>, UpDownCounterGuard<'static>) { + let mut bytes_guard = UpDownCounterGuard::from_counter( &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_num_bytes, ); bytes_guard.add(get_request_size as i64); - let mut count_guard = - GaugeGuard::from_gauge(&crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count); + let mut count_guard = UpDownCounterGuard::from_counter( + &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count, + ); count_guard.add(1); (bytes_guard, count_guard) }