feat(parquet): make PushBuffers boundary-agnostic for prefetch IO#9697
feat(parquet): make PushBuffers boundary-agnostic for prefetch IO#9697HippoBaro wants to merge 3 commits intoapache:mainfrom
PushBuffers boundary-agnostic for prefetch IO#9697Conversation
5eab5b9 to
5d60935
Compare
|
@alamb This PR changes code you previously wrote, and I’d value your take on the direction. Beyond fixing the quadratic complexity issue, it also pushes |
|
I will review it -- I am currently working on reviewing Thanks @HippoBaro |
504035a to
2f891e7
Compare
|
Rebased onto |
The `PushDecoder` (introduced in apache#7997, apache#8080) is designed to decouple IO and CPU. It holds non-contiguous byte ranges, with a `NeedsData`/`push_range` protocol. However, it requires each logical read to be satisfied in full by a single physical buffer: `has_range`, `get_bytes`, and `Read::read` all searched for one buffer that entirely covered the requested range. This assumption conflates two orthogonal IO strategies: - Coalescing: the IO layer merges adjacent requested ranges into fewer, larger fetches. - Prefetching: the IO layer pushes data ahead of what the decoder has requested. This is an inversion of control: the IO layer speculatively fills buffers at offsets not yet requested and for arbitrary buffer sizes. These two strategies interact poorly with the current release mechanism (`clear_ranges`), which matches buffers by exact range equality: - Coalescing is both rewarded and punished. It is load bearing because without it, the number of physical buffers scale with ranges requested, and `clear_ranges` performs an O(N×M) scan to remove consumed ranges, producing quadratic overhead on wide schemas. But it is also punished because a coalesced buffer never exactly matches any individual requested range, so `clear_ranges` silently skips it: the buffer leaks in `PushBuffers` until the decoder finishes or the caller manually calls `release_all_ranges` (apache#9624). This increases peak RSS proportionally to the amount of data coalesced ahead of the current row group. - Prefetching is structurally impossible: speculatively pushed buffers will straddle future read boundaries, so the decoder cannot consume them, and `clear_ranges` cannot release them. This commit makes `PushBuffers` boundary-agnostic, completing the prefetching story, and changes the internals to scale with buffer count instead of range count: - Buffer stitching: `has_range`, `get_bytes`, and `Read::read` resolve logical ranges across multiple contiguous physical buffers via binary search, so the IO layer is free to push arbitrarily-sized parts without knowing future read boundaries. This is a nice improvement, because some IO layer can be made much more efficient when using uniform buffers and vectorized reads. - Incremental release (`release_through`): replaces `clear_ranges` with a watermark-based release that drops all buffers below a byte offset, trimming straddling buffers via zero-copy `Bytes::slice`. The decoder calls this automatically at row-group boundaries. Benchmark results (vs baseline): push_decoder/1buf/1000ranges 321.9 µs (was 323.5 µs, −1%) push_decoder/1buf/10000ranges 3.26 ms (was 3.25 ms, +0%) push_decoder/1buf/100000ranges 34.9 ms (was 34.6 ms, +1%) push_decoder/1buf/500000ranges 192.2 ms (was 185.3 ms, +4%) push_decoder/Nbuf/1000ranges 363.9 µs (was 437.2 µs, −17%) push_decoder/Nbuf/10000ranges 3.82 ms (was 10.7 ms, −64%) push_decoder/Nbuf/100000ranges 42.1 ms (was 711.6 ms, −94%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
2f891e7 to
d2ea6c4
Compare
etseidl
left a comment
There was a problem hiding this comment.
Thanks @HippoBaro. This looks interesting. I still have to do a deep dive on #9653 before diving into this.
|
Nice, @HippoBaro. One finding: this change introduces a regression in reverse row-group scans, which surfaced to me in a Datafusion test: // Test reverse scan
let opener = make_opener(true);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let reverse_values = collect_int32_values(stream).await;
// The forward scan should return data in the order written
assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
// With reverse scan, row groups are reversed, so we expect:
// Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1 (1,2,3)
assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);result: Basically, the incremental release watermark invariant conflicts with reverse row-group traversal. From what I understand, incremental release decodes a row group, marks its ending byte offset as the watermark, and then expects subsequent ranges to stay at or above that offset. So not able to do reverse traversal and safely release in this way. could potentially support a symmetric reverse version of the same monotonic watermark tracking you've introduced here? |
|
run benchmark push_decoder |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing boundary_agnostic_pushbuffers (d2ea6c4) to 711fac8 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
@nathanb9 Thank you for the feedback; I'll see what I can do there |
That is certainly nice looking |
|
run benchmark arrow_reader |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing boundary_agnostic_pushbuffers (d2ea6c4) to 711fac8 (merge-base) diff File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
Thank you @HippoBaro and @nathanb9 -- this is a great start.
In general I have two major concerns:
- The assumption of range usage in the parquet decoder
- The manual management of the sorted flag / parallel buffers
Let me know what you think
| // All data for this row group has been extracted into the | ||
| // InMemoryRowGroup. Release physical buffers up to the end | ||
| // of this row group so streaming IO can reclaim memory. | ||
| self.buffers |
There was a problem hiding this comment.
You can configure the parquet reader to read row groups in some arbitrary order with_row_groups
Also technically there is no reason that row groups have to be written in order (though most writers will do that) -- for example, you could have a file where the bytes for row group 0 are after the bytes for row group 1.
So I think assuming that the reader will never want any bytes prior to the current row group should be reconsidered.
Can we instead perhaps release data for the start/end of the row group? rather than just a one sided range?
There was a problem hiding this comment.
Also technically there is no reason that row groups have to be written in order (though most writers will do that) -- for example, you could have a file where the bytes for row group 0 are after the bytes for row group 1.
Indeed, coworkers of mine are using this property as a means to do deletions from parquet files. Rewrite a single row group, tack it onto the end of the file, and then modify the footer to point to the new row group and ignore the original.
There was a problem hiding this comment.
Thank to you both, I wasn't aware of this! This certainly changes things. My watermark approach was nice and optimal, but this makes it fundamentally flawed. Oh well, back to the drawing board!
| self.file_offset | ||
| } | ||
|
|
||
| /// Returns the byte offset just past the last column chunk in this row group. |
There was a problem hiding this comment.
I think practically speaking most parquet files will have the column chunks for one row group written contiguously in the file, but I am not sure the spec requires this. I do think it effectively requires all pages for a column to be in a contiguous range
There was a problem hiding this comment.
My reading of the spec makes me think the guarantee is at the column chunk level: "Column chunks are composed of pages written back to back." But there is no equivalent guarantee for column chunks within a row group. It explicitly says "There is no physical structure that is guaranteed for a row group."
Another reason, on top of your previous remark, that puts the final nail in the coffin of the watermark release mechanism.
| /// | ||
| /// Thus, the implementation defers to the caller to coalesce subsequent requests | ||
| /// if desired. | ||
| /// # No Speculative Prefetching |
| /// The buffers of data that can be used to decode the Parquet file | ||
| buffers: Vec<Bytes>, | ||
| /// High-water mark set by [`Self::release_through`]. After a release, | ||
| /// no push, has_range, or read may target offsets below this value. |
There was a problem hiding this comment.
it is probably good to point outwhat "may not" means (like does the code panic if it is tried?)
| return; | ||
| } | ||
|
|
||
| // Insertion sort: zero-allocation and linear on nearly-sorted input |
There was a problem hiding this comment.
this is n^2 on reverse sorted input though, right?
There was a problem hiding this comment.
Yes, this was fine under my mistaken assumption that files were always laid out such that reading them back would be sequential. In that case, most reads would happen at the edge, and inserting buffers here would mostly amount to inserting into an ordered, or nearly ordered, vec.
| watermark: u64, | ||
| /// Whether `ranges`/`buffers` are sorted by range start. | ||
| /// Set to `false` on every `push_range`, restored lazily before reads. | ||
| sorted: bool, |
There was a problem hiding this comment.
What if we encoded the sort invariant in the type system rather than relying on the flag to be set correctly? Something like
enum Buffers {
Sorted {
ranges: Vec<Range<u64>>,
buffers: Vec<Bytes>,
}
UnSorted {
ranges: Vec<Range<u64>>,
buffers: Vec<Bytes>,
}
}Maybe it is overly complicated but it make it much clearer that all paths correctly update the sorting
There was a problem hiding this comment.
Latest version uses a Btree which takes care of the invariant for us, thanks!
| @@ -48,6 +59,13 @@ pub(crate) struct PushBuffers { | |||
| ranges: Vec<Range<u64>>, | |||
There was a problem hiding this comment.
If the goal is to keep a list sorted by start range, did you consider using a BTreeSet? You could then define some sort of wrapper over Range/Bytes liie
struct RangeAndData {
range: Range<u64>,
buffer: Bytes
}
impl PartialOrd for RangeAndData {
// define comparison from start range
}
pub(crate) struct PushBuffers {
...
buffers: BtreeSet<RangeAndData>,
}That would probably simplify the accounting significantly
|
I was thinking more about this change and I thought maybe we could take a step back and figure out what you are trying to accomplish For example, if your usecase is to clear all IO after reading a row group's data, I wonder if we could you just call Recently, I have been working on something downstream in DataFusion where I would like a similar API One solution could be to make a way split off RemainingRowGroups perhaps into a different decoder 🤔 |
|
@alamb @nathanb9 Thank you both for your feedback — it's been very helpful. The watermark mechanism is fundamentally flawed. I worked under the assumption that all Parquet files are laid out like mine are, but the spec makes no such guarantee: "There is no physical structure that is guaranteed for a row group." Column chunks within a row group need not be contiguous or ordered, so a monotonic watermark won't be correct in general. @nathanb9's reverse-scan regression is one concrete consequence, but it's not the only one. The deletion trick is actually quite neat.
Fair point. I should have been clearer in the original description. Here are the high-level goals driving this work:
Hopefully this helps y'all understand where I'm coming from!
The problem is that by the time I finish decoding a row group, I only know I'm done with that row group. The IO layer may have already prefetched data I'm about to need. Calling The watermark was a nice idea for this: if we know that after reading byte offset N we'll never read anything below N, we can prefetch freely above N and safely release everything below. But since we can't guarantee monotonic access, that doesn't hold. I'll push a revised version that addresses all the feedback shortly! |
Although come to think of it, we could give the IO layer a hint. Not a list of offsets we need right now ( |
58ef433 to
2460bed
Compare
The `release_through` watermark assumed monotonically increasing byte offsets: all future reads must stay above the high-water mark. This invariant is violated by reverse row-group scans (@nathanb9 — DataFusion panics with `has_range(...) below watermark`), by files whose row groups are not laid out sequentially (@alamb, @etseidl), and by the Parquet spec itself, which guarantees no physical structure for a row group. The manual `sorted` flag / `ensure_sorted()` pattern was also fragile: callers had to sort before reads, and the insertion sort was O(n²) on reverse-sorted input. This commit replaces both mechanisms: - `BTreeMap<u64, BufferValue>` replaces `Vec<Range> + Vec<Bytes>`, always sorted by construction. All read-side operations (`has_range`, `get_bytes`, `Read::read`) are O(log n) in buffer count, not in range count. This means `PushBuffers` scales with the number of IO buffers the caller pushes, not with the number of column chunks in the schema, and coalescing is no longer load-bearing for performance. - `release_range(Range<u64>)` replaces the one-sided watermark. Each buffer tracks a `live_bytes` counter decremented by the overlap on release; when it reaches zero the entry is dropped. Release works in any access order. - `release_row_group(idx)` releases individual column chunk byte ranges after a row group is fully consumed or skipped. It is called at every exit from the row group state machine, guaranteeing that no column chunk outlives its row group regardless of access pattern, pruning ratio, or file layout. - The public `release_all`, `release_through`, and `clear_all_ranges` APIs are removed — buffer release is now fully automatic and internal. Tests are expanded to cover arbitrary row group orderings (forward, reverse, single-RG) and verify incremental release for each. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
2460bed to
871700c
Compare
|
I've pushed two follow-up commits that address the feedback. The watermark is gone, replaced by per-range release backed by a On the prefetching side, incoming buffers are now filtered at push time against the column chunk byte ranges of the queued row groups, so data the decoder will never consume is discarded before it enters As for perf, this new version performs slightly worse than the original watermark-based approach, but scales logarithmically with buffer count, which is where we want to be: Benchmark results compared with previous commit in the series
Interested to know your thoughts about this last version, but completely open to shift direction if you feels this is not the right way to go about it! |
Per-range release guarantees that every consumed row group is freed. But when the IO layer prefetches aggressively (eg. streaming entire files or over-reading across row group boundaries) data for row groups the decoder will never process enters `PushBuffers` and is never consumed, so `release_range` is never called for it. Add `RetentionSet`, a sorted set of byte ranges derived from column chunk metadata for the queued row groups. Incoming buffers are filtered at push time: only portions overlapping a retained range are stored (zero-copy via `Bytes::slice`); everything else is discarded before reaching `PushBuffers`. The filter is advisory: ranges the decoder explicitly requests via `NeedsData` are merged into the retention set, so the decoder can always make progress even when the retention set is incomplete. Together with the per-range release in the previous commit, this closes the loop on memory management: the IO layer is free to push data in any shape — coalesced, prefetched, uniform-sized, or even the entire file — without knowledge of Parquet layout. The decoder admits only what it will consume, and releases it at row-group boundaries. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
871700c to
c02b044
Compare
|
@alamb from the above PR you linked to:
I see what you mean. Exposing internal boundaries to the caller is in tension with the IO decoupling I'm pursuing here (ideally the decoder shouldn't dictate how data is fetched or partitioned.) That said, take a look at the :2c: |
|
To summarize your use case, @HippoBaro, in my own words:
What seems hard about the current setup is:
My main concern with this PR is that it adds a specific IO buffer management policy for one usage pattern into Thus my preference would be to keep buffer management policy above For example: let decoder = make_decoder();
// Plan and prefetch what is likely needed for row group 1
let row_group1_bytes = /* calculate likely bytes needed for row group 1 */;
let prefetched_data = fancy_prefetcher.get(row_group1_bytes);
decoder.push_data(prefetched_data);
decoder.decode(); // ideally does not need more data until row group 2
// Once row group 1 is fully consumed, release any staged buffered data
decoder.release_all();
// Move on to row group 2
let row_group2_bytes = /* calculate likely bytes needed for row group 2 */;
let prefetched_data = fancy_prefetcher.get(row_group2_bytes);
decoder.push_data(prefetched_data);
decoder.decode(); // ideally does not need more data until row group 3From that perspective, I think we are close to having the right pieces already. What still seems missing is:
|
|
Actually, I dug around and I think we DO have the API that is needed: So the idea would be instead of calling And then once a reader is returned, we could then clear out all the cached data ( What do you think @HippoBaro ? |
|
We may need to add some API to the PushDecoder that says not to try and release used ranges though 🤔 |
Yeah, I agree with that. The current code lets the IO layer push more data than was requested and then filters out what is not useful, but as you pointed out, that creates a lot of issues, especially once there are multiple destinations, such as with dynamic work-stealing row group consumption. So instead, let’s assume the IO layer is responsible for owning prefetched data until it is actually requested. That does not solve the release problem so much as move it around.
I think that is partially solved if we use Maybe we could add a new If I put it all together, we get something like this: let decoder = make_decoder();
// retrieve all row groups of interest with metadata
let ranges = predicted_row_group(&metadata);
io_layer.hint(ranges); // maybe start prefetching, maybe not
for rg_idx in row_groups {
match decoder.try_next_reader()? {
DecodeResult::Data(reader) => {
// All pushed buffers go away when the reader is dropped
consume(reader);
},
DecodeResult::NeedsData(ranges) => decoder.push_ranges(ranges, io_layer.fetch(&ranges))?,
DecodeResult::SkipData(ranges) => io_layer.release(ranges),
DecodeResult::Finished => break,
}
}This gives us fairly clean decoupling: the IO layer can do whatever it wants, but can't push unsolicited buffers, which seems like a reasonable constraint. WDYT @alamb? |
Which issue does this PR close?
PushBuffers::clear_rangesis quadratic and leaks #9695.Rationale for this change
The
PushDecoder(introduced in #7997, #8080) is designed to decouple IO and CPU. It holds non-contiguous byte ranges, with aNeedsData/push_rangeprotocol. However, it requires each logical read to be satisfied in full by a single physical buffer:has_range,get_bytes, andRead::readall searched for one buffer that entirely covered the requested range.This assumption conflates two orthogonal IO strategies:
These two strategies interact poorly with the current release mechanism (
clear_ranges), which matches buffers by exact range equality:Coalescing is both rewarded and punished. It is load bearing because without it, the number of physical buffers scale with ranges requested, and
clear_rangesperforms an O(N×M) scan to remove consumed ranges, producing quadratic overhead on wide schemas. But it is also punished because a coalesced buffer never exactly matches any individual requested range, soclear_rangessilently skips it: the buffer leaks inPushBuffersuntil the decoder finishes or the caller manually callsrelease_all_ranges(ParquetPushDecoder API to clear all buffered ranges #9624). This increases peak RSS proportionally to the amount of data coalesced ahead of the current row group.Prefetching is structurally impossible: speculatively pushed buffers will straddle future read boundaries, so the decoder cannot consume them, and
clear_rangescannot release them.What changes are included in this PR?
This commit makes
PushBuffersboundary-agnostic, completing the prefetching story, and changes the internals to scale with buffer count instead of range count:Buffer stitching:
has_range,get_bytes, andRead::readresolve logical ranges across multiple contiguous physical buffers via binary search, so the IO layer is free to push arbitrarily-sized parts without knowing future read boundaries. This is a nice improvement, because some IO layer can be made much more efficient when using uniform buffers and vectorized reads.Incremental release (
release_through): replacesclear_rangeswith a watermark-based release that drops all buffers below a byte offset, trimming straddling buffers via zero-copyBytes::slice. The decoder calls this automatically at row-group boundaries.Are these changes tested?
Significant coverage added, all tests passing. Benchmark results (vs baseline in #9696):
Are there any user-facing changes?
PushBuffers:: clear_all_rangesmarked as deprecated in favor of the newerPushBuffers::clear_all.