diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs index 6780ee923e3..413c3f3bb6c 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs @@ -25,6 +25,7 @@ mod indexing_service_impl; mod parquet_doc_processor; mod parquet_indexer; +pub(crate) mod parquet_merge_messages; mod parquet_packager; mod parquet_splits_update; mod parquet_uploader; @@ -44,6 +45,7 @@ pub use parquet_doc_processor::{ ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc, }; pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch}; +pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits}; pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters}; pub use parquet_splits_update::ParquetSplitsUpdate; pub use parquet_uploader::ParquetUploader; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs index dd3fd5cb4b7..bd968138142 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs @@ -100,6 +100,9 @@ pub struct ParquetSplitBatch { pub publish_lock: PublishLock, /// Optional publish token. pub publish_token_opt: Option, + /// Split IDs being replaced by this batch (non-empty for merges). + /// Empty for the ingest path. + pub replaced_split_ids: Vec, } /// ParquetIndexer actor that accumulates RecordBatches and forwards them to ParquetPackager. diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_messages.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_messages.rs new file mode 100644 index 00000000000..e2af97055fe --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_messages.rs @@ -0,0 +1,91 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Message types for the Parquet merge pipeline. +//! +//! These messages flow through the actor chain: +//! +//! ```text +//! ParquetMergePlanner ──► MergeSchedulerService ──► ParquetMergeSplitDownloader +//! │ +//! ▼ (ParquetMergeScratch) +//! ParquetMergeExecutor +//! ``` + +use std::fmt; +use std::path::PathBuf; + +use quickwit_common::temp_dir::TempDirectory; +use quickwit_parquet_engine::merge::policy::ParquetMergeOperation; +use quickwit_parquet_engine::split::ParquetSplitMetadata; +use tantivy::TrackedObject; + +use crate::actors::MergePermit; + +/// Notification of newly created Parquet splits. +/// +/// Sent to `ParquetMergePlanner` from: +/// - The publisher (feedback loop after a merge completes) +/// - The indexing service (initial seeding of immature splits on start) +#[derive(Debug)] +pub struct ParquetNewSplits { + pub new_splits: Vec, +} + +/// A merge task dispatched by `MergeSchedulerService` to `ParquetMergeSplitDownloader`. +/// +/// Carries the merge operation (tracked by the planner's inventory) and a +/// concurrency permit from the global merge semaphore. +pub struct ParquetMergeTask { + pub merge_operation: TrackedObject, + pub merge_permit: MergePermit, +} + +impl fmt::Debug for ParquetMergeTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetMergeTask") + .field("merge_split_id", &self.merge_operation.merge_split_id) + .field("num_inputs", &self.merge_operation.splits.len()) + .finish() + } +} + +/// Downloaded Parquet files ready for merge execution. +/// +/// Sent from `ParquetMergeSplitDownloader` to `ParquetMergeExecutor` after +/// all input files have been downloaded to local storage. +pub struct ParquetMergeScratch { + /// The merge operation describing what to merge. + pub merge_operation: TrackedObject, + + /// Local paths to the downloaded Parquet files, one per input split, + /// in the same order as `merge_operation.splits`. + pub downloaded_parquet_files: Vec, + + /// Temp directory containing the downloaded files. Held to prevent cleanup + /// until the merge executor is done with the files. + pub scratch_directory: TempDirectory, + + /// Concurrency permit — held until the merge completes (including upload). + pub merge_permit: MergePermit, +} + +impl fmt::Debug for ParquetMergeScratch { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetMergeScratch") + .field("merge_split_id", &self.merge_operation.merge_split_id) + .field("num_files", &self.downloaded_parquet_files.len()) + .finish() + } +} diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs index 52529029ae3..a3ac5cbd409 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs @@ -236,6 +236,7 @@ impl Handler for ParquetPackager { checkpoint_delta, publish_lock, publish_token_opt, + replaced_split_ids: Vec::new(), }; ctx.send_message(&self.uploader_mailbox, split_batch) diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs index b1509659ab3..3f03750230b 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs @@ -221,6 +221,7 @@ impl Handler for ParquetUploader { let publish_lock = batch.publish_lock; let publish_token_opt = batch.publish_token_opt; let splits = batch.splits; + let replaced_split_ids = batch.replaced_split_ids; debug!( index_uid = %index_uid, num_splits = splits.len(), @@ -321,7 +322,7 @@ impl Handler for ParquetUploader { let update = ParquetSplitsUpdate { index_uid, new_splits: splits, - replaced_split_ids: Vec::new(), // No merging yet + replaced_split_ids, checkpoint_delta_opt: Some(checkpoint_delta), publish_lock, publish_token_opt, @@ -427,6 +428,7 @@ mod tests { checkpoint_delta, publish_lock: PublishLock::default(), publish_token_opt: None, + replaced_split_ids: Vec::new(), }; uploader_mailbox.send_message(batch).await.unwrap(); @@ -520,6 +522,7 @@ mod tests { checkpoint_delta, publish_lock: PublishLock::default(), publish_token_opt: None, + replaced_split_ids: Vec::new(), }; uploader_mailbox.send_message(batch).await.unwrap(); @@ -594,6 +597,7 @@ mod tests { checkpoint_delta, publish_lock: PublishLock::default(), publish_token_opt: None, + replaced_split_ids: Vec::new(), }; uploader_mailbox.send_message(batch).await.unwrap(); @@ -664,6 +668,7 @@ mod tests { checkpoint_delta, publish_lock: PublishLock::default(), publish_token_opt: None, + replaced_split_ids: Vec::new(), }; uploader_mailbox.send_message(batch).await.unwrap(); } diff --git a/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs new file mode 100644 index 00000000000..60f688adbe1 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs @@ -0,0 +1,547 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Metadata aggregation for Parquet merge operations. +//! +//! The merge engine produces [`MergeOutputFile`] with physical metadata (rows, +//! bytes, row_keys, zonemap_regexes). This module combines that with logical +//! metadata (metric_names, tags, time_range, window) from input splits to +//! produce complete [`ParquetSplitMetadata`] for the merged output. + +use std::collections::{HashMap, HashSet}; +use std::time::SystemTime; + +use anyhow::{Result, bail}; + +use super::MergeOutputFile; +use crate::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange}; + +/// Aggregates metadata from input splits and a merge output file to produce +/// complete [`ParquetSplitMetadata`] for the merged output. +/// +/// # Preconditions +/// +/// All input splits must share the same: +/// - `kind` (metrics or sketches) +/// - `index_uid` +/// - `partition_id` +/// - `sort_fields` +/// - `window` +/// +/// These are enforced by the compaction scope grouping (MP-3 invariant) and +/// validated here with explicit error returns. +/// +/// # Aggregation rules +/// +/// | Field | Rule | +/// |-------|------| +/// | `split_id` | Fresh `ParquetSplitId::generate()` | +/// | `time_range` | `min(inputs.start) .. max(inputs.end)` | +/// | `num_rows` | From `MergeOutputFile` (actual merged count) | +/// | `size_bytes` | From `MergeOutputFile` (actual file size) | +/// | `metric_names` | Union across all inputs | +/// | `low_cardinality_tags` | Merge value sets, then `finalize_tag_cardinality()` | +/// | `high_cardinality_tag_keys` | Union across all inputs | +/// | `num_merge_ops` | `max(inputs.num_merge_ops) + 1` | +/// | `row_keys_proto` | From `MergeOutputFile` | +/// | `zonemap_regexes` | From `MergeOutputFile` | +pub fn merge_parquet_split_metadata( + inputs: &[ParquetSplitMetadata], + output: &MergeOutputFile, +) -> Result { + if inputs.is_empty() { + bail!("merge_parquet_split_metadata requires at least one input split"); + } + + let first = &inputs[0]; + + // Validate invariant fields: all inputs must agree on these. + for (i, input) in inputs.iter().enumerate().skip(1) { + if input.kind != first.kind { + bail!( + "input {} has kind {:?}, expected {:?}", + i, + input.kind, + first.kind + ); + } + if input.index_uid != first.index_uid { + bail!( + "input {} has index_uid '{}', expected '{}'", + i, + input.index_uid, + first.index_uid + ); + } + if input.partition_id != first.partition_id { + bail!( + "input {} has partition_id {}, expected {}", + i, + input.partition_id, + first.partition_id + ); + } + if input.sort_fields != first.sort_fields { + bail!( + "input {} has sort_fields '{}', expected '{}'", + i, + input.sort_fields, + first.sort_fields + ); + } + if input.window != first.window { + bail!( + "input {} has window {:?}, expected {:?}", + i, + input.window, + first.window + ); + } + } + + // time_range: min(start) .. max(end) + let start_secs = inputs + .iter() + .map(|s| s.time_range.start_secs) + .min() + .expect("at least one input"); + let end_secs = inputs + .iter() + .map(|s| s.time_range.end_secs) + .max() + .expect("at least one input"); + let time_range = TimeRange::new(start_secs, end_secs); + + // metric_names: union + let metric_names: HashSet = inputs + .iter() + .flat_map(|s| s.metric_names.iter().cloned()) + .collect(); + + // Tags: merge low-cardinality value sets, then promote any that exceed threshold. + // A key that is high-cardinality in ANY input stays high-cardinality in the output. + let high_cardinality_tag_keys: HashSet = inputs + .iter() + .flat_map(|s| s.high_cardinality_tag_keys.iter().cloned()) + .collect(); + + let mut low_cardinality_tags: HashMap> = HashMap::new(); + for input in inputs { + for (key, values) in &input.low_cardinality_tags { + // Skip keys already known to be high-cardinality from another input. + if high_cardinality_tag_keys.contains(key) { + continue; + } + low_cardinality_tags + .entry(key.clone()) + .or_default() + .extend(values.iter().cloned()); + } + } + + // num_merge_ops: max(inputs) + 1 + let num_merge_ops = inputs + .iter() + .map(|s| s.num_merge_ops) + .max() + .expect("at least one input") + + 1; + + let split_id = ParquetSplitId::generate(first.kind); + let parquet_file = format!("{split_id}.parquet"); + + let mut metadata = ParquetSplitMetadata { + kind: first.kind, + partition_id: first.partition_id, + split_id, + index_uid: first.index_uid.clone(), + time_range, + num_rows: output.num_rows as u64, + size_bytes: output.size_bytes, + metric_names, + low_cardinality_tags, + high_cardinality_tag_keys, + created_at: SystemTime::now(), + parquet_file, + window: first.window.clone(), + sort_fields: first.sort_fields.clone(), + num_merge_ops, + row_keys_proto: output.row_keys_proto.clone(), + zonemap_regexes: output.zonemap_regexes.clone(), + }; + + // Finalize: merged tag sets may now exceed the cardinality threshold. + metadata.finalize_tag_cardinality(); + + Ok(metadata) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::path::PathBuf; + + use super::*; + use crate::split::{ParquetSplitId, ParquetSplitKind, ParquetSplitMetadata, TimeRange}; + + /// Helper to build a test split with the given properties. + fn make_test_split( + split_id: &str, + time_range: (u64, u64), + num_merge_ops: u32, + ) -> ParquetSplitMetadata { + ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new(split_id)) + .index_uid("test-index:00000000000000000000000001") + .partition_id(42) + .time_range(TimeRange::new(time_range.0, time_range.1)) + .num_rows(100) + .size_bytes(5000) + .sort_fields("metric_name|host|timestamp_secs/V2") + .window_start_secs(1000) + .window_duration_secs(3600) + .num_merge_ops(num_merge_ops) + .build() + } + + fn make_output(num_rows: usize, size_bytes: u64) -> MergeOutputFile { + MergeOutputFile { + path: PathBuf::from("/tmp/merged.parquet"), + num_rows, + size_bytes, + row_keys_proto: Some(vec![0x08, 0x01]), + zonemap_regexes: HashMap::from([("metric_name".to_string(), "cpu\\..*".to_string())]), + } + } + + #[test] + fn test_basic_two_input_merge() { + let inputs = vec![ + make_test_split("s0", (1000, 1500), 0), + make_test_split("s1", (1200, 2000), 0), + ]; + let output = make_output(200, 9000); + + let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + + assert_eq!(result.kind, ParquetSplitKind::Metrics); + assert_eq!(result.index_uid, "test-index:00000000000000000000000001"); + assert_eq!(result.partition_id, 42); + assert_eq!(result.time_range.start_secs, 1000); + assert_eq!(result.time_range.end_secs, 2000); + assert_eq!(result.num_rows, 200); + assert_eq!(result.size_bytes, 9000); + assert_eq!(result.num_merge_ops, 1); + assert_eq!(result.sort_fields, "metric_name|host|timestamp_secs/V2"); + assert_eq!(result.window, Some(1000..4600)); + assert_eq!(result.row_keys_proto, Some(vec![0x08, 0x01])); + assert_eq!( + result.zonemap_regexes.get("metric_name").unwrap(), + "cpu\\..*" + ); + assert!(result.parquet_file.ends_with(".parquet")); + } + + #[test] + fn test_metric_names_union() { + let mut s0 = make_test_split("s0", (1000, 2000), 0); + s0.metric_names.insert("cpu.usage".to_string()); + s0.metric_names.insert("mem.used".to_string()); + + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.metric_names.insert("cpu.usage".to_string()); + s1.metric_names.insert("disk.io".to_string()); + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + + assert_eq!(result.metric_names.len(), 3); + assert!(result.metric_names.contains("cpu.usage")); + assert!(result.metric_names.contains("mem.used")); + assert!(result.metric_names.contains("disk.io")); + } + + #[test] + fn test_low_cardinality_tags_merged() { + let mut s0 = make_test_split("s0", (1000, 2000), 0); + s0.low_cardinality_tags + .entry("service".to_string()) + .or_default() + .insert("web".to_string()); + s0.low_cardinality_tags + .entry("env".to_string()) + .or_default() + .insert("prod".to_string()); + + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.low_cardinality_tags + .entry("service".to_string()) + .or_default() + .insert("api".to_string()); + s1.low_cardinality_tags + .entry("region".to_string()) + .or_default() + .insert("us-east".to_string()); + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + + // service: {web, api} + let service_values = result.low_cardinality_tags.get("service").unwrap(); + assert_eq!(service_values.len(), 2); + assert!(service_values.contains("web")); + assert!(service_values.contains("api")); + + // env: {prod} + assert!( + result + .low_cardinality_tags + .get("env") + .unwrap() + .contains("prod") + ); + + // region: {us-east} + assert!( + result + .low_cardinality_tags + .get("region") + .unwrap() + .contains("us-east") + ); + } + + #[test] + fn test_high_cardinality_propagated_from_any_input() { + // Input A has "host" as high-cardinality. + // Input B has "host" as low-cardinality with values. + // Merged result: "host" is high-cardinality, low_cardinality_tags drops it. + let mut s0 = make_test_split("s0", (1000, 2000), 0); + s0.high_cardinality_tag_keys.insert("host".to_string()); + + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.low_cardinality_tags + .entry("host".to_string()) + .or_default() + .insert("host-1".to_string()); + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + + assert!(result.high_cardinality_tag_keys.contains("host")); + assert!(!result.low_cardinality_tags.contains_key("host")); + } + + #[test] + fn test_tag_cardinality_promotion_after_merge() { + // Two splits each with 600 unique "host" values (below 1000 threshold). + // After merge, combined set exceeds 1000 — promoted to high cardinality. + let mut s0 = make_test_split("s0", (1000, 2000), 0); + for i in 0..600 { + s0.low_cardinality_tags + .entry("host".to_string()) + .or_default() + .insert(format!("host-a-{i}")); + } + + let mut s1 = make_test_split("s1", (1000, 2000), 0); + for i in 0..600 { + s1.low_cardinality_tags + .entry("host".to_string()) + .or_default() + .insert(format!("host-b-{i}")); + } + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + + // 1200 unique values > 1000 threshold → promoted + assert!(result.high_cardinality_tag_keys.contains("host")); + assert!(!result.low_cardinality_tags.contains_key("host")); + } + + #[test] + fn test_time_range_min_max() { + let inputs = vec![ + make_test_split("s0", (500, 1500), 0), + make_test_split("s1", (1200, 1800), 0), + make_test_split("s2", (100, 2500), 0), + ]; + let output = make_output(300, 12000); + + let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + + assert_eq!(result.time_range.start_secs, 100); + assert_eq!(result.time_range.end_secs, 2500); + } + + #[test] + fn test_num_merge_ops_max_plus_one() { + let inputs = vec![ + make_test_split("s0", (1000, 2000), 2), + make_test_split("s1", (1000, 2000), 2), + make_test_split("s2", (1000, 2000), 2), + ]; + let output = make_output(300, 12000); + + let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + + assert_eq!(result.num_merge_ops, 3); // max(2,2,2) + 1 + } + + #[test] + fn test_output_metadata_from_merge_output_file() { + let inputs = vec![make_test_split("s0", (1000, 2000), 0)]; + let output = MergeOutputFile { + path: PathBuf::from("/tmp/merged.parquet"), + num_rows: 42, + size_bytes: 7777, + row_keys_proto: Some(vec![0xDE, 0xAD]), + zonemap_regexes: HashMap::from([ + ("col_a".to_string(), "pattern_a".to_string()), + ("col_b".to_string(), "pattern_b".to_string()), + ]), + }; + + let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + + assert_eq!(result.num_rows, 42); + assert_eq!(result.size_bytes, 7777); + assert_eq!(result.row_keys_proto, Some(vec![0xDE, 0xAD])); + assert_eq!(result.zonemap_regexes.len(), 2); + assert_eq!(result.zonemap_regexes.get("col_a").unwrap(), "pattern_a"); + } + + #[test] + fn test_empty_inputs_error() { + let output = make_output(0, 0); + let result = merge_parquet_split_metadata(&[], &output); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("at least one input") + ); + } + + #[test] + fn test_mismatched_kind_error() { + let s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.kind = ParquetSplitKind::Sketches; + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("kind")); + } + + #[test] + fn test_mismatched_index_uid_error() { + let s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.index_uid = "other-index:00000000000000000000000002".to_string(); + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("index_uid")); + } + + #[test] + fn test_mismatched_partition_id_error() { + let s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.partition_id = 99; + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("partition_id")); + } + + #[test] + fn test_mismatched_sort_fields_error() { + let s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.sort_fields = "different|schema/V2".to_string(); + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("sort_fields")); + } + + #[test] + fn test_mismatched_window_error() { + let s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.window = Some(2000..5600); + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("window")); + } + + #[test] + fn test_single_input_pass_through() { + let mut s0 = make_test_split("s0", (1000, 2000), 3); + s0.metric_names.insert("cpu.usage".to_string()); + + let output = make_output(100, 5000); + let result = merge_parquet_split_metadata(&[s0], &output).unwrap(); + + assert_eq!(result.num_merge_ops, 4); // 3 + 1 + assert!(result.metric_names.contains("cpu.usage")); + assert_eq!(result.num_rows, 100); + } + + #[test] + fn test_fresh_split_id_generated() { + let inputs = vec![ + make_test_split("s0", (1000, 2000), 0), + make_test_split("s1", (1000, 2000), 0), + ]; + let output = make_output(200, 9000); + + let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + + // Generated split ID should not match any input. + assert_ne!(result.split_id.as_str(), "s0"); + assert_ne!(result.split_id.as_str(), "s1"); + // Should have metrics prefix. + assert!(result.split_id.as_str().starts_with("metrics_")); + // parquet_file should match. + assert_eq!(result.parquet_file, format!("{}.parquet", result.split_id)); + } + + #[test] + fn test_none_row_keys_and_zonemaps_propagated() { + let inputs = vec![make_test_split("s0", (1000, 2000), 0)]; + let output = MergeOutputFile { + path: PathBuf::from("/tmp/merged.parquet"), + num_rows: 100, + size_bytes: 5000, + row_keys_proto: None, + zonemap_regexes: HashMap::new(), + }; + + let result = merge_parquet_split_metadata(&inputs, &output).unwrap(); + + assert!(result.row_keys_proto.is_none()); + assert!(result.zonemap_regexes.is_empty()); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index c167860fe5a..9bddefe6811 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -21,6 +21,7 @@ //! file has non-overlapping key ranges. mod merge_order; +pub mod metadata_aggregation; pub mod policy; mod schema; mod writer;