feat: column-major streaming Parquet reader primitive#6386
Open
feat: column-major streaming Parquet reader primitive#6386
Conversation
Issues exactly two streaming reads per input file in the common path: one footer GET (via RemoteByteSource::get_slice) and one body GET (via RemoteByteSource::get_slice_stream). Yields one Page at a time in storage order — row-group-major, column-major-within-row-group, page-major-within-column. Peak memory is one page's compressed bytes (~1 MiB for typical metrics splits) plus a small Thrift header buffer, regardless of file size or row-group size. Symmetric with PR-2's StreamingParquetWriter on the read side. The two PRs together form the I/O substrate for PR-6's streaming column-major merge engine, including direct page copy (no decode/recode) when an output column comes entirely from one input. Why pages, not column chunks: PR-3 cuts ingest over to single-row- group files. Under that layout, one column chunk = all rows of a column in the entire file, scaling linearly with file size. Column-chunk granularity would defeat the streaming budget for large compacted splits. quickwit-parquet-engine deliberately doesn't depend on quickwit-storage. The reader takes a minimal RemoteByteSource trait (file_size + get_slice + get_slice_stream); PR-6 will provide a ~10-line adapter from quickwit_storage::Storage in quickwit-indexing. Page index loading deferred (offset_index_policy = Skip): page indexes live in the file body, not the footer, and loading them would either require extra GETs or extend the body GET range earlier than the first column chunk. PR-6's direct page copy reads column_chunk.offset_index_offset/length and decodes the offset index from body bytes lazily when needed. 12 tests cover: footer-only-on-construct, two-GETs after drain, body range correctness, metadata equivalence vs sync reader, page-byte concatenation matches column chunk bytes (single-page and multi-page columns), storage-order advance with page indices, EOF idempotency, pending-buffer drain after each page (bounded memory contract), prefetch retry, truncated-file rejection, and data-page num_values consistency. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
67c7276 to
9380648
Compare
7 tasks
Contributor
Author
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9380648f5b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
`parse_page_header_streaming` previously did `let _ = fill_pending_best_effort(...).await;` to tolerate EOF (where the function returns Ok), but the same pattern silenced *real* I/O errors. A transient failure on the body stream — e.g. an S3 throttle or network blip — would surface as `ThriftPageHeader` or `PageHeaderTooLarge`, since the parse step ran on whatever partial buffer existed. Callers that drive retry/backoff off `Io` couldn't distinguish "transient, retry" from "malformed file, give up." Switch to `?` propagation: `fill_pending_best_effort` already returns `Ok(())` on EOF (treating short reads as success), so propagation only surfaces real I/O errors. Audit pass over both `streaming_reader.rs` and PR-2's `streaming_writer.rs` for similar error-swallowing patterns (`let _ = ...await`, `.ok()` on Results, `unwrap_or` on Result- typed values, etc.) found nothing else in production paths. The `unwrap_or_default()` calls remaining in both files are inside test code, reading optional KV metadata where the absence-as-empty treatment is correct. New test `test_body_read_failure_surfaces_as_io_error`: wires a `FailingBodySource` whose body stream errors immediately on read. Asserts the error surfaces as `ParquetReadError::Io` (with the underlying message preserved), not as `ThriftPageHeader` / `PageHeaderTooLarge`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The thrift crate (added in this PR for Parquet page-header parsing) transitively pulls in threadpool. Regenerated via `make update-licenses`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
6 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
PR-4 of the streaming-merge stack. Symmetric with PR-2 (#6384) on the
read side: yields one Page at a time in storage order via
RemoteByteSourceand exactly one footer GET + one body GET inthe common path. Peak memory is one page's compressed bytes
(~1 MiB for typical metrics splits) plus a small Thrift header
buffer, regardless of file size or row-group size.
No production callers in this PR. PR-5 will wrap a legacy adapter for
multi-RG files with no
qh.rg_partition_prefix_lenclaim; PR-6'sstreaming merge engine consumes both shapes through the same trait;
PR-7 wires
ParquetMergeExecutor.Why pages, not column chunks
PR-3 cuts ingest over to single-row-group files (last in the
execution order). But the merge engine reads existing splits from
day one, including post-PR-6 multi-GB compaction outputs. Under
single-RG, "one column chunk" = "all rows of that column in the
entire file" — column-chunk granularity scales linearly with file
size and would defeat the streaming budget for large compacted
splits (a 5 GB super-merged split would hold 100 MB column chunks
in memory per input × N inputs alive). Page granularity decouples
memory from file size.
Caller contract
Page::bytesis exactly the on-disk page'scompressed_page_sizebytes, ref-counted (
bytes::Bytes).Page::headeris the Thrift-decoded
parquet::format::PageHeader. PR-6 can:header.type_to distinguish dictionary / data / index pagesbytesto an output writer when the column comesentirely from one input (no decompress/decode) — gated on a future
arrow-rs upstream PR; see PR-2 (feat: column-major streaming Parquet writer primitive #6384) for context
hot path
Layering
quickwit-parquet-enginedoes NOT take aquickwit-storagedependency. The reader takes a minimal
RemoteByteSourcetrait(
file_size,get_slice,get_slice_stream); PR-6 will add a~10-line adapter from
quickwit_storage::Storageinquickwit-indexing. New direct deps on this branch:async-trait,bytes,tokio,thrift = 0.17(the last forTCompactInputProtocol,which the parquet 58 page header parser uses).
Page index loading
Deferred (
PageIndexPolicy::Skipfor bothoffset_indexandcolumn_index). Page indexes live in the file body, not thefooter; loading them either requires extra GETs or extends the body
GET range to start before the first column chunk. PR-6's direct
page copy reads
column_chunk.offset_index_offset()/offset_index_length()from the column metadata and decodes theoffset index from body bytes lazily when it needs page boundaries
for skip-page-index decisions.
Footer prefetch
Configurable via
StreamingReaderConfig::footer_prefetch_bytes,defaulting to 256 KiB. If the configured prefetch is smaller than
the actual footer, the reader transparently issues one retry GET
sized to the parser-reported needed length. The "two-GETs" contract
holds in the common path; the rare retry is opt-in via configuration.
Tests (12)
PR-A (two-GETs):
test_footer_get_only_at_constructiontest_two_gets_for_full_consumptiontest_body_get_starts_at_first_column_chunk_offsetPR-B (metadata equivalence vs sync reader):
test_metadata_matches_sync_reader— schema, KV, sorting_columns, num_rowsPR-C (page round-trip):
test_pages_concatenate_to_column_chunk_bytes— single-page-per-column casetest_pages_concatenate_to_column_chunk_bytes_multi_page— forces 8 pages/column,asserts re-encoding
header || bytesper page concatenates byte-for-byte tothe file's
byte_range()slice for that columnPR-D (order):
test_storage_order_advance— pages emitted in lex order on(rg_idx, col_idx, page_idx_in_col), no skips, no duplicatesBounded memory contract:
test_pending_buffer_drained_after_each_page— internalpendingbuffer is drained after each page yield (proof of bounded peak)
Edge cases:
test_eof_idempotenttest_small_prefetch_retries_with_correct_sizetest_truncated_file_returns_footer_too_largetest_data_page_num_values_sum_matches_row_count— sum ofheader.data_page_header.num_valuesacross data pages equals rowgroup's
num_rows, per columnPosition in stack (execution order)
PR labels (PR-1..PR-7) are stable IDs that match opened PR
descriptions. The execution order below is updated 2026-05-05 to
put PR-3 (ingest cutover) last — the OOM concern lives in
compaction, not ingest.
qh.rg_partition_prefix_lenmarkerParquetMergeExecutor; delete downloaderPR-4 is independent of PR-1 / PR-2 / PR-3 review.
Test plan
cargo +nightly fmt(per files touched)RUSTFLAGS="-Dwarnings --cfg tokio_unstable" cargo clippy -p quickwit-parquet-engine --testscargo doc -p quickwit-parquet-engine --no-depscargo machetebash quickwit/scripts/check_license_headers.shbash quickwit/scripts/check_log_format.shcargo nextest run -p quickwit-parquet-engine --all-features— 368 tests, all pass🤖 Generated with Claude Code