feat: Phase 3a — merge metadata aggregation, message types, replaced_split_ids#6352
Open
feat: Phase 3a — merge metadata aggregation, message types, replaced_split_ids#6352
Conversation
Adds a constant write amplification merge policy for Parquet splits, adapted from the existing ConstWriteAmplificationMergePolicy but using byte size instead of document count as the primary size metric. This is Phase 2 of the Parquet compaction project — the decision layer that determines which splits to merge within each compaction scope. Key components: - ParquetMergePolicy trait mirroring the MergePolicy interface - CompactionScope grouping by (index_uid, sort_fields, window_start) - ConstWriteAmplificationParquetMergePolicy with bounded write amp - finalize_operations() for cold window compaction - 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340), include it in CompactionScope so splits with different partitions are never merged together. Adds 2 new scope tests for partition isolation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
finalize_operations() was running single_merge_operation over all young splits without grouping by num_merge_ops level. This could merge level-0 and level-1 splits together, stamping the output with max(levels) + 1 and prematurely maturing lower-level data. Fix: group by num_merge_ops in finalize just like operations() does, then apply the lower merge factor within each level independently. Added test_finalize_respects_mc_level_invariant (unit) and proptest_finalize_respects_mc_level (property test) — both caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CompactionScope only used window_start_secs, so splits with the same start but different durations (e.g. after a window config change) would be grouped together. The merge engine requires all inputs to agree on both window_start and window_duration, so merging across durations would fail validation. Added test_different_window_duration which caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add three merge-policy invariants to the shared verification layer (quickwit-dst) with check_invariant! enforcement in ParquetMergeOperation::new(): - MP-1: all splits share the same num_merge_ops level - MP-2: merge op has at least 2 input splits - MP-3: all splits share the same compaction scope (sort_fields + window) Shared pure functions in quickwit_dst::invariants::merge_policy are the single source of truth, usable by Stateright models and production code. Debug builds panic on violation; all builds emit metrics via the invariant recorder. Tests written first (4 should_panic tests failed before adding checks, pass after). Plus 1 positive test and 3 unit tests for shared functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…it_ids (Phase 3a) Phase 3 pipeline integration, first PR: - merge_parquet_split_metadata(): aggregates input split metadata with MergeOutputFile physical metadata to produce complete ParquetSplitMetadata for merged output. Validates invariant fields, unions metric_names and tags, finalizes tag cardinality after merge. 17 tests. - ParquetNewSplits, ParquetMergeTask, ParquetMergeScratch message types for the merge actor chain (planner → scheduler → downloader → executor). - Add replaced_split_ids to ParquetSplitBatch and propagate through ParquetUploader (was hardcoded Vec::new()). Enables merge executor to specify which splits are being replaced. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…der stub (Phase 3b) Phase 3 pipeline integration, second PR: - ParquetMergePlanner: receives ParquetNewSplits, groups by CompactionScope, invokes ParquetMergePolicy::operations(), dispatches to scheduler. Handles startup seeding of immature splits, deduplication via known_split_ids, RunFinalizeMergePolicyAndQuit for cold-window finalization. 6 tests. - MergeSchedulerService extension: ScheduleParquetMerge message with shared merge_semaphore for global concurrency control across Tantivy and Parquet merges. Feature-gated behind cfg(feature = "metrics"). Existing scheduler tests unaffected. - ParquetMergeSplitDownloader stub: minimal Actor + Handler<ParquetMergeTask> for the scheduler and planner to reference. Full download implementation comes in PR 3c. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ase 3c) Phase 3 pipeline integration, third PR: - ParquetMergeSplitDownloader: downloads each input split's Parquet file from object storage to a local temp directory, forwards ParquetMergeScratch to the executor. Replaces the stub from PR 3b. - ParquetMergeExecutor: runs merge_sorted_parquet_files via run_cpu_intensive, builds output ParquetSplitMetadata via merge_parquet_split_metadata, renames output files to match generated split IDs, sends ParquetSplitBatch with replaced_split_ids to the uploader. - ParquetSplitBatch.checkpoint_delta -> checkpoint_delta_opt: now Option to support merge operations (no checkpoint delta for data reorganization). Ingest path passes Some(delta), merge path passes None. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Phase 3 (pipeline integration) first PR, building on Phase 1 (merge engine, #6335) and Phase 2 (merge policy, #6351).
merge_parquet_split_metadata()— aggregates input split metadata withMergeOutputFilephysical metadata to produce completeParquetSplitMetadatafor merged output. Validates invariant fields (kind, index_uid, partition_id, sort_fields, window), unions metric_names and tags, finalizes tag cardinality after merge. 17 unit tests covering all aggregation rules and error cases.ParquetNewSplits,ParquetMergeTask,ParquetMergeScratchfor the merge actor chain (planner → scheduler → downloader → executor).replaced_split_ids— added toParquetSplitBatchand propagated throughParquetUploader(was hardcodedVec::new()). Enables the merge executor (PR 3c) to specify which splits are being replaced during atomic publish-and-replace.Test plan
merge_parquet_split_metadata()(basic aggregation, metric_names union, tag merging, cardinality promotion, time_range, num_merge_ops, invariant validation errors, edge cases)ParquetUploadertests pass with newreplaced_split_idsfieldcargo clippycleancargo doccompilescargo macheteclean🤖 Generated with Claude Code