From af6737e90d677eb9c14320f047c2503987acb2ee Mon Sep 17 00:00:00 2001 From: Alex Bianchi Date: Mon, 27 Apr 2026 14:04:13 -0400 Subject: [PATCH 1/2] support sorted series column --- .../src/sources/metrics/mod.rs | 13 +- .../src/sources/metrics/optimizer.rs | 139 ++++++++++++++ .../src/sources/metrics/table_provider.rs | 180 ++++++++++++++++-- .../src/sources/metrics/test_utils.rs | 3 +- .../tests/common/splits.rs | 3 +- .../quickwit-datafusion/tests/distributed.rs | 56 ++++++ quickwit/quickwit-datafusion/tests/metrics.rs | 58 +++++- 7 files changed, 422 insertions(+), 30 deletions(-) create mode 100644 quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs index bd4f794df86..6b91e436c5c 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs @@ -23,6 +23,7 @@ pub(crate) mod factory; pub(crate) mod index_resolver; pub(crate) mod metastore_provider; +pub(crate) mod optimizer; pub(crate) mod predicate; pub(crate) mod table_provider; @@ -46,6 +47,7 @@ use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient}; use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory}; use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver}; +use self::optimizer::SortedSeriesStreamingAggregateRule; use self::table_provider::MetricsTableProvider; /// Returns `true` when `err` wraps a [`MetastoreError::NotFound`]. @@ -210,7 +212,16 @@ impl QuickwitRuntimePlugin for MetricsDataSource { let factory: Arc = Arc::new(MetricsTableProviderFactory::new( Arc::clone(&self.index_resolver), )); - QuickwitRuntimeRegistration::default().with_table_factory(METRICS_FILE_TYPE, factory) + QuickwitRuntimeRegistration::default() + .with_session_config_setter(|config| { + config + .options_mut() + .optimizer + .enable_round_robin_repartition = false; + config.options_mut().optimizer.repartition_file_scans = false; + }) + .with_physical_optimizer_rule(Arc::new(SortedSeriesStreamingAggregateRule)) + .with_table_factory(METRICS_FILE_TYPE, factory) } } diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs b/quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs new file mode 100644 index 00000000000..31ebfed147a --- /dev/null +++ b/quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs @@ -0,0 +1,139 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Physical rewrites for sorted-series metrics rollups. + +use std::sync::Arc; + +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::config::ConfigOptions; +use datafusion::error::Result as DFResult; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_expr::{LexOrdering, Partitioning}; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN; + +/// Replaces the inner sorted-series hash repartition in rollup plans with a +/// sort-preserving merge into a single final aggregate. +/// +/// This keeps worker/file-local partial aggregation parallel, then lets the +/// coordinator stitch ordered per-series partial rows without hash-shuffling +/// those partials by `sorted_series`. +#[derive(Debug, Default)] +pub struct SortedSeriesStreamingAggregateRule; + +impl PhysicalOptimizerRule for SortedSeriesStreamingAggregateRule { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> DFResult> { + let transformed = plan.transform_up(|plan| { + if let Some(rewritten) = rewrite_sorted_series_final_aggregate(&plan)? { + Ok(Transformed::yes(rewritten)) + } else { + Ok(Transformed::no(plan)) + } + })?; + Ok(transformed.data) + } + + fn name(&self) -> &str { + "sorted_series_streaming_aggregate" + } + + fn schema_check(&self) -> bool { + true + } +} + +fn rewrite_sorted_series_final_aggregate( + plan: &Arc, +) -> DFResult>> { + let Some(final_agg) = plan.as_any().downcast_ref::() else { + return Ok(None); + }; + if final_agg.mode() != &AggregateMode::FinalPartitioned + || !aggregate_groups_on_sorted_series(final_agg) + { + return Ok(None); + } + + let Some(sort) = final_agg.input().as_any().downcast_ref::() else { + return Ok(None); + }; + if !sort.preserve_partitioning() + || sort.fetch().is_some() + || !ordering_starts_with_sorted_series(sort.expr()) + { + return Ok(None); + } + + let Some(repartition) = sort.input().as_any().downcast_ref::() else { + return Ok(None); + }; + if !hash_partitioning_contains_sorted_series(repartition.partitioning()) { + return Ok(None); + } + + let ordering = sort.expr().clone(); + let partition_sort: Arc = Arc::new( + SortExec::new(ordering.clone(), Arc::clone(repartition.input())) + .with_preserve_partitioning(true), + ); + let merged: Arc = + Arc::new(SortPreservingMergeExec::new(ordering, partition_sort)); + + let rewritten = AggregateExec::try_new( + AggregateMode::Final, + final_agg.group_expr().clone(), + final_agg.aggr_expr().to_vec(), + final_agg.filter_expr().to_vec(), + merged, + final_agg.input_schema(), + )? + .with_limit_options(final_agg.limit_options()); + + Ok(Some(Arc::new(rewritten))) +} + +fn aggregate_groups_on_sorted_series(aggregate: &AggregateExec) -> bool { + aggregate + .group_expr() + .expr() + .iter() + .any(|(expr, alias)| alias == SORTED_SERIES_COLUMN || is_sorted_series_column(expr)) +} + +fn hash_partitioning_contains_sorted_series(partitioning: &Partitioning) -> bool { + let Partitioning::Hash(exprs, _) = partitioning else { + return false; + }; + exprs.iter().any(is_sorted_series_column) +} + +fn ordering_starts_with_sorted_series(ordering: &LexOrdering) -> bool { + is_sorted_series_column(&ordering.first().expr) +} + +fn is_sorted_series_column(expr: &Arc) -> bool { + expr.as_any() + .downcast_ref::() + .is_some_and(|column| column.name() == SORTED_SERIES_COLUMN) +} diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs index 59b5941e4e7..e4d870696a1 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs @@ -34,11 +34,14 @@ use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; use datafusion::physical_plan::ExecutionPlan; use datafusion_datasource::PartitionedFile; +use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_physical_plan::expressions::Column; use quickwit_common::uri::Uri; +use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN; use quickwit_parquet_engine::split::ParquetSplitMetadata; +use quickwit_parquet_engine::table_config::ProductType; use tracing::debug; use super::predicate; @@ -207,9 +210,15 @@ impl TableProvider for MetricsTableProvider { let mut builder = FileScanConfigBuilder::new(self.object_store_url.clone(), Arc::new(parquet_source)); - // Add each split as its own file group (one file per partition) - for file in file_groups { - builder = builder.with_file(file); + // Add each split as its own file group (one file per partition). Empty + // scans still need one empty partition so single-partition operators + // such as COUNT(*) can satisfy their distribution requirement. + if file_groups.is_empty() { + builder = builder.with_file_group(FileGroup::new(vec![])); + } else { + for file in file_groups { + builder = builder.with_file(file); + } } if let Some(proj) = projection { @@ -220,11 +229,14 @@ impl TableProvider for MetricsTableProvider { builder = builder.with_limit(Some(lim)); } - // Advertise only the contiguous prefix of the writer sort order present - // in the table schema. Later keys are not globally ordered if an - // earlier discriminator is hidden by the declared schema. - if let Some(ordering) = metrics_output_ordering(&self.schema) { - builder = builder.with_output_ordering(vec![ordering]); + // Advertise every ordering the writer physically guarantees for each + // split. `sorted_series` is a materialized composite prefix of the + // writer sort schema, so it is the preferred single-column series key + // for streaming rollups. The expanded sort schema remains useful for + // queries that group/filter on the raw tag columns. + let output_orderings = metrics_output_orderings(&self.schema, &splits); + if !output_orderings.is_empty() { + builder = builder.with_output_ordering(output_orderings); } let file_scan_config = builder.build(); @@ -265,25 +277,78 @@ fn column_name_from_expr(expr: &Expr) -> Option { predicate::column_name(expr) } -fn metrics_output_ordering(schema: &SchemaRef) -> Option { - let sort_options = SortOptions { +fn sort_expr( + schema: &SchemaRef, + col_name: &str, + sort_options: SortOptions, +) -> Option { + schema + .index_of(col_name) + .ok() + .map(|idx| PhysicalSortExpr::new(Arc::new(Column::new(col_name, idx)), sort_options)) +} + +fn metrics_output_orderings( + schema: &SchemaRef, + splits: &[ParquetSplitMetadata], +) -> Vec { + if !splits_have_default_metrics_sort(splits) { + return Vec::new(); + } + + let ascending = SortOptions { descending: false, nulls_first: false, }; - let sort_exprs: Vec = METRICS_SORT_ORDER + let timestamp_descending = SortOptions { + descending: true, + nulls_first: false, + }; + + let mut orderings = Vec::new(); + + if let Some(sorted_series) = sort_expr(schema, SORTED_SERIES_COLUMN, ascending) { + let mut sort_exprs = vec![sorted_series]; + if let Some(timestamp) = sort_expr(schema, "timestamp_secs", timestamp_descending) { + sort_exprs.push(timestamp); + } + if let Some(ordering) = LexOrdering::new(sort_exprs) { + orderings.push(ordering); + } + } + + // Advertise only the contiguous prefix of the expanded writer sort order + // present in the table schema. Later keys are not globally ordered if an + // earlier discriminator is hidden by the declared schema. + let expanded_sort_exprs: Vec = METRICS_SORT_ORDER .iter() .map_while(|col_name| { - schema.index_of(col_name).ok().map(|idx| { - PhysicalSortExpr::new(Arc::new(Column::new(col_name, idx)), sort_options) - }) + let sort_options = if *col_name == "timestamp_secs" { + timestamp_descending + } else { + ascending + }; + sort_expr(schema, col_name, sort_options) }) .collect(); - LexOrdering::new(sort_exprs) + if let Some(ordering) = LexOrdering::new(expanded_sort_exprs) { + orderings.push(ordering); + } + + orderings +} + +fn splits_have_default_metrics_sort(splits: &[ParquetSplitMetadata]) -> bool { + splits.is_empty() + || splits + .iter() + .all(|split| split.sort_fields.as_str() == ProductType::Metrics.default_sort_fields()) } #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field, Schema}; + use quickwit_parquet_engine::split::TimeRange; use super::*; @@ -309,31 +374,106 @@ mod tests { .collect() } + fn ordering_column_options(ordering: &LexOrdering) -> Vec { + ordering.iter().map(|expr| expr.options).collect() + } + fn expected_names(names: &[&str]) -> Vec { names.iter().map(|name| name.to_string()).collect() } + fn split_with_sort_fields(sort_fields: &str) -> ParquetSplitMetadata { + ParquetSplitMetadata::metrics_builder() + .index_uid("test-index") + .time_range(TimeRange::new(0, 1)) + .sort_fields(sort_fields) + .build() + } + #[test] fn metrics_output_ordering_stops_at_first_missing_sort_key() { let schema = schema_with_columns(&["metric_name", "service", "timestamp_secs"]); - let ordering = metrics_output_ordering(&schema).unwrap(); + let orderings = metrics_output_orderings(&schema, &[]); assert_eq!( - ordering_column_names(&ordering), + ordering_column_names(&orderings[0]), expected_names(&["metric_name", "service"]) ); } #[test] - fn metrics_output_ordering_keeps_timestamp_after_all_discriminators() { + fn metrics_output_ordering_keeps_descending_timestamp_after_all_discriminators() { let schema = schema_with_columns(METRICS_SORT_ORDER); - let ordering = metrics_output_ordering(&schema).unwrap(); + let orderings = metrics_output_orderings(&schema, &[]); assert_eq!( - ordering_column_names(&ordering), + ordering_column_names(&orderings[0]), expected_names(METRICS_SORT_ORDER) ); + let options = ordering_column_options(&orderings[0]); + assert!( + !options[..options.len() - 1] + .iter() + .any(|option| option.descending), + "non-timestamp metrics sort columns should be ascending" + ); + assert!( + options.last().unwrap().descending, + "timestamp_secs must be advertised as descending to match the writer" + ); + } + + #[test] + fn metrics_output_ordering_prefers_sorted_series_when_declared() { + let schema = schema_with_columns(&[ + "metric_name", + "timestamp_secs", + SORTED_SERIES_COLUMN, + "service", + ]); + + let orderings = metrics_output_orderings(&schema, &[]); + + assert_eq!( + ordering_column_names(&orderings[0]), + expected_names(&[SORTED_SERIES_COLUMN, "timestamp_secs"]) + ); + assert!( + ordering_column_options(&orderings[0])[1].descending, + "timestamp_secs must remain descending within sorted_series" + ); + assert_eq!( + ordering_column_names(&orderings[1]), + expected_names(&["metric_name", "service"]) + ); + } + + #[test] + fn metrics_output_ordering_requires_known_default_sort_fields() { + let schema = schema_with_columns(&[ + "metric_name", + "timestamp_secs", + SORTED_SERIES_COLUMN, + "service", + ]); + let unknown_sort_split = split_with_sort_fields(""); + let descending_tag_split = + split_with_sort_fields("metric_name|-service|timeseries_id|timestamp_secs/V2"); + let default_sort_split = split_with_sort_fields(ProductType::Metrics.default_sort_fields()); + + assert!( + metrics_output_orderings(&schema, &[unknown_sort_split]).is_empty(), + "must not advertise ordering for old or unknown sort metadata" + ); + assert!( + metrics_output_orderings(&schema, &[descending_tag_split]).is_empty(), + "must not advertise sorted_series ordering for non-default sort metadata" + ); + assert!( + !metrics_output_orderings(&schema, &[default_sort_split]).is_empty(), + "default metrics sort metadata should enable the advertised ordering" + ); } } diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/test_utils.rs b/quickwit/quickwit-datafusion/src/sources/metrics/test_utils.rs index 888b3dfa157..58272cfc47e 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/test_utils.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/test_utils.rs @@ -360,7 +360,8 @@ async fn write_split( .index_uid("test-index:00000000000000000000000000") .time_range(TimeRange::new(min_ts, max_ts + 1)) .num_rows(batch.num_rows() as u64) - .size_bytes(size_bytes); + .size_bytes(size_bytes) + .sort_fields(TableConfig::default().effective_sort_fields()); for name in &metric_names { builder = builder.add_metric_name(name.clone()); } diff --git a/quickwit/quickwit-datafusion/tests/common/splits.rs b/quickwit/quickwit-datafusion/tests/common/splits.rs index 6498679291a..abbced537d8 100644 --- a/quickwit/quickwit-datafusion/tests/common/splits.rs +++ b/quickwit/quickwit-datafusion/tests/common/splits.rs @@ -114,7 +114,8 @@ pub async fn publish_split( .index_uid(index_uid.to_string()) .time_range(TimeRange::new(min_ts, max_ts + 1)) .num_rows(batch.num_rows() as u64) - .size_bytes(size_bytes); + .size_bytes(size_bytes) + .sort_fields(TableConfig::default().effective_sort_fields()); for name in &metric_names { builder = builder.add_metric_name(name.clone()); } diff --git a/quickwit/quickwit-datafusion/tests/distributed.rs b/quickwit/quickwit-datafusion/tests/distributed.rs index 2182d201a2e..6797328c13c 100644 --- a/quickwit/quickwit-datafusion/tests/distributed.rs +++ b/quickwit/quickwit-datafusion/tests/distributed.rs @@ -171,6 +171,62 @@ async fn test_distributed_tasks_not_shuffles() { .value(0); assert_eq!(cnt, 8); + let rollup_ddl = r#"CREATE OR REPLACE EXTERNAL TABLE "dist-rollup" ( + metric_name VARCHAR NOT NULL, metric_type TINYINT, + timestamp_secs BIGINT NOT NULL, value DOUBLE NOT NULL, + sorted_series BYTEA NOT NULL, service VARCHAR + ) STORED AS metrics LOCATION 'dist-test'"#; + let rollup_sql = format!( + "{rollup_ddl}; + SELECT service, MAX(value) AS max_val + FROM \"dist-rollup\" + WHERE metric_name = 'cpu.usage' + GROUP BY sorted_series, service" + ); + let mut statements = split_sql_statements(&ctx, &rollup_sql).unwrap(); + let query = statements.pop().unwrap(); + for statement in statements { + ctx.sql(&statement).await.unwrap().collect().await.unwrap(); + } + + let df = ctx.sql(&query).await.unwrap(); + let plan = df.clone().create_physical_plan().await.unwrap(); + let plan_str = format!( + "{}", + datafusion::physical_plan::displayable(plan.as_ref()).indent(true) + ); + assert!( + plan_str.contains("NetworkCoalesceExec"), + "expected sorted-series worker streams to be coalesced at the coordinator:\n{plan_str}" + ); + assert!( + plan_str.contains("SortPreservingMergeExec: [sorted_series"), + "expected sorted-series worker streams to be merge-sorted:\n{plan_str}" + ); + assert!( + !plan_str.contains("NetworkShuffleExec"), + "expected no network shuffle for sorted-series finalization:\n{plan_str}" + ); + assert!( + !plan_str.contains("RoundRobinBatch"), + "expected scan/partial stage parallelism to stay split-bounded:\n{plan_str}" + ); + assert!( + !plan_str.contains("Hash([sorted_series"), + "expected no hash repartition by sorted_series:\n{plan_str}" + ); + + let batches = df.collect().await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); + let max_val = batches[0] + .column_by_name("max_val") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert!((max_val - 0.4).abs() < f64::EPSILON); + // Keep the worker handles alive until we've finished collecting results. drop(worker_a); drop(worker_b); diff --git a/quickwit/quickwit-datafusion/tests/metrics.rs b/quickwit/quickwit-datafusion/tests/metrics.rs index 66f8572b79f..6dd6228c194 100644 --- a/quickwit/quickwit-datafusion/tests/metrics.rs +++ b/quickwit/quickwit-datafusion/tests/metrics.rs @@ -486,10 +486,11 @@ async fn test_in_list_tag_filter_returns_all_matching_rows() { /// is compiled to SQL over two tables joined on `bhandle` (a tag hash). /// /// With our wide-format parquet model every data point carries its own tags -/// as columns, so the same query is a single two-level aggregation: +/// as columns, and `sorted_series` carries the per-series handle, so the same +/// query is a single two-level aggregation: /// -/// 1. Inner GROUP BY (service, host, time_bin) → MAX(value) per series per bin -/// 2. Outer GROUP BY (service, time_bin) → AVG(max) across hosts per bin +/// 1. Inner GROUP BY (sorted_series, service, time_bin) → MAX(value) per series per bin +/// 2. Outer GROUP BY (service, time_bin) → AVG(max) across series per bin /// /// Three prod series, one staging series (must be filtered out): /// web / host=web-01: values 1,2,3,4,5,6 at t=0,15,30,45,60,75 @@ -594,7 +595,7 @@ async fn test_rollup_nested_aggregation() { // The query mirrors the Datadog rollup pattern without a context/points join: // avg:cpu.usage{env:prod} by {service}.rollup(max, 30) // - // Step 1 (inner): MAX per series (service + host) per 30-second bin. + // Step 1 (inner): MAX per sorted_series per 30-second bin. // Step 2 (outer): AVG of those per-series maxes, grouped by service. // // to_timestamp_seconds() converts the stored epoch-seconds UInt64 to a @@ -605,14 +606,15 @@ async fn test_rollup_nested_aggregation() { metric_type TINYINT, timestamp_secs BIGINT NOT NULL, value DOUBLE NOT NULL, + sorted_series BYTEA NOT NULL, service VARCHAR, env VARCHAR, host VARCHAR ) STORED AS metrics LOCATION 'rollup-test'; WITH bin_max AS ( SELECT + sorted_series AS bhdl, service, - host, date_bin( INTERVAL '30 seconds', to_timestamp_seconds(timestamp_secs) @@ -621,7 +623,7 @@ async fn test_rollup_nested_aggregation() { FROM "rollup-test" WHERE metric_name = 'cpu.usage' AND env = 'prod' - GROUP BY service, host, time_bin + GROUP BY bhdl, time_bin, service ) SELECT service, @@ -632,7 +634,49 @@ async fn test_rollup_nested_aggregation() { ORDER BY time_bin, service "#; - let batches = run_sql(&builder, sql).await; + let ctx = builder.build_session().unwrap(); + let mut statements = split_sql_statements(&ctx, sql).unwrap(); + let query = statements.pop().unwrap(); + for statement in statements { + ctx.sql(&statement).await.unwrap().collect().await.unwrap(); + } + let df = ctx.sql(&query).await.unwrap(); + let plan = df.clone().create_physical_plan().await.unwrap(); + let plan_str = format!( + "{}", + datafusion::physical_plan::displayable(plan.as_ref()).indent(true) + ); + assert!( + plan_str.contains("sorted_series"), + "expected the rollup plan to group on sorted_series:\n{plan_str}" + ); + assert!( + plan_str.contains("ordering_mode=PartiallySorted") + || plan_str.contains("ordering_mode=Sorted"), + "expected sorted_series ordering to reach AggregateExec:\n{plan_str}" + ); + assert!( + plan_str.contains("AggregateExec: mode=Final, gby=[sorted_series"), + "expected sorted_series finalization to be single-partition streaming after merge:\n{plan_str}" + ); + assert!( + plan_str.contains("SortPreservingMergeExec: [sorted_series"), + "expected sorted_series partials to be merge-sorted before finalization:\n{plan_str}" + ); + assert!( + !plan_str.contains("Hash([sorted_series"), + "expected no hash repartition by sorted_series:\n{plan_str}" + ); + assert!( + !plan_str.contains("RoundRobinBatch"), + "expected scan/partial stage parallelism to stay split-bounded:\n{plan_str}" + ); + assert!( + plan_str.contains("file_groups={4 groups"), + "expected one scan partition per split, not byte-range split partitions:\n{plan_str}" + ); + + let batches = df.collect().await.unwrap(); // 3 bins × 2 services (web, api) = 6 result rows. assert_eq!( From 26f903d525ed636dc4d466984712be724ee16b15 Mon Sep 17 00:00:00 2001 From: Alex Bianchi Date: Mon, 27 Apr 2026 20:04:48 -0400 Subject: [PATCH 2/2] respond to review --- .../src/sources/metrics/optimizer.rs | 24 ++++++++++++------- .../quickwit-datafusion/tests/distributed.rs | 5 ++++ quickwit/quickwit-datafusion/tests/metrics.rs | 8 ++++++- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs b/quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs index 31ebfed147a..3dbfc8b786a 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/optimizer.rs @@ -22,11 +22,11 @@ use datafusion::error::Result as DFResult; use datafusion::physical_expr::expressions::Column; use datafusion::physical_expr::{LexOrdering, Partitioning}; use datafusion::physical_optimizer::PhysicalOptimizerRule; -use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN; /// Replaces the inner sorted-series hash repartition in rollup plans with a @@ -93,10 +93,17 @@ fn rewrite_sorted_series_final_aggregate( } let ordering = sort.expr().clone(); - let partition_sort: Arc = Arc::new( - SortExec::new(ordering.clone(), Arc::clone(repartition.input())) - .with_preserve_partitioning(true), - ); + let repartition_input = Arc::clone(repartition.input()); + let partition_sort: Arc = if repartition_input + .equivalence_properties() + .ordering_satisfy(ordering.clone())? + { + repartition_input + } else { + Arc::new( + SortExec::new(ordering.clone(), repartition_input).with_preserve_partitioning(true), + ) + }; let merged: Arc = Arc::new(SortPreservingMergeExec::new(ordering, partition_sort)); @@ -133,7 +140,8 @@ fn ordering_starts_with_sorted_series(ordering: &LexOrdering) -> bool { } fn is_sorted_series_column(expr: &Arc) -> bool { - expr.as_any() - .downcast_ref::() - .is_some_and(|column| column.name() == SORTED_SERIES_COLUMN) + match expr.as_any().downcast_ref::() { + Some(column) => column.name() == SORTED_SERIES_COLUMN, + None => false, + } } diff --git a/quickwit/quickwit-datafusion/tests/distributed.rs b/quickwit/quickwit-datafusion/tests/distributed.rs index 6797328c13c..a4d08e10147 100644 --- a/quickwit/quickwit-datafusion/tests/distributed.rs +++ b/quickwit/quickwit-datafusion/tests/distributed.rs @@ -203,6 +203,11 @@ async fn test_distributed_tasks_not_shuffles() { plan_str.contains("SortPreservingMergeExec: [sorted_series"), "expected sorted-series worker streams to be merge-sorted:\n{plan_str}" ); + assert!( + !plan_str.contains("SortExec: expr=[sorted_series"), + "expected sorted-series worker streams to use preserved ordering without an explicit \ + sort:\n{plan_str}" + ); assert!( !plan_str.contains("NetworkShuffleExec"), "expected no network shuffle for sorted-series finalization:\n{plan_str}" diff --git a/quickwit/quickwit-datafusion/tests/metrics.rs b/quickwit/quickwit-datafusion/tests/metrics.rs index 6dd6228c194..ccee9a6c648 100644 --- a/quickwit/quickwit-datafusion/tests/metrics.rs +++ b/quickwit/quickwit-datafusion/tests/metrics.rs @@ -657,12 +657,18 @@ async fn test_rollup_nested_aggregation() { ); assert!( plan_str.contains("AggregateExec: mode=Final, gby=[sorted_series"), - "expected sorted_series finalization to be single-partition streaming after merge:\n{plan_str}" + "expected sorted_series finalization to be single-partition streaming after \ + merge:\n{plan_str}" ); assert!( plan_str.contains("SortPreservingMergeExec: [sorted_series"), "expected sorted_series partials to be merge-sorted before finalization:\n{plan_str}" ); + assert!( + !plan_str.contains("SortExec: expr=[sorted_series"), + "expected sorted_series partials to use preserved ordering without an explicit \ + sort:\n{plan_str}" + ); assert!( !plan_str.contains("Hash([sorted_series"), "expected no hash repartition by sorted_series:\n{plan_str}"