feat: page-stream → RecordBatch decoder (PR-6a)#6407
Draft
g-talbot wants to merge 2 commits intogtt/column-page-stream-traitfrom
Draft
feat: page-stream → RecordBatch decoder (PR-6a)#6407g-talbot wants to merge 2 commits intogtt/column-page-stream-traitfrom
g-talbot wants to merge 2 commits intogtt/column-page-stream-traitfrom
Conversation
Bridges PR-4's ColumnPageStream (raw compressed pages in storage order) to arrow's standard ParquetRecordBatchReaderBuilder (decoded arrays). PR-6's streaming merge engine drains each input row-group through this to keep per-RG memory bounded — only one input RG worth of bytes is materialised at a time, rather than the whole file. Approach: reconstruct one row group's column-chunk byte layout in a buffer (column chunks placed at their original offsets, gaps zero- padded), wrap the buffer in `Bytes`, and feed it to `ParquetRecordBatchReaderBuilder::new_with_metadata` with `with_row_groups([rg_idx])`. Byte-exact reconstruction by carrying each page's original Thrift-compact `header_bytes` through PR-4's streaming reader — no re-encoding, so encoder-version drift inside the compactor cannot silently corrupt outputs. Adds `header_bytes: Bytes` to `Page` and captures the drained header bytes inside `parse_page_header_streaming`. New `StreamDecoder` borrows the stream and exposes `next_rg()` returning one `RecordBatch` per input row group, idempotent at EOF. Tests (9, all passing): single-RG and multi-RG drains, multi-page columns, dict columns, null preservation, compression codec roundtrip (uncompressed/snappy/zstd — LZ4 not enabled in our parquet feature set), idempotent EOF, byte-exact reconstruction proof, and I/O failure surfacing as PageDecodeError::PageStream rather than masked as decode. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CI nightly rustfmt (newer than my local at the time of the original push) wraps `write_parquet(...)` onto multiple lines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
ColumnPageStream(raw compressed pages) to arrow's standardParquetRecordBatchReaderBuilder(decoded arrays). PR-6b's merge engine drains each input row-group through this.header_bytes: BytestoPageand a newStreamDecoderwhosenext_rg()yields oneRecordBatchper input row group, idempotent at EOF.How byte-exact reconstruction works
Each
Pagenow carries the original Thrift-compact bytes for its header (header_bytes) alongside the parsedPageHeaderand the raw compressed body (bytes). Concatenatingheader_bytes ++ bytesfor every page in a column chunk reproduces the on-disk byte layout. The decoder allocates one buffer per RG sized tomax(col_byte_range_end), places each column chunk at its original offset, and asksParquetRecordBatchReaderBuilder::new_with_metadata(built on the originalParquetMetaData) to read just that RG viawith_row_groups([rg_idx]).We could in principle re-encode parsed
PageHeaderstructs (Thrift-compact is deterministic for a given struct), but "deterministic in practice" is not a contract — encoder-version drift inside the compactor would silently corrupt outputs. Carrying the original header bytes sidesteps the problem.Stack
Test plan
page_decoder.rs: single/multi-RG drains, multi-page columns, dict columns, null preservation, codec roundtrip (uncompressed/snappy/zstd), idempotent EOF, byte-exact reconstruction proof, I/O failure →PageDecodeError::PageStreamstreaming_reader.rstests still pass with theheader_bytesfield addition (401/401 crate-wide)cargo clippy --workspace --all-features --testswith-Dwarningscargo +nightly fmt --all -- --checkcargo doc --no-deps -p quickwit-parquet-enginewith-Dwarningscargo machetebash quickwit/scripts/check_license_headers.shtyposclean on changed files🤖 Generated with Claude Code