Skip to content

feat(parquet): make PushBuffers boundary-agnostic for prefetch IO#9697

Open
HippoBaro wants to merge 3 commits intoapache:mainfrom
HippoBaro:boundary_agnostic_pushbuffers
Open

feat(parquet): make PushBuffers boundary-agnostic for prefetch IO#9697
HippoBaro wants to merge 3 commits intoapache:mainfrom
HippoBaro:boundary_agnostic_pushbuffers

Conversation

@HippoBaro
Copy link
Copy Markdown
Contributor

@HippoBaro HippoBaro commented Apr 13, 2026

Which issue does this PR close?

Rationale for this change

The PushDecoder (introduced in #7997, #8080) is designed to decouple IO and CPU. It holds non-contiguous byte ranges, with a NeedsData/push_range protocol. However, it requires each logical read to be satisfied in full by a single physical buffer: has_range, get_bytes, and Read::read all searched for one buffer that entirely covered the requested range.

This assumption conflates two orthogonal IO strategies:

  • Coalescing: the IO layer merges adjacent requested ranges into fewer, larger fetches.
  • Prefetching: the IO layer pushes data ahead of what the decoder has requested. This is an inversion of control: the IO layer speculatively fills buffers at offsets not yet requested and for arbitrary buffer sizes.

These two strategies interact poorly with the current release mechanism (clear_ranges), which matches buffers by exact range equality:

  • Coalescing is both rewarded and punished. It is load bearing because without it, the number of physical buffers scale with ranges requested, and clear_ranges performs an O(N×M) scan to remove consumed ranges, producing quadratic overhead on wide schemas. But it is also punished because a coalesced buffer never exactly matches any individual requested range, so clear_ranges silently skips it: the buffer leaks in PushBuffers until the decoder finishes or the caller manually calls release_all_ranges (ParquetPushDecoder API to clear all buffered ranges #9624). This increases peak RSS proportionally to the amount of data coalesced ahead of the current row group.

  • Prefetching is structurally impossible: speculatively pushed buffers will straddle future read boundaries, so the decoder cannot consume them, and clear_ranges cannot release them.

What changes are included in this PR?

This commit makes PushBuffers boundary-agnostic, completing the prefetching story, and changes the internals to scale with buffer count instead of range count:

  • Buffer stitching: has_range, get_bytes, and Read::read resolve logical ranges across multiple contiguous physical buffers via binary search, so the IO layer is free to push arbitrarily-sized parts without knowing future read boundaries. This is a nice improvement, because some IO layer can be made much more efficient when using uniform buffers and vectorized reads.

  • Incremental release (release_through): replaces clear_ranges with a watermark-based release that drops all buffers below a byte offset, trimming straddling buffers via zero-copy Bytes::slice. The decoder calls this automatically at row-group boundaries.

Are these changes tested?

Significant coverage added, all tests passing. Benchmark results (vs baseline in #9696):

  push_decoder/1buf/1000ranges       321.9 µs   (was 323.5 µs,  −1%)
  push_decoder/1buf/10000ranges       3.26 ms   (was  3.25 ms,  +0%)
  push_decoder/1buf/100000ranges      34.9 ms   (was  34.6 ms,  +1%)
  push_decoder/1buf/500000ranges     192.2 ms   (was 185.3 ms,  +4%)
  push_decoder/Nbuf/1000ranges       363.9 µs   (was 437.2 µs, −17%)
  push_decoder/Nbuf/10000ranges       3.82 ms   (was  10.7 ms, −64%)
  push_decoder/Nbuf/100000ranges      42.1 ms   (was 711.6 ms, −94%)

Are there any user-facing changes?

PushBuffers:: clear_all_ranges marked as deprecated in favor of the newer PushBuffers::clear_all.

@github-actions github-actions bot added the parquet Changes to the parquet crate label Apr 13, 2026
@HippoBaro HippoBaro force-pushed the boundary_agnostic_pushbuffers branch from 5eab5b9 to 5d60935 Compare April 13, 2026 07:11
@HippoBaro
Copy link
Copy Markdown
Contributor Author

@alamb This PR changes code you previously wrote, and I’d value your take on the direction.

Beyond fixing the quadratic complexity issue, it also pushes PushBuffers—pun intended—further along the path you laid out. More broadly, better support for I/O coalescing and speculative prefetching would be very valuable for us, and this feels like a step in that direction.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 13, 2026

I will review it -- I am currently working on reviewing

Thanks @HippoBaro

@HippoBaro HippoBaro force-pushed the boundary_agnostic_pushbuffers branch 2 times, most recently from 504035a to 2f891e7 Compare April 13, 2026 21:16
@HippoBaro
Copy link
Copy Markdown
Contributor Author

Rebased onto main to resolve conflicts and pick up the new benchmarks (#9696). We can now run the fancy bot and check whether it shows the expected speedups.

The `PushDecoder` (introduced in apache#7997, apache#8080) is designed to decouple
IO and CPU. It holds non-contiguous byte ranges, with a
`NeedsData`/`push_range` protocol. However, it requires each logical
read to be satisfied in full by a single physical buffer: `has_range`,
`get_bytes`, and `Read::read` all searched for one buffer that entirely
covered the requested range.

This assumption conflates two orthogonal IO strategies:

- Coalescing: the IO layer merges adjacent requested ranges into fewer,
  larger fetches.
- Prefetching: the IO layer pushes data ahead of what the decoder has
  requested. This is an inversion of control: the IO layer speculatively
  fills buffers at offsets not yet requested and for arbitrary buffer
  sizes.

These two strategies interact poorly with the current release mechanism
(`clear_ranges`), which matches buffers by exact range equality:

- Coalescing is both rewarded and punished. It is load bearing because
  without it, the number of physical buffers scale with ranges
  requested, and `clear_ranges` performs an O(N×M) scan to remove
  consumed ranges, producing quadratic overhead on wide schemas.
  But it is also punished because a coalesced buffer never exactly
  matches any individual requested range, so `clear_ranges` silently
  skips it: the buffer leaks in `PushBuffers` until the decoder
  finishes or the caller manually calls `release_all_ranges` (apache#9624).
  This increases peak RSS proportionally to the amount of data coalesced
  ahead of the current row group.

- Prefetching is structurally impossible: speculatively pushed
  buffers will straddle future read boundaries, so the decoder
  cannot consume them, and `clear_ranges` cannot release them.

This commit makes `PushBuffers` boundary-agnostic, completing the
prefetching story, and changes the internals to scale with buffer count
instead of range count:

- Buffer stitching: `has_range`, `get_bytes`, and `Read::read` resolve
  logical ranges across multiple contiguous physical buffers via binary
  search, so the IO layer is free to push arbitrarily-sized parts
  without knowing future read boundaries. This is a nice improvement,
  because some IO layer can be made much more efficient when using
  uniform buffers and vectorized reads.

- Incremental release (`release_through`): replaces `clear_ranges` with
  a watermark-based release that drops all buffers below a byte offset,
  trimming straddling buffers via zero-copy `Bytes::slice`.
  The decoder calls this automatically at row-group boundaries.

Benchmark results (vs baseline):

  push_decoder/1buf/1000ranges       321.9 µs   (was 323.5 µs,  −1%)
  push_decoder/1buf/10000ranges       3.26 ms   (was  3.25 ms,  +0%)
  push_decoder/1buf/100000ranges      34.9 ms   (was  34.6 ms,  +1%)
  push_decoder/1buf/500000ranges     192.2 ms   (was 185.3 ms,  +4%)
  push_decoder/Nbuf/1000ranges       363.9 µs   (was 437.2 µs, −17%)
  push_decoder/Nbuf/10000ranges       3.82 ms   (was  10.7 ms, −64%)
  push_decoder/Nbuf/100000ranges      42.1 ms   (was 711.6 ms, −94%)

Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
@HippoBaro HippoBaro force-pushed the boundary_agnostic_pushbuffers branch from 2f891e7 to d2ea6c4 Compare April 13, 2026 21:26
Copy link
Copy Markdown
Contributor

@etseidl etseidl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HippoBaro. This looks interesting. I still have to do a deep dive on #9653 before diving into this.

Comment thread parquet/src/util/push_buffers.rs Outdated
@nathanb9
Copy link
Copy Markdown
Contributor

Nice, @HippoBaro. One finding: this change introduces a regression in reverse row-group scans, which surfaced to me in a Datafusion test:
https://github.com/apache/datafusion/blob/9dab336ee08eb778e23cd04acdd05f93fd0196f4/datafusion/datasource-parquet/src/opener.rs#L2471-L2477

// Test reverse scan
let opener = make_opener(true);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let reverse_values = collect_int32_values(stream).await;

// The forward scan should return data in the order written
assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);

// With reverse scan, row groups are reversed, so we expect:
// Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1 (1,2,3)
assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);

result:panic...buffers.rs:193:9: has_range(57..110) below watermark 163

Basically, the incremental release watermark invariant conflicts with reverse row-group traversal. From what I understand, incremental release decodes a row group, marks its ending byte offset as the watermark, and then expects subsequent ranges to stay at or above that offset. So not able to do reverse traversal and safely release in this way.

could potentially support a symmetric reverse version of the same monotonic watermark tracking you've introduced here?

Comment thread parquet/src/arrow/push_decoder/mod.rs Outdated
Comment thread parquet/src/util/push_buffers.rs Outdated
@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 14, 2026

run benchmark push_decoder

@adriangbot
Copy link
Copy Markdown

🤖 Arrow criterion benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4244008571-1224-mblm4 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing boundary_agnostic_pushbuffers (d2ea6c4) to 711fac8 (merge-base) diff
BENCH_NAME=push_decoder
BENCH_COMMAND=cargo bench --features=arrow,async,test_common,experimental,object_store --bench push_decoder
BENCH_FILTER=
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

🤖 Arrow criterion benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

group                             boundary_agnostic_pushbuffers          main
-----                             -----------------------------          ----
push_decoder/1buf/100000ranges    1.05     53.0±0.48ms        ? ?/sec    1.00     50.7±0.64ms        ? ?/sec
push_decoder/1buf/10000ranges     1.02      4.8±0.04ms        ? ?/sec    1.00      4.7±0.05ms        ? ?/sec
push_decoder/1buf/1000ranges      1.01    476.2±2.55µs        ? ?/sec    1.00    469.4±2.66µs        ? ?/sec
push_decoder/1buf/500000ranges    1.00    282.2±2.24ms        ? ?/sec    1.06   298.0±43.35ms        ? ?/sec
push_decoder/Nbuf/100000ranges    1.00     57.5±0.40ms        ? ?/sec    14.52  834.3±76.53ms        ? ?/sec
push_decoder/Nbuf/10000ranges     1.00      5.5±0.13ms        ? ?/sec    2.25     12.4±0.03ms        ? ?/sec
push_decoder/Nbuf/1000ranges      1.00    530.3±5.49µs        ? ?/sec    1.10    582.9±2.43µs        ? ?/sec

Resource Usage

base (merge-base)

Metric Value
Wall time 183.4s
Peak memory 5.8 GiB
Avg memory 5.1 GiB
CPU user 179.1s
CPU sys 4.1s
Peak spill 0 B

branch

Metric Value
Wall time 102.7s
Peak memory 5.7 GiB
Avg memory 4.7 GiB
CPU user 101.4s
CPU sys 1.3s
Peak spill 0 B

File an issue against this benchmark runner

@HippoBaro
Copy link
Copy Markdown
Contributor Author

@nathanb9 Thank you for the feedback; I'll see what I can do there

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 14, 2026

push_decoder/Nbuf/100000ranges 1.00 57.5±0.40ms ? ?/sec 14.52 834.3±76.53ms ? ?/sec

That is certainly nice looking

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 14, 2026

run benchmark arrow_reader

@adriangbot
Copy link
Copy Markdown

🤖 Arrow criterion benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4246036406-1241-wplv5 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing boundary_agnostic_pushbuffers (d2ea6c4) to 711fac8 (merge-base) diff
BENCH_NAME=arrow_reader
BENCH_COMMAND=cargo bench --features=arrow,async,test_common,experimental,object_store --bench arrow_reader
BENCH_FILTER=
Results will be posted here when complete


File an issue against this benchmark runner

Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @HippoBaro and @nathanb9 -- this is a great start.

In general I have two major concerns:

  1. The assumption of range usage in the parquet decoder
  2. The manual management of the sorted flag / parallel buffers

Let me know what you think

Comment thread parquet/src/arrow/push_decoder/reader_builder/mod.rs
// All data for this row group has been extracted into the
// InMemoryRowGroup. Release physical buffers up to the end
// of this row group so streaming IO can reclaim memory.
self.buffers
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can configure the parquet reader to read row groups in some arbitrary order with_row_groups

Also technically there is no reason that row groups have to be written in order (though most writers will do that) -- for example, you could have a file where the bytes for row group 0 are after the bytes for row group 1.

So I think assuming that the reader will never want any bytes prior to the current row group should be reconsidered.

Can we instead perhaps release data for the start/end of the row group? rather than just a one sided range?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also technically there is no reason that row groups have to be written in order (though most writers will do that) -- for example, you could have a file where the bytes for row group 0 are after the bytes for row group 1.

Indeed, coworkers of mine are using this property as a means to do deletions from parquet files. Rewrite a single row group, tack it onto the end of the file, and then modify the footer to point to the new row group and ignore the original.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank to you both, I wasn't aware of this! This certainly changes things. My watermark approach was nice and optimal, but this makes it fundamentally flawed. Oh well, back to the drawing board!

Comment thread parquet/src/file/metadata/mod.rs Outdated
self.file_offset
}

/// Returns the byte offset just past the last column chunk in this row group.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think practically speaking most parquet files will have the column chunks for one row group written contiguously in the file, but I am not sure the spec requires this. I do think it effectively requires all pages for a column to be in a contiguous range

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of the spec makes me think the guarantee is at the column chunk level: "Column chunks are composed of pages written back to back." But there is no equivalent guarantee for column chunks within a row group. It explicitly says "There is no physical structure that is guaranteed for a row group."

Another reason, on top of your previous remark, that puts the final nail in the coffin of the watermark release mechanism.

///
/// Thus, the implementation defers to the caller to coalesce subsequent requests
/// if desired.
/// # No Speculative Prefetching
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment thread parquet/src/util/push_buffers.rs Outdated
/// The buffers of data that can be used to decode the Parquet file
buffers: Vec<Bytes>,
/// High-water mark set by [`Self::release_through`]. After a release,
/// no push, has_range, or read may target offsets below this value.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is probably good to point outwhat "may not" means (like does the code panic if it is tried?)

Comment thread parquet/src/util/push_buffers.rs Outdated
Comment thread parquet/src/util/push_buffers.rs Outdated
return;
}

// Insertion sort: zero-allocation and linear on nearly-sorted input
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is n^2 on reverse sorted input though, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was fine under my mistaken assumption that files were always laid out such that reading them back would be sequential. In that case, most reads would happen at the edge, and inserting buffers here would mostly amount to inserting into an ordered, or nearly ordered, vec.

Comment thread parquet/src/util/push_buffers.rs Outdated
watermark: u64,
/// Whether `ranges`/`buffers` are sorted by range start.
/// Set to `false` on every `push_range`, restored lazily before reads.
sorted: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we encoded the sort invariant in the type system rather than relying on the flag to be set correctly? Something like

enum Buffers {
  Sorted {
    ranges: Vec<Range<u64>>,
    buffers: Vec<Bytes>,
  }
  UnSorted {
    ranges: Vec<Range<u64>>,
    buffers: Vec<Bytes>,
  }
}

Maybe it is overly complicated but it make it much clearer that all paths correctly update the sorting

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest version uses a Btree which takes care of the invariant for us, thanks!

Comment thread parquet/src/util/push_buffers.rs Outdated
@@ -48,6 +59,13 @@ pub(crate) struct PushBuffers {
ranges: Vec<Range<u64>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the goal is to keep a list sorted by start range, did you consider using a BTreeSet? You could then define some sort of wrapper over Range/Bytes liie

struct RangeAndData {
  range: Range<u64>,
  buffer: Bytes
}
impl PartialOrd for RangeAndData {
  // define comparison from start range
}

pub(crate) struct PushBuffers {
...
    buffers: BtreeSet<RangeAndData>,
}

That would probably simplify the accounting significantly

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 14, 2026

I was thinking more about this change and I thought maybe we could take a step back and figure out what you are trying to accomplish

For example, if your usecase is to clear all IO after reading a row group's data, I wonder if we could you just call clear_all_buffers after reading the row group's data? One challenge if this is there is no way (currently) to know when the row group's data is no longer needed externally (as in there is no way to introspect the push decoder about when it is done reading a particular row group)

Recently, I have been working on something downstream in DataFusion where I would like a similar API

One solution could be to make a way split off RemainingRowGroups perhaps into a different decoder 🤔

@HippoBaro
Copy link
Copy Markdown
Contributor Author

HippoBaro commented Apr 15, 2026

@alamb @nathanb9 Thank you both for your feedback — it's been very helpful.

The watermark mechanism is fundamentally flawed. I worked under the assumption that all Parquet files are laid out like mine are, but the spec makes no such guarantee: "There is no physical structure that is guaranteed for a row group." Column chunks within a row group need not be contiguous or ordered, so a monotonic watermark won't be correct in general. @nathanb9's reverse-scan regression is one concrete consequence, but it's not the only one. The deletion trick is actually quite neat.

I was thinking more about this change and I thought maybe we could take a step back and figure out what you are trying to accomplish

Fair point. I should have been clearer in the original description. Here are the high-level goals driving this work:

  • Linear scaling with column count. My schemas are very high cardinality. Loads of columns. PushBuffers must scale linearly (ideally sub-linearly) with the number of columns. The current implementation scales with the number of requested ranges from the reader, which can reach millions on adversarial inputs.
  • Total decoupling from the IO layer. The decoder should own all logic related to what to read; the IO layer should be exclusively concerned with how best to exploit the characteristics of the underlying storage medium along whatever axis the user cares about (throughput, latency, cost). The current design leaks information across that boundary: buffers have to align with logical range requests or they leak, and can't straddle request boundaries.
  • Coalescing should not be load bearing. PushBuffers should scale sub-linearly to the number of buffers it's given. Coalescing should be decided based on the characteristics of storage, not because the code can't deal with too many small buffers.
  • First-class support for prefetching. My data lives in blob storage. The economics of blob storage incentivize pulling as much as possible: you pay the toll per GET request. If we need two row groups separated by 100 MiB of pruned row groups, it is most likely cheapest to stream all of it and discard than to seek. One can also also imagine cases where the reader doesn't know which row groups to pull next, because that depends on the content of the current row group — i.e., there are data dependencies. So prefetching can't simply be "give a list of row groups of interest ahead of time."
  • No leaks. Whatever the IO layer does (coalescing, prefetch, etc.), regardless of access pattern or pruning ratio, we should never leak buffers. This doesn't mean every byte must be freed at the earliest opportunity, but at row-group granularity, everything should be released when we move on to the next. A single decoder should be able to stream-decode TBs worth of data, and the user should have RSS guarantees derived from the max row-group size of the data they are dealing with (and modulo how aggressive prefetching is allowed to be, which they can easily control).

Hopefully this helps y'all understand where I'm coming from!

if your usecase is to clear all IO after reading a row group's data, I wonder if we could you just call clear_all_buffers after reading the row group's data?

The problem is that by the time I finish decoding a row group, I only know I'm done with that row group. The IO layer may have already prefetched data I'm about to need. Calling release_all would throw away that prefetched work.

The watermark was a nice idea for this: if we know that after reading byte offset N we'll never read anything below N, we can prefetch freely above N and safely release everything below. But since we can't guarantee monotonic access, that doesn't hold.

I'll push a revised version that addresses all the feedback shortly!

@HippoBaro
Copy link
Copy Markdown
Contributor Author

HippoBaro commented Apr 15, 2026

So prefetching can't simply be "give a list of row groups of interest ahead of time."

Although come to think of it, we could give the IO layer a hint. Not a list of offsets we need right now (DataRequest) but a list of "regions" we "may" need and IO may use that to inform if/what it prefetches.

@HippoBaro HippoBaro force-pushed the boundary_agnostic_pushbuffers branch from 58ef433 to 2460bed Compare April 15, 2026 04:09
The `release_through` watermark assumed monotonically increasing byte
offsets: all future reads must stay above the high-water mark. This
invariant is violated by reverse row-group scans (@nathanb9 — DataFusion
panics with `has_range(...) below watermark`), by files whose row groups
are not laid out sequentially (@alamb, @etseidl), and by the Parquet
spec itself, which guarantees no physical structure for a row group.

The manual `sorted` flag / `ensure_sorted()` pattern was also fragile:
callers had to sort before reads, and the insertion sort was O(n²) on
reverse-sorted input.

This commit replaces both mechanisms:

- `BTreeMap<u64, BufferValue>` replaces `Vec<Range> + Vec<Bytes>`,
  always sorted by construction. All read-side operations (`has_range`,
  `get_bytes`, `Read::read`) are O(log n) in buffer count, not in range
  count. This means `PushBuffers` scales with the number of IO buffers
  the caller pushes, not with the number of column chunks in the schema,
  and coalescing is no longer load-bearing for performance.

- `release_range(Range<u64>)` replaces the one-sided watermark. Each
  buffer tracks a `live_bytes` counter decremented by the overlap on
  release; when it reaches zero the entry is dropped. Release works in
  any access order.

- `release_row_group(idx)` releases individual column chunk byte ranges
  after a row group is fully consumed or skipped. It is called at every
  exit from the row group state machine, guaranteeing that no column
  chunk outlives its row group regardless of access pattern, pruning
  ratio, or file layout.

- The public `release_all`, `release_through`, and `clear_all_ranges`
  APIs are removed — buffer release is now fully automatic and internal.

Tests are expanded to cover arbitrary row group orderings (forward,
reverse, single-RG) and verify incremental release for each.

Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
@HippoBaro HippoBaro force-pushed the boundary_agnostic_pushbuffers branch from 2460bed to 871700c Compare April 15, 2026 04:22
@HippoBaro
Copy link
Copy Markdown
Contributor Author

HippoBaro commented Apr 15, 2026

I've pushed two follow-up commits that address the feedback.

The watermark is gone, replaced by per-range release backed by a BTreeMap (as @alamb suggested). Row group column chunks are released individually when consumed or skipped — no monotonicity assumption, no restriction on file layout or access order. This fixes the reverse-scan regression @nathanb9 found.

On the prefetching side, incoming buffers are now filtered at push time against the column chunk byte ranges of the queued row groups, so data the decoder will never consume is discarded before it enters PushBuffers. The IO layer can push anything — coalesced, prefetched, the entire file — and the decoder sheds what it doesn't need, or releases it in row-group increments otherwise, regardless of how they are laid out in the file.

As for perf, this new version performs slightly worse than the original watermark-based approach, but scales logarithmically with buffer count, which is where we want to be:

Benchmark results compared with previous commit in the series
push_decoder/1buf/1000ranges
                        time:   [339.10 µs 339.87 µs 340.73 µs]
                        change: [+0.6171% +0.8846% +1.1728%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe
push_decoder/1buf/10000ranges
                        time:   [3.4756 ms 3.4923 ms 3.5152 ms]
                        change: [+1.5727% +2.2242% +3.0099%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
push_decoder/1buf/100000ranges
                        time:   [37.775 ms 37.868 ms 37.968 ms]
                        change: [+3.7284% +4.0739% +4.4491%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild
Benchmarking push_decoder/1buf/500000ranges: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 21.2s, or reduce sample count to 20.
push_decoder/1buf/500000ranges
                        time:   [211.96 ms 212.60 ms 213.25 ms]
                        change: [+6.6703% +7.0475% +7.4395%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

push_decoder/Nbuf/1000ranges
                        time:   [470.69 µs 475.92 µs 481.77 µs]
                        change: [+5.2790% +5.9914% +6.7459%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe
push_decoder/Nbuf/10000ranges
                        time:   [5.6163 ms 5.6440 ms 5.6881 ms]
                        change: [+1.7046% +2.8093% +3.8623%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
Benchmarking push_decoder/Nbuf/100000ranges: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.3s, or reduce sample count to 70.
push_decoder/Nbuf/100000ranges
                        time:   [62.593 ms 62.858 ms 63.231 ms]
                        change: [+4.4007% +5.1085% +5.8527%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe

I was thinking more about this change and I thought maybe we could take a step back and figure out what you are trying to accomplish

Interested to know your thoughts about this last version, but completely open to shift direction if you feels this is not the right way to go about it!

Per-range release guarantees that every consumed row group is freed. But
when the IO layer prefetches aggressively (eg. streaming entire files or
over-reading across row group boundaries) data for row groups the
decoder will never process enters `PushBuffers` and is never consumed,
so `release_range` is never called for it.

Add `RetentionSet`, a sorted set of byte ranges derived from column
chunk metadata for the queued row groups. Incoming buffers are filtered
at push time: only portions overlapping a retained range are stored
(zero-copy via `Bytes::slice`); everything else is discarded before
reaching `PushBuffers`.

The filter is advisory: ranges the decoder explicitly requests via
`NeedsData` are merged into the retention set, so the decoder can
always make progress even when the retention set is incomplete.

Together with the per-range release in the previous commit, this closes
the loop on memory management: the IO layer is free to push data in any
shape — coalesced, prefetched, uniform-sized, or even the entire file —
without knowledge of Parquet layout. The decoder admits only what it
will consume, and releases it at row-group boundaries.

Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
@HippoBaro
Copy link
Copy Markdown
Contributor Author

@alamb from the above PR you linked to:

I also think we may need to adjust the Parquet PushDecoder somehow to communicate better its internal boundaries (e.g. so we can easily / efficiently pick off RowGroups without recreating it)

I see what you mean. Exposing internal boundaries to the caller is in tension with the IO decoupling I'm pursuing here (ideally the decoder shouldn't dictate how data is fetched or partitioned.) That said, take a look at the RetentionSet introduced in c02b044: if work is split across consumer threads, each could own its own PushBuffers with a RetentionSet scoped to its row groups. The IO layer pushes the same data to all of them, and each decoder admits only what it needs. No boundary negotiation required.

:2c:

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 16, 2026

To summarize your use case, @HippoBaro, in my own words:

  1. You want to control prefetching / coalescing of IO to optimize for your environment, especially object-store
    access.
  2. You want to know when previously prefetched data will no longer be needed so its resources can be freed.
  3. For very wide schemas, you want buffer management to scale efficiently, not quadratically with the number of
    columns.

What seems hard about the current setup is:

  1. The push decoder knows which ranges it explicitly requested, so it can reason about those.
  2. If an external system pushes data at a different granularity, such as speculative prefetch or coalesced
    reads, the decoder does not really know what data may still be needed later versus what can safely be released.

My main concern with this PR is that it adds a specific IO buffer management policy for one usage pattern into ParquetPushDecoder . That may work well for your object-store case, but it may not generalize as well to other environments or IO strategies (I am thinking io_uring for example)

Thus my preference would be to keep buffer management policy above ParquetPushDecoder, and make sure arrow-rs exposes the primitives needed to support it.

For example:

let decoder = make_decoder();

// Plan and prefetch what is likely needed for row group 1
let row_group1_bytes = /* calculate likely bytes needed for row group 1 */;
let prefetched_data = fancy_prefetcher.get(row_group1_bytes);
decoder.push_data(prefetched_data);

decoder.decode(); // ideally does not need more data until row group 2

// Once row group 1 is fully consumed, release any staged buffered data
decoder.release_all();

// Move on to row group 2
let row_group2_bytes = /* calculate likely bytes needed for row group 2 */;
let prefetched_data = fancy_prefetcher.get(row_group2_bytes);
decoder.push_data(prefetched_data);

decoder.decode(); // ideally does not need more data until row group 3

From that perspective, I think we are close to having the right pieces already. What still seems missing is:

  1. A reliable way to know when the push decoder has consumed everything it will need from previously pushed data.
  2. (Maybe) an easier API to calculate likely byte ranges for a given set of row groups / columns. I think it can be derived from the metadata APIs.

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 16, 2026

Actually, I dug around and I think we DO have the API that is needed:

https://docs.rs/parquet/58.0.0/parquet/arrow/push_decoder/struct.ParquetPushDecoder.html#method.try_next_reader

So the idea would be instead of calling try_decode the caller could call try_next_reader

And then once a reader is returned, we could then clear out all the cached data (clear_all_ranges) and wait for the next request

What do you think @HippoBaro ?

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Apr 16, 2026

We may need to add some API to the PushDecoder that says not to try and release used ranges though 🤔

@HippoBaro
Copy link
Copy Markdown
Contributor Author

HippoBaro commented Apr 17, 2026

My main concern with this PR is that it adds a specific IO buffer management policy for one usage pattern into ParquetPushDecoder

Yeah, I agree with that. RetentionSet feels hacky and breaks with dynamic assignments.

The current code lets the IO layer push more data than was requested and then filters out what is not useful, but as you pointed out, that creates a lot of issues, especially once there are multiple destinations, such as with dynamic work-stealing row group consumption.

So instead, let’s assume the IO layer is responsible for owning prefetched data until it is actually requested. That does not solve the release problem so much as move it around.

A reliable way to know when the push decoder has consumed everything it will need from previously pushed data.

I think that is partially solved if we use try_next_reader. Once you are done with the reader, you drop it, and the buffers go away, as you suggested. I like that. It does not help with speculatively fetched data still sitting in the IO layer, though. For that, we will still need some mechanism to tell the IO layer that certain ranges will never be consumed.

Maybe we could add a new DecodeResult::SkipData(Vec<Range<u64>>) that tells the caller: “I will never NeedsData this range, so you may free any buffered data for it and cancel associated in-flight IO.” We would return that when the decoder decides to skip a row group, for example.

If I put it all together, we get something like this:

let decoder = make_decoder();

// retrieve all row groups of interest with metadata
let ranges = predicted_row_group(&metadata);
io_layer.hint(ranges); // maybe start prefetching, maybe not

for rg_idx in row_groups {
    match decoder.try_next_reader()? {
        DecodeResult::Data(reader) => { 
            // All pushed buffers go away when the reader is dropped
            consume(reader);
        },
        DecodeResult::NeedsData(ranges) => decoder.push_ranges(ranges, io_layer.fetch(&ranges))?,
        DecodeResult::SkipData(ranges) => io_layer.release(ranges),
        DecodeResult::Finished => break,
    }
}

This gives us fairly clean decoupling: the IO layer can do whatever it wants, but can't push unsolicited buffers, which seems like a reasonable constraint.

WDYT @alamb?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

parquet Changes to the parquet crate performance

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PushBuffers::clear_ranges is quadratic and leaks

5 participants