feat(parquet): fuse level encoding passes and compact level representation#9653
feat(parquet): fuse level encoding passes and compact level representation#9653HippoBaro wants to merge 9 commits intoapache:mainfrom
Conversation
335fb81 to
44dae05
Compare
|
This is a continuation of the work done in #9447 to improve runtime performance around sparse and/or highly uniform columns. As such this may be of interest to @alamb and @etseidl. 5a1d3d7 adds three benchmarks that exercise the code path this series optimizes. I created a PR (#9654) to merge those separately if needed so the benchmark bot can have a baseline to compare against. Thanks! |
44dae05 to
7902e69
Compare
|
Thanks @HippoBaro, this looks impressive. I'm still looking, but haven't found any obvious problems yet. Gads, every time I delve this deep into parquet I go a little mad 😵💫. I think the RLE encoder could use a little refactoring/comment improvements to make the flow a little more obvious. Not as part of this PR though. |
etseidl
left a comment
There was a problem hiding this comment.
Flushing a few comments. More tomorrow.
| let mut values_to_write = 0usize; | ||
| let max_def = self.descr.max_def_level(); | ||
| self.def_levels_encoder | ||
| .put_with_observer(levels, |level, count| { |
There was a problem hiding this comment.
❤️ When I added the histograms I wasn't happy with the redundancy here. Nice fix!
7902e69 to
c891c35
Compare
|
Thanks for the reviews! I've reworked the branch to address all feedback. Sorry for the delay, it took me a while to experiment. The main structural change is a The enum LevelData {
Absent,
Materialized(Vec<i16>),
Uniform { value: i16, count: usize },
}
The resulting refactor has a larger LoC footprint, but the API is arguably much cleaner and robust. Also, rebased as per #9656 (review) |
|
Thanks @HippoBaro. I'll try to make some time to review the changes. Probably not today but hopefully tomorrow... 🤞 |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing faster_sparse_columns_encoding (c891c35) to aac969d (merge-base) diff File an issue against this benchmark runner |
# Which issue does this PR close? - None, but relates to #9653 # Rationale for this change #9653 introduces optimizations related to non-null uniform workloads. This adds benchmarks so we can quantify them. # What changes are included in this PR? Add three new benchmark cases to the arrow_writer benchmark suite for evaluating write performance on struct columns at varying null densities: * `struct_non_null`: a nullable struct with 0% null rows and non-nullable primitive children; * `struct_sparse_99pct_null`: a nullable struct with 99% null rows, exercising null batching through one level of struct nesting; * `struct_all_null`: a nullable struct with 100% null rows, exercising the uniform-null path through struct nesting. Baseline results (Apple M1 Max): ``` struct_non_null/default 29.9 ms struct_non_null/parquet_2 38.2 ms struct_non_null/zstd_parquet_2 50.9 ms struct_sparse_99pct_null/default 7.2 ms struct_sparse_99pct_null/parquet_2 7.3 ms struct_sparse_99pct_null/zstd_p2 8.1 ms struct_all_null/default 83.3 µs struct_all_null/parquet_2 82.5 µs struct_all_null/zstd_parquet_2 106.6 µs ``` # Are these changes tested? N/A # Are there any user-facing changes? None Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing faster_sparse_columns_encoding (6c73ac7) to adf9308 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
I am surprised by the few regressions above, such as: I can't reproduce these locally. I get: Are these known to be noisy? |
Yes. They are extremely twitchy. I always take them with a grain of salt or ten. 😅 |
|
I've now run multiple passes of the arrow_writer bench on my workstation and there appear to be no regressions due to this PR. And the speed ups are quite impressive 😄 Details |
|
@kszucs do you have time to look at this PR? It touches on your CDC code. |
|
I am hoping to review this tomorrow |
alamb
left a comment
There was a problem hiding this comment.
Thank you @HippoBaro -- this is really exciting. I am sorry it is taking so long to review, but this is in some of the most performance critical and tricky code in the parquet writer.
I went through most of it pretty carefully and I really like where it is heading but as you can probably tell by the number of comments it is a pretty large change
What I would like to request is that we break this PR into smaller chunks to make review easier to veify to get this one in
Some suggested parts to break out:
BIT_PACK_GROUP_SIZE- The fast path changes to parquet/src/arrow/arrow_writer/levels.rs
- The introduction of LevelData
- Chang to use
put_with_observerrather thanput
Test coverage for fast path
One thing that came up during my review is that many of the newly added fast paths are not covered by tests/
To see this, you can run
cargo llvm-cov --html test -p parquet
Here is a copy of the result: llvm.zip
Here is an example showing that the fast paths aren't covered
| fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> { | ||
| unreachable!("should call write_gather instead") | ||
| fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()> { | ||
| downcast_op!( |
There was a problem hiding this comment.
is this code actually callable now?
There was a problem hiding this comment.
Yes! The code is now able to distinguish between Dense { offset, len } and Sparse(Vec<usize>). When the column has no nulls, write_leaf produces Dense directly without materializing a vec like previously and write_mini_batch then calls encoder.write(values, offset, len) based on that. Neat!
|
|
||
| /// Maximum groups of 8 values per bit-packed run. Current value is 64. | ||
| /// Number of values in one bit-packed group. The Parquet RLE/bit-packing hybrid | ||
| /// format always bit-packs values in multiples of this count (see the format spec: |
There was a problem hiding this comment.
Can you please provide a link in the comments to this statement
| /// needed. Callers may use [`extend_run`](Self::extend_run) to add further | ||
| /// repetitions in O(1) once this returns `true`. | ||
| #[inline] | ||
| pub fn is_accumulating(&self, value: u64) -> bool { |
There was a problem hiding this comment.
Perhaps calling it is_accumulating_rle would help readers understand more that this is specific for the RLE mode
| if self.current_value == value { | ||
| self.repeat_count += 1; | ||
| if self.repeat_count > 8 { | ||
| if self.repeat_count > BIT_PACK_GROUP_SIZE { |
There was a problem hiding this comment.
the change to use a constant is better than hard coded constants --thank you
|
|
||
| /// Increments the count for a level value by `count`. | ||
| #[inline] | ||
| pub fn update_n(&mut self, level: i16, count: i64) { |
There was a problem hiding this comment.
what does the n stand for here? As in why not call this update_count to match the inner?
There was a problem hiding this comment.
Agreed, _n isn’t great (I’ll probably rename the put_n_* symbols as well). For now, I’ll go with increment_by. update_n|update_count can be interpreted as replacing the count with a new value. The function increments (as the doc already states), so this seems like the clearest option.
| match nulls { | ||
| Some(nulls) => { | ||
| let null_offset = range.start; | ||
| let mut pending_nulls: usize = 0; |
There was a problem hiding this comment.
I think it might also help future readers to define what empties means in this context (and how it is different than null)
| } | ||
| } | ||
|
|
||
| match info.logical_nulls.clone() { |
There was a problem hiding this comment.
To appease the borrow-checker gods. The match arms need &mut info (for extend_def_levels etc.) Matching on &info.logical_nulls would hold a shared borrow on info for the entire arm, blocking the &mut self calls.
No clean way around it AFAICT.
There was a problem hiding this comment.
Could std::mem::take and then restoring the field afterwards work?
Or inlining extend_value_indices/extend_def_levels again could work.
Maybe those are the alternatives that you considered not clean :)
There was a problem hiding this comment.
Yes, I considered the mem::take approach, but it feels brittle if the control flow changes in the future or if there is a panic. Cloning the Arc is a bit sad because of the atomic, but it is foolproof 🤷
| /// incrementally across multiple batches without forcing run boundaries. | ||
| /// The encoder is flushed automatically when [`consume`](Self::consume) is called. | ||
| #[inline] | ||
| pub fn put(&mut self, buffer: &[i16]) -> usize { |
There was a problem hiding this comment.
In theory this is a breaking API change
However the LevelEncoder is part of the "experimental" API which is documented as not being stable
| } | ||
|
|
||
| #[derive(Debug, Clone, Copy)] | ||
| pub(crate) enum LevelDataRef<'a> { |
There was a problem hiding this comment.
This seems pretty similar to LevelData -- why can't we just use &LevelData?
If there is a good reason I think we need an explanation in comments
There was a problem hiding this comment.
LevelDataRef is to LevelData what &str is to String. We need both because ArrayLevels owns its level data (LevelData::Materialized(Vec<i16>)), but write_batch_internal must also accept borrowed &[i16] slices from the public write_batch API without allocating.
Even if we could change the public interface, that seems fundamental.
There was a problem hiding this comment.
I will write a proper doc on both of the types so the above context is easily accessible.
| } | ||
|
|
||
| #[derive(Debug, Clone, Copy)] | ||
| pub(crate) enum ValueSelectionRef<'a> { |
There was a problem hiding this comment.
likewise here -- an explanation of this and how it is related to ValueSelection would be really helpful
|
I can't wait to get this in -- so good |
|
Thanks again for the thorough reviews! I’ll keep working on this branch/PR to address the feedback (hopefully tomorrow) and for discussion purposes, but we can otherwise close it. I’ll make individual PRs as requested. Bear with me as I work through the many comments and break the commits into more digestible pieces 🙇 |
100% -- thank you for being willing to do so |
|
I had a quick look at the levels and cdc changes and seems like a strict improvement without any noticeable issues; I will try to take a closer look tomorrow. |
|
Marking as draft as I think this PR is no longer waiting on feedback and I am trying to make it easier to find PRs in need of review. Please mark it as ready for review when it is ready for another look |
The literal `8` appeared in two distinct roles throughout `RleEncoder`, `RleDecoder`, and their tests. Replacing each with a named constant makes the intent explicit and prevents the two meanings from being confused. * `BIT_PACK_GROUP_SIZE = 8` The Parquet RLE/bit-packing hybrid format always bit-packs values in multiples of this count (spec: "we always bit-pack a multiple of 8 values at a time"). Every occurrence related to the staging buffer size, the repeat-count threshold that triggers the RLE decision, and the group-count arithmetic in bit-packed headers now uses this name. * `u8::BITS` (= 8, from std) Used wherever a bit-count is divided by 8 to obtain a byte-count (e.g. `ceil(bit_width, u8::BITS as usize)`). This is a bits-per-byte conversion, a fundamentally different concept from the packing-group size. No behaviour change. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Add `put_with_observer()` to `LevelEncoder` that calls an `FnMut(i16, usize)` observer for each value during encoding. This allows callers to piggyback counting and histogram updates into the encoding pass without extra iterations over the level buffer. Previously, `write_mini_batch()` made 3 separate passes over each level array: one to count non-null values or row boundaries, one to update the level histogram, and one to RLE-encode. Now all three operations happen in a single pass via the observer closure. Replace `LevelHistogram::update_from_levels()` with a new `LevelHistogram::increment_by()` that accepts a count, and remove the now-unnecessary `update_definition_level_histogram()` and `update_repetition_level_histogram()` methods from PageMetrics. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Add `is_accumulating_rle()` and `extend_run()` methods to `RleEncoder` that allow callers to detect when the encoder is in RLE accumulation mode and bulk-extend runs without per-element overhead. Upgrade `put_with_observer()` in `LevelEncoder` to exploit this: after each `put()`, it checks whether the encoder entered accumulation mode. If so, it scans ahead for the rest of the run, calls `extend_run()` to batch it in O(1), and fires the observer once with the full run length. This turns the previous O(n) per-value encoding + observation into O(1) amortized per RLE run, which is a significant improvement for sparse columns where long runs of identical levels are common. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Restructure `write_list()` to accumulate consecutive null and empty rows and flush them in a single `visit_leaves()` call using `extend(repeat_n(...))`, instead of calling `visit_leaves()` per row. With sparse data (99% nulls), a 4096-row batch previously triggered ~4000 individual tree traversals, each pushing a single value per leaf. Now consecutive null/empty runs are collapsed into one traversal that extends all leaf level buffers in bulk. This follows the same pattern already used by `write_struct()`. The `write_non_null_slice` path is unchanged since each non-null row has different offsets and cannot be batched. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Adds a bulk encoding method for repeated level values. After a small warmup to enter RLE accumulation mode, remaining values are extended in O(1) via the existing `extend_run` path. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
6c73ac7 to
d07829b
Compare
Introduces a `LevelData` enum (`Absent`, `Materialized`, `Uniform`) to replace `Option<Vec<i16>>` for definition and repetition levels, and a borrowed `LevelDataRef` counterpart for the writer path. Uniform columns (e.g. required fields, all-null pages) are now encoded in O(1) without materializing a dense list. The CDC chunker, column writer, and arrow writer are migrated to the new types. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
When an entire list, struct, fixed-size list, or leaf array is null, skip per-row iteration and emit bulk uniform def/rep levels via `extend_uniform_levels` in O(1). Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Changes `byte_array` encoder methods (`FallbackEncoder::encode`, `DictEncoder::encode`, etc) and all `get_*_array_slice` functions from `&[usize]` to `impl ExactSizeIterator<Item = usize>`. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Changes `byte_array` encoder methods (`FallbackEncoder::encode`, `DictEncoder::encode`, etc) and all `get_*_array_slice` functions from `&[usize]` to `impl ExactSizeIterator<Item = usize>`. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
d07829b to
0b1fd2f
Compare
|
@alamb @etseidl Sorry for the delay—it took me longer than expected to reorganize the series into more individually reviewable commits. Test coverage is now substantial as well (hence the ballooning LoC diff.) PRs #9751 and #9752 are up and can be reviewed/merged independently. The remaining commits build on that base, and on each other, so they can’t really be reviewed in parallel. I will published them once those two are merged. I expect a total of 5 to 6 PRs 😅. Thank you again for all the reviews! |
Which issue does this PR close?
Rationale for this change
See issue for details. The Parquet column writer currently does per-value work during level encoding regardless of data sparsity, even though the output encoding (RLE) is proportional to the number of runs.
What changes are included in this PR?
Three incremental commits, each building on the previous:
Fuse level encoding with counting and histogram updates.
write_mini_batch()previously made three separate passes over each level array: count non-nulls, update the level histogram, and RLE-encode. Now all three happen in a single pass via an observer callback onLevelEncoder. When the RLE encoder enters accumulation mode, the loop scans ahead for the full run length and batches the observer call. This makes counting and histogram updates O(1) per run.Batch consecutive null/empty rows in
write_list. Consecutive null or empty list entries are now collapsed into a singlevisit_leaves()call that bulk-extends all leaf level buffers, instead of one tree traversal per null row. Mirrors the approach already used bywrite_struct().Short-circuit entirely-null columns. When every element in an array is null, skip
Vec<i16>level-buffer materialization entirely and store a compact(def_value, rep_value, count)tuple. The writer encodes this viaRleEncoder::put_n()in O(1) amortized time, bypassing the normal mini-batch loop.Are these changes tested?
All tests passing. I added some benchmark to exercice the heavy and all-null code paths, alongside the existing 25% sparseness benchmarks:
Non-nullable column benchmarks are within noise, as expected since they have no definition levels to optimize.
Are there any user-facing changes?
None.