From 1d84e85841bb41603642068f3e56d585b10c41a5 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Thu, 7 May 2026 22:03:40 -0400 Subject: [PATCH 1/2] feat: streaming column-major single-RG merge engine (PR-6b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `merge::streaming::streaming_merge_sorted_parquet_files`, an async N-input → M-output merge that consumes inputs as `Box` (PR-5a's trait) and writes each output column-by-column via `StreamingParquetWriter` (PR-2). Each output is single row group; multi-RG output at metric_name boundaries lands in PR-6c. Compared to the existing whole-file engine, the win is on the output side: the standard `ArrowWriter` materialises a full row-group worth of column-chunk buffers before serialising, whereas this writer flushes one column chunk at a time, so output peak memory is bounded by the largest single column chunk plus bookkeeping (page index, bloom filters), not by the total row group. Inputs are drained one row group at a time via PR-6a's `StreamDecoder`, then concatenated per input. Per-RG decode memory is bounded; the per-input concat matches the existing engine's input shape for the merge planner. Truly per-RG streaming inputs (one input RG at a time across all inputs) lands when prefix=1 multi-RG inputs become the dominant compaction path — PR-6b is correct without it. Reuses the existing permutation, KV metadata, sorting columns, MC-3 sort-order check (`super::writer`), and union schema / output optimisation (`super::schema`) helpers — they're now `pub(super)` so both the existing whole-file engine and the new streaming engine can share them. PR-7 will fold the non-streaming path away. Tests (10, all passing): two-input simple merge, single-RG output contract, total row count preservation (MC-1), sort-schema mismatch rejected, window mismatch rejected, output has page-level statistics, KV metadata propagated (with `qh.num_merge_ops` incremented), all- empty inputs produce no output, one empty among non-empty handled, output drainable through `StreamDecoder` round-trip. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../quickwit-parquet-engine/src/merge/mod.rs | 12 +- .../src/merge/streaming.rs | 1034 +++++++++++++++++ .../src/merge/writer.rs | 13 +- 3 files changed, 1049 insertions(+), 10 deletions(-) create mode 100644 quickwit/quickwit-parquet-engine/src/merge/streaming.rs diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index 5e29998217d..3f248e2eb9b 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -24,6 +24,7 @@ mod merge_order; pub mod metadata_aggregation; pub mod policy; mod schema; +pub mod streaming; mod writer; #[cfg(test)] @@ -63,11 +64,11 @@ pub struct MergeConfig { /// Metadata extracted from input files' Parquet KV metadata. /// All inputs must agree on sort_fields, window_start, and window_duration. -struct InputMetadata { - sort_fields: String, - window_start_secs: Option, - window_duration_secs: u32, - num_merge_ops: u32, +pub(super) struct InputMetadata { + pub(super) sort_fields: String, + pub(super) window_start_secs: Option, + pub(super) window_duration_secs: u32, + pub(super) num_merge_ops: u32, } /// Result of a single output file from the merge. @@ -76,6 +77,7 @@ struct InputMetadata { /// logical metadata (metric names, tags, time range) extracted from the /// actual rows in this output file. When the merge produces multiple /// outputs, each has metadata reflecting only its own rows. +#[derive(Debug)] pub struct MergeOutputFile { /// Path to the output Parquet file. pub path: PathBuf, diff --git a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs new file mode 100644 index 00000000000..68839eb6b69 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs @@ -0,0 +1,1034 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Streaming column-major merge engine. +//! +//! Consumes inputs as [`ColumnPageStream`] and writes outputs +//! column-by-column via the crate-private streaming writer primitive. +//! Each output is **single-row-group** — multi-RG output at +//! metric_name boundaries lands in PR-6c. +//! +//! Compared to the existing whole-file engine in `super::writer`: +//! +//! - **Inputs**: drained one row group at a time via PR-6a's +//! [`StreamDecoder`], then concatenated per input. Per-RG decode +//! memory is bounded; concatenation into a per-input `RecordBatch` +//! matches the existing engine's input contract for the merge +//! planner. Truly per-RG streaming inputs lands when prefix=1 multi- +//! RG inputs become the dominant compaction path. +//! +//! - **Outputs**: written column-by-column. The standard arrow writer +//! materialises a full row group worth of column-chunk buffers before +//! serialising; the column-major writer flushes one column chunk at +//! a time, so output peak memory is bounded by the *largest single +//! column chunk* plus bookkeeping (page index, bloom filters), not by +//! the row group total. +//! +//! Reuses the existing permutation / KV metadata / sorting columns / +//! MC-3 sort-order helpers from `super::writer`, and the union-schema +//! and optimise-output helpers from `super::schema`. PR-7 will fold +//! the non-streaming path away; until then both engines coexist and +//! share these helpers via `pub(super)`. +//! +//! [`ColumnPageStream`]: crate::storage::ColumnPageStream +//! [`StreamDecoder`]: crate::storage::page_decoder::StreamDecoder + +use std::collections::{HashMap, HashSet}; +use std::ops::Range; +use std::path::Path; +use std::sync::Arc; + +use anyhow::{Context, Result, bail}; +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; +use parquet::file::metadata::ParquetMetaData; +use tracing::info; +use ulid::Ulid; + +use super::merge_order::{compute_merge_order, compute_output_boundaries}; +use super::schema::{align_inputs_to_union_schema, optimize_output_batch}; +use super::writer::{ + apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, + resolve_sort_field_names, verify_sort_order, +}; +use super::{InputMetadata, MergeConfig, MergeOutputFile}; +use crate::row_keys; +use crate::sort_fields::{equivalent_schemas_for_compaction, parse_sort_fields}; +use crate::sorted_series::SORTED_SERIES_COLUMN; +use crate::split::TAG_SERVICE; +use crate::storage::page_decoder::StreamDecoder; +use crate::storage::split_writer::{ + extract_metric_names, extract_service_names, extract_time_range, +}; +use crate::storage::streaming_writer::StreamingParquetWriter; +use crate::storage::{ + ColumnPageStream, PARQUET_META_NUM_MERGE_OPS, PARQUET_META_SORT_FIELDS, + PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, +}; +use crate::zonemap::{self, ZonemapOptions}; + +/// Streaming N-input → M-output merge. +/// +/// Same overall contract as [`super::merge_sorted_parquet_files`]: all +/// inputs must share the same sort schema and window, the merge key is +/// `(sorted_series ASC, timestamp_secs )`, and outputs are +/// split at `sorted_series` boundaries to keep ranges non-overlapping. +/// The difference is *how* inputs are read and outputs are written — +/// see the module docs. +/// +/// Each output file is **single row group**. Multi-row-group output +/// at metric_name boundaries lands in PR-6c. +pub async fn streaming_merge_sorted_parquet_files( + inputs: Vec>, + output_dir: &Path, + config: &MergeConfig, +) -> Result> { + if inputs.is_empty() { + bail!("merge requires at least one input"); + } + if config.num_outputs == 0 { + bail!("num_outputs must be at least 1"); + } + + // Step 0: extract and validate metadata from each input. Reads + // `qh.*` keys from `ColumnPageStream::metadata()` — no draining yet. + let input_meta = extract_and_validate_input_metadata(&inputs)?; + + info!( + num_inputs = inputs.len(), + num_outputs = config.num_outputs, + sort_fields = %input_meta.sort_fields, + "starting streaming sorted parquet merge" + ); + + // Step 1: drain each input ColumnPageStream into a per-input + // RecordBatch (concatenated across the input's row groups). Per-RG + // decode memory is bounded by `StreamDecoder::next_rg`'s contract; + // the concatenation is the existing engine's input shape. + let mut input_batches: Vec = Vec::with_capacity(inputs.len()); + for stream in inputs.into_iter() { + let batch = drain_stream_to_record_batch(stream).await?; + if batch.num_columns() > 0 && batch.schema().index_of(SORTED_SERIES_COLUMN).is_err() { + bail!( + "input is missing the '{}' column required for merge", + SORTED_SERIES_COLUMN, + ); + } + input_batches.push(batch); + } + + let total_rows: usize = input_batches.iter().map(|b| b.num_rows()).sum(); + if total_rows == 0 { + info!("all inputs empty, producing no output"); + return Ok(Vec::new()); + } + + // Step 2: align inputs to a common union schema. Missing columns + // are filled with nulls; string-like types are normalised to Utf8. + let (union_schema, aligned_inputs) = + align_inputs_to_union_schema(&input_batches, &input_meta.sort_fields)?; + + // MC-4: verify union schema covers every input column. + check_mc4_union_schema(&input_batches, &union_schema); + + // Step 3: compute merge order across all inputs. + let merge_order = compute_merge_order(&aligned_inputs, &input_meta.sort_fields)?; + + // Step 4: split the merge order into output boundaries at + // `sorted_series` transitions. + let boundaries = compute_output_boundaries(&merge_order, &aligned_inputs, config.num_outputs)?; + + info!( + total_rows, + num_outputs = boundaries.len(), + "streaming merge order computed" + ); + + // Step 5: write each output file column-major. + let outputs = write_streaming_outputs( + &aligned_inputs, + &union_schema, + &merge_order, + &boundaries, + output_dir, + config, + &input_meta, + )?; + + // MC-1: total row count preserved. + let output_total: usize = outputs.iter().map(|o| o.num_rows).sum(); + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::MC1, + output_total == total_rows, + ": streaming merge input rows={}, output rows={}", + total_rows, + output_total, + ); + + Ok(outputs) +} + +/// Drain a `ColumnPageStream` into a single concatenated `RecordBatch`. +/// +/// Uses [`StreamDecoder`] to decode one row group at a time; each call +/// drops the previous RG's reconstructed bytes before starting the +/// next, so peak per-call memory is bounded by one input row group. +/// The concatenation step here does materialise the whole input — +/// per-RG streaming through the merge driver lands in a follow-up +/// once prefix=1 multi-RG inputs become the dominant case. +async fn drain_stream_to_record_batch(stream: Box) -> Result { + let mut stream = stream; + let metadata: Arc = Arc::clone(stream.metadata()); + + let mut batches: Vec = Vec::new(); + { + let mut decoder = StreamDecoder::new(&mut *stream); + while let Some(batch) = decoder + .next_rg() + .await + .context("draining input row group via StreamDecoder")? + { + batches.push(batch); + } + } + + if batches.is_empty() { + // Empty file: synthesise an empty `RecordBatch` with the file's + // arrow schema so downstream merge logic sees a uniform shape. + let arrow_meta = ArrowReaderMetadata::try_new(metadata, ArrowReaderOptions::default()) + .context("loading arrow schema from input metadata for empty input")?; + return Ok(RecordBatch::new_empty(arrow_meta.schema().clone())); + } + if batches.len() == 1 { + return Ok(batches.into_iter().next().expect("len == 1")); + } + let schema = batches[0].schema(); + arrow::compute::concat_batches(&schema, &batches) + .context("concatenating drained input row groups") +} + +/// Extract sort schema, window, and merge-ops metadata from each input +/// stream and validate consistency across inputs. Mirrors +/// [`super::extract_and_validate_input_metadata`] but reads from +/// [`ColumnPageStream::metadata`] instead of opening files. +fn extract_and_validate_input_metadata( + inputs: &[Box], +) -> Result { + let mut consensus_sort_fields: Option = None; + let mut consensus_window_start: Option> = None; + let mut consensus_window_duration: Option = None; + let mut max_merge_ops: u32 = 0; + + for (idx, stream) in inputs.iter().enumerate() { + let metadata = stream.metadata(); + let kv_metadata = metadata.file_metadata().key_value_metadata(); + + let find_kv = |key: &str| -> Option { + kv_metadata.and_then(|kvs| { + kvs.iter() + .find(|kv| kv.key == key) + .and_then(|kv| kv.value.clone()) + }) + }; + + // Sort fields: required, must be consistent across inputs. + let file_sort_fields = match find_kv(PARQUET_META_SORT_FIELDS) { + Some(s) => s, + None => bail!( + "input {idx} is missing {} metadata", + PARQUET_META_SORT_FIELDS, + ), + }; + + match &consensus_sort_fields { + Some(expected) => { + let expected_schema = parse_sort_fields(expected)?; + let file_schema = parse_sort_fields(&file_sort_fields).with_context(|| { + format!("parsing sort schema from input {idx}: '{file_sort_fields}'") + })?; + if !equivalent_schemas_for_compaction(&expected_schema, &file_schema) { + bail!( + "sort schema mismatch in input {idx}: expected '{expected}', found \ + '{file_sort_fields}'", + ); + } + } + None => { + parse_sort_fields(&file_sort_fields).with_context(|| { + format!("parsing sort schema from input {idx}: '{file_sort_fields}'") + })?; + consensus_sort_fields = Some(file_sort_fields.clone()); + } + } + + let file_window_start = find_kv(PARQUET_META_WINDOW_START) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing window_start from input {idx}"))?; + + match &consensus_window_start { + Some(expected) => { + if file_window_start != *expected { + bail!( + "window_start mismatch in input {idx}: expected {:?}, found {:?}", + expected, + file_window_start, + ); + } + } + None => { + consensus_window_start = Some(file_window_start); + } + } + + let file_window_duration = find_kv(PARQUET_META_WINDOW_DURATION) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing window_duration from input {idx}"))? + .unwrap_or(0); + + match &consensus_window_duration { + Some(expected) => { + if file_window_duration != *expected { + bail!( + "window_duration_secs mismatch in input {idx}: expected {expected}, found \ + {file_window_duration}", + ); + } + } + None => { + consensus_window_duration = Some(file_window_duration); + } + } + + let file_merge_ops = find_kv(PARQUET_META_NUM_MERGE_OPS) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing num_merge_ops from input {idx}"))? + .unwrap_or(0); + + if file_merge_ops > max_merge_ops { + max_merge_ops = file_merge_ops; + } + } + + let sort_fields = match consensus_sort_fields { + Some(s) => s, + None => bail!("at least one input is required"), + }; + let window_start_secs = consensus_window_start.unwrap_or(None); + let window_duration_secs = consensus_window_duration.unwrap_or(0); + + Ok(InputMetadata { + sort_fields, + window_start_secs, + window_duration_secs, + num_merge_ops: max_merge_ops + 1, + }) +} + +/// MC-4: union schema must contain every input column. +fn check_mc4_union_schema(inputs: &[RecordBatch], union_schema: &SchemaRef) { + let union_field_names: HashSet<&str> = union_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + for (idx, input) in inputs.iter().enumerate() { + for field in input.schema().fields() { + quickwit_dst::check_invariant!( + quickwit_dst::invariants::InvariantId::MC4, + union_field_names.contains(field.name().as_str()), + ": streaming merge input {} column '{}' missing from union schema", + idx, + field.name(), + ); + } + } +} + +/// Apply merge permutation per output and write each output as a +/// **single-row-group** parquet file via [`StreamingParquetWriter`]. +fn write_streaming_outputs( + aligned_inputs: &[RecordBatch], + union_schema: &SchemaRef, + merge_order: &[super::merge_order::MergeRun], + boundaries: &[Range], + output_dir: &Path, + config: &MergeConfig, + input_meta: &InputMetadata, +) -> Result> { + let mut outputs = Vec::with_capacity(boundaries.len()); + + for boundary in boundaries { + let runs = &merge_order[boundary.clone()]; + + let sorted_batch = apply_merge_permutation(aligned_inputs, union_schema, runs)?; + if sorted_batch.num_rows() == 0 { + continue; + } + + // MC-3 sort order check, same as the non-streaming engine. + verify_sort_order(&sorted_batch, &input_meta.sort_fields); + + // Per-output schema optimisation: drop all-null columns, + // dictionary-encode strings whose cardinality is low. This + // changes the schema before we write, so we must compute + // metadata against the optimised batch. + let sorted_batch = optimize_output_batch(&sorted_batch); + + let output = write_single_rg_output(&sorted_batch, output_dir, config, input_meta)?; + outputs.push(output); + } + + Ok(outputs) +} + +/// Write one output file: builds row keys + zonemap + KV metadata, +/// then column-major-streams the batch via [`StreamingParquetWriter`]. +fn write_single_rg_output( + sorted_batch: &RecordBatch, + output_dir: &Path, + config: &MergeConfig, + input_meta: &InputMetadata, +) -> Result { + let row_keys_proto = row_keys::extract_row_keys(&input_meta.sort_fields, sorted_batch) + .context("extracting row keys from streaming merge output")? + .map(|rk| row_keys::encode_row_keys_proto(&rk)); + + let zonemap_opts = ZonemapOptions::default(); + let zonemap_regexes = + zonemap::extract_zonemap_regexes(&input_meta.sort_fields, sorted_batch, &zonemap_opts) + .context("extracting zonemap regexes from streaming merge output")?; + + let kv_entries = build_merge_kv_metadata(input_meta, &row_keys_proto, &zonemap_regexes); + let sorting_cols = build_sorting_columns(sorted_batch, &input_meta.sort_fields)?; + let sort_field_names = resolve_sort_field_names(&input_meta.sort_fields)?; + + let props = config.writer_config.to_writer_properties_with_metadata( + &sorted_batch.schema(), + sorting_cols, + Some(kv_entries), + &sort_field_names, + ); + + let metric_names = extract_metric_names(sorted_batch) + .context("extracting metric names from streaming merge output")?; + let time_range = extract_time_range(sorted_batch) + .context("extracting time range from streaming merge output")?; + let service_names = extract_service_names(sorted_batch) + .context("extracting service names from streaming merge output")?; + let mut low_cardinality_tags: HashMap> = HashMap::new(); + if !service_names.is_empty() { + low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names); + } + + let output_filename = format!("merge_output_{}.parquet", Ulid::new()); + let output_path = output_dir.join(&output_filename); + + let size_bytes = column_major_write_single_rg(sorted_batch, &output_path, props)?; + + Ok(MergeOutputFile { + path: output_path, + num_rows: sorted_batch.num_rows(), + size_bytes, + row_keys_proto, + zonemap_regexes, + metric_names, + time_range, + low_cardinality_tags, + }) +} + +/// Column-major write of one row group via [`StreamingParquetWriter`]. +fn column_major_write_single_rg( + sorted_batch: &RecordBatch, + output_path: &Path, + props: parquet::file::properties::WriterProperties, +) -> Result { + let mut file = std::fs::File::create(output_path) + .with_context(|| format!("creating output file: {}", output_path.display()))?; + + { + let mut writer = + StreamingParquetWriter::try_new(&mut file, sorted_batch.schema(), props) + .with_context(|| format!("opening streaming writer: {}", output_path.display()))?; + + let mut row_group = writer + .start_row_group() + .with_context(|| format!("starting row group: {}", output_path.display()))?; + for col_idx in 0..sorted_batch.num_columns() { + row_group + .write_next_column(sorted_batch.column(col_idx)) + .with_context(|| { + format!( + "writing column {col_idx} ({}): {}", + sorted_batch.schema().field(col_idx).name(), + output_path.display(), + ) + })?; + } + row_group + .finish() + .with_context(|| format!("finishing row group: {}", output_path.display()))?; + + writer + .close() + .with_context(|| format!("closing streaming writer: {}", output_path.display()))?; + } + + let size = std::fs::metadata(output_path) + .with_context(|| format!("reading output file size: {}", output_path.display()))? + .len(); + Ok(size) +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::sync::Arc; + + use arrow::array::{ + ArrayRef, BinaryArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt8Array, + UInt64Array, + }; + use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema}; + use bytes::Bytes; + use parquet::arrow::ArrowWriter; + use parquet::file::metadata::KeyValue; + use parquet::file::properties::WriterProperties; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use tempfile::TempDir; + use tokio::io::AsyncRead; + + use super::*; + use crate::storage::page_decoder::StreamDecoder; + use crate::storage::streaming_reader::{RemoteByteSource, StreamingParquetReader}; + use crate::storage::{Compression, ParquetWriterConfig}; + + // -------- Fixtures -------- + + /// Build a sorted metrics RecordBatch with `num_rows` rows and + /// optional `start_series_idx` so multiple inputs interleave. + /// `metric_name` is the only column with non-trivial sort behaviour + /// (alternates between two values), `service` carries nulls every + /// 5th row to exercise the null path. + fn make_sorted_batch(num_rows: usize, start_series_idx: u64) -> RecordBatch { + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("metric_name", dict_type.clone(), false), + Field::new("metric_type", DataType::UInt8, false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("value", DataType::Float64, false), + Field::new("timeseries_id", DataType::Int64, false), + Field::new("service", dict_type, true), + Field::new("sorted_series", DataType::Binary, false), + ])); + + // Two distinct metric names, alternating. sorted_series is a + // monotonically-increasing 8-byte big-endian series id so the + // batch is sorted (sorted_series ASC). + let metric_keys: Vec = (0..num_rows as i32).map(|_| 0).collect(); + let metric_values = StringArray::from(vec!["cpu.usage", "memory.used"]); + let metric_name: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(metric_keys), + Arc::new(metric_values), + ) + .expect("test dict array"), + ); + let metric_type: ArrayRef = Arc::new(UInt8Array::from(vec![0u8; num_rows])); + // Timestamps DESCENDING within a series (matches "timestamp_secs desc" + // sort schema below). + let timestamps: Vec = (0..num_rows as u64) + .map(|i| 1_700_000_000 + (num_rows as u64 - i)) + .collect(); + let timestamp_secs: ArrayRef = Arc::new(UInt64Array::from(timestamps)); + let values: Vec = (0..num_rows).map(|i| i as f64).collect(); + let value: ArrayRef = Arc::new(Float64Array::from(values)); + let tsids: Vec = (0..num_rows as i64).map(|i| 1000 + i).collect(); + let timeseries_id: ArrayRef = Arc::new(Int64Array::from(tsids)); + let svc_keys: Vec> = (0..num_rows as i32) + .map(|i| if i % 5 == 0 { None } else { Some(i % 3) }) + .collect(); + let svc_values = StringArray::from(vec!["api", "db", "cache"]); + let service: ArrayRef = Arc::new( + DictionaryArray::::try_new( + arrow::array::Int32Array::from(svc_keys), + Arc::new(svc_values), + ) + .expect("test dict array"), + ); + // sorted_series: 8-byte big-endian series id, ascending. + let mut series_bytes: Vec> = Vec::with_capacity(num_rows); + for i in 0..num_rows as u64 { + let id = start_series_idx + i; + series_bytes.push(id.to_be_bytes().to_vec()); + } + let series_refs: Vec<&[u8]> = series_bytes.iter().map(|v| v.as_slice()).collect(); + let sorted_series: ArrayRef = Arc::new(BinaryArray::from(series_refs)); + + RecordBatch::try_new( + schema, + vec![ + metric_name, + metric_type, + timestamp_secs, + value, + timeseries_id, + service, + sorted_series, + ], + ) + .expect("test batch") + } + + /// Write a fixture parquet file with the standard `qh.*` KVs that + /// the merge engine validates. + fn write_input_parquet(batches: &[RecordBatch], extra_kvs: &[(&str, &str)]) -> Bytes { + let schema = batches[0].schema(); + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }; + let sort_fields = "metric_name|-timestamp_secs/V2"; + let sort_field_names = vec!["metric_name".to_string(), "timestamp_secs".to_string()]; + let mut kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + sort_fields.to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + for (k, v) in extra_kvs { + kvs.push(KeyValue::new(k.to_string(), v.to_string())); + } + let sorting_cols = vec![ + parquet::file::metadata::SortingColumn { + column_idx: schema.index_of("metric_name").expect("test schema") as i32, + descending: false, + nulls_first: false, + }, + parquet::file::metadata::SortingColumn { + column_idx: schema.index_of("timestamp_secs").expect("test schema") as i32, + descending: true, + nulls_first: false, + }, + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &schema, + sorting_cols, + Some(kvs), + &sort_field_names, + ); + let mut buf: Vec = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props)).expect("arrow writer"); + for b in batches { + writer.write(b).expect("test write"); + } + writer.close().expect("test close"); + Bytes::from(buf) + } + + // -------- In-memory byte source -------- + + struct InMemorySource { + bytes: Bytes, + } + + #[async_trait::async_trait] + impl RemoteByteSource for InMemorySource { + async fn file_size(&self, _path: &std::path::Path) -> std::io::Result { + Ok(self.bytes.len() as u64) + } + async fn get_slice( + &self, + _path: &std::path::Path, + range: std::ops::Range, + ) -> std::io::Result { + Ok(self.bytes.slice(range.start as usize..range.end as usize)) + } + async fn get_slice_stream( + &self, + _path: &std::path::Path, + range: std::ops::Range, + ) -> std::io::Result> { + let slice = self.bytes.slice(range.start as usize..range.end as usize); + Ok(Box::new(std::io::Cursor::new(slice.to_vec()))) + } + } + + async fn open_stream(bytes: Bytes) -> Box { + let source = Arc::new(InMemorySource { bytes }); + let reader = StreamingParquetReader::try_open(source, PathBuf::from("test.parquet")) + .await + .expect("open reader"); + Box::new(reader) + } + + /// Read an output parquet file back into a single concatenated + /// `RecordBatch` via the standard arrow reader. + fn read_output_to_record_batch(path: &Path) -> RecordBatch { + let bytes = std::fs::read(path).expect("read output"); + let builder = parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new( + Bytes::from(bytes), + ) + .expect("read output builder"); + let schema = builder.schema().clone(); + let reader = builder.build().expect("read output build"); + let batches: Vec = reader.collect::, _>>().expect("read output"); + if batches.is_empty() { + RecordBatch::new_empty(schema) + } else { + arrow::compute::concat_batches(&schema, &batches).expect("concat") + } + } + + fn merge_config(num_outputs: usize) -> MergeConfig { + MergeConfig { + num_outputs, + writer_config: ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }, + } + } + + // -------- Tests -------- + + /// Two inputs, single output, total row count and sort order + /// preserved. + #[tokio::test] + async fn test_two_inputs_simple_merge() { + let batch_a = make_sorted_batch(50, 0); + let batch_b = make_sorted_batch(50, 50); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 100); + + let merged = read_output_to_record_batch(&outputs[0].path); + assert_eq!(merged.num_rows(), 100); + // sorted_series must be ascending across the merged output. + let ss_array = merged.column(merged.schema().index_of("sorted_series").expect("col")); + let ss = ss_array + .as_any() + .downcast_ref::() + .expect("binary"); + for i in 0..ss_array.len().saturating_sub(1) { + assert!( + ss.value(i) <= ss.value(i + 1), + "row {i}: sorted_series not ascending", + ); + } + } + + /// Single output is always single-row-group (PR-6b's contract). + #[tokio::test] + async fn test_output_is_single_row_group() { + let batch_a = make_sorted_batch(200, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let inputs: Vec> = vec![open_stream(bytes_a).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(bytes)).expect("ser reader"); + assert_eq!( + reader.metadata().num_row_groups(), + 1, + "PR-6b output must be single row group", + ); + } + + /// Total row count is preserved (MC-1). + #[tokio::test] + async fn test_total_rows_preserved() { + let batch_a = make_sorted_batch(75, 0); + let batch_b = make_sorted_batch(50, 100); + let batch_c = make_sorted_batch(25, 200); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + let bytes_c = write_input_parquet(std::slice::from_ref(&batch_c), &[]); + + let inputs: Vec> = vec![ + open_stream(bytes_a).await, + open_stream(bytes_b).await, + open_stream(bytes_c).await, + ]; + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(2)) + .await + .expect("merge"); + + let total: usize = outputs.iter().map(|o| o.num_rows).sum(); + assert_eq!(total, 150); + } + + /// Sort schema mismatch across inputs is rejected. + #[tokio::test] + async fn test_sort_schema_mismatch_rejected() { + let batch_a = make_sorted_batch(20, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + + // Override the sort_fields KV to a parseable but incompatible + // schema (different leading sort column). Both schemas include + // the required `timestamp_secs` column. + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }; + let kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "service|-timestamp_secs/V2".to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + "1700000000".to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &batch_a.schema(), + Vec::new(), + Some(kvs), + &["service".to_string(), "timestamp_secs".to_string()], + ); + let mut buf: Vec = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buf, batch_a.schema(), Some(props)).expect("arrow writer"); + writer.write(&batch_a).expect("write"); + writer.close().expect("close"); + let bytes_b = Bytes::from(buf); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let err = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect_err("must reject mismatched sort schema"); + let s = err.to_string(); + assert!( + s.contains("sort schema mismatch"), + "expected 'sort schema mismatch', got: {s}", + ); + } + + /// Window mismatch across inputs is rejected. + #[tokio::test] + async fn test_window_start_mismatch_rejected() { + let batch_a = make_sorted_batch(20, 0); + + // `write_input_parquet` always sets WINDOW_START; build the + // KV set manually so the two inputs declare different windows. + let cfg = ParquetWriterConfig { + compression: Compression::Snappy, + ..ParquetWriterConfig::default() + }; + let make = |window_start: &str| { + let kvs = vec![ + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "metric_name|-timestamp_secs/V2".to_string(), + ), + KeyValue::new( + PARQUET_META_WINDOW_START.to_string(), + window_start.to_string(), + ), + KeyValue::new(PARQUET_META_WINDOW_DURATION.to_string(), "60".to_string()), + KeyValue::new(PARQUET_META_NUM_MERGE_OPS.to_string(), "0".to_string()), + ]; + let props: WriterProperties = cfg.to_writer_properties_with_metadata( + &batch_a.schema(), + Vec::new(), + Some(kvs), + &["metric_name".to_string(), "timestamp_secs".to_string()], + ); + let mut buf: Vec = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, batch_a.schema(), Some(props)) + .expect("arrow writer"); + writer.write(&batch_a).expect("write"); + writer.close().expect("close"); + Bytes::from(buf) + }; + let bytes_a = make("1700000000"); + let bytes_b = make("1700000060"); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let err = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect_err("must reject mismatched window_start"); + let s = err.to_string(); + assert!( + s.contains("window_start mismatch"), + "expected 'window_start mismatch', got: {s}", + ); + } + + /// Output has page-level statistics (column index + offset index) + /// that the streaming writer is configured to produce. This is the + /// premise PR-1 ships and the merge engine inherits. + #[tokio::test] + async fn test_output_has_page_index_metadata() { + let batch_a = make_sorted_batch(2048, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let inputs: Vec> = vec![open_stream(bytes_a).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + let bytes = std::fs::read(&outputs[0].path).expect("read"); + + // Use the standard arrow reader builder configured to load the + // page index. If the column index isn't present the policy + // setting is moot — we can still read the file. To assert + // the index *exists* we use ParquetMetaDataReader directly. + let meta = parquet::file::metadata::ParquetMetaDataReader::new() + .with_column_index_policy(parquet::file::metadata::PageIndexPolicy::Required) + .with_offset_index_policy(parquet::file::metadata::PageIndexPolicy::Required) + .parse_and_finish(&Bytes::from(bytes)) + .expect("parse footer with page index"); + assert!( + meta.column_index().is_some(), + "output must carry column index for query pruning", + ); + assert!( + meta.offset_index().is_some(), + "output must carry offset index for query pruning", + ); + } + + /// `qh.*` KV metadata is propagated to the output. The merge + /// engine increments num_merge_ops; window_start, window_duration + /// and sort_fields carry through unchanged. + #[tokio::test] + async fn test_kv_metadata_propagated_to_output() { + let batch_a = make_sorted_batch(40, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let inputs: Vec> = vec![open_stream(bytes_a).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let reader = SerializedFileReader::new(Bytes::from(bytes)).expect("ser reader"); + let kvs = reader + .metadata() + .file_metadata() + .key_value_metadata() + .cloned() + .unwrap_or_default(); + let find = |k: &str| -> Option { + kvs.iter() + .find(|kv| kv.key == k) + .and_then(|kv| kv.value.clone()) + }; + assert_eq!( + find(PARQUET_META_SORT_FIELDS).as_deref(), + Some("metric_name|-timestamp_secs/V2"), + ); + assert_eq!( + find(PARQUET_META_WINDOW_START).as_deref(), + Some("1700000000"), + ); + assert_eq!(find(PARQUET_META_WINDOW_DURATION).as_deref(), Some("60")); + assert_eq!( + find(PARQUET_META_NUM_MERGE_OPS).as_deref(), + Some("1"), + "num_merge_ops must increment by 1 over input's max", + ); + } + + /// All-empty inputs produce no output. + #[tokio::test] + async fn test_all_empty_inputs_no_output() { + let empty = make_sorted_batch(0, 0); + let bytes = write_input_parquet(std::slice::from_ref(&empty), &[]); + let inputs: Vec> = vec![open_stream(bytes).await]; + + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert!(outputs.is_empty()); + } + + /// One empty input among several non-empty inputs is handled. + #[tokio::test] + async fn test_one_empty_input_among_nonempty() { + let empty = make_sorted_batch(0, 0); + let batch = make_sorted_batch(30, 0); + let bytes_a = write_input_parquet(std::slice::from_ref(&empty), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 30); + } + + /// Each input drained through `StreamDecoder` yields a batch the + /// merge engine can consume. Sanity: merge → output, drain output + /// through the same `StreamDecoder` and confirm row count round- + /// trips. + #[tokio::test] + async fn test_output_drainable_by_stream_decoder() { + let batch_a = make_sorted_batch(40, 0); + let batch_b = make_sorted_batch(40, 40); + let bytes_a = write_input_parquet(std::slice::from_ref(&batch_a), &[]); + let bytes_b = write_input_parquet(std::slice::from_ref(&batch_b), &[]); + + let inputs: Vec> = + vec![open_stream(bytes_a).await, open_stream(bytes_b).await]; + let tmp = TempDir::new().expect("tmpdir"); + let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(1)) + .await + .expect("merge"); + + let bytes = std::fs::read(&outputs[0].path).expect("read"); + let mut output_stream = open_stream(Bytes::from(bytes)).await; + let mut decoder = StreamDecoder::new(&mut *output_stream); + let first = decoder.next_rg().await.expect("decode").expect("rg"); + assert!(decoder.next_rg().await.expect("decode").is_none()); + assert_eq!(first.num_rows(), 80); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/writer.rs b/quickwit/quickwit-parquet-engine/src/merge/writer.rs index 44cb85d37ec..5f716225e75 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/writer.rs @@ -152,7 +152,7 @@ pub fn write_merge_outputs( /// /// Takes the relevant row ranges from each input according to the merge runs, /// concatenates into a single batch, and applies the permutation via `take`. -fn apply_merge_permutation( +pub(super) fn apply_merge_permutation( inputs: &[RecordBatch], union_schema: &SchemaRef, runs: &[MergeRun], @@ -205,7 +205,7 @@ fn apply_merge_permutation( } /// Build Parquet KV metadata entries for a merge output file. -fn build_merge_kv_metadata( +pub(super) fn build_merge_kv_metadata( input_meta: &InputMetadata, row_keys_proto: &Option>, zonemap_regexes: &std::collections::HashMap, @@ -267,7 +267,10 @@ fn build_merge_kv_metadata( } /// Build `SortingColumn` entries for Parquet file metadata. -fn build_sorting_columns(batch: &RecordBatch, sort_fields_str: &str) -> Result> { +pub(super) fn build_sorting_columns( + batch: &RecordBatch, + sort_fields_str: &str, +) -> Result> { let sort_schema = parse_sort_fields(sort_fields_str)?; let schema = batch.schema(); @@ -290,7 +293,7 @@ fn build_sorting_columns(batch: &RecordBatch, sort_fields_str: &str) -> Result Result> { +pub(super) fn resolve_sort_field_names(sort_fields_str: &str) -> Result> { let sort_schema = parse_sort_fields(sort_fields_str)?; Ok(sort_schema .column @@ -304,7 +307,7 @@ fn resolve_sort_field_names(sort_fields_str: &str) -> Result> { /// /// Checks that sorted_series values are non-decreasing, and within equal /// sorted_series values, timestamp_secs respects the schema's sort direction. -fn verify_sort_order(batch: &RecordBatch, sort_fields_str: &str) { +pub(super) fn verify_sort_order(batch: &RecordBatch, sort_fields_str: &str) { if batch.num_rows() <= 1 { return; } From add52f061fecb3b5cc5e5ad17501fa65cd50c45d Mon Sep 17 00:00:00 2001 From: George Talbot Date: Fri, 8 May 2026 09:33:10 -0400 Subject: [PATCH 2/2] style: nightly fmt fixup for streaming.rs doc comments Newer nightly rustfmt's `wrap_comments` reflows the module-level doc comments tighter than my local nightly at original push time. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/merge/streaming.rs | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs index 68839eb6b69..03d35792b2a 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/streaming.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/streaming.rs @@ -21,19 +21,15 @@ //! //! Compared to the existing whole-file engine in `super::writer`: //! -//! - **Inputs**: drained one row group at a time via PR-6a's -//! [`StreamDecoder`], then concatenated per input. Per-RG decode -//! memory is bounded; concatenation into a per-input `RecordBatch` -//! matches the existing engine's input contract for the merge -//! planner. Truly per-RG streaming inputs lands when prefix=1 multi- -//! RG inputs become the dominant compaction path. +//! - **Inputs**: drained one row group at a time via PR-6a's [`StreamDecoder`], then concatenated +//! per input. Per-RG decode memory is bounded; concatenation into a per-input `RecordBatch` +//! matches the existing engine's input contract for the merge planner. Truly per-RG streaming +//! inputs lands when prefix=1 multi- RG inputs become the dominant compaction path. //! -//! - **Outputs**: written column-by-column. The standard arrow writer -//! materialises a full row group worth of column-chunk buffers before -//! serialising; the column-major writer flushes one column chunk at -//! a time, so output peak memory is bounded by the *largest single -//! column chunk* plus bookkeeping (page index, bloom filters), not by -//! the row group total. +//! - **Outputs**: written column-by-column. The standard arrow writer materialises a full row group +//! worth of column-chunk buffers before serialising; the column-major writer flushes one column +//! chunk at a time, so output peak memory is bounded by the *largest single column chunk* plus +//! bookkeeping (page index, bloom filters), not by the row group total. //! //! Reuses the existing permutation / KV metadata / sorting columns / //! MC-3 sort-order helpers from `super::writer`, and the union-schema