-
Notifications
You must be signed in to change notification settings - Fork 2k
feat: add sort_pushdown_inexact benchmark for RG reorder #21674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -109,6 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet | |
| # Sort Pushdown Benchmarks | ||
| sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1) | ||
| sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files | ||
| sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder | ||
| sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder | ||
| sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario) | ||
|
|
||
| # Sorted Data Benchmarks (ORDER BY Optimization) | ||
| clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) | ||
|
|
@@ -316,6 +319,9 @@ main() { | |
| sort_pushdown|sort_pushdown_sorted) | ||
| data_sort_pushdown | ||
| ;; | ||
| sort_pushdown_inexact|sort_pushdown_inexact_unsorted|sort_pushdown_inexact_overlap) | ||
| data_sort_pushdown_inexact | ||
| ;; | ||
| sort_tpch) | ||
| # same data as for tpch | ||
| data_tpch "1" "parquet" | ||
|
|
@@ -522,6 +528,15 @@ main() { | |
| sort_pushdown_sorted) | ||
| run_sort_pushdown_sorted | ||
| ;; | ||
| sort_pushdown_inexact) | ||
| run_sort_pushdown_inexact | ||
| ;; | ||
| sort_pushdown_inexact_unsorted) | ||
| run_sort_pushdown_inexact_unsorted | ||
| ;; | ||
| sort_pushdown_inexact_overlap) | ||
| run_sort_pushdown_inexact_overlap | ||
| ;; | ||
| sort_tpch) | ||
| run_sort_tpch "1" | ||
| ;; | ||
|
|
@@ -1137,6 +1152,121 @@ run_sort_pushdown_sorted() { | |
| debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${SORT_PUSHDOWN_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} | ||
| } | ||
|
|
||
| # Generates data for sort pushdown Inexact benchmark. | ||
| # | ||
| # Produces a single large lineitem parquet file where row groups have | ||
| # NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally | ||
| # sorted, RGs shuffled). This simulates append-heavy workloads where data | ||
| # is written in batches at different times. | ||
| data_sort_pushdown_inexact() { | ||
| INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem" | ||
| if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then | ||
| echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}" | ||
| return | ||
| fi | ||
|
|
||
| echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..." | ||
|
|
||
| # Re-use the sort_pushdown data as the source (generate if missing) | ||
| data_sort_pushdown | ||
|
|
||
| mkdir -p "${INEXACT_DIR}" | ||
| SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem" | ||
|
|
||
| # Use datafusion-cli to bucket rows into 64 groups by a deterministic | ||
| # scrambler, then sort within each bucket by orderkey. This produces | ||
| # ~64 RG-sized segments where each has a tight orderkey range but the | ||
| # segments appear in scrambled (non-sorted) order in the file. | ||
|
Comment on lines
+1176
to
+1179
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think another interesting benchmark, which is our case at least, is when there is overlap between the row groups but some general logic. E.g.: In our case, this happens because there's a stream of data that's coming in with time stamps, and it should arrive at around the same time it was created, but there are always some network delays, time skew, etc. that means that it's not perfect. But data that arrived 30 minutes later is guaranteed to have timestamps in a different range than data that arrive 30 minutes before it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great suggestion! Partially overlapping RGs from streaming data is a very realistic scenario. I will add a benchmark variant for this pattern when I update the PR — something like time-ordered chunks with small overlaps between adjacent chunks to simulate network delays / time skew.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you plan to include it in this PR or a followup?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will add it in this PR — I'll create a third data variant with partially overlapping RGs (simulating streaming data with network delays) alongside the current shuffled data.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Amazing thanks so much
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in the latest push — |
||
| (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " | ||
|
zhuqi-lucas marked this conversation as resolved.
|
||
| CREATE EXTERNAL TABLE src | ||
| STORED AS PARQUET | ||
| LOCATION '${SRC_DIR}'; | ||
|
|
||
| COPY ( | ||
| SELECT * FROM src | ||
| ORDER BY | ||
| (l_orderkey * 1664525 + 1013904223) % 64, | ||
| l_orderkey | ||
| ) | ||
| TO '${INEXACT_DIR}/shuffled.parquet' | ||
| STORED AS PARQUET | ||
| OPTIONS ('format.max_row_group_size' '100000'); | ||
| ") | ||
|
|
||
| echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}" | ||
| ls -la "${INEXACT_DIR}" | ||
|
|
||
| # Also generate a file with partially overlapping row groups. | ||
| # Simulates streaming data with network delays: each chunk is mostly | ||
| # in order but has a small overlap with the next chunk (±5% of the | ||
| # chunk range). This is the pattern described by @adriangb — data | ||
| # arriving with timestamps that are generally increasing but with | ||
| # network-induced jitter causing small overlaps between row groups. | ||
| OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem" | ||
| if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then | ||
| echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}" | ||
| return | ||
| fi | ||
|
|
||
| echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..." | ||
| mkdir -p "${OVERLAP_DIR}" | ||
|
|
||
| (cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c " | ||
| CREATE EXTERNAL TABLE src | ||
| STORED AS PARQUET | ||
| LOCATION '${SRC_DIR}'; | ||
|
|
||
| -- Add jitter to l_orderkey: shift each row by a random-ish offset | ||
| -- proportional to its position. This creates overlap between adjacent | ||
| -- row groups while preserving the general ascending trend. | ||
| -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500 | ||
| -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs. | ||
| COPY ( | ||
| SELECT * FROM src | ||
| ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500 | ||
| ) | ||
| TO '${OVERLAP_DIR}/overlapping.parquet' | ||
| STORED AS PARQUET | ||
| OPTIONS ('format.max_row_group_size' '100000'); | ||
| ") | ||
|
|
||
| echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}" | ||
| ls -la "${OVERLAP_DIR}" | ||
| } | ||
|
|
||
| # Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics). | ||
| # Enables pushdown_filters so TopK's dynamic filter is pushed to the parquet | ||
| # reader for late materialization (only needed for Inexact path). | ||
| run_sort_pushdown_inexact() { | ||
| INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" | ||
| RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" | ||
| echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..." | ||
| DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ | ||
| debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} | ||
|
Comment on lines
+1240
to
+1245
|
||
| } | ||
|
|
||
| # Runs the sort pushdown Inexact benchmark WITHOUT declared ordering. | ||
| # Tests the Unsupported path in try_pushdown_sort where RG reorder by | ||
| # statistics can still help TopK queries without any file ordering guarantee. | ||
| run_sort_pushdown_inexact_unsorted() { | ||
| INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" | ||
| RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_unsorted.json" | ||
| echo "Running sort pushdown Inexact benchmark (no WITH ORDER, Unsupported path)..." | ||
| DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ | ||
| debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} | ||
| } | ||
|
|
||
| # Runs the sort pushdown benchmark with partially overlapping RGs. | ||
| # Simulates streaming data with network jitter — RGs are mostly in order | ||
| # but have small overlaps (±2500 orderkey jitter between adjacent RGs). | ||
| run_sort_pushdown_inexact_overlap() { | ||
| OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap" | ||
| RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json" | ||
| echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..." | ||
| DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \ | ||
| debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} | ||
| } | ||
|
|
||
| # Runs the sort integration benchmark | ||
| run_sort_tpch() { | ||
| SCALE_FACTOR=$1 | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| -- Inexact path: TopK + DESC LIMIT on ASC-declared file. | ||
| -- With RG reorder, the first RG read contains the highest max value, | ||
| -- so TopK's threshold tightens quickly and subsequent RGs get filtered | ||
| -- efficiently via dynamic filter pushdown. | ||
| SELECT l_orderkey, l_partkey, l_suppkey | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 100 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| -- Inexact path: TopK + DESC LIMIT with larger fetch (1000). | ||
| -- Larger LIMIT means more row_replacements; RG reorder reduces the | ||
| -- total replacement count by tightening the threshold faster. | ||
| SELECT l_orderkey, l_partkey, l_suppkey | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 1000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| -- Inexact path: wide projection (all columns) + DESC LIMIT. | ||
| -- Shows the row-level filter benefit: with a tight threshold from the | ||
| -- first RG, subsequent RGs skip decoding non-sort columns for filtered | ||
| -- rows — bigger wins for wide tables. | ||
| SELECT * | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 100 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| -- Inexact path: wide projection + DESC LIMIT with larger fetch. | ||
| -- Combines wide-row row-level filter benefit with larger LIMIT to | ||
| -- demonstrate cumulative gains from RG reorder. | ||
| SELECT * | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 1000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| -- Overlapping RGs: TopK + DESC LIMIT on file with partially overlapping | ||
| -- row groups (simulates streaming data with network jitter). | ||
| -- RG reorder places highest-max RG first for fastest threshold convergence. | ||
| SELECT l_orderkey, l_partkey, l_suppkey | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 100 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| -- Overlapping RGs: DESC LIMIT with larger fetch. | ||
| SELECT l_orderkey, l_partkey, l_suppkey | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 1000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| -- Overlapping RGs: wide projection + DESC LIMIT. | ||
| -- Row-level filter benefit: tight threshold skips decoding non-sort columns. | ||
| SELECT * | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 100 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| -- Overlapping RGs: wide projection + DESC LIMIT larger fetch. | ||
| SELECT * | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 1000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| -- Unsupported path: TopK + ASC LIMIT on file without declared ordering. | ||
| -- Tests RG reorder benefit when no WITH ORDER is declared — the | ||
| -- Unsupported path in try_pushdown_sort triggers RG reorder. | ||
| SELECT l_orderkey, l_partkey, l_suppkey | ||
| FROM lineitem | ||
| ORDER BY l_orderkey | ||
| LIMIT 100 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| -- Unsupported path: TopK + ASC LIMIT with larger fetch. | ||
| SELECT l_orderkey, l_partkey, l_suppkey | ||
| FROM lineitem | ||
| ORDER BY l_orderkey | ||
| LIMIT 1000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| -- Unsupported path: wide projection + ASC LIMIT. | ||
| -- Shows row-level filter benefit when RG reorder tightens TopK threshold. | ||
| SELECT * | ||
| FROM lineitem | ||
| ORDER BY l_orderkey | ||
| LIMIT 100 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| -- Unsupported path: wide projection + ASC LIMIT with larger fetch. | ||
| SELECT * | ||
| FROM lineitem | ||
| ORDER BY l_orderkey | ||
| LIMIT 1000 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| -- Unsupported path: DESC LIMIT (no declared ordering = no reverse scan). | ||
| SELECT l_orderkey, l_partkey, l_suppkey | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 100 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| -- Unsupported path: wide projection + DESC LIMIT. | ||
| SELECT * | ||
| FROM lineitem | ||
| ORDER BY l_orderkey DESC | ||
| LIMIT 100 |
Uh oh!
There was an error while loading. Please reload this page.