diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 41c8baf8139..54d2eeebd23 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -8438,6 +8438,7 @@ dependencies = [ "datafusion-substrait", "futures", "object_store", + "parquet", "prost 0.14.3", "quickwit-common", "quickwit-config", diff --git a/quickwit/quickwit-common/src/metrics_specific.rs b/quickwit/quickwit-common/src/metrics_specific.rs index fac1be94fe4..b6ead3beb55 100644 --- a/quickwit/quickwit-common/src/metrics_specific.rs +++ b/quickwit/quickwit-common/src/metrics_specific.rs @@ -15,9 +15,12 @@ /// Returns whether the given index ID corresponds to a metrics index. /// /// Metrics indexes use the Parquet/DataFusion pipeline instead of the Tantivy pipeline. -/// An index is considered a metrics index if it starts with "otel-metrics" or "metrics-". +/// An index is considered a metrics index if it uses one of the BYOC or OSS +/// parquet metrics prefixes. pub fn is_metrics_index(index_id: &str) -> bool { - index_id.starts_with("otel-metrics") || index_id.starts_with("metrics-") + ["datadog-metrics", "metrics-", "otel-metrics"] + .iter() + .any(|prefix| index_id.starts_with(prefix)) } /// Returns whether the given index ID corresponds to a sketches index. @@ -25,7 +28,9 @@ pub fn is_metrics_index(index_id: &str) -> bool { /// Sketches indexes use the Parquet/DataFusion pipeline with sketch-specific /// processors and writers. pub fn is_sketches_index(index_id: &str) -> bool { - index_id.starts_with("sketches-") + ["datadog-sketches", "sketches-"] + .iter() + .any(|prefix| index_id.starts_with(prefix)) } /// Returns whether the given index ID uses the Parquet/DataFusion pipeline. @@ -44,6 +49,10 @@ mod tests { assert!(is_metrics_index("otel-metrics")); assert!(is_metrics_index("otel-metrics-custom")); + // BYOC metrics indexes + assert!(is_metrics_index("datadog-metrics")); + assert!(is_metrics_index("datadog-metrics-v2")); + // Generic metrics indexes assert!(is_metrics_index("metrics-default")); assert!(is_metrics_index("metrics-")); @@ -60,13 +69,18 @@ mod tests { #[test] fn test_is_sketches_index() { + assert!(is_sketches_index("datadog-sketches")); + assert!(is_sketches_index("datadog-sketches-v2")); assert!(is_sketches_index("sketches-default")); + assert!(!is_sketches_index("datadog-metrics")); assert!(!is_sketches_index("otel-metrics")); assert!(!is_sketches_index("my-index")); } #[test] fn test_is_parquet_pipeline_index() { + assert!(is_parquet_pipeline_index("datadog-metrics")); + assert!(is_parquet_pipeline_index("datadog-sketches")); assert!(is_parquet_pipeline_index("otel-metrics")); assert!(is_parquet_pipeline_index("sketches-default")); assert!(!is_parquet_pipeline_index("otel-logs-v0_7")); diff --git a/quickwit/quickwit-datafusion/Cargo.toml b/quickwit/quickwit-datafusion/Cargo.toml index 928fb68ab71..553d1cf5b52 100644 --- a/quickwit/quickwit-datafusion/Cargo.toml +++ b/quickwit/quickwit-datafusion/Cargo.toml @@ -43,6 +43,7 @@ object_store = "0.13" bytesize = { workspace = true } datafusion = "53" datafusion-substrait = "53" +parquet = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/factory.rs b/quickwit/quickwit-datafusion/src/sources/metrics/factory.rs index 38ceb178051..b95c9cbe8e0 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/factory.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/factory.rs @@ -24,6 +24,18 @@ //! service VARCHAR, //! env VARCHAR //! ) STORED AS metrics LOCATION 'my-metrics'; +//! +//! CREATE EXTERNAL TABLE "my-sketches" ( +//! metric_name VARCHAR NOT NULL, +//! timestamp_secs BIGINT UNSIGNED NOT NULL, +//! count BIGINT UNSIGNED NOT NULL, +//! sum DOUBLE NOT NULL, +//! min DOUBLE NOT NULL, +//! max DOUBLE NOT NULL, +//! flags INT UNSIGNED NOT NULL, +//! keys ARRAY NOT NULL, +//! counts ARRAY NOT NULL +//! ) STORED AS sketches LOCATION 'my-sketches'; //! ``` use std::sync::Arc; @@ -34,22 +46,32 @@ use datafusion::arrow; use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::logical_expr::CreateExternalTable; +use quickwit_parquet_engine::split::ParquetSplitKind; use super::index_resolver::MetricsIndexResolver; use super::table_provider::MetricsTableProvider; /// The file type string used in `STORED AS metrics`. pub const METRICS_FILE_TYPE: &str = "metrics"; +/// The file type string used in `STORED AS sketches`. +pub const SKETCHES_FILE_TYPE: &str = "sketches"; /// Creates `MetricsTableProvider` instances from `CREATE EXTERNAL TABLE` DDL. #[derive(Debug)] pub struct MetricsTableProviderFactory { index_resolver: Arc, + split_kind: ParquetSplitKind, } impl MetricsTableProviderFactory { - pub fn new(index_resolver: Arc) -> Self { - Self { index_resolver } + pub fn new( + index_resolver: Arc, + split_kind: ParquetSplitKind, + ) -> Self { + Self { + index_resolver, + split_kind, + } } } @@ -66,7 +88,10 @@ impl TableProviderFactory for MetricsTableProviderFactory { cmd.location.clone() }; - let (split_provider, index_uri) = self.index_resolver.resolve(&index_name).await?; + let (split_provider, index_uri) = self + .index_resolver + .resolve(&index_name, self.split_kind) + .await?; let arrow_schema: SchemaRef = Arc::new(cmd.schema.as_arrow().clone()); diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs b/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs index 2da2db0e196..a8f96fec60b 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use datafusion::error::Result as DFResult; use quickwit_common::uri::Uri; use quickwit_metastore::{IndexMetadataResponseExt, ListIndexesMetadataResponseExt}; +use quickwit_parquet_engine::split::ParquetSplitKind; use quickwit_proto::metastore::{ IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, }; @@ -40,7 +41,11 @@ pub trait MetricsIndexResolver: Send + Sync + std::fmt::Debug { /// Returns the split provider and storage URI for `index_name`. The /// `ObjectStore` for that URI is built on demand by the registry the /// first time DataFusion reads from it. - async fn resolve(&self, index_name: &str) -> DFResult<(Arc, Uri)>; + async fn resolve( + &self, + index_name: &str, + split_kind: ParquetSplitKind, + ) -> DFResult<(Arc, Uri)>; async fn list_index_names(&self) -> DFResult>; } @@ -71,8 +76,12 @@ impl std::fmt::Debug for MetastoreIndexResolver { #[async_trait] impl MetricsIndexResolver for MetastoreIndexResolver { - async fn resolve(&self, index_name: &str) -> DFResult<(Arc, Uri)> { - debug!(index_name, "resolving metrics index"); + async fn resolve( + &self, + index_name: &str, + split_kind: ParquetSplitKind, + ) -> DFResult<(Arc, Uri)> { + debug!(index_name, ?split_kind, "resolving parquet index"); let response = self .metastore @@ -93,6 +102,7 @@ impl MetricsIndexResolver for MetastoreIndexResolver { let split_provider: Arc = Arc::new(MetastoreSplitProvider::new( self.metastore.clone(), index_uid, + split_kind, )); Ok((split_provider, index_uri)) diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs b/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs index 3ce08cd02b7..e046c08af0a 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs @@ -21,9 +21,9 @@ use datafusion::error::Result as DFResult; use quickwit_metastore::{ ListParquetSplitsQuery, ListParquetSplitsRequestExt, ListParquetSplitsResponseExt, }; -use quickwit_parquet_engine::split::ParquetSplitMetadata; +use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata}; use quickwit_proto::metastore::{ - ListMetricsSplitsRequest, MetastoreService, MetastoreServiceClient, + ListMetricsSplitsRequest, ListSketchSplitsRequest, MetastoreService, MetastoreServiceClient, }; use quickwit_proto::types::IndexUid; use tracing::{debug, instrument}; @@ -36,13 +36,19 @@ use super::table_provider::MetricsSplitProvider; pub struct MetastoreSplitProvider { metastore: MetastoreServiceClient, index_uid: IndexUid, + split_kind: ParquetSplitKind, } impl MetastoreSplitProvider { - pub fn new(metastore: MetastoreServiceClient, index_uid: IndexUid) -> Self { + pub fn new( + metastore: MetastoreServiceClient, + index_uid: IndexUid, + split_kind: ParquetSplitKind, + ) -> Self { Self { metastore, index_uid, + split_kind, } } } @@ -56,26 +62,45 @@ impl MetricsSplitProvider for MetastoreSplitProvider { metric_names = ?query.metric_names, time_range_start = ?query.time_range_start, time_range_end = ?query.time_range_end, + split_kind = ?self.split_kind, num_splits, ) )] async fn list_splits(&self, query: &MetricsSplitQuery) -> DFResult> { let metastore_query = to_metastore_query(&self.index_uid, query); - let request = - ListMetricsSplitsRequest::try_from_query(self.index_uid.clone(), &metastore_query) + let records = match self.split_kind { + ParquetSplitKind::Metrics => { + let request = ListMetricsSplitsRequest::try_from_query( + self.index_uid.clone(), + &metastore_query, + ) .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; - let response = self - .metastore - .clone() - .list_metrics_splits(request) - .await - .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; + self.metastore + .clone() + .list_metrics_splits(request) + .await + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))? + .deserialize_splits() + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))? + } + ParquetSplitKind::Sketches => { + let request = ListSketchSplitsRequest::try_from_query( + self.index_uid.clone(), + &metastore_query, + ) + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; - let records = response - .deserialize_splits() - .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; + self.metastore + .clone() + .list_sketch_splits(request) + .await + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))? + .deserialize_splits() + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))? + } + }; // The metastore guarantees only Published splits are returned because // `to_metastore_query` sets `split_states = vec![Published]`. No diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs index bd4f794df86..2844c101e96 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs @@ -24,6 +24,7 @@ pub(crate) mod factory; pub(crate) mod index_resolver; pub(crate) mod metastore_provider; pub(crate) mod predicate; +pub(crate) mod sketch_udf; pub(crate) mod table_provider; #[cfg(any(test, feature = "testsuite"))] @@ -38,14 +39,16 @@ use datafusion::arrow; use datafusion::catalog::{MemorySchemaProvider, SchemaProvider, TableProviderFactory}; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; -use quickwit_common::is_metrics_index; +use quickwit_common::{is_metrics_index, is_parquet_pipeline_index, is_sketches_index}; use quickwit_df_core::{ QuickwitRuntimePlugin, QuickwitRuntimeRegistration, QuickwitSubstraitConsumerExt, }; +use quickwit_parquet_engine::split::ParquetSplitKind; use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient}; -use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory}; +use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory, SKETCHES_FILE_TYPE}; use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver}; +use self::sketch_udf::{create_dd_quantile_udf, create_dd_sketch_udaf}; use self::table_provider::MetricsTableProvider; /// Returns `true` when `err` wraps a [`MetastoreError::NotFound`]. @@ -99,15 +102,16 @@ async fn resolve_metrics_table_provider( index_resolver: &dyn MetricsIndexResolver, index_name: &str, schema: SchemaRef, + split_kind: ParquetSplitKind, ) -> DFResult>> { - // Only claim indexes backed by the metrics (parquet) pipeline. This + // Only claim indexes backed by the parquet pipeline. This // remains a naming-prefix check today so sibling sources can coexist // without racing to claim every index. - if !is_metrics_index(index_name) { + if !is_parquet_pipeline_index(index_name) { return Ok(None); } - match index_resolver.resolve(index_name).await { + match index_resolver.resolve(index_name, split_kind).await { Ok((split_provider, index_uri)) => { let provider = MetricsTableProvider::new(schema, split_provider, index_uri)?; Ok(Some(Arc::new(provider))) @@ -122,6 +126,16 @@ async fn resolve_metrics_table_provider( } } +fn split_kind_from_index_name(index_name: &str) -> Option { + if is_metrics_index(index_name) { + Some(ParquetSplitKind::Metrics) + } else if is_sketches_index(index_name) { + Some(ParquetSplitKind::Sketches) + } else { + None + } +} + /// Minimal 4-column schema — always present in every OSS metrics parquet file. fn minimal_base_schema() -> SchemaRef { let dict = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); @@ -133,6 +147,23 @@ fn minimal_base_schema() -> SchemaRef { ])) } +/// Minimal sketch schema — always present in every DDSketch parquet file. +fn minimal_sketch_schema() -> SchemaRef { + Arc::new(ArrowSchema::new( + quickwit_parquet_engine::schema::sketch_fields::SketchParquetField::all() + .iter() + .map(|field| field.to_arrow_field()) + .collect::>(), + )) +} + +fn minimal_schema_for_kind(split_kind: ParquetSplitKind) -> SchemaRef { + match split_kind { + ParquetSplitKind::Metrics => minimal_base_schema(), + ParquetSplitKind::Sketches => minimal_sketch_schema(), + } +} + /// Native OSS `SchemaProvider` for metrics indexes. pub struct MetricsSchemaProvider { index_resolver: Arc, @@ -168,7 +199,7 @@ impl SchemaProvider for MetricsSchemaProvider { tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { if let Ok(mut resolved_names) = resolver.list_index_names().await { - resolved_names.retain(|id| is_metrics_index(id)); + resolved_names.retain(|id| is_parquet_pipeline_index(id)); names.append(&mut resolved_names); } }) @@ -183,8 +214,16 @@ impl SchemaProvider for MetricsSchemaProvider { return Ok(Some(provider)); } - resolve_metrics_table_provider(self.index_resolver.as_ref(), name, minimal_base_schema()) - .await + let Some(split_kind) = split_kind_from_index_name(name) else { + return Ok(None); + }; + resolve_metrics_table_provider( + self.index_resolver.as_ref(), + name, + minimal_schema_for_kind(split_kind), + split_kind, + ) + .await } fn table_exist(&self, name: &str) -> bool { @@ -209,8 +248,18 @@ impl QuickwitRuntimePlugin for MetricsDataSource { fn registration(&self) -> QuickwitRuntimeRegistration { let factory: Arc = Arc::new(MetricsTableProviderFactory::new( Arc::clone(&self.index_resolver), + ParquetSplitKind::Metrics, )); - QuickwitRuntimeRegistration::default().with_table_factory(METRICS_FILE_TYPE, factory) + let sketches_factory: Arc = + Arc::new(MetricsTableProviderFactory::new( + Arc::clone(&self.index_resolver), + ParquetSplitKind::Sketches, + )); + QuickwitRuntimeRegistration::default() + .with_table_factory(METRICS_FILE_TYPE, factory) + .with_table_factory(SKETCHES_FILE_TYPE, sketches_factory) + .with_udaf(Arc::new(create_dd_sketch_udaf())) + .with_udf(Arc::new(create_dd_quantile_udf())) } } @@ -260,11 +309,20 @@ impl QuickwitSubstraitConsumerExt for MetricsDataSource { }; let index_name = index_name.as_str(); - // Use the producer-declared schema if available; fall back to minimal base schema. - let schema = schema_hint.unwrap_or_else(minimal_base_schema); - let provider = - resolve_metrics_table_provider(self.index_resolver.as_ref(), index_name, schema) - .await?; + let Some(split_kind) = split_kind_from_index_name(index_name) else { + return Ok(None); + }; + + // Use the producer-declared schema if available; fall back to the + // minimal schema for the index family. + let schema = schema_hint.unwrap_or_else(|| minimal_schema_for_kind(split_kind)); + let provider = resolve_metrics_table_provider( + self.index_resolver.as_ref(), + index_name, + schema, + split_kind, + ) + .await?; Ok(provider.map(|provider| (index_name.to_string(), provider))) } } diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/sketch_udf.rs b/quickwit/quickwit-datafusion/src/sources/metrics/sketch_udf.rs new file mode 100644 index 00000000000..fbfa20e683b --- /dev/null +++ b/quickwit/quickwit-datafusion/src/sources/metrics/sketch_udf.rs @@ -0,0 +1,783 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! DataFusion UDFs for querying DDSketch parquet rows. +//! +//! `dd_sketch(keys, counts, count, min, max, flags)` is the decomposable aggregate: +//! it merges sparse DDSketch bucket arrays and scalar bounds into a single +//! struct. `dd_quantile(sketch, q)` is the final scalar projection over that +//! merged sketch. + +use std::any::Any; +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, AsArray, Float64Array, Float64Builder, Int16Array, ListArray, StructArray, + UInt32Array, UInt64Array, +}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field, Fields, Float64Type, Int16Type, UInt32Type, UInt64Type}; +use datafusion::common::{DataFusionError, Result as DFResult, ScalarValue}; +use datafusion::logical_expr::utils::format_state_name; +use datafusion::logical_expr::{ + Accumulator, AggregateUDF, AggregateUDFImpl, ColumnarValue, ScalarFunctionArgs, ScalarUDF, + ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; + +const SUPPORTED_SKETCH_FLAGS: u32 = 0; + +#[derive(Debug, Clone)] +pub(crate) struct SketchConfig { + gamma: f64, + bias: f64, +} + +impl SketchConfig { + fn quantile_for_key(&self, key: i16) -> f64 { + if key == 0 { + return 0.0; + } + let abs_key = key.unsigned_abs() as f64; + let value = self.gamma.powf(abs_key - self.bias); + if key > 0 { value } else { -value } + } +} + +impl Default for SketchConfig { + fn default() -> Self { + const EPSILON: f64 = 1.0 / 128.0; + const MIN_VALUE: f64 = 1e-9; + let gamma = 1.0 + 2.0 * EPSILON; + let gamma_ln = gamma.ln(); + let emin = (MIN_VALUE.ln() / gamma_ln).floor() as i64; + let bias = (-emin + 1) as f64; + Self { gamma, bias } + } +} + +pub(crate) fn merged_sketch_type() -> DataType { + DataType::Struct(Fields::from(vec![ + Field::new( + "keys", + DataType::List(Arc::new(Field::new("item", DataType::Int16, false))), + false, + ), + Field::new( + "counts", + DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))), + false, + ), + Field::new("total_count", DataType::UInt64, false), + Field::new("global_min", DataType::Float64, false), + Field::new("global_max", DataType::Float64, false), + Field::new("flags", DataType::UInt32, false), + ])) +} + +fn validate_flags(context: &str, flags: u32) -> DFResult<()> { + if flags == SUPPORTED_SKETCH_FLAGS { + Ok(()) + } else { + Err(DataFusionError::Execution(format!( + "{context}: unsupported sketch flags {flags}" + ))) + } +} + +fn validate_bounds(context: &str, min: f64, max: f64) -> DFResult<()> { + if min.is_finite() && max.is_finite() && min <= max { + Ok(()) + } else { + Err(DataFusionError::Execution(format!( + "{context}: invalid sketch bounds: min={min}, max={max}" + ))) + } +} + +fn quantile_for_row( + config: &SketchConfig, + keys: &[i16], + counts: &[u64], + total_count: u64, + global_min: f64, + global_max: f64, + flags: u32, + quantile: f64, +) -> DFResult> { + if !(0.0..=1.0).contains(&quantile) { + return Err(DataFusionError::Execution(format!( + "dd_quantile: quantile must be between 0.0 and 1.0, got {quantile}" + ))); + } + validate_flags("dd_quantile", flags)?; + + if keys.len() != counts.len() { + return Err(DataFusionError::Execution(format!( + "dd_quantile: keys/counts length mismatch: keys has {} elements but counts has {}", + keys.len(), + counts.len() + ))); + } + let mut bucket_total = 0u64; + for count in counts { + bucket_total = bucket_total.checked_add(*count).ok_or_else(|| { + DataFusionError::Execution("dd_quantile: total bucket count overflow".to_string()) + })?; + } + if bucket_total != total_count { + return Err(DataFusionError::Execution(format!( + "dd_quantile: total_count {total_count} does not match sum(counts) {bucket_total}" + ))); + } + + if total_count == 0 { + return Ok(None); + } + validate_bounds("dd_quantile", global_min, global_max)?; + + if quantile == 0.0 { + return Ok(Some(global_min)); + } + if quantile == 1.0 { + return Ok(Some(global_max)); + } + + let rank = (quantile * (total_count as f64 - 1.0)).floor() as u64 + 1; + let mut cumulative = 0u64; + for (key, count) in keys.iter().zip(counts.iter()) { + cumulative = cumulative.checked_add(*count).ok_or_else(|| { + DataFusionError::Execution("dd_quantile: cumulative count overflow".to_string()) + })?; + if cumulative >= rank { + return Ok(Some( + config.quantile_for_key(*key).clamp(global_min, global_max), + )); + } + } + + Err(DataFusionError::Execution(format!( + "dd_quantile: bucket counts do not reach rank {rank}" + ))) +} + +pub(crate) struct DdSketchAccumulator { + merged_buckets: BTreeMap, + total_count: u64, + global_min: f64, + global_max: f64, + flags: u32, +} + +impl Debug for DdSketchAccumulator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DdSketchAccumulator") + .field("total_count", &self.total_count) + .field("global_min", &self.global_min) + .field("global_max", &self.global_max) + .field("buckets_len", &self.merged_buckets.len()) + .finish() + } +} + +impl Default for DdSketchAccumulator { + fn default() -> Self { + Self::new() + } +} + +impl DdSketchAccumulator { + fn new() -> Self { + Self { + merged_buckets: BTreeMap::new(), + total_count: 0, + global_min: f64::INFINITY, + global_max: f64::NEG_INFINITY, + flags: SUPPORTED_SKETCH_FLAGS, + } + } + + fn update_single( + &mut self, + keys: &[i16], + counts: &[u64], + count: u64, + min: f64, + max: f64, + flags: u32, + ) -> DFResult<()> { + if keys.len() != counts.len() { + return Err(DataFusionError::Execution(format!( + "dd_sketch: keys/counts length mismatch: keys has {} elements but counts has {}", + keys.len(), + counts.len() + ))); + } + + let mut bucket_total = 0u64; + for cnt in counts { + bucket_total = bucket_total.checked_add(*cnt).ok_or_else(|| { + DataFusionError::Execution( + "dd_sketch: bucket count overflow while validating row".to_string(), + ) + })?; + } + + if bucket_total != count { + return Err(DataFusionError::Execution(format!( + "dd_sketch: row count {count} does not match sum(counts) {bucket_total}" + ))); + } + + validate_flags("dd_sketch", flags)?; + if count == 0 { + return Ok(()); + } + validate_bounds("dd_sketch", min, max)?; + + for (key, cnt) in keys.iter().zip(counts.iter()) { + let current = self.merged_buckets.entry(*key).or_insert(0); + *current = current.checked_add(*cnt).ok_or_else(|| { + DataFusionError::Execution(format!( + "dd_sketch: bucket count overflow while merging key {key}" + )) + })?; + } + + self.total_count = self.total_count.checked_add(count).ok_or_else(|| { + DataFusionError::Execution( + "dd_sketch: total count overflow while merging rows".to_string(), + ) + })?; + self.global_min = self.global_min.min(min); + self.global_max = self.global_max.max(max); + self.flags = flags; + Ok(()) + } + + fn state_arrays(&self) -> (ArrayRef, ArrayRef, ArrayRef, ArrayRef, ArrayRef, ArrayRef) { + let mut keys = Vec::with_capacity(self.merged_buckets.len()); + let mut counts = Vec::with_capacity(self.merged_buckets.len()); + for (key, count) in &self.merged_buckets { + keys.push(*key); + counts.push(*count); + } + + let keys_array = Arc::new(Int16Array::from(keys)) as ArrayRef; + let counts_array = Arc::new(UInt64Array::from(counts)) as ArrayRef; + + let keys_list = Arc::new(ListArray::new( + Arc::new(Field::new("item", DataType::Int16, false)), + OffsetBuffer::from_lengths([keys_array.len()]), + keys_array, + None, + )) as ArrayRef; + let counts_list = Arc::new(ListArray::new( + Arc::new(Field::new("item", DataType::UInt64, false)), + OffsetBuffer::from_lengths([counts_array.len()]), + counts_array, + None, + )) as ArrayRef; + + ( + keys_list, + counts_list, + Arc::new(UInt64Array::from(vec![self.total_count])) as ArrayRef, + Arc::new(Float64Array::from(vec![self.global_min])) as ArrayRef, + Arc::new(Float64Array::from(vec![self.global_max])) as ArrayRef, + Arc::new(UInt32Array::from(vec![self.flags])) as ArrayRef, + ) + } +} + +impl Accumulator for DdSketchAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> { + let keys_list = values[0].as_list::(); + let counts_list = values[1].as_list::(); + let count_array = values[2].as_primitive::(); + let min_array = values[3].as_primitive::(); + let max_array = values[4].as_primitive::(); + let flags_array = values[5].as_primitive::(); + + let key_offsets = keys_list.value_offsets(); + let key_values = keys_list.values().as_primitive::().values(); + let count_offsets = counts_list.value_offsets(); + let count_values = counts_list.values().as_primitive::().values(); + + for row_idx in 0..keys_list.len() { + if keys_list.is_null(row_idx) + || counts_list.is_null(row_idx) + || count_array.is_null(row_idx) + || min_array.is_null(row_idx) + || max_array.is_null(row_idx) + || flags_array.is_null(row_idx) + { + continue; + } + + let key_start = key_offsets[row_idx] as usize; + let key_end = key_offsets[row_idx + 1] as usize; + let count_start = count_offsets[row_idx] as usize; + let count_end = count_offsets[row_idx + 1] as usize; + + self.update_single( + &key_values[key_start..key_end], + &count_values[count_start..count_end], + count_array.value(row_idx), + min_array.value(row_idx), + max_array.value(row_idx), + flags_array.value(row_idx), + )?; + } + + Ok(()) + } + + fn evaluate(&mut self) -> DFResult { + let (keys_list, counts_list, total_count_arr, min_arr, max_arr, flags_arr) = + self.state_arrays(); + let fields = Fields::from(vec![ + Field::new("keys", keys_list.data_type().clone(), false), + Field::new("counts", counts_list.data_type().clone(), false), + Field::new("total_count", DataType::UInt64, false), + Field::new("global_min", DataType::Float64, false), + Field::new("global_max", DataType::Float64, false), + Field::new("flags", DataType::UInt32, false), + ]); + + Ok(ScalarValue::Struct(Arc::new(StructArray::try_new( + fields, + vec![ + keys_list, + counts_list, + total_count_arr, + min_arr, + max_arr, + flags_arr, + ], + None, + )?))) + } + + fn state(&mut self) -> DFResult> { + let (keys_list, counts_list, total_count_arr, min_arr, max_arr, flags_arr) = + self.state_arrays(); + Ok(vec![ + ScalarValue::List(Arc::new(keys_list.as_list::().clone())), + ScalarValue::List(Arc::new(counts_list.as_list::().clone())), + ScalarValue::UInt64(Some(total_count_arr.as_primitive::().value(0))), + ScalarValue::Float64(Some(min_arr.as_primitive::().value(0))), + ScalarValue::Float64(Some(max_arr.as_primitive::().value(0))), + ScalarValue::UInt32(Some(flags_arr.as_primitive::().value(0))), + ]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> { + self.update_batch(states) + } + + fn size(&self) -> usize { + std::mem::size_of::() + self.merged_buckets.len() * 58 + } +} + +#[derive(Debug)] +struct DdSketchUdaf { + signature: Signature, +} + +impl PartialEq for DdSketchUdaf { + fn eq(&self, _other: &Self) -> bool { + true + } +} + +impl Eq for DdSketchUdaf {} + +impl std::hash::Hash for DdSketchUdaf { + fn hash(&self, state: &mut H) { + "dd_sketch".hash(state); + } +} + +impl DdSketchUdaf { + fn new() -> Self { + Self { + signature: Signature::exact( + vec![ + DataType::List(Arc::new(Field::new("item", DataType::Int16, false))), + DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))), + DataType::UInt64, + DataType::Float64, + DataType::Float64, + DataType::UInt32, + ], + Volatility::Immutable, + ), + } + } +} + +impl AggregateUDFImpl for DdSketchUdaf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "dd_sketch" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> DFResult { + Ok(merged_sketch_type()) + } + + fn accumulator( + &self, + _arg: datafusion::logical_expr::function::AccumulatorArgs, + ) -> DFResult> { + Ok(Box::new(DdSketchAccumulator::new())) + } + + fn state_fields( + &self, + args: datafusion::logical_expr::function::StateFieldsArgs, + ) -> DFResult>> { + Ok(vec![ + Arc::new(Field::new( + format_state_name(args.name, "merged_keys"), + DataType::List(Arc::new(Field::new("item", DataType::Int16, false))), + true, + )), + Arc::new(Field::new( + format_state_name(args.name, "merged_counts"), + DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))), + true, + )), + Arc::new(Field::new( + format_state_name(args.name, "total_count"), + DataType::UInt64, + true, + )), + Arc::new(Field::new( + format_state_name(args.name, "global_min"), + DataType::Float64, + true, + )), + Arc::new(Field::new( + format_state_name(args.name, "global_max"), + DataType::Float64, + true, + )), + Arc::new(Field::new( + format_state_name(args.name, "flags"), + DataType::UInt32, + true, + )), + ]) + } +} + +#[derive(Debug)] +struct DdQuantileUdf { + signature: Signature, + config: SketchConfig, +} + +impl PartialEq for DdQuantileUdf { + fn eq(&self, other: &Self) -> bool { + self.config.gamma.to_bits() == other.config.gamma.to_bits() + && self.config.bias.to_bits() == other.config.bias.to_bits() + } +} + +impl Eq for DdQuantileUdf {} + +impl std::hash::Hash for DdQuantileUdf { + fn hash(&self, state: &mut H) { + self.config.gamma.to_bits().hash(state); + self.config.bias.to_bits().hash(state); + } +} + +impl DdQuantileUdf { + fn new(config: SketchConfig) -> Self { + Self { + signature: Signature::new( + TypeSignature::Exact(vec![merged_sketch_type(), DataType::Float64]), + Volatility::Immutable, + ), + config, + } + } +} + +impl ScalarUDFImpl for DdQuantileUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "dd_quantile" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> DFResult { + Ok(DataType::Float64) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> DFResult { + let sketch_array = args.args[0].to_array(args.number_rows)?; + let quantile_array = args.args[1].to_array(args.number_rows)?; + let quantiles = quantile_array.as_primitive::(); + + let sketches = sketch_array.as_struct(); + let keys_col = sketches.column(0).as_list::(); + let counts_col = sketches.column(1).as_list::(); + let total_count_col = sketches.column(2).as_primitive::(); + let min_col = sketches.column(3).as_primitive::(); + let max_col = sketches.column(4).as_primitive::(); + let flags_col = sketches.column(5).as_primitive::(); + + let key_offsets = keys_col.value_offsets(); + let key_values = keys_col.values().as_primitive::().values(); + let count_offsets = counts_col.value_offsets(); + let count_values = counts_col.values().as_primitive::().values(); + + let mut results = Float64Builder::with_capacity(args.number_rows); + for row in 0..args.number_rows { + if sketches.is_null(row) + || keys_col.is_null(row) + || counts_col.is_null(row) + || total_count_col.is_null(row) + || min_col.is_null(row) + || max_col.is_null(row) + || flags_col.is_null(row) + || quantiles.is_null(row) + { + results.append_null(); + continue; + } + + let key_start = key_offsets[row] as usize; + let key_end = key_offsets[row + 1] as usize; + let count_start = count_offsets[row] as usize; + let count_end = count_offsets[row + 1] as usize; + + match quantile_for_row( + &self.config, + &key_values[key_start..key_end], + &count_values[count_start..count_end], + total_count_col.value(row), + min_col.value(row), + max_col.value(row), + flags_col.value(row), + quantiles.value(row), + )? { + Some(value) => results.append_value(value), + None => results.append_null(), + } + } + + Ok(ColumnarValue::Array(Arc::new(results.finish()))) + } +} + +pub(crate) fn create_dd_sketch_udaf() -> AggregateUDF { + AggregateUDF::from(DdSketchUdaf::new()) +} + +pub(crate) fn create_dd_quantile_udf() -> ScalarUDF { + ScalarUDF::from(DdQuantileUdf::new(SketchConfig::default())) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn merge_accumulator_coalesces_keys() { + let mut acc = DdSketchAccumulator::new(); + acc.update_single(&[1338, 1784], &[131_072, 1], 131_073, 1.0, 1000.0, 0) + .unwrap(); + acc.update_single(&[1338, 1784], &[128, 4], 132, 1.0, 1000.0, 0) + .unwrap(); + + assert_eq!(acc.total_count, 131_205); + assert_eq!(acc.merged_buckets[&1338], 131_200); + assert_eq!(acc.merged_buckets[&1784], 5); + } + + #[test] + fn merge_batch_combines_partial_states() { + let mut left = DdSketchAccumulator::new(); + left.update_single(&[1338, 1400], &[3, 1], 4, 1.0, 2.0, 0) + .unwrap(); + let mut right = DdSketchAccumulator::new(); + right + .update_single(&[1338, 1450], &[2, 1], 3, 1.0, 6.0, 0) + .unwrap(); + + let left_state = left.state().unwrap(); + let right_state = right.state().unwrap(); + let state_arrays = (0..left_state.len()) + .map(|idx| { + ScalarValue::iter_to_array(vec![left_state[idx].clone(), right_state[idx].clone()]) + .unwrap() + }) + .collect::>(); + + let mut merged = DdSketchAccumulator::new(); + merged.merge_batch(&state_arrays).unwrap(); + + let mut one_pass = DdSketchAccumulator::new(); + one_pass + .update_single(&[1338, 1400], &[3, 1], 4, 1.0, 2.0, 0) + .unwrap(); + one_pass + .update_single(&[1338, 1450], &[2, 1], 3, 1.0, 6.0, 0) + .unwrap(); + + assert_eq!(merged.total_count, one_pass.total_count); + assert_eq!(merged.global_min, one_pass.global_min); + assert_eq!(merged.global_max, one_pass.global_max); + assert_eq!(merged.flags, one_pass.flags); + assert_eq!(merged.merged_buckets, one_pass.merged_buckets); + } + + #[test] + fn merge_accumulator_ignores_empty_sketch_bounds() { + let mut acc = DdSketchAccumulator::new(); + acc.update_single(&[], &[], 0, -999.0, 999.0, 0).unwrap(); + acc.update_single(&[1338], &[2], 2, 1.0, 2.0, 0).unwrap(); + acc.update_single(&[999], &[0], 0, -123.0, 456.0, 0) + .unwrap(); + + assert_eq!(acc.total_count, 2); + assert_eq!(acc.global_min, 1.0); + assert_eq!(acc.global_max, 2.0); + assert_eq!(acc.merged_buckets[&1338], 2); + assert!(!acc.merged_buckets.contains_key(&999)); + } + + #[test] + fn merge_accumulator_rejects_invalid_bounds_for_non_empty_sketch() { + let mut acc = DdSketchAccumulator::new(); + let reversed = acc.update_single(&[1], &[1], 1, 2.0, 1.0, 0).unwrap_err(); + assert!(reversed.to_string().contains("invalid sketch bounds")); + + let nan = acc + .update_single(&[1], &[1], 1, f64::NAN, 1.0, 0) + .unwrap_err(); + assert!(nan.to_string().contains("invalid sketch bounds")); + } + + #[test] + fn merge_accumulator_rejects_unsupported_flags() { + let mut acc = DdSketchAccumulator::new(); + let err = acc.update_single(&[1], &[1], 1, 1.0, 1.0, 1).unwrap_err(); + assert!(err.to_string().contains("unsupported sketch flags")); + } + + #[test] + fn merge_accumulator_rejects_inconsistent_summary_count() { + let mut acc = DdSketchAccumulator::new(); + let err = acc + .update_single(&[1, 2], &[3, 4], 99, 1.0, 2.0, 0) + .unwrap_err(); + + assert!( + err.to_string().contains("does not match sum(counts)"), + "{err}" + ); + } + + #[test] + fn quantile_for_key_matches_sparse_config_anchor() { + let config = SketchConfig::default(); + + assert!((config.quantile_for_key(1338) - 1.0).abs() < f64::EPSILON); + assert!((config.quantile_for_key(1784) - 1000.0).abs() < 10.0); + assert_eq!(config.quantile_for_key(0), 0.0); + } + + #[test] + fn quantile_for_row_validates_inputs() { + let config = SketchConfig::default(); + let invalid_q = quantile_for_row(&config, &[1], &[1], 1, 1.0, 1.0, 0, 1.5).unwrap_err(); + assert!(invalid_q.to_string().contains("quantile must be")); + + let reversed_bounds = + quantile_for_row(&config, &[1], &[1], 1, 2.0, 1.0, 0, 0.5).unwrap_err(); + assert!( + reversed_bounds + .to_string() + .contains("invalid sketch bounds") + ); + + let nan_bounds = + quantile_for_row(&config, &[1], &[1], 1, f64::NAN, 1.0, 0, 0.5).unwrap_err(); + assert!(nan_bounds.to_string().contains("invalid sketch bounds")); + + let flags = quantile_for_row(&config, &[1], &[1], 1, 1.0, 1.0, 1, 0.5).unwrap_err(); + assert!(flags.to_string().contains("unsupported sketch flags")); + + let mismatched_lengths = + quantile_for_row(&config, &[1, 2], &[1], 1, 1.0, 1.0, 0, 0.5).unwrap_err(); + assert!( + mismatched_lengths + .to_string() + .contains("keys/counts length mismatch") + ); + + let mismatched_total = + quantile_for_row(&config, &[1], &[2], 1, 1.0, 1.0, 0, 0.5).unwrap_err(); + assert!(mismatched_total.to_string().contains("does not match")); + } + + #[test] + fn quantile_for_row_handles_bounds_and_empty_sketches() { + let config = SketchConfig::default(); + + assert_eq!( + quantile_for_row( + &config, + &[], + &[], + 0, + f64::INFINITY, + f64::NEG_INFINITY, + 0, + 0.5 + ) + .unwrap(), + None + ); + assert_eq!( + quantile_for_row(&config, &[1338], &[2], 2, 1.0, 2.0, 0, 0.0).unwrap(), + Some(1.0) + ); + assert_eq!( + quantile_for_row(&config, &[1338], &[2], 2, 1.0, 2.0, 0, 1.0).unwrap(), + Some(2.0) + ); + } +} diff --git a/quickwit/quickwit-datafusion/tests/common/mod.rs b/quickwit/quickwit-datafusion/tests/common/mod.rs index 2c8c9eb6406..95f1d655c8f 100644 --- a/quickwit/quickwit-datafusion/tests/common/mod.rs +++ b/quickwit/quickwit-datafusion/tests/common/mod.rs @@ -15,7 +15,9 @@ //! Shared helpers for quickwit-datafusion integration tests. pub mod sandbox; +#[allow(dead_code)] pub mod splits; pub use sandbox::TestSandbox; -pub use splits::{create_metrics_index, publish_split}; +#[allow(unused_imports)] +pub use splits::{create_metrics_index, publish_sketch_split, publish_split}; diff --git a/quickwit/quickwit-datafusion/tests/common/splits.rs b/quickwit/quickwit-datafusion/tests/common/splits.rs index 6498679291a..75a9544f4d4 100644 --- a/quickwit/quickwit-datafusion/tests/common/splits.rs +++ b/quickwit/quickwit-datafusion/tests/common/splits.rs @@ -26,7 +26,7 @@ use quickwit_parquet_engine::storage::{ParquetWriter, ParquetWriterConfig}; use quickwit_parquet_engine::table_config::TableConfig; use quickwit_proto::metastore::{ CreateIndexRequest, MetastoreService, MetastoreServiceClient, PublishMetricsSplitsRequest, - StageMetricsSplitsRequest, + PublishSketchSplitsRequest, StageMetricsSplitsRequest, StageSketchSplitsRequest, }; use quickwit_proto::types::IndexUid; @@ -172,3 +172,123 @@ pub async fn publish_split( .await .expect("publish_metrics_splits"); } + +/// Write `batch` as a sketch parquet file, stage it in the sketch split table, +/// and publish it. +/// +/// This intentionally writes the Arrow batch directly through Parquet's +/// `ArrowWriter`: the production split writer still assumes metrics rows have +/// a `timeseries_id` sort key, while the sketch schema under test stores only +/// the DDSketch payload columns plus tags. +pub async fn publish_sketch_split( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + data_dir: &std::path::Path, + split_name: &str, + batch: &RecordBatch, +) { + let file_path = data_dir.join(format!("{split_name}.parquet")); + let file = std::fs::File::create(&file_path).expect("create sketch parquet file"); + let mut writer = + parquet::arrow::ArrowWriter::try_new(file, batch.schema(), None).expect("arrow writer"); + writer.write(batch).expect("write sketch parquet batch"); + writer.close().expect("close sketch parquet writer"); + let size_bytes = std::fs::metadata(&file_path) + .expect("sketch parquet metadata") + .len(); + + let batch_schema = batch.schema(); + let ts_idx = batch_schema.index_of("timestamp_secs").unwrap(); + let ts_col = batch + .column(ts_idx) + .as_any() + .downcast_ref::() + .unwrap(); + let min_ts = (0..ts_col.len()) + .map(|i| ts_col.value(i)) + .min() + .unwrap_or(0); + let max_ts = (0..ts_col.len()) + .map(|i| ts_col.value(i)) + .max() + .unwrap_or(0); + + let mn_idx = batch_schema.index_of("metric_name").unwrap(); + let dict = batch + .column(mn_idx) + .as_any() + .downcast_ref::>() + .unwrap(); + let values = dict + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let metric_names: HashSet = (0..values.len()) + .filter(|i| !values.is_null(*i)) + .map(|i| values.value(i).to_string()) + .collect(); + + let mut builder = ParquetSplitMetadata::sketches_builder() + .split_id(ParquetSplitId::new(split_name)) + .index_uid(index_uid.to_string()) + .time_range(TimeRange::new(min_ts, max_ts + 1)) + .num_rows(batch.num_rows() as u64) + .size_bytes(size_bytes); + for name in &metric_names { + builder = builder.add_metric_name(name.clone()); + } + + for tag_col in &["service", "env", "datacenter", "region", "host"] { + if let Ok(col_idx) = batch_schema.index_of(tag_col) { + let col = batch.column(col_idx); + let values: HashSet = if let Some(dict) = + col.as_any() + .downcast_ref::>() + { + let keys = dict + .keys() + .as_any() + .downcast_ref::() + .unwrap(); + let vals = dict + .values() + .as_any() + .downcast_ref::() + .unwrap(); + (0..batch.num_rows()) + .filter(|i| !keys.is_null(*i)) + .map(|i| vals.value(keys.value(i) as usize).to_string()) + .collect() + } else { + HashSet::new() + }; + for v in values { + builder = builder.add_low_cardinality_tag(tag_col.to_string(), v); + } + } + } + + metastore + .clone() + .stage_sketch_splits( + StageSketchSplitsRequest::try_from_splits_metadata( + index_uid.clone(), + &[builder.build()], + ) + .unwrap(), + ) + .await + .expect("stage_sketch_splits"); + metastore + .clone() + .publish_sketch_splits(PublishSketchSplitsRequest { + index_uid: Some(index_uid.clone()), + staged_split_ids: vec![split_name.to_string()], + replaced_split_ids: vec![], + index_checkpoint_delta_json_opt: None, + publish_token_opt: None, + }) + .await + .expect("publish_sketch_splits"); +} diff --git a/quickwit/quickwit-datafusion/tests/sketches.rs b/quickwit/quickwit-datafusion/tests/sketches.rs new file mode 100644 index 00000000000..0f2d8e3dd39 --- /dev/null +++ b/quickwit/quickwit-datafusion/tests/sketches.rs @@ -0,0 +1,282 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Integration tests for DDSketch DataFusion queries. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::array::{Float64Array, RecordBatch}; +use datafusion::arrow; +use quickwit_datafusion::service::split_sql_statements; +use quickwit_datafusion::sources::metrics::MetricsDataSource; +use quickwit_datafusion::{DataFusionSessionBuilder, QuickwitObjectStoreRegistry}; +use quickwit_parquet_engine::ingest::{ArrowSketchBatchBuilder, SketchDataPoint}; + +mod common; + +use common::{TestSandbox, create_metrics_index, publish_sketch_split}; + +async fn start_sandbox() -> TestSandbox { + quickwit_common::setup_logging_for_tests(); + TestSandbox::start().await +} + +fn session_builder(sandbox: &TestSandbox) -> DataFusionSessionBuilder { + let source = Arc::new(MetricsDataSource::new(sandbox.metastore.clone())); + let schema_source = Arc::clone(&source); + let registry = Arc::new(QuickwitObjectStoreRegistry::new( + sandbox.storage_resolver.clone(), + )); + DataFusionSessionBuilder::new() + .with_object_store_registry(registry) + .expect("install object store registry") + .with_runtime_plugin(Arc::clone(&source) as Arc<_>) + .with_substrait_consumer(source as Arc<_>) + .with_schema_provider_factory("quickwit", "public", move || { + schema_source.schema_provider() + }) +} + +async fn run_sql(builder: &DataFusionSessionBuilder, sql: &str) -> Vec { + let ctx = builder.build_session().unwrap(); + let mut statements = split_sql_statements(&ctx, sql).unwrap(); + let last = statements.pop().unwrap(); + for statement in statements { + ctx.sql(&statement).await.unwrap().collect().await.unwrap(); + } + ctx.sql(&last).await.unwrap().collect().await.unwrap() +} + +fn make_doc_example_batch() -> RecordBatch { + let mut builder = ArrowSketchBatchBuilder::with_capacity(3); + for (count, sum, max, keys, counts, host) in [ + (4, 50.0, 20.0, vec![1338, 1400], vec![3, 1], "a"), + (3, 45.0, 18.0, vec![1338, 1450], vec![2, 1], "a"), + (5, 70.0, 22.0, vec![1338, 1500], vec![4, 1], "b"), + ] { + let tags = HashMap::from([ + ("service".to_string(), "api".to_string()), + ("host".to_string(), host.to_string()), + ("region".to_string(), "us".to_string()), + ]); + builder.append(SketchDataPoint { + metric_name: "req.latency".to_string(), + timestamp_secs: 600, + count, + sum, + min: 0.5, + max, + flags: 0, + keys, + counts, + tags, + }); + } + builder.finish() +} + +fn float_value(batch: &RecordBatch, name: &str) -> f64 { + batch + .column_by_name(name) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0) +} + +fn display_value(batch: &RecordBatch, name: &str) -> String { + let col = batch.column_by_name(name).unwrap(); + arrow::util::display::array_value_to_string(col, 0).unwrap() +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_sketch_merge_and_quantile_sql() { + let sandbox = start_sandbox().await; + let metastore = sandbox.metastore.clone(); + let data_dir = &sandbox.data_dir; + let builder = session_builder(&sandbox); + + let index_uid = create_metrics_index(&metastore, "sketches-latency", data_dir.path()).await; + let batch = make_doc_example_batch(); + publish_sketch_split( + &metastore, + &index_uid, + data_dir.path(), + "sketch_split_1", + &batch, + ) + .await; + + let sql = r#" + SELECT + SUM("count") AS total_count, + SUM("sum") AS total_sum, + MIN("min") AS global_min, + MAX("max") AS global_max, + dd_quantile(dd_sketch(keys, counts, "count", "min", "max", flags), 0.50) AS p50, + dd_quantile(dd_sketch(keys, counts, "count", "min", "max", flags), 0.99) AS p99 + FROM "sketches-latency" + WHERE metric_name = 'req.latency' AND timestamp_secs = 600 + "#; + let batches = run_sql(&builder, sql).await; + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 1); + + assert_eq!(display_value(&batches[0], "total_count"), "12"); + assert_eq!(float_value(&batches[0], "total_sum"), 165.0); + assert_eq!(float_value(&batches[0], "global_min"), 0.5); + assert_eq!(float_value(&batches[0], "global_max"), 22.0); + + let p50 = float_value(&batches[0], "p50"); + let p99 = float_value(&batches[0], "p99"); + assert!((p50 - 1.0).abs() < f64::EPSILON, "p50={p50}"); + assert!((p99 - 5.677260965422914).abs() < 1e-12, "p99={p99}"); + + let multiple_sketch_states_sql = r#" + SELECT + dd_quantile(dd_sketch(keys, counts, "count", "min", "max", flags), 0.99) AS p99_all, + dd_quantile( + dd_sketch(keys, counts, "count", "min", "max", flags) + FILTER (WHERE "max" <= 20.0), + 0.99 + ) AS p99_filtered + FROM "sketches-latency" + WHERE metric_name = 'req.latency' AND timestamp_secs = 600 + "#; + let multiple_state_batches = run_sql(&builder, multiple_sketch_states_sql).await; + assert_eq!(multiple_state_batches.len(), 1); + assert_eq!(multiple_state_batches[0].num_rows(), 1); + let p99_all = float_value(&multiple_state_batches[0], "p99_all"); + let p99_filtered = float_value(&multiple_state_batches[0], "p99_filtered"); + assert!( + (p99_all - 5.677260965422914).abs() < 1e-12, + "p99_all={p99_all}" + ); + assert!( + (p99_filtered - 2.614988148096247).abs() < 1e-12, + "p99_filtered={p99_filtered}" + ); + + let ddl_sql = r#" + CREATE EXTERNAL TABLE "sketches_explicit" ( + metric_name VARCHAR NOT NULL, + timestamp_secs BIGINT UNSIGNED NOT NULL, + count BIGINT UNSIGNED NOT NULL, + sum DOUBLE NOT NULL, + min DOUBLE NOT NULL, + max DOUBLE NOT NULL, + flags INT UNSIGNED NOT NULL, + keys ARRAY NOT NULL, + counts ARRAY NOT NULL + ) STORED AS sketches LOCATION 'sketches-latency'; + + SELECT + dd_quantile(dd_sketch(keys, counts, "count", "min", "max", flags), 0.50) AS p50 + FROM "sketches_explicit" + WHERE metric_name = 'req.latency' AND timestamp_secs = 600 + "#; + let ddl_batches = run_sql(&builder, ddl_sql).await; + assert_eq!(ddl_batches.len(), 1); + assert_eq!(ddl_batches[0].num_rows(), 1); + let ddl_p50 = float_value(&ddl_batches[0], "p50"); + assert!((ddl_p50 - 1.0).abs() < f64::EPSILON, "p50={ddl_p50}"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_sketch_merge_and_quantile_substrait() { + use datafusion_substrait::logical_plan::producer::to_substrait_plan; + use datafusion_substrait::substrait::proto::extensions::simple_extension_declaration::MappingType; + use prost::Message; + + let sandbox = start_sandbox().await; + let metastore = sandbox.metastore.clone(); + let data_dir = &sandbox.data_dir; + let builder = session_builder(&sandbox); + + let index_uid = create_metrics_index(&metastore, "datadog-sketches", data_dir.path()).await; + let batch = make_doc_example_batch(); + publish_sketch_split( + &metastore, + &index_uid, + data_dir.path(), + "sketch_split_1", + &batch, + ) + .await; + + let ctx = builder.build_session().unwrap(); + ctx.sql( + r#" + CREATE OR REPLACE EXTERNAL TABLE "datadog-sketches" ( + metric_name VARCHAR NOT NULL, + timestamp_secs BIGINT UNSIGNED NOT NULL, + count BIGINT UNSIGNED NOT NULL, + sum DOUBLE NOT NULL, + min DOUBLE NOT NULL, + max DOUBLE NOT NULL, + flags INT UNSIGNED NOT NULL, + keys ARRAY NOT NULL, + counts ARRAY NOT NULL + ) STORED AS sketches LOCATION 'datadog-sketches' + "#, + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let df = ctx + .sql( + r#" + SELECT + dd_quantile(dd_sketch(keys, counts, "count", "min", "max", flags), 0.50) AS p50 + FROM "datadog-sketches" + WHERE metric_name = 'req.latency' AND timestamp_secs = 600 + "#, + ) + .await + .unwrap(); + let plan = df.into_optimized_plan().unwrap(); + let substrait_plan = to_substrait_plan(&plan, &ctx.state()).unwrap(); + let extension_functions: Vec<&str> = substrait_plan + .extensions + .iter() + .filter_map(|extension| match &extension.mapping_type { + Some(MappingType::ExtensionFunction(function)) => Some(function.name.as_str()), + _ => None, + }) + .collect(); + assert!( + extension_functions.contains(&"dd_sketch"), + "Substrait plan must declare dd_sketch aggregate UDF; got {extension_functions:?}" + ); + assert!( + extension_functions.contains(&"dd_quantile"), + "Substrait plan must declare dd_quantile scalar UDF; got {extension_functions:?}" + ); + + let plan_bytes = substrait_plan.encode_to_vec(); + let stream = builder.execute_substrait(&plan_bytes).await.unwrap(); + let batches = datafusion::physical_plan::common::collect(stream) + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 1); + let p50 = float_value(&batches[0], "p50"); + assert!((p50 - 1.0).abs() < f64::EPSILON, "p50={p50}"); +} diff --git a/quickwit/quickwit-df-core/src/data_source.rs b/quickwit/quickwit-df-core/src/data_source.rs index 71a05ff1583..6648d96ad4a 100644 --- a/quickwit/quickwit-df-core/src/data_source.rs +++ b/quickwit/quickwit-df-core/src/data_source.rs @@ -35,7 +35,7 @@ use datafusion::catalog::TableProviderFactory; use datafusion::datasource::TableProvider; use datafusion::error::Result as DFResult; use datafusion::execution::SessionStateBuilder; -use datafusion::logical_expr::ScalarUDF; +use datafusion::logical_expr::{AggregateUDF, ScalarUDF}; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::prelude::SessionConfig; @@ -53,6 +53,7 @@ pub struct QuickwitRuntimeRegistration { session_config_setters: Vec, physical_optimizer_rules: Vec>, udfs: Vec>, + udafs: Vec>, table_factories: Vec<(String, Arc)>, } @@ -70,6 +71,11 @@ impl QuickwitRuntimeRegistration { self } + pub fn with_udaf(mut self, udaf: Arc) -> Self { + self.udafs.push(udaf); + self + } + pub fn with_physical_optimizer_rule( mut self, rule: Arc, @@ -91,6 +97,13 @@ impl QuickwitRuntimeRegistration { self.udfs.iter().map(|udf| udf.name().to_string()).collect() } + pub(crate) fn udaf_names(&self) -> Vec { + self.udafs + .iter() + .map(|udaf| udaf.name().to_string()) + .collect() + } + pub fn apply_to_config(&self, config: &mut SessionConfig) { for setter in &self.session_config_setters { setter(config); @@ -109,6 +122,13 @@ impl QuickwitRuntimeRegistration { .extend(self.udfs); } + if !self.udafs.is_empty() { + builder + .aggregate_functions() + .get_or_insert_default() + .extend(self.udafs); + } + for (key, factory) in self.table_factories { builder = builder.with_table_factory(key.clone(), Arc::clone(&factory)); let upper = key.to_uppercase(); @@ -126,6 +146,7 @@ impl QuickwitRuntimeRegistration { self.physical_optimizer_rules .extend(other.physical_optimizer_rules); self.udfs.extend(other.udfs); + self.udafs.extend(other.udafs); self.table_factories.extend(other.table_factories); } } diff --git a/quickwit/quickwit-df-core/src/session.rs b/quickwit/quickwit-df-core/src/session.rs index 6cd2752318a..ecc63a3fcb7 100644 --- a/quickwit/quickwit-df-core/src/session.rs +++ b/quickwit/quickwit-df-core/src/session.rs @@ -255,6 +255,7 @@ impl DataFusionSessionBuilder { pub fn check_invariants(&self) -> DFResult<()> { let mut seen_udfs: HashSet = HashSet::new(); + let mut seen_udafs: HashSet = HashSet::new(); for plugin in &self.runtime_plugins { let registration = plugin.registration(); for name in registration.udf_names() { @@ -264,6 +265,13 @@ impl DataFusionSessionBuilder { ))); } } + for name in registration.udaf_names() { + if !seen_udafs.insert(name.clone()) { + return Err(DataFusionError::Configuration(format!( + "two runtime plugins both register an aggregate UDF named '{name}'" + ))); + } + } } Ok(()) }