Skip to content

feat: Phase 3a — merge metadata aggregation, message types, replaced_split_ids#6352

Open
g-talbot wants to merge 14 commits intomainfrom
gtt/parquet-merge-pipeline
Open

feat: Phase 3a — merge metadata aggregation, message types, replaced_split_ids#6352
g-talbot wants to merge 14 commits intomainfrom
gtt/parquet-merge-pipeline

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Summary

Phase 3 (pipeline integration) first PR, building on Phase 1 (merge engine, #6335) and Phase 2 (merge policy, #6351).

  • merge_parquet_split_metadata() — aggregates input split metadata with MergeOutputFile physical metadata to produce complete ParquetSplitMetadata for merged output. Validates invariant fields (kind, index_uid, partition_id, sort_fields, window), unions metric_names and tags, finalizes tag cardinality after merge. 17 unit tests covering all aggregation rules and error cases.
  • Message typesParquetNewSplits, ParquetMergeTask, ParquetMergeScratch for the merge actor chain (planner → scheduler → downloader → executor).
  • replaced_split_ids — added to ParquetSplitBatch and propagated through ParquetUploader (was hardcoded Vec::new()). Enables the merge executor (PR 3c) to specify which splits are being replaced during atomic publish-and-replace.

Test plan

  • 17 unit tests for merge_parquet_split_metadata() (basic aggregation, metric_names union, tag merging, cardinality promotion, time_range, num_merge_ops, invariant validation errors, edge cases)
  • 4 existing ParquetUploader tests pass with new replaced_split_ids field
  • cargo clippy clean
  • cargo doc compiles
  • cargo machete clean

🤖 Generated with Claude Code

mattmkim and others added 14 commits April 24, 2026 14:21
Adds a constant write amplification merge policy for Parquet splits,
adapted from the existing ConstWriteAmplificationMergePolicy but using
byte size instead of document count as the primary size metric. This is
Phase 2 of the Parquet compaction project — the decision layer that
determines which splits to merge within each compaction scope.

Key components:
- ParquetMergePolicy trait mirroring the MergePolicy interface
- CompactionScope grouping by (index_uid, sort_fields, window_start)
- ConstWriteAmplificationParquetMergePolicy with bounded write amp
- finalize_operations() for cold window compaction
- 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340),
include it in CompactionScope so splits with different partitions are
never merged together. Adds 2 new scope tests for partition isolation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
finalize_operations() was running single_merge_operation over all young
splits without grouping by num_merge_ops level. This could merge level-0
and level-1 splits together, stamping the output with max(levels) + 1
and prematurely maturing lower-level data.

Fix: group by num_merge_ops in finalize just like operations() does,
then apply the lower merge factor within each level independently.

Added test_finalize_respects_mc_level_invariant (unit) and
proptest_finalize_respects_mc_level (property test) — both caught the
bug before the fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CompactionScope only used window_start_secs, so splits with the same
start but different durations (e.g. after a window config change) would
be grouped together. The merge engine requires all inputs to agree on
both window_start and window_duration, so merging across durations would
fail validation.

Added test_different_window_duration which caught the bug before the fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add three merge-policy invariants to the shared verification layer
(quickwit-dst) with check_invariant! enforcement in
ParquetMergeOperation::new():

- MP-1: all splits share the same num_merge_ops level
- MP-2: merge op has at least 2 input splits
- MP-3: all splits share the same compaction scope (sort_fields + window)

Shared pure functions in quickwit_dst::invariants::merge_policy are the
single source of truth, usable by Stateright models and production code.
Debug builds panic on violation; all builds emit metrics via the
invariant recorder.

Tests written first (4 should_panic tests failed before adding checks,
pass after). Plus 1 positive test and 3 unit tests for shared functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…it_ids (Phase 3a)

Phase 3 pipeline integration, first PR:

- merge_parquet_split_metadata(): aggregates input split metadata with
  MergeOutputFile physical metadata to produce complete ParquetSplitMetadata
  for merged output. Validates invariant fields, unions metric_names and tags,
  finalizes tag cardinality after merge. 17 tests.

- ParquetNewSplits, ParquetMergeTask, ParquetMergeScratch message types for
  the merge actor chain (planner → scheduler → downloader → executor).

- Add replaced_split_ids to ParquetSplitBatch and propagate through
  ParquetUploader (was hardcoded Vec::new()). Enables merge executor to
  specify which splits are being replaced.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…der stub (Phase 3b)

Phase 3 pipeline integration, second PR:

- ParquetMergePlanner: receives ParquetNewSplits, groups by CompactionScope,
  invokes ParquetMergePolicy::operations(), dispatches to scheduler. Handles
  startup seeding of immature splits, deduplication via known_split_ids,
  RunFinalizeMergePolicyAndQuit for cold-window finalization. 6 tests.

- MergeSchedulerService extension: ScheduleParquetMerge message with shared
  merge_semaphore for global concurrency control across Tantivy and Parquet
  merges. Feature-gated behind cfg(feature = "metrics"). Existing scheduler
  tests unaffected.

- ParquetMergeSplitDownloader stub: minimal Actor + Handler<ParquetMergeTask>
  for the scheduler and planner to reference. Full download implementation
  comes in PR 3c.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ase 3c)

Phase 3 pipeline integration, third PR:

- ParquetMergeSplitDownloader: downloads each input split's Parquet file
  from object storage to a local temp directory, forwards ParquetMergeScratch
  to the executor. Replaces the stub from PR 3b.

- ParquetMergeExecutor: runs merge_sorted_parquet_files via run_cpu_intensive,
  builds output ParquetSplitMetadata via merge_parquet_split_metadata, renames
  output files to match generated split IDs, sends ParquetSplitBatch with
  replaced_split_ids to the uploader.

- ParquetSplitBatch.checkpoint_delta -> checkpoint_delta_opt: now Option to
  support merge operations (no checkpoint delta for data reorganization).
  Ingest path passes Some(delta), merge path passes None.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants