diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 9dce4cf77b93..aa1ec477345c 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -109,6 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet # Sort Pushdown Benchmarks sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files +sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder +sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder +sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) # Sorted Data Benchmarks (ORDER BY Optimization) clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) @@ -316,6 +319,9 @@ main() { sort_pushdown|sort_pushdown_sorted) data_sort_pushdown ;; + sort_pushdown_inexact|sort_pushdown_inexact_unsorted|sort_pushdown_inexact_overlap) + data_sort_pushdown_inexact + ;; sort_tpch) # same data as for tpch data_tpch "1" "parquet" @@ -522,6 +528,15 @@ main() { sort_pushdown_sorted) run_sort_pushdown_sorted ;; + sort_pushdown_inexact) + run_sort_pushdown_inexact + ;; + sort_pushdown_inexact_unsorted) + run_sort_pushdown_inexact_unsorted + ;; + sort_pushdown_inexact_overlap) + run_sort_pushdown_inexact_overlap + ;; sort_tpch) run_sort_tpch "1" ;; @@ -1137,6 +1152,121 @@ run_sort_pushdown_sorted() { debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} } +# Generates data for sort pushdown Inexact benchmark. +# +# Produces a single large lineitem parquet file where row groups have +# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally +# sorted, RGs shuffled). This simulates append-heavy workloads where data +# is written in batches at different times. +data_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" + if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then + echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}" + return + fi + + echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." + + # Re-use the sort_pushdown data as the source (generate if missing) + data_sort_pushdown + + mkdir -p "${INEXACT_DIR}" + SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" + + # Use datafusion-cli to bucket rows into 64 groups by a deterministic + # scrambler, then sort within each bucket by orderkey. This produces + # ~64 RG-sized segments where each has a tight orderkey range but the + # segments appear in scrambled (non-sorted) order in the file. + (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " + CREATE EXTERNAL TABLE src + STORED AS PARQUET + LOCATION '${SRC_DIR}'; + + COPY ( + SELECT * FROM src + ORDER BY + (l_orderkey * 1664525 + 1013904223) % 64, + l_orderkey + ) + TO '${INEXACT_DIR}/shuffled.parquet' + STORED AS PARQUET + OPTIONS ('format.max_row_group_size' '100000'); + ") + + echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" + ls -la "${INEXACT_DIR}" + + # Also generate a file with partially overlapping row groups. + # Simulates streaming data with network delays: each chunk is mostly + # in order but has a small overlap with the next chunk (±5% of the + # chunk range). This is the pattern described by @adriangb — data + # arriving with timestamps that are generally increasing but with + # network-induced jitter causing small overlaps between row groups. + OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" + if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then + echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" + return + fi + + echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." + mkdir -p "${OVERLAP_DIR}" + + (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " + CREATE EXTERNAL TABLE src + STORED AS PARQUET + LOCATION '${SRC_DIR}'; + + -- Add jitter to l_orderkey: shift each row by a random-ish offset + -- proportional to its position. This creates overlap between adjacent + -- row groups while preserving the general ascending trend. + -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 + -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. + COPY ( + SELECT * FROM src + ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 + ) + TO '${OVERLAP_DIR}/overlapping.parquet' + STORED AS PARQUET + OPTIONS ('format.max_row_group_size' '100000'); + ") + + echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" + ls -la "${OVERLAP_DIR}" +} + +# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). +# Enables pushdown_filters so TopK's dynamic filter is pushed to the parquet +# reader for late materialization (only needed for Inexact path). +run_sort_pushdown_inexact() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" + echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + +# Runs the sort pushdown Inexact benchmark WITHOUT declared ordering. +# Tests the Unsupported path in try_pushdown_sort where RG reorder by +# statistics can still help TopK queries without any file ordering guarantee. +run_sort_pushdown_inexact_unsorted() { + INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_unsorted.json" + echo "Running sort pushdown Inexact benchmark (no WITH ORDER, Unsupported path)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + +# Runs the sort pushdown benchmark with partially overlapping RGs. +# Simulates streaming data with network jitter — RGs are mostly in order +# but have small overlaps (±2500 orderkey jitter between adjacent RGs). +run_sort_pushdown_inexact_overlap() { + OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" + RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" + echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." + DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ + debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} +} + # Runs the sort integration benchmark run_sort_tpch() { SCALE_FACTOR=$1 diff --git a/benchmarks/queries/sort_pushdown_inexact/q1.sql b/benchmarks/queries/sort_pushdown_inexact/q1.sql new file mode 100644 index 000000000000..d772bc486a12 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q1.sql @@ -0,0 +1,8 @@ +-- Inexact path: TopK + DESC LIMIT on ASC-declared file. +-- With RG reorder, the first RG read contains the highest max value, +-- so TopK's threshold tightens quickly and subsequent RGs get filtered +-- efficiently via dynamic filter pushdown. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q2.sql b/benchmarks/queries/sort_pushdown_inexact/q2.sql new file mode 100644 index 000000000000..6e2bef44fc37 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q2.sql @@ -0,0 +1,7 @@ +-- Inexact path: TopK + DESC LIMIT with larger fetch (1000). +-- Larger LIMIT means more row_replacements; RG reorder reduces the +-- total replacement count by tightening the threshold faster. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact/q3.sql b/benchmarks/queries/sort_pushdown_inexact/q3.sql new file mode 100644 index 000000000000..d858ec79a67c --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q3.sql @@ -0,0 +1,8 @@ +-- Inexact path: wide projection (all columns) + DESC LIMIT. +-- Shows the row-level filter benefit: with a tight threshold from the +-- first RG, subsequent RGs skip decoding non-sort columns for filtered +-- rows — bigger wins for wide tables. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact/q4.sql b/benchmarks/queries/sort_pushdown_inexact/q4.sql new file mode 100644 index 000000000000..bd2efc5d3b99 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact/q4.sql @@ -0,0 +1,7 @@ +-- Inexact path: wide projection + DESC LIMIT with larger fetch. +-- Combines wide-row row-level filter benefit with larger LIMIT to +-- demonstrate cumulative gains from RG reorder. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact_overlap/q1.sql b/benchmarks/queries/sort_pushdown_inexact_overlap/q1.sql new file mode 100644 index 000000000000..0e978bddbed0 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_overlap/q1.sql @@ -0,0 +1,7 @@ +-- Overlapping RGs: TopK + DESC LIMIT on file with partially overlapping +-- row groups (simulates streaming data with network jitter). +-- RG reorder places highest-max RG first for fastest threshold convergence. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact_overlap/q2.sql b/benchmarks/queries/sort_pushdown_inexact_overlap/q2.sql new file mode 100644 index 000000000000..34d0a910cbf3 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_overlap/q2.sql @@ -0,0 +1,5 @@ +-- Overlapping RGs: DESC LIMIT with larger fetch. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact_overlap/q3.sql b/benchmarks/queries/sort_pushdown_inexact_overlap/q3.sql new file mode 100644 index 000000000000..08b30b24d3dd --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_overlap/q3.sql @@ -0,0 +1,6 @@ +-- Overlapping RGs: wide projection + DESC LIMIT. +-- Row-level filter benefit: tight threshold skips decoding non-sort columns. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact_overlap/q4.sql b/benchmarks/queries/sort_pushdown_inexact_overlap/q4.sql new file mode 100644 index 000000000000..4c091424f901 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_overlap/q4.sql @@ -0,0 +1,5 @@ +-- Overlapping RGs: wide projection + DESC LIMIT larger fetch. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact_unsorted/q1.sql b/benchmarks/queries/sort_pushdown_inexact_unsorted/q1.sql new file mode 100644 index 000000000000..06748b72a98a --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_unsorted/q1.sql @@ -0,0 +1,7 @@ +-- Unsupported path: TopK + ASC LIMIT on file without declared ordering. +-- Tests RG reorder benefit when no WITH ORDER is declared — the +-- Unsupported path in try_pushdown_sort triggers RG reorder. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact_unsorted/q2.sql b/benchmarks/queries/sort_pushdown_inexact_unsorted/q2.sql new file mode 100644 index 000000000000..384e4647eb0d --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_unsorted/q2.sql @@ -0,0 +1,5 @@ +-- Unsupported path: TopK + ASC LIMIT with larger fetch. +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact_unsorted/q3.sql b/benchmarks/queries/sort_pushdown_inexact_unsorted/q3.sql new file mode 100644 index 000000000000..d48a2d969c46 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_unsorted/q3.sql @@ -0,0 +1,6 @@ +-- Unsupported path: wide projection + ASC LIMIT. +-- Shows row-level filter benefit when RG reorder tightens TopK threshold. +SELECT * +FROM lineitem +ORDER BY l_orderkey +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact_unsorted/q4.sql b/benchmarks/queries/sort_pushdown_inexact_unsorted/q4.sql new file mode 100644 index 000000000000..d12d48f43a62 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_unsorted/q4.sql @@ -0,0 +1,5 @@ +-- Unsupported path: wide projection + ASC LIMIT with larger fetch. +SELECT * +FROM lineitem +ORDER BY l_orderkey +LIMIT 1000 diff --git a/benchmarks/queries/sort_pushdown_inexact_unsorted/q5.sql b/benchmarks/queries/sort_pushdown_inexact_unsorted/q5.sql new file mode 100644 index 000000000000..ab1dddab408f --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_unsorted/q5.sql @@ -0,0 +1,5 @@ +-- Unsupported path: DESC LIMIT (no declared ordering = no reverse scan). +SELECT l_orderkey, l_partkey, l_suppkey +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100 diff --git a/benchmarks/queries/sort_pushdown_inexact_unsorted/q6.sql b/benchmarks/queries/sort_pushdown_inexact_unsorted/q6.sql new file mode 100644 index 000000000000..8366e9696919 --- /dev/null +++ b/benchmarks/queries/sort_pushdown_inexact_unsorted/q6.sql @@ -0,0 +1,5 @@ +-- Unsupported path: wide projection + DESC LIMIT. +SELECT * +FROM lineitem +ORDER BY l_orderkey DESC +LIMIT 100