Skip to content

feat: column-major streaming Parquet reader primitive#6386

Open
g-talbot wants to merge 4 commits intomainfrom
gtt/streaming-parquet-reader
Open

feat: column-major streaming Parquet reader primitive#6386
g-talbot wants to merge 4 commits intomainfrom
gtt/streaming-parquet-reader

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 5, 2026

Summary

PR-4 of the streaming-merge stack. Symmetric with PR-2 (#6384) on the
read side: yields one Page at a time in storage order via
RemoteByteSource and exactly one footer GET + one body GET in
the common path. Peak memory is one page's compressed bytes
(~1 MiB for typical metrics splits) plus a small Thrift header
buffer, regardless of file size or row-group size.

No production callers in this PR. PR-5 will wrap a legacy adapter for
multi-RG files with no qh.rg_partition_prefix_len claim; PR-6's
streaming merge engine consumes both shapes through the same trait;
PR-7 wires ParquetMergeExecutor.

Why pages, not column chunks

PR-3 cuts ingest over to single-row-group files (last in the
execution order). But the merge engine reads existing splits from
day one, including post-PR-6 multi-GB compaction outputs. Under
single-RG, "one column chunk" = "all rows of that column in the
entire file" — column-chunk granularity scales linearly with file
size and would defeat the streaming budget for large compacted
splits (a 5 GB super-merged split would hold 100 MB column chunks
in memory per input × N inputs alive). Page granularity decouples
memory from file size.

Caller contract

1. RemoteByteSource (caller-implemented; ~10 lines on top of Storage)
2. StreamingParquetReader::try_open(source, path)   → footer GET, parse metadata
3. .metadata()                                       → ParquetMetaData accessor
4. .next_page() repeatedly                           → first call issues the body GET
                                                       → returns Page (header + bytes)
                                                         in storage order
                                                       → returns Ok(None) at EOF
                                                         (idempotent thereafter)

Page::bytes is exactly the on-disk page's compressed_page_size
bytes, ref-counted (bytes::Bytes). Page::header is the Thrift-
decoded parquet::format::PageHeader. PR-6 can:

  • Inspect header.type_ to distinguish dictionary / data / index pages
  • Direct-copy bytes to an output writer when the column comes
    entirely from one input (no decompress/decode) — gated on a future
    arrow-rs upstream PR; see PR-2 (feat: column-major streaming Parquet writer primitive #6384) for context
  • Decompress + decode for sort-column inspection on the merge driver's
    hot path

Layering

quickwit-parquet-engine does NOT take a quickwit-storage
dependency. The reader takes a minimal RemoteByteSource trait
(file_size, get_slice, get_slice_stream); PR-6 will add a
~10-line adapter from quickwit_storage::Storage in
quickwit-indexing. New direct deps on this branch: async-trait,
bytes, tokio, thrift = 0.17 (the last for TCompactInputProtocol,
which the parquet 58 page header parser uses).

Page index loading

Deferred (PageIndexPolicy::Skip for both offset_index and
column_index). Page indexes live in the file body, not the
footer; loading them either requires extra GETs or extends the body
GET range to start before the first column chunk. PR-6's direct
page copy reads column_chunk.offset_index_offset() /
offset_index_length() from the column metadata and decodes the
offset index from body bytes lazily when it needs page boundaries
for skip-page-index decisions.

Footer prefetch

Configurable via StreamingReaderConfig::footer_prefetch_bytes,
defaulting to 256 KiB. If the configured prefetch is smaller than
the actual footer, the reader transparently issues one retry GET
sized to the parser-reported needed length. The "two-GETs" contract
holds in the common path; the rare retry is opt-in via configuration.

Tests (12)

PR-A (two-GETs):

  • test_footer_get_only_at_construction
  • test_two_gets_for_full_consumption
  • test_body_get_starts_at_first_column_chunk_offset

PR-B (metadata equivalence vs sync reader):

  • test_metadata_matches_sync_reader — schema, KV, sorting_columns, num_rows

PR-C (page round-trip):

  • test_pages_concatenate_to_column_chunk_bytes — single-page-per-column case
  • test_pages_concatenate_to_column_chunk_bytes_multi_page — forces 8 pages/column,
    asserts re-encoding header || bytes per page concatenates byte-for-byte to
    the file's byte_range() slice for that column

PR-D (order):

  • test_storage_order_advance — pages emitted in lex order on
    (rg_idx, col_idx, page_idx_in_col), no skips, no duplicates

Bounded memory contract:

  • test_pending_buffer_drained_after_each_page — internal pending
    buffer is drained after each page yield (proof of bounded peak)

Edge cases:

  • test_eof_idempotent
  • test_small_prefetch_retries_with_correct_size
  • test_truncated_file_returns_footer_too_large
  • test_data_page_num_values_sum_matches_row_count — sum of
    header.data_page_header.num_values across data pages equals row
    group's num_rows, per column

Position in stack (execution order)

PR labels (PR-1..PR-7) are stable IDs that match opened PR
descriptions. The execution order below is updated 2026-05-05 to
put PR-3 (ingest cutover) last — the OOM concern lives in
compaction, not ingest.

Order PR Status Description
1 PR-1 open (#6377) Page-level stats + qh.rg_partition_prefix_len marker
2 PR-2 open (#6384) Streaming column-major writer primitive
3 PR-4 this PR Streaming column-major reader, page-level
4 PR-5 next Legacy multi-RG input adapter
5 PR-6 Streaming merge engine; consumes PR-2 + PR-4
6 PR-7 Wire ParquetMergeExecutor; delete downloader
7 PR-3 last Ingest writer cuts over to single-RG (least critical — splits already small)

PR-4 is independent of PR-1 / PR-2 / PR-3 review.

Test plan

  • cargo +nightly fmt (per files touched)
  • RUSTFLAGS="-Dwarnings --cfg tokio_unstable" cargo clippy -p quickwit-parquet-engine --tests
  • cargo doc -p quickwit-parquet-engine --no-deps
  • cargo machete
  • bash quickwit/scripts/check_license_headers.sh
  • bash quickwit/scripts/check_log_format.sh
  • cargo nextest run -p quickwit-parquet-engine --all-features — 368 tests, all pass

🤖 Generated with Claude Code

Issues exactly two streaming reads per input file in the common path:
one footer GET (via RemoteByteSource::get_slice) and one body GET
(via RemoteByteSource::get_slice_stream). Yields one Page at a time
in storage order — row-group-major, column-major-within-row-group,
page-major-within-column. Peak memory is one page's compressed
bytes (~1 MiB for typical metrics splits) plus a small Thrift
header buffer, regardless of file size or row-group size.

Symmetric with PR-2's StreamingParquetWriter on the read side. The
two PRs together form the I/O substrate for PR-6's streaming
column-major merge engine, including direct page copy (no
decode/recode) when an output column comes entirely from one input.

Why pages, not column chunks: PR-3 cuts ingest over to single-row-
group files. Under that layout, one column chunk = all rows of a
column in the entire file, scaling linearly with file size.
Column-chunk granularity would defeat the streaming budget for
large compacted splits.

quickwit-parquet-engine deliberately doesn't depend on
quickwit-storage. The reader takes a minimal RemoteByteSource trait
(file_size + get_slice + get_slice_stream); PR-6 will provide a
~10-line adapter from quickwit_storage::Storage in
quickwit-indexing.

Page index loading deferred (offset_index_policy = Skip): page
indexes live in the file body, not the footer, and loading them
would either require extra GETs or extend the body GET range
earlier than the first column chunk. PR-6's direct page copy reads
column_chunk.offset_index_offset/length and decodes the offset
index from body bytes lazily when needed.

12 tests cover: footer-only-on-construct, two-GETs after drain,
body range correctness, metadata equivalence vs sync reader,
page-byte concatenation matches column chunk bytes (single-page
and multi-page columns), storage-order advance with page
indices, EOF idempotency, pending-buffer drain after each page
(bounded memory contract), prefetch retry, truncated-file
rejection, and data-page num_values consistency.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/streaming-parquet-reader branch from 67c7276 to 9380648 Compare May 6, 2026 21:02
@g-talbot g-talbot requested review from a team as code owners May 6, 2026 21:02
@g-talbot
Copy link
Copy Markdown
Contributor Author

g-talbot commented May 7, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9380648f5b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/storage/streaming_reader.rs Outdated
g-talbot and others added 3 commits May 7, 2026 17:08
`parse_page_header_streaming` previously did `let _ =
fill_pending_best_effort(...).await;` to tolerate EOF (where the
function returns Ok), but the same pattern silenced *real* I/O
errors. A transient failure on the body stream — e.g. an S3 throttle
or network blip — would surface as `ThriftPageHeader` or
`PageHeaderTooLarge`, since the parse step ran on whatever partial
buffer existed. Callers that drive retry/backoff off `Io` couldn't
distinguish "transient, retry" from "malformed file, give up."

Switch to `?` propagation: `fill_pending_best_effort` already returns
`Ok(())` on EOF (treating short reads as success), so propagation only
surfaces real I/O errors.

Audit pass over both `streaming_reader.rs` and PR-2's
`streaming_writer.rs` for similar error-swallowing patterns
(`let _ = ...await`, `.ok()` on Results, `unwrap_or` on Result-
typed values, etc.) found nothing else in production paths. The
`unwrap_or_default()` calls remaining in both files are inside test
code, reading optional KV metadata where the absence-as-empty
treatment is correct.

New test `test_body_read_failure_surfaces_as_io_error`: wires a
`FailingBodySource` whose body stream errors immediately on read.
Asserts the error surfaces as `ParquetReadError::Io` (with the
underlying message preserved), not as `ThriftPageHeader` /
`PageHeaderTooLarge`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The thrift crate (added in this PR for Parquet page-header parsing)
transitively pulls in threadpool. Regenerated via `make update-licenses`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant