perf(repartition): batch reservation + sends via per-partition local buffers#21677
perf(repartition): batch reservation + sends via per-partition local buffers#21677Dandandan wants to merge 1 commit intoapache:mainfrom
Conversation
…buffers Accumulate batches into per-partition local buffers in each input task and flush only when a buffer reaches 256 KiB. This amortizes the memory reservation + channel overhead: each flush performs a single `try_grow` call and spill-check for the entire buffered size, instead of once per tiny sub-batch. When hash-partitioning an input batch into N output partitions each output partition only gets ~input_size / N bytes per input batch. With many output partitions these sub-batches are very small and hitting the reservation Mutex (plus the backpressure gate mutex) on every one is a significant contention point that reduces CPU utilization for parallel-partition workloads like ClickBench. Semantics are preserved: - Memory batches and spill fallback behavior are unchanged. - If the receiver drops mid-flush, the full reserved amount is released. - End-of-input triggers a flush of any remaining buffered batches. No user-visible behavior change besides batching granularity. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-local-buffers (f2270ad) to 3b5008a (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-local-buffers (f2270ad) to 3b5008a (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing repartition-local-buffers (f2270ad) to 3b5008a (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
In
RepartitionExec::pull_from_input, each input task partitions an input batch and sends the resulting sub-batches to output channels. On every sub-batch, the input task:Mutexand callstry_growchannel.state.lock()and potentially the gate mutex)When hash-partitioning an input batch of size
SintoNoutput partitions, each output gets a sub-batch of size~S/N. With many output partitions (e.g. 16 cores → 16 output partitions), these sub-batches are very small, so the per-batch synchronization cost dominates.What changes are included in this PR?
Each input task now accumulates partitioned sub-batches in per-output-partition local buffers (
Vec<PartitionBuffer>) and only flushes when a buffer reachesFLUSH_THRESHOLD_BYTES(256 KiB). Each flush performs a singletry_growcall for the total buffered size instead of once per sub-batch. Iftry_growfails, all buffered batches for that partition are spilled together.End-of-input triggers a final flush of any remaining buffered data per partition.
Are these changes tested?
Covered by the existing repartition test suite (41 tests pass), including the spilling, delayed-stream, dropped-output-stream, and ordering-preservation tests.
Are there any user-facing changes?
No. The change only affects batching granularity inside
RepartitionExec; memory semantics, spilling behavior, and output ordering are preserved.🤖 Generated with Claude Code