Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet
# Sort Pushdown Benchmarks
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario)

# Sorted Data Benchmarks (ORDER BY Optimization)
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
Expand Down Expand Up @@ -316,6 +319,9 @@ main() {
sort_pushdown|sort_pushdown_sorted)
data_sort_pushdown
;;
sort_pushdown_inexact|sort_pushdown_inexact_unsorted|sort_pushdown_inexact_overlap)
data_sort_pushdown_inexact
;;
sort_tpch)
# same data as for tpch
data_tpch "1" "parquet"
Expand Down Expand Up @@ -522,6 +528,15 @@ main() {
sort_pushdown_sorted)
run_sort_pushdown_sorted
;;
sort_pushdown_inexact)
run_sort_pushdown_inexact
;;
sort_pushdown_inexact_unsorted)
run_sort_pushdown_inexact_unsorted
;;
sort_pushdown_inexact_overlap)
run_sort_pushdown_inexact_overlap
;;
sort_tpch)
run_sort_tpch "1"
;;
Expand Down Expand Up @@ -1137,6 +1152,121 @@ run_sort_pushdown_sorted() {
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Generates data for sort pushdown Inexact benchmark.
#
# Produces a single large lineitem parquet file where row groups have
# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
# sorted, RGs shuffled). This simulates append-heavy workloads where data
# is written in batches at different times.
Comment thread
zhuqi-lucas marked this conversation as resolved.
data_sort_pushdown_inexact() {
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem"
if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then
echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}"
return
fi

echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."

# Re-use the sort_pushdown data as the source (generate if missing)
data_sort_pushdown

mkdir -p "${INEXACT_DIR}"
SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem"

# Use datafusion-cli to bucket rows into 64 groups by a deterministic
# scrambler, then sort within each bucket by orderkey. This produces
# ~64 RG-sized segments where each has a tight orderkey range but the
# segments appear in scrambled (non-sorted) order in the file.
Comment on lines +1176 to +1179
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another interesting benchmark, which is our case at least, is when there is overlap between the row groups but some general logic. E.g.:

rg,min,max
0,0,5
1,4,7
2,5,32

In our case, this happens because there's a stream of data that's coming in with time stamps, and it should arrive at around the same time it was created, but there are always some network delays, time skew, etc. that means that it's not perfect. But data that arrived 30 minutes later is guaranteed to have timestamps in a different range than data that arrive 30 minutes before it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! Partially overlapping RGs from streaming data is a very realistic scenario. I will add a benchmark variant for this pattern when I update the PR — something like time-ordered chunks with small overlaps between adjacent chunks to simulate network delays / time skew.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you plan to include it in this PR or a followup?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add it in this PR — I'll create a third data variant with partially overlapping RGs (simulating streaming data with network delays) alongside the current shuffled data.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing thanks so much

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in the latest push — sort_pushdown_inexact_overlap generates a file with partially overlapping RGs (±2500 orderkey jitter between adjacent 100K-row chunks, simulating streaming data with network delays). 4 DESC LIMIT queries to match your use case.

(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
Comment thread
zhuqi-lucas marked this conversation as resolved.
CREATE EXTERNAL TABLE src
STORED AS PARQUET
LOCATION '${SRC_DIR}';

COPY (
SELECT * FROM src
ORDER BY
(l_orderkey * 1664525 + 1013904223) % 64,
l_orderkey
)
TO '${INEXACT_DIR}/shuffled.parquet'
STORED AS PARQUET
OPTIONS ('format.max_row_group_size' '100000');
")

echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}"
ls -la "${INEXACT_DIR}"

# Also generate a file with partially overlapping row groups.
# Simulates streaming data with network delays: each chunk is mostly
# in order but has a small overlap with the next chunk (±5% of the
# chunk range). This is the pattern described by @adriangb — data
# arriving with timestamps that are generally increasing but with
# network-induced jitter causing small overlaps between row groups.
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem"
if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then
echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}"
return
fi

echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..."
mkdir -p "${OVERLAP_DIR}"

(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
CREATE EXTERNAL TABLE src
STORED AS PARQUET
LOCATION '${SRC_DIR}';

-- Add jitter to l_orderkey: shift each row by a random-ish offset
-- proportional to its position. This creates overlap between adjacent
-- row groups while preserving the general ascending trend.
-- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
-- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
COPY (
SELECT * FROM src
ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
)
TO '${OVERLAP_DIR}/overlapping.parquet'
STORED AS PARQUET
OPTIONS ('format.max_row_group_size' '100000');
")

echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}"
ls -la "${OVERLAP_DIR}"
}

# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
# Enables pushdown_filters so TopK's dynamic filter is pushed to the parquet
# reader for late materialization (only needed for Inexact path).
run_sort_pushdown_inexact() {
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json"
echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..."
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
Comment on lines +1240 to +1245
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_sort_pushdown_inexact passes --sorted, which tells DataFusion the file is ordered by l_orderkey (via WITH ORDER). However, the generated shuffled.parquet is intentionally not globally sorted by l_orderkey (row groups are arranged by the bucket key). This benchmark therefore relies on the planner staying on an Inexact path (keeping TopK/Sort for correctness). Please document this assumption explicitly and/or add a guard to ensure the benchmark can’t silently become incorrect if future optimizations start treating the reversed scan as Exact and eliminate the ordering operator.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the --sorted flag is supposed to do but worth checking @zhuqi-lucas

Copy link
Copy Markdown
Contributor Author

@zhuqi-lucas zhuqi-lucas Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--sorted adds WITH ORDER (l_orderkey ASC) which is needed to trigger the reverse scan path in try_pushdown_sort — currently the only path where reorder_by_statistics is called.

After reviewing this more carefully, I realized we need two benchmark suites to cover the full optimization path:

  1. sort_pushdown_inexact (with --sorted, DESC queries) — tests the reverse scan path where RG reorder is already supported
  2. sort_pushdown_inexact_unsorted (without --sorted, ASC+DESC queries) — tests the Unsupported path where RG reorder will be supported in a follow-up PR (feat: reorder row groups by statistics during sort pushdown #21580)

Updated in the latest push. This way each follow-up PR can run its corresponding benchmark to show the improvement.

}

# Runs the sort pushdown Inexact benchmark WITHOUT declared ordering.
# Tests the Unsupported path in try_pushdown_sort where RG reorder by
# statistics can still help TopK queries without any file ordering guarantee.
run_sort_pushdown_inexact_unsorted() {
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_unsorted.json"
echo "Running sort pushdown Inexact benchmark (no WITH ORDER, Unsupported path)..."
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the sort pushdown benchmark with partially overlapping RGs.
# Simulates streaming data with network jitter — RGs are mostly in order
# but have small overlaps (±2500 orderkey jitter between adjacent RGs).
run_sort_pushdown_inexact_overlap() {
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap"
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json"
echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
}

# Runs the sort integration benchmark
run_sort_tpch() {
SCALE_FACTOR=$1
Expand Down
8 changes: 8 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact/q1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Inexact path: TopK + DESC LIMIT on ASC-declared file.
-- With RG reorder, the first RG read contains the highest max value,
-- so TopK's threshold tightens quickly and subsequent RGs get filtered
-- efficiently via dynamic filter pushdown.
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
7 changes: 7 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact/q2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Inexact path: TopK + DESC LIMIT with larger fetch (1000).
-- Larger LIMIT means more row_replacements; RG reorder reduces the
-- total replacement count by tightening the threshold faster.
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 1000
8 changes: 8 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact/q3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Inexact path: wide projection (all columns) + DESC LIMIT.
-- Shows the row-level filter benefit: with a tight threshold from the
-- first RG, subsequent RGs skip decoding non-sort columns for filtered
-- rows — bigger wins for wide tables.
SELECT *
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
7 changes: 7 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact/q4.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Inexact path: wide projection + DESC LIMIT with larger fetch.
-- Combines wide-row row-level filter benefit with larger LIMIT to
-- demonstrate cumulative gains from RG reorder.
SELECT *
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 1000
7 changes: 7 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_overlap/q1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Overlapping RGs: TopK + DESC LIMIT on file with partially overlapping
-- row groups (simulates streaming data with network jitter).
-- RG reorder places highest-max RG first for fastest threshold convergence.
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_overlap/q2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Overlapping RGs: DESC LIMIT with larger fetch.
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 1000
6 changes: 6 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_overlap/q3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Overlapping RGs: wide projection + DESC LIMIT.
-- Row-level filter benefit: tight threshold skips decoding non-sort columns.
SELECT *
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_overlap/q4.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Overlapping RGs: wide projection + DESC LIMIT larger fetch.
SELECT *
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 1000
7 changes: 7 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_unsorted/q1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Unsupported path: TopK + ASC LIMIT on file without declared ordering.
-- Tests RG reorder benefit when no WITH ORDER is declared — the
-- Unsupported path in try_pushdown_sort triggers RG reorder.
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey
LIMIT 100
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_unsorted/q2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Unsupported path: TopK + ASC LIMIT with larger fetch.
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey
LIMIT 1000
6 changes: 6 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_unsorted/q3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Unsupported path: wide projection + ASC LIMIT.
-- Shows row-level filter benefit when RG reorder tightens TopK threshold.
SELECT *
FROM lineitem
ORDER BY l_orderkey
LIMIT 100
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_unsorted/q4.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Unsupported path: wide projection + ASC LIMIT with larger fetch.
SELECT *
FROM lineitem
ORDER BY l_orderkey
LIMIT 1000
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_unsorted/q5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Unsupported path: DESC LIMIT (no declared ordering = no reverse scan).
SELECT l_orderkey, l_partkey, l_suppkey
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
5 changes: 5 additions & 0 deletions benchmarks/queries/sort_pushdown_inexact_unsorted/q6.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Unsupported path: wide projection + DESC LIMIT.
SELECT *
FROM lineitem
ORDER BY l_orderkey DESC
LIMIT 100
Loading