From 910c5208b934b5ca12aaa076b612706a6a866e69 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:38:47 +0000 Subject: [PATCH 01/10] feat: reorder row groups by statistics during sort pushdown When sort pushdown is active, reorder row groups within each file by their min/max statistics to match the requested sort order. This helps TopK queries find optimal values first via dynamic filter pushdown. - Add reorder_by_statistics to PreparedAccessPlan that sorts row_group_indexes by the first sort column's min values - Pass sort order from ParquetSource::try_pushdown_sort through to the opener via sort_order_for_reorder field - Reorder happens after pruning but before reverse (they compose) - Gracefully skips reorder when statistics unavailable, sort expr is not a simple column, row_selection present, or <=1 row groups Closes #21317 --- .../datasource-parquet/src/access_plan.rs | 326 ++++++++++++++++++ .../src/access_plan_optimizer.rs | 107 ++++++ datafusion/datasource-parquet/src/opener.rs | 22 +- datafusion/datasource-parquet/src/source.rs | 10 +- 4 files changed, 462 insertions(+), 3 deletions(-) create mode 100644 datafusion/datasource-parquet/src/access_plan_optimizer.rs diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index ca4d097c37a44..d98f64044db37 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -16,7 +16,12 @@ // under the License. use crate::sort::reverse_row_selection; +use arrow::datatypes::Schema; use datafusion_common::{Result, assert_eq_or_internal_err}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use log::debug; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; @@ -377,6 +382,106 @@ impl PreparedAccessPlan { }) } + /// Reorder row groups by their min statistics for the given sort order. + /// + /// This helps TopK queries find optimal values first. For ASC sort, + /// row groups with the smallest min values come first. For DESC sort, + /// row groups with the largest min values come first. + /// + /// Gracefully skips reordering when: + /// - There is a row_selection (too complex to remap) + /// - 0 or 1 row groups (nothing to reorder) + /// - Sort expression is not a simple column reference + /// - Statistics are unavailable + pub(crate) fn reorder_by_statistics( + mut self, + sort_order: &LexOrdering, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result { + // Skip if row_selection present (too complex to remap) + if self.row_selection.is_some() { + debug!("Skipping RG reorder: row_selection present"); + return Ok(self); + } + + // Nothing to reorder + if self.row_group_indexes.len() <= 1 { + return Ok(self); + } + + // Get the first sort expression + // LexOrdering is guaranteed non-empty, so first() returns &PhysicalSortExpr + let first_sort_expr = sort_order.first(); + + // Extract column name from sort expression + let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::() + { + Some(col) => col, + None => { + debug!("Skipping RG reorder: sort expr is not a simple column"); + return Ok(self); + } + }; + + let descending = first_sort_expr.options.descending; + + // Build statistics converter for this column + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + file_metadata.file_metadata().schema_descr(), + ) { + Ok(c) => c, + Err(e) => { + debug!("Skipping RG reorder: cannot create stats converter: {e}"); + return Ok(self); + } + }; + + // Get min values for the selected row groups + let rg_metadata: Vec<&RowGroupMetaData> = self + .row_group_indexes + .iter() + .map(|&idx| file_metadata.row_group(idx)) + .collect(); + + let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); + } + }; + + // Sort indices by min values + let sort_options = arrow::compute::SortOptions { + descending, + nulls_first: first_sort_expr.options.nulls_first, + }; + let sorted_indices = match arrow::compute::sort_to_indices( + &min_values, + Some(sort_options), + None, + ) { + Ok(indices) => indices, + Err(e) => { + debug!("Skipping RG reorder: sort failed: {e}"); + return Ok(self); + } + }; + + // Apply the reordering + let original_indexes = self.row_group_indexes.clone(); + self.row_group_indexes = sorted_indices + .values() + .iter() + .map(|&i| original_indexes[i as usize]) + .collect(); + + Ok(self) + } + /// Reverse the access plan for reverse scanning pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { // Get the row group indexes before reversing @@ -614,4 +719,225 @@ mod test { .unwrap(); Arc::new(SchemaDescriptor::new(Arc::new(schema))) } + + // ---- reorder_by_statistics tests ---- + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::FileMetaData; + use parquet::file::statistics::Statistics; + use parquet::schema::types::Type as SchemaType; + + /// Create ParquetMetaData with row groups that have Int32 min/max stats + fn make_metadata_with_stats(min_max_pairs: &[(i32, i32)]) -> ParquetMetaData { + let field = SchemaType::primitive_type_builder("id", PhysicalType::INT32) + .build() + .unwrap(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(vec![Arc::new(field)]) + .build() + .unwrap(); + let schema_descr = Arc::new(SchemaDescriptor::new(Arc::new(schema))); + + let row_groups: Vec = min_max_pairs + .iter() + .map(|(min, max)| { + let stats = + Statistics::int32(Some(*min), Some(*max), None, Some(100), false); + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(100) + .set_statistics(stats) + .build() + .unwrap(); + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(100) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect(); + + let file_meta = FileMetaData::new( + 1, + min_max_pairs.len() as i64 * 100, + None, + None, + schema_descr, + None, + ); + ParquetMetaData::new(file_meta, row_groups) + } + + fn make_sort_order_asc() -> LexOrdering { + LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new(Column::new( + "id", 0, + )))]) + .unwrap() + } + + fn make_sort_order_desc() -> LexOrdering { + use arrow::compute::SortOptions; + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("id", 0)), + SortOptions { + descending: true, + nulls_first: false, + }, + )]) + .unwrap() + } + + fn make_arrow_schema() -> Schema { + Schema::new(vec![Field::new("id", DataType::Int32, false)]) + } + + #[test] + fn test_reorder_by_statistics_asc() { + // RGs in wrong order: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should be reordered: RG2(1-30), RG0(50-99), RG1(200-299) + assert_eq!(plan.row_group_indexes, vec![2, 0, 1]); + } + + #[test] + fn test_reorder_by_statistics_desc() { + // RGs: [50-99, 200-299, 1-30] + let metadata = make_metadata_with_stats(&[(50, 99), (200, 299), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30) + assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + } + + #[test] + fn test_reorder_by_statistics_single_rg() { + let metadata = make_metadata_with_stats(&[(1, 100)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Single RG, no reorder + assert_eq!(plan.row_group_indexes, vec![0]); + } + + #[test] + fn test_reorder_by_statistics_with_skipped_rgs() { + // 4 RGs but only 0, 2, 3 are selected (RG1 was pruned) + let metadata = + make_metadata_with_stats(&[(300, 400), (100, 200), (1, 50), (50, 99)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 2, 3], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Reorder selected RGs by min: RG2(1-50), RG3(50-99), RG0(300-400) + assert_eq!(plan.row_group_indexes, vec![2, 3, 0]); + } + + #[test] + fn test_reorder_by_statistics_skips_with_row_selection() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let selection = RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + RowSelector::select(100), + ]); + + let plan = PreparedAccessPlan::new(vec![0, 1], Some(selection)).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because row_selection is present + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_already_sorted() { + // Already in correct ASC order + let metadata = make_metadata_with_stats(&[(1, 30), (50, 99), (200, 299)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_asc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Already sorted, order preserved + assert_eq!(plan.row_group_indexes, vec![0, 1, 2]); + } + + #[test] + fn test_reorder_by_statistics_skips_non_column_expr() { + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::BinaryExpr; + + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + let schema = make_arrow_schema(); + + // Sort expression is a binary expression (id + 1), not a simple column + let expr = Arc::new(BinaryExpr::new( + Arc::new(Column::new("id", 0)), + Operator::Plus, + Arc::new(datafusion_physical_expr::expressions::Literal::new( + datafusion_common::ScalarValue::Int32(Some(1)), + )), + )); + let sort_order = + LexOrdering::new(vec![PhysicalSortExpr::new_default(expr)]).unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because sort expr is not a simple column + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } + + #[test] + fn test_reorder_by_statistics_skips_missing_column() { + let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); + // Schema has "id" but sort order references "nonexistent" + let schema = make_arrow_schema(); + let sort_order = LexOrdering::new(vec![PhysicalSortExpr::new_default(Arc::new( + Column::new("nonexistent", 99), + ))]) + .unwrap(); + + let plan = PreparedAccessPlan::new(vec![0, 1], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Should NOT reorder because column not found in schema + assert_eq!(plan.row_group_indexes, vec![0, 1]); + } } diff --git a/datafusion/datasource-parquet/src/access_plan_optimizer.rs b/datafusion/datasource-parquet/src/access_plan_optimizer.rs new file mode 100644 index 0000000000000..885dc0b5656ee --- /dev/null +++ b/datafusion/datasource-parquet/src/access_plan_optimizer.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`AccessPlanOptimizer`] trait and implementations for optimizing +//! row group access order during parquet scans. +//! +//! Applied after row group pruning but before building the decoder, +//! these optimizers reorder (or reverse) the row groups to improve +//! query performance — e.g., placing the "best" row groups first +//! so TopK's dynamic filter threshold tightens quickly. + +use crate::access_plan::PreparedAccessPlan; +use arrow::datatypes::Schema; +use datafusion_common::Result; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use parquet::file::metadata::ParquetMetaData; +use std::fmt::Debug; + +/// Optimizes the row group access order for a prepared access plan. +/// +/// Implementations can reorder, reverse, or otherwise transform the +/// row group read order to improve scan performance. The optimizer +/// is applied once per file, after all pruning passes are complete. +/// +/// # Examples +/// +/// - [`ReverseRowGroups`]: simple O(n) reversal for DESC on ASC-sorted data +/// - [`ReorderByStatistics`]: sort row groups by min/max statistics +/// so TopK queries find optimal values first +pub(crate) trait AccessPlanOptimizer: Send + Sync + Debug { + /// Transform the prepared access plan. + /// + /// Implementations should return the plan unchanged if they cannot + /// apply their optimization (e.g., missing statistics). + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result; +} + +/// Reverse the row group order — simple O(n) reversal. +/// +/// Used as a fallback when the sort column has no statistics available. +/// For ASC-sorted files with a DESC query, reversing row groups places +/// the highest-value row groups first. +#[derive(Debug)] +pub(crate) struct ReverseRowGroups; + +impl AccessPlanOptimizer for ReverseRowGroups { + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + _arrow_schema: &Schema, + ) -> Result { + plan.reverse(file_metadata) + } +} + +/// Reorder row groups by min/max statistics of the sort column. +/// +/// For ASC sort: row groups with the smallest min come first. +/// For DESC sort: row groups with the largest max come first. +/// +/// This is more effective than [`ReverseRowGroups`] when row groups +/// are out of order (e.g., append-heavy workloads), because it uses +/// actual statistics rather than assuming the original order is sorted. +/// +/// Gracefully falls back to the original order when statistics are +/// unavailable, the sort expression is not a simple column, etc. +#[derive(Debug)] +pub(crate) struct ReorderByStatistics { + sort_order: LexOrdering, +} + +impl ReorderByStatistics { + pub(crate) fn new(sort_order: LexOrdering) -> Self { + Self { sort_order } + } +} + +impl AccessPlanOptimizer for ReorderByStatistics { + fn optimize( + &self, + plan: PreparedAccessPlan, + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + ) -> Result { + plan.reorder_by_statistics(&self.sort_order, file_metadata, arrow_schema) + } +} diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..5a381f6bf975f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -50,6 +50,7 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ PhysicalExpr, is_dynamic_physical_expr, }; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, PruningMetrics, @@ -136,6 +137,10 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by their min/max statistics. + /// When set, row groups are reordered before reading so that row groups likely + /// to contain optimal values (for TopK queries) are read first. + pub sort_order_for_reorder: Option, } impl fmt::Debug for ParquetMorselizer { @@ -286,6 +291,7 @@ struct PreparedParquetOpen { predicate_creation_errors: Count, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, @@ -655,6 +661,7 @@ impl ParquetMorselizer { predicate_creation_errors, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, @@ -1126,6 +1133,16 @@ impl RowGroupsPrunedParquetOpen { // Prepare the access plan (extract row groups and row selection) let mut prepared_plan = access_plan.prepare(rg_metadata)?; + // Reorder row groups by statistics if sort order is known. + // This helps TopK queries find optimal values first. + if let Some(sort_order) = &prepared.sort_order_for_reorder { + prepared_plan = prepared_plan.reorder_by_statistics( + sort_order, + file_metadata.as_ref(), + &prepared.physical_file_schema, + )?; + } + // Potentially reverse the access plan for performance. // See `ParquetSource::try_pushdown_sort` for the rationale. if prepared.reverse_row_groups { @@ -1644,6 +1661,7 @@ mod test { use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, replace_columns_with_literals, }; + use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::StreamExt; use futures::stream::BoxStream; @@ -1675,6 +1693,7 @@ mod test { coerce_int96: Option, max_predicate_cache_size: Option, reverse_row_groups: bool, + sort_order_for_reorder: Option, preserve_order: bool, } @@ -1701,6 +1720,7 @@ mod test { coerce_int96: None, max_predicate_cache_size: None, reverse_row_groups: false, + sort_order_for_reorder: None, preserve_order: false, } } @@ -1816,6 +1836,7 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder, } } } @@ -1840,7 +1861,6 @@ mod test { let Some(planner) = planners.pop_front() else { return Ok(Box::pin(futures::stream::empty())); - }; if let Some(mut plan) = planner.plan()? { morsels.extend(plan.take_morsels()); diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..5586e29a9eae5 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -55,7 +55,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use itertools::Itertools; use object_store::ObjectStore; #[cfg(feature = "parquet_encryption")] @@ -294,6 +294,8 @@ pub struct ParquetSource { /// so we still need to sort them after reading, so the reverse scan is inexact. /// Used to optimize ORDER BY ... DESC on sorted data. reverse_row_groups: bool, + /// Optional sort order used to reorder row groups by min/max statistics. + sort_order_for_reorder: Option, } impl ParquetSource { @@ -319,6 +321,7 @@ impl ParquetSource { #[cfg(feature = "parquet_encryption")] encryption_factory: None, reverse_row_groups: false, + sort_order_for_reorder: None, } } @@ -580,6 +583,7 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + sort_order_for_reorder: self.sort_order_for_reorder.clone(), })) } @@ -821,7 +825,9 @@ impl FileSource for ParquetSource { // Return Inexact because we're only reversing row group order, // not guaranteeing perfect row-level ordering - let new_source = self.clone().with_reverse_row_groups(true); + let sort_order = LexOrdering::new(order.iter().cloned()); + let mut new_source = self.clone().with_reverse_row_groups(true); + new_source.sort_order_for_reorder = sort_order; Ok(SortOrderPushdownResult::Inexact { inner: Arc::new(new_source) as Arc, }) From 10803e2f29369a5df84ab602871bbeade7746994 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:48:06 +0000 Subject: [PATCH 02/10] test: add SLT tests for row group reorder by statistics --- .../sqllogictest/test_files/sort_pushdown.slt | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index b6c75f3977010..4f41c4b08aecf 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2271,6 +2271,89 @@ DROP TABLE tg_src_high; statement ok DROP TABLE tg_buffer; +# =========================================================== +# Test H: Row group reorder by statistics for TopK queries. +# When a file has multiple row groups with overlapping or +# out-of-order statistics, sort pushdown returns Inexact and +# `reorder_by_statistics` reorders row groups within the file +# so TopK finds optimal values first. +# =========================================================== + +# Create a table with 30 rows and write to parquet with small row groups +# so we get multiple row groups per file. Rows are inserted in a mixed +# order so row groups span overlapping ranges (forcing Inexact path). +statement ok +CREATE TABLE th_mixed(id INT, value INT) AS VALUES + (15, 150), (5, 50), (25, 250), + (10, 100), (20, 200), (1, 10), + (30, 300), (3, 30), (18, 180); + +# Write with row_group_size=3 → 3 rows per RG, 3 RGs total +# RG statistics (unsorted order): RG0(5-25), RG1(1-20), RG2(3-30) +# Note: files are overlapping → Inexact path → TopK retained +query I +COPY (SELECT * FROM th_mixed) +TO 'test_files/scratch/sort_pushdown/th_reorder/data.parquet' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' '3'); +---- +9 + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +CREATE EXTERNAL TABLE th_reorder(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/th_reorder/data.parquet'; + +# Test H.1: ASC ORDER BY with LIMIT — reorder helps TopK find min values first +# Results must be correct regardless of RG reorder. +query II +SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Test H.2: DESC ORDER BY with LIMIT — reorder + reverse compose +query II +SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +30 300 +25 250 +20 200 + +# Test H.3: Full sort (no LIMIT) — output must still be correctly sorted +query II +SELECT * FROM th_reorder ORDER BY id ASC; +---- +1 10 +3 30 +5 50 +10 100 +15 150 +18 180 +20 200 +25 250 +30 300 + +# Test H.4: ORDER BY expression (not a simple column) — reorder should +# gracefully skip, results still correct +query II +SELECT id, value FROM th_reorder ORDER BY id + 1 ASC LIMIT 3; +---- +1 10 +3 30 +5 50 + +# Cleanup Test H +statement ok +DROP TABLE th_mixed; + +statement ok +DROP TABLE th_reorder; + # Reset settings (SLT runner uses target_partitions=4, not system default) statement ok SET datafusion.execution.target_partitions = 4; From 714477aeaebc4c5f0a3eb661199620ede11815a3 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 06:56:39 +0000 Subject: [PATCH 03/10] test: add EXPLAIN assertions for row group reorder tests --- .../sqllogictest/test_files/sort_pushdown.slt | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 4f41c4b08aecf..a9d512228fc14 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2307,7 +2307,19 @@ CREATE EXTERNAL TABLE th_reorder(id INT, value INT) STORED AS PARQUET LOCATION 'test_files/scratch/sort_pushdown/th_reorder/data.parquet'; -# Test H.1: ASC ORDER BY with LIMIT — reorder helps TopK find min values first +# Test H.1: ASC ORDER BY with LIMIT — Inexact path (file has no declared ordering) +# Plan: SortExec(TopK) preserved. RG reorder happens inside DataSourceExec +# (not visible in EXPLAIN, but verified by unit tests in access_plan.rs). +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id ASC NULLS LAST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + # Results must be correct regardless of RG reorder. query II SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; @@ -2317,6 +2329,16 @@ SELECT * FROM th_reorder ORDER BY id ASC LIMIT 3; 5 50 # Test H.2: DESC ORDER BY with LIMIT — reorder + reverse compose +query TT +EXPLAIN SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: th_reorder.id DESC NULLS FIRST, fetch=3 +02)--TableScan: th_reorder projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/th_reorder/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ] + query II SELECT * FROM th_reorder ORDER BY id DESC LIMIT 3; ---- From 768e44d4329785ec3ea6e4f0e64f0d9cc153d670 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 13 Apr 2026 07:35:19 +0000 Subject: [PATCH 04/10] fix: use max statistics for DESC sort reorder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For overlapping row group ranges, sorting by min for DESC can pick a worse first RG. Example: RG0(50-60) vs RG1(40-100) — min DESC picks RG0 first (max=60), but RG1 contains the largest values (max=100). Use min for ASC and max for DESC to correctly prioritize the row group most likely to contain the optimal values for TopK. --- .../datasource-parquet/src/access_plan.rs | 77 ++++++++++++++----- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index d98f64044db37..3f1a0a675e361 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -439,37 +439,51 @@ impl PreparedAccessPlan { } }; - // Get min values for the selected row groups + // Get the relevant statistics for the selected row groups. + // For ASC sort: use min values — we want the RG with the smallest min + // to come first (best candidate for "smallest values"). + // For DESC sort: use max values — we want the RG with the largest max + // to come first (best candidate for "largest values"). Using min for + // DESC can pick a worse first RG when ranges overlap (e.g., RG0 50-60 + // vs RG1 40-100 — RG1 has larger values but smaller min). let rg_metadata: Vec<&RowGroupMetaData> = self .row_group_indexes .iter() .map(|&idx| file_metadata.row_group(idx)) .collect(); - let min_values = match converter.row_group_mins(rg_metadata.iter().copied()) { - Ok(vals) => vals, - Err(e) => { - debug!("Skipping RG reorder: cannot get min values: {e}"); - return Ok(self); + let stat_values = if descending { + match converter.row_group_maxes(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get max values: {e}"); + return Ok(self); + } + } + } else { + match converter.row_group_mins(rg_metadata.iter().copied()) { + Ok(vals) => vals, + Err(e) => { + debug!("Skipping RG reorder: cannot get min values: {e}"); + return Ok(self); + } } }; - // Sort indices by min values + // Sort indices by statistic values (min for ASC, max for DESC) let sort_options = arrow::compute::SortOptions { descending, nulls_first: first_sort_expr.options.nulls_first, }; - let sorted_indices = match arrow::compute::sort_to_indices( - &min_values, - Some(sort_options), - None, - ) { - Ok(indices) => indices, - Err(e) => { - debug!("Skipping RG reorder: sort failed: {e}"); - return Ok(self); - } - }; + let sorted_indices = + match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None) + { + Ok(indices) => indices, + Err(e) => { + debug!("Skipping RG reorder: sort failed: {e}"); + return Ok(self); + } + }; // Apply the reordering let original_indexes = self.row_group_indexes.clone(); @@ -821,7 +835,7 @@ mod test { .reorder_by_statistics(&sort_order, &metadata, &schema) .unwrap(); - // DESC: largest min first: RG1(200-299), RG0(50-99), RG2(1-30) + // DESC: largest max first: RG1(max=299), RG0(max=99), RG2(max=30) assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); } @@ -922,6 +936,31 @@ mod test { assert_eq!(plan.row_group_indexes, vec![0, 1]); } + #[test] + fn test_reorder_by_statistics_desc_uses_max_for_overlapping_rgs() { + // Overlapping ranges where min DESC would pick worse RG than max DESC: + // RG0: 50-60 (small range, moderate max) + // RG1: 40-100 (wide range, high max but lower min) + // RG2: 20-30 (low max) + // + // For ORDER BY DESC LIMIT N: + // Using min DESC: [RG0(50), RG1(40), RG2(20)] → reads RG0 first (max=60 only) + // Using max DESC: [RG1(100), RG0(60), RG2(30)] → reads RG1 first (max=100) + // + // RG1 is the better first choice for DESC because it contains the largest values. + let metadata = make_metadata_with_stats(&[(50, 60), (40, 100), (20, 30)]); + let schema = make_arrow_schema(); + let sort_order = make_sort_order_desc(); + + let plan = PreparedAccessPlan::new(vec![0, 1, 2], None).unwrap(); + let plan = plan + .reorder_by_statistics(&sort_order, &metadata, &schema) + .unwrap(); + + // Expected: RG1 (max=100) first, then RG0 (max=60), then RG2 (max=30) + assert_eq!(plan.row_group_indexes, vec![1, 0, 2]); + } + #[test] fn test_reorder_by_statistics_skips_missing_column() { let metadata = make_metadata_with_stats(&[(50, 99), (1, 30)]); From 1d06fe771850a55abe3c917106e7842faa9f6ea2 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 14 Apr 2026 06:23:36 +0000 Subject: [PATCH 05/10] fix: prevent reorder+reverse double-reordering of row groups When sort_order_for_reorder is set, reorder_by_statistics already handles the sort direction (min for ASC, max for DESC). Applying reverse on top would undo the reorder. Use else-if so only one strategy is applied. Also adds sort_pushdown_inexact benchmark with pushdown_filters enabled to measure RG reorder benefit on wide-row TopK queries. --- benchmarks/src/sort_pushdown.rs | 9 ++++++++- datafusion/datasource-parquet/src/opener.rs | 13 +++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/benchmarks/src/sort_pushdown.rs b/benchmarks/src/sort_pushdown.rs index e7fce1921e7a8..e2a4615a3ef39 100644 --- a/benchmarks/src/sort_pushdown.rs +++ b/benchmarks/src/sort_pushdown.rs @@ -159,7 +159,14 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let sql = self.load_query(query_id)?; - let config = self.common.config()?; + let mut config = self.common.config()?; + // Enable parquet filter pushdown + late materialization. This is + // essential for the Inexact sort pushdown path: TopK's dynamic + // filter is pushed to the parquet reader, so only sort-column + // rows pass the filter's Decode non-sort columns are skipped for + // rows that don't pass the filter — this is where RG reorder's + // tight-threshold-first strategy pays off for wide-row queries. + config.options_mut().execution.parquet.pushdown_filters = true; let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 5a381f6bf975f..fa5bbaced9878 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1134,18 +1134,19 @@ impl RowGroupsPrunedParquetOpen { let mut prepared_plan = access_plan.prepare(rg_metadata)?; // Reorder row groups by statistics if sort order is known. - // This helps TopK queries find optimal values first. + // This helps TopK queries find optimal values first by placing + // row groups with optimal min/max values at the front. + // When reorder is active, skip reverse — reorder already encodes + // the direction (uses min for ASC, max for DESC). if let Some(sort_order) = &prepared.sort_order_for_reorder { prepared_plan = prepared_plan.reorder_by_statistics( sort_order, file_metadata.as_ref(), &prepared.physical_file_schema, )?; - } - - // Potentially reverse the access plan for performance. - // See `ParquetSource::try_pushdown_sort` for the rationale. - if prepared.reverse_row_groups { + } else if prepared.reverse_row_groups { + // Fallback: simple reverse when no sort order statistics available. + // See `ParquetSource::try_pushdown_sort` for the rationale. prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } From d1357631630038ff08df4d0e5905e192cffdc909 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 16 Apr 2026 10:41:56 +0000 Subject: [PATCH 06/10] fix: rebase conflicts and compilation errors --- datafusion/datasource-parquet/src/access_plan.rs | 3 +-- datafusion/datasource-parquet/src/opener.rs | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 3f1a0a675e361..77b538bd52249 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -415,8 +415,7 @@ impl PreparedAccessPlan { let first_sort_expr = sort_order.first(); // Extract column name from sort expression - let column: &Column = match first_sort_expr.expr.as_any().downcast_ref::() - { + let column: &Column = match first_sort_expr.expr.downcast_ref::() { Some(col) => col, None => { debug!("Skipping RG reorder: sort expr is not a simple column"); diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index fa5bbaced9878..d91fc9900f445 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1862,6 +1862,7 @@ mod test { let Some(planner) = planners.pop_front() else { return Ok(Box::pin(futures::stream::empty())); + }; if let Some(mut plan) = planner.plan()? { morsels.extend(plan.take_morsels()); From 32b1b951d31f5ffbb86a36e2c80b1f90f3136da4 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 16 Apr 2026 12:13:51 +0000 Subject: [PATCH 07/10] refactor: introduce AccessPlanOptimizer trait for row group reordering Address review feedback: extract row group optimization into a trait instead of post-pass flags. prepare_with_optimizer() integrates the optimization into the access plan preparation step. - AccessPlanOptimizer trait with ReverseRowGroups and ReorderByStatistics - prepare_with_optimizer() applies optimizer inside prepare flow - Original prepare() unchanged for backward compatibility --- .../datasource-parquet/src/access_plan.rs | 15 ++++++- datafusion/datasource-parquet/src/mod.rs | 1 + datafusion/datasource-parquet/src/opener.rs | 44 ++++++++++++------- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 77b538bd52249..32085fa8a177a 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -351,9 +351,22 @@ impl ParquetAccessPlan { ) -> Result { let row_group_indexes = self.row_group_indexes(); let row_selection = self.into_overall_row_selection(row_group_meta_data)?; - PreparedAccessPlan::new(row_group_indexes, row_selection) } + + /// Like [`prepare`](Self::prepare), but also applies an + /// [`AccessPlanOptimizer`] to reorder/reverse row groups after + /// preparing the plan. + pub(crate) fn prepare_with_optimizer( + self, + row_group_meta_data: &[RowGroupMetaData], + file_metadata: &ParquetMetaData, + arrow_schema: &Schema, + optimizer: &dyn crate::access_plan_optimizer::AccessPlanOptimizer, + ) -> Result { + let plan = self.prepare(row_group_meta_data)?; + optimizer.optimize(plan, file_metadata, arrow_schema) + } } /// Represents a prepared, fully resolved [`ParquetAccessPlan`] diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index 9a907f4118a86..de42c527845fb 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -25,6 +25,7 @@ #![cfg_attr(test, allow(clippy::needless_pass_by_value))] pub mod access_plan; +pub(crate) mod access_plan_optimizer; pub mod file_format; pub mod metadata; mod metrics; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index d91fc9900f445..41dacd04dc3e8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1130,25 +1130,35 @@ impl RowGroupsPrunedParquetOpen { ); } - // Prepare the access plan (extract row groups and row selection) - let mut prepared_plan = access_plan.prepare(rg_metadata)?; - - // Reorder row groups by statistics if sort order is known. - // This helps TopK queries find optimal values first by placing - // row groups with optimal min/max values at the front. - // When reorder is active, skip reverse — reorder already encodes - // the direction (uses min for ASC, max for DESC). - if let Some(sort_order) = &prepared.sort_order_for_reorder { - prepared_plan = prepared_plan.reorder_by_statistics( - sort_order, + // Build the access plan optimizer from sort pushdown hints. + // ReorderByStatistics is preferred (handles both ASC and DESC via + // min/max stats). ReverseRowGroups is a fallback when no statistics + // are available on the sort column. + let optimizer: Option< + Box, + > = if let Some(sort_order) = &prepared.sort_order_for_reorder { + Some(Box::new( + crate::access_plan_optimizer::ReorderByStatistics::new( + sort_order.clone(), + ), + )) + } else if prepared.reverse_row_groups { + Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups)) + } else { + None + }; + + // Prepare the access plan and apply row group optimizer if configured. + let prepared_plan = if let Some(opt) = &optimizer { + access_plan.prepare_with_optimizer( + rg_metadata, file_metadata.as_ref(), &prepared.physical_file_schema, - )?; - } else if prepared.reverse_row_groups { - // Fallback: simple reverse when no sort order statistics available. - // See `ParquetSource::try_pushdown_sort` for the rationale. - prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; - } + opt.as_ref(), + )? + } else { + access_plan.prepare(rg_metadata)? + }; let arrow_reader_metrics = ArrowReaderMetrics::enabled(); let read_plan = build_projection_read_plan( From 0a72fe020136fdc149fb3c0ed3a1de3b4f43a8b2 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 16 Apr 2026 12:20:27 +0000 Subject: [PATCH 08/10] chore: remove benchmark from this PR (tracked in #21582) --- benchmarks/bench.sh | 130 ------------------ .../queries/sort_pushdown_inexact/q1.sql | 8 -- .../queries/sort_pushdown_inexact/q2.sql | 7 - .../queries/sort_pushdown_inexact/q3.sql | 8 -- .../queries/sort_pushdown_inexact/q4.sql | 7 - benchmarks/src/sort_pushdown.rs | 9 +- 6 files changed, 1 insertion(+), 168 deletions(-) delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q1.sql delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q2.sql delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q3.sql delete mode 100644 benchmarks/queries/sort_pushdown_inexact/q4.sql diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index aa1ec477345c6..9dce4cf77b933 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,9 +109,6 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files -sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder -sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder -sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -319,9 +316,6 @@ main() { sort_pushdown|sort_pushdown_sorted) data_sort_pushdown ;; - sort_pushdown_inexact|sort_pushdown_inexact_unsorted|sort_pushdown_inexact_overlap) - data_sort_pushdown_inexact - ;; sort_tpch) # same data as for tpch data_tpch "1" "parquet" @@ -528,15 +522,6 @@ main() { sort_pushdown_sorted) run_sort_pushdown_sorted ;; - sort_pushdown_inexact) - run_sort_pushdown_inexact - ;; - sort_pushdown_inexact_unsorted) - run_sort_pushdown_inexact_unsorted - ;; - sort_pushdown_inexact_overlap) - run_sort_pushdown_inexact_overlap - ;; sort_tpch) run_sort_tpch "1" ;; @@ -1152,121 +1137,6 @@ run_sort_pushdown_sorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } -# Generates data for sort pushdown Inexact benchmark. -# -# Produces a single large lineitem parquet file where row groups have -# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally -# sorted, RGs shuffled). This simulates append-heavy workloads where data -# is written in batches at different times. -data_sort_pushdown_inexact() { - INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" - if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then - echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}" - return - fi - - echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." - - # Re-use the sort_pushdown data as the source (generate if missing) - data_sort_pushdown - - mkdir -p "${INEXACT_DIR}" - SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" - - # Use datafusion-cli to bucket rows into 64 groups by a deterministic - # scrambler, then sort within each bucket by orderkey. This produces - # ~64 RG-sized segments where each has a tight orderkey range but the - # segments appear in scrambled (non-sorted) order in the file. - (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " - CREATE EXTERNAL TABLE src - STORED AS PARQUET - LOCATION '${SRC_DIR}'; - - COPY ( - SELECT * FROM src - ORDER BY - (l_orderkey * 1664525 + 1013904223) % 64, - l_orderkey - ) - TO '${INEXACT_DIR}/shuffled.parquet' - STORED AS PARQUET - OPTIONS ('format.max_row_group_size' '100000'); - ") - - echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" - ls -la "${INEXACT_DIR}" - - # Also generate a file with partially overlapping row groups. - # Simulates streaming data with network delays: each chunk is mostly - # in order but has a small overlap with the next chunk (±5% of the - # chunk range). This is the pattern described by @adriangb — data - # arriving with timestamps that are generally increasing but with - # network-induced jitter causing small overlaps between row groups. - OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" - if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then - echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" - return - fi - - echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." - mkdir -p "${OVERLAP_DIR}" - - (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " - CREATE EXTERNAL TABLE src - STORED AS PARQUET - LOCATION '${SRC_DIR}'; - - -- Add jitter to l_orderkey: shift each row by a random-ish offset - -- proportional to its position. This creates overlap between adjacent - -- row groups while preserving the general ascending trend. - -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 - -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. - COPY ( - SELECT * FROM src - ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 - ) - TO '${OVERLAP_DIR}/overlapping.parquet' - STORED AS PARQUET - OPTIONS ('format.max_row_group_size' '100000'); - ") - - echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" - ls -la "${OVERLAP_DIR}" -} - -# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). -# Enables pushdown_filters so TopK's dynamic filter is pushed to the parquet -# reader for late materialization (only needed for Inexact path). -run_sort_pushdown_inexact() { - INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" - RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" - echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." - DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ - debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} -} - -# Runs the sort pushdown Inexact benchmark WITHOUT declared ordering. -# Tests the Unsupported path in try_pushdown_sort where RG reorder by -# statistics can still help TopK queries without any file ordering guarantee. -run_sort_pushdown_inexact_unsorted() { - INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" - RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_unsorted.json" - echo "Running sort pushdown Inexact benchmark (no WITH ORDER, Unsupported path)..." - DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ - debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} -} - -# Runs the sort pushdown benchmark with partially overlapping RGs. -# Simulates streaming data with network jitter — RGs are mostly in order -# but have small overlaps (±2500 orderkey jitter between adjacent RGs). -run_sort_pushdown_inexact_overlap() { - OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" - RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" - echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." - DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ - debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} -} - # Runs the sort integration benchmark run_sort_tpch() { SCALE_FACTOR=$1 diff --git a/benchmarks/queries/sort_pushdown_inexact/q1.sql b/benchmarks/queries/sort_pushdown_inexact/q1.sql deleted file mode 100644 index d772bc486a12b..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q1.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Inexact path: TopK + DESC LIMIT on ASC-declared file. --- With RG reorder, the first RG read contains the highest max value, --- so TopK's threshold tightens quickly and subsequent RGs get filtered --- efficiently via dynamic filter pushdown. -SELECT l_orderkey, l_partkey, l_suppkey -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q2.sql b/benchmarks/queries/sort_pushdown_inexact/q2.sql deleted file mode 100644 index 6e2bef44fc37e..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q2.sql +++ /dev/null @@ -1,7 +0,0 @@ --- Inexact path: TopK + DESC LIMIT with larger fetch (1000). --- Larger LIMIT means more row_replacements; RG reorder reduces the --- total replacement count by tightening the threshold faster. -SELECT l_orderkey, l_partkey, l_suppkey -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact/q3.sql b/benchmarks/queries/sort_pushdown_inexact/q3.sql deleted file mode 100644 index d858ec79a67c9..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q3.sql +++ /dev/null @@ -1,8 +0,0 @@ --- Inexact path: wide projection (all columns) + DESC LIMIT. --- Shows the row-level filter benefit: with a tight threshold from the --- first RG, subsequent RGs skip decoding non-sort columns for filtered --- rows — bigger wins for wide tables. -SELECT * -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q4.sql b/benchmarks/queries/sort_pushdown_inexact/q4.sql deleted file mode 100644 index bd2efc5d3b992..0000000000000 --- a/benchmarks/queries/sort_pushdown_inexact/q4.sql +++ /dev/null @@ -1,7 +0,0 @@ --- Inexact path: wide projection + DESC LIMIT with larger fetch. --- Combines wide-row row-level filter benefit with larger LIMIT to --- demonstrate cumulative gains from RG reorder. -SELECT * -FROM lineitem -ORDER BY l_orderkey DESC -LIMIT 1000 diff --git a/benchmarks/src/sort_pushdown.rs b/benchmarks/src/sort_pushdown.rs index e2a4615a3ef39..e7fce1921e7a8 100644 --- a/benchmarks/src/sort_pushdown.rs +++ b/benchmarks/src/sort_pushdown.rs @@ -159,14 +159,7 @@ impl RunOpt { async fn benchmark_query(&self, query_id: usize) -> Result> { let sql = self.load_query(query_id)?; - let mut config = self.common.config()?; - // Enable parquet filter pushdown + late materialization. This is - // essential for the Inexact sort pushdown path: TopK's dynamic - // filter is pushed to the parquet reader, so only sort-column - // rows pass the filter's Decode non-sort columns are skipped for - // rows that don't pass the filter — this is where RG reorder's - // tight-threshold-first strategy pays off for wide-row queries. - config.options_mut().execution.parquet.pushdown_filters = true; + let config = self.common.config()?; let rt = self.common.build_runtime()?; let state = SessionStateBuilder::new() .with_config(config) From ad8f3562b955f148be792601be704c203b1252a9 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 17 Apr 2026 14:10:54 +0000 Subject: [PATCH 09/10] fix: resolve doc link for AccessPlanOptimizer --- datafusion/datasource-parquet/src/access_plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 32085fa8a177a..f8bcb5b74c19e 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -355,7 +355,7 @@ impl ParquetAccessPlan { } /// Like [`prepare`](Self::prepare), but also applies an - /// [`AccessPlanOptimizer`] to reorder/reverse row groups after + /// `AccessPlanOptimizer` to reorder/reverse row groups after /// preparing the plan. pub(crate) fn prepare_with_optimizer( self, From 2eb2351a18050864f28e2c27cee45fcf4dbebf2c Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 17 Apr 2026 14:32:50 +0000 Subject: [PATCH 10/10] fix: restore benchmark files from upstream main --- benchmarks/bench.sh | 130 ++++++++++++++++++ .../queries/sort_pushdown_inexact/q1.sql | 8 ++ .../queries/sort_pushdown_inexact/q2.sql | 7 + .../queries/sort_pushdown_inexact/q3.sql | 8 ++ .../queries/sort_pushdown_inexact/q4.sql | 7 + 5 files changed, 160 insertions(+) create mode 100644 benchmarks/queries/sort_pushdown_inexact/q1.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q2.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q3.sql create mode 100644 benchmarks/queries/sort_pushdown_inexact/q4.sql diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 9dce4cf77b933..aa1ec477345c6 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,6 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files +sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder +sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder +sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -316,6 +319,9 @@ main() { sort_pushdown|sort_pushdown_sorted) data_sort_pushdown ;; + sort_pushdown_inexact|sort_pushdown_inexact_unsorted|sort_pushdown_inexact_overlap) + data_sort_pushdown_inexact + ;; sort_tpch) # same data as for tpch data_tpch "1" "parquet" @@ -522,6 +528,15 @@ main() { sort_pushdown_sorted) run_sort_pushdown_sorted ;; + sort_pushdown_inexact) + run_sort_pushdown_inexact + ;; + sort_pushdown_inexact_unsorted) + run_sort_pushdown_inexact_unsorted + ;; + sort_pushdown_inexact_overlap) + run_sort_pushdown_inexact_overlap + ;; sort_tpch) run_sort_tpch "1" ;; @@ -1137,6 +1152,121 @@ run_sort_pushdown_sorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } +# Generates data for sort pushdown Inexact benchmark. +# +# Produces a single large lineitem parquet file where row groups have +# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally +# sorted, RGs shuffled). This simulates append-heavy workloads where data +# is written in batches at different times. +data_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" + if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then + echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}" + return + fi + + echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." + + # Re-use the sort_pushdown data as the source (generate if missing) + data_sort_pushdown + + mkdir -p "${INEXACT_DIR}" + SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" + + # Use datafusion-cli to bucket rows into 64 groups by a deterministic + # scrambler, then sort within each bucket by orderkey. This produces + # ~64 RG-sized segments where each has a tight orderkey range but the + # segments appear in scrambled (non-sorted) order in the file. + (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " + CREATE EXTERNAL TABLE src + STORED AS PARQUET + LOCATION '${SRC_DIR}'; + + COPY ( + SELECT * FROM src + ORDER BY + (l_orderkey * 1664525 + 1013904223) % 64, + l_orderkey + ) + TO '${INEXACT_DIR}/shuffled.parquet' + STORED AS PARQUET + OPTIONS ('format.max_row_group_size' '100000'); + ") + + echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" + ls -la "${INEXACT_DIR}" + + # Also generate a file with partially overlapping row groups. + # Simulates streaming data with network delays: each chunk is mostly + # in order but has a small overlap with the next chunk (±5% of the + # chunk range). This is the pattern described by @adriangb — data + # arriving with timestamps that are generally increasing but with + # network-induced jitter causing small overlaps between row groups. + OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" + if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then + echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" + return + fi + + echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." + mkdir -p "${OVERLAP_DIR}" + + (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " + CREATE EXTERNAL TABLE src + STORED AS PARQUET + LOCATION '${SRC_DIR}'; + + -- Add jitter to l_orderkey: shift each row by a random-ish offset + -- proportional to its position. This creates overlap between adjacent + -- row groups while preserving the general ascending trend. + -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 + -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. + COPY ( + SELECT * FROM src + ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 + ) + TO '${OVERLAP_DIR}/overlapping.parquet' + STORED AS PARQUET + OPTIONS ('format.max_row_group_size' '100000'); + ") + + echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" + ls -la "${OVERLAP_DIR}" +} + +# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). +# Enables pushdown_filters so TopK's dynamic filter is pushed to the parquet +# reader for late materialization (only needed for Inexact path). +run_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" + echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + +# Runs the sort pushdown Inexact benchmark WITHOUT declared ordering. +# Tests the Unsupported path in try_pushdown_sort where RG reorder by +# statistics can still help TopK queries without any file ordering guarantee. +run_sort_pushdown_inexact_unsorted() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_unsorted.json" + echo "Running sort pushdown Inexact benchmark (no WITH ORDER, Unsupported path)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + +# Runs the sort pushdown benchmark with partially overlapping RGs. +# Simulates streaming data with network jitter — RGs are mostly in order +# but have small overlaps (±2500 orderkey jitter between adjacent RGs). +run_sort_pushdown_inexact_overlap() { + OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" + echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + # Runs the sort integration benchmark run_sort_tpch() { SCALE_FACTOR=$1 diff --git a/benchmarks/queries/sort_pushdown_inexact/q1.sql b/benchmarks/queries/sort_pushdown_inexact/q1.sql new file mode 100644 index 0000000000000..d772bc486a12b --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q1.sql @@ -0,0 +1,8 @@ +-- Inexact path: TopK + DESC LIMIT on ASC-declared file. +-- With RG reorder, the first RG read contains the highest max value, +-- so TopK's threshold tightens quickly and subsequent RGs get filtered +-- efficiently via dynamic filter pushdown. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q2.sql b/benchmarks/queries/sort_pushdown_inexact/q2.sql new file mode 100644 index 0000000000000..6e2bef44fc37e --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q2.sql @@ -0,0 +1,7 @@ +-- Inexact path: TopK + DESC LIMIT with larger fetch (1000). +-- Larger LIMIT means more row_replacements; RG reorder reduces the +-- total replacement count by tightening the threshold faster. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact/q3.sql b/benchmarks/queries/sort_pushdown_inexact/q3.sql new file mode 100644 index 0000000000000..d858ec79a67c9 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q3.sql @@ -0,0 +1,8 @@ +-- Inexact path: wide projection (all columns) + DESC LIMIT. +-- Shows the row-level filter benefit: with a tight threshold from the +-- first RG, subsequent RGs skip decoding non-sort columns for filtered +-- rows — bigger wins for wide tables. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q4.sql b/benchmarks/queries/sort_pushdown_inexact/q4.sql new file mode 100644 index 0000000000000..bd2efc5d3b992 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q4.sql @@ -0,0 +1,7 @@ +-- Inexact path: wide projection + DESC LIMIT with larger fetch. +-- Combines wide-row row-level filter benefit with larger LIMIT to +-- demonstrate cumulative gains from RG reorder. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000