From 34a4ca0154b1b0b9d9ba1f9d4cfb8282cb1de9a5 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Mon, 4 May 2026 14:36:10 -0400 Subject: [PATCH 1/3] feat: enable page-level Parquet stats + add rg_partition_prefix_len marker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for the streaming column-major merge engine workstream. Switches the writer's default from EnabledStatistics::Chunk to EnabledStatistics::Page so every newly-written file carries a Column Index and Offset Index in its footer. Without this, single-RG files produced by future PRs would have one min/max per file — useless for selective queries. The default is exposed as a knob (`ParquetWriterConfig::with_page_statistics`) so callers can opt out when the footer overhead isn't worth it. Adds a numeric marker `qh.rg_partition_prefix_len` in the file's KV metadata and a matching `rg_partition_prefix_len: u32` field on `ParquetSplitMetadata`. The marker records how many leading sort schema columns RG boundaries align with: 0 = no claim (legacy default), N = aligned with the first N sort columns. Single-RG files vacuously satisfy any prefix; future writers will set N = sort_schema.len(). Compaction scope now includes `rg_partition_prefix_len`. Splits with different prefix values land in different buckets; the merge engine validates input files agree on prefix and rejects mismatches at both the metastore-struct layer and the on-disk KV layer. Until the streaming engine lands, the merge writer demotes the output's prefix to 0 because it cannot enforce alignment. New developer tooling: - `quickwit_parquet_engine::storage::inspect_parquet_page_stats` library function returning a structured per-RG / per-column / per-page report, plus `verify_partition_prefix` for the strong-form alignment check. - `inspect_parquet` binary in the parquet-engine crate with `--json`, `--all-pages`, and `--verify-prefix` flags. Footer-size delta on a representative shape (100K rows × 6 cols): +19.5% (672 KB → 804 KB). The page index scales with column count, not data volume, so production-sized 50 MB splits show < 0.3% overhead. Test count: 367 → 382 (15 new). Clippy/doc/license/log/machete clean. --- .../src/bin/inspect_parquet.rs | 264 +++++ .../src/merge/metadata_aggregation.rs | 48 + .../quickwit-parquet-engine/src/merge/mod.rs | 42 +- .../merge/policy/const_write_amplification.rs | 2 + .../src/merge/policy/mod.rs | 1 + .../src/merge/policy/scope.rs | 85 +- .../src/merge/tests.rs | 116 +++ .../src/split/metadata.rs | 93 ++ .../src/storage/config.rs | 137 ++- .../src/storage/inspect.rs | 911 ++++++++++++++++++ .../src/storage/mod.rs | 11 +- .../src/storage/writer.rs | 19 + 12 files changed, 1716 insertions(+), 13 deletions(-) create mode 100644 quickwit/quickwit-parquet-engine/src/bin/inspect_parquet.rs create mode 100644 quickwit/quickwit-parquet-engine/src/storage/inspect.rs diff --git a/quickwit/quickwit-parquet-engine/src/bin/inspect_parquet.rs b/quickwit/quickwit-parquet-engine/src/bin/inspect_parquet.rs new file mode 100644 index 00000000000..0aa5d3ebbb9 --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/bin/inspect_parquet.rs @@ -0,0 +1,264 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Developer CLI for inspecting Parquet files written by Quickwit's +//! metrics pipeline. +//! +//! Reads a `.parquet` file's footer (including the page-level Column +//! Index and Offset Index produced by `EnabledStatistics::Page`) and +//! prints a per-row-group, per-column report. The output is intended +//! for debugging the on-disk format and verifying claims like +//! `qh.rg_partition_prefix_len`. +//! +//! ## Usage +//! +//! ```text +//! cargo run -p quickwit-parquet-engine --bin inspect_parquet -- [FLAGS] +//! +//! --json Emit JSON instead of human-readable output +//! --all-pages Show every page (default: first 10 per column) +//! --verify-prefix Check that the file's claimed +//! qh.rg_partition_prefix_len actually holds; exits +//! with status 2 if it doesn't +//! ``` + +use std::path::PathBuf; +use std::process::ExitCode; + +use anyhow::{Context, Result, bail}; +use quickwit_parquet_engine::storage::{ + ColumnReport, ParquetPageStatsReport, inspect_parquet_page_stats, verify_partition_prefix, +}; + +const DEFAULT_PAGE_PREVIEW_LIMIT: usize = 10; + +fn main() -> ExitCode { + let opts = match parse_args() { + Ok(opts) => opts, + Err(err) => { + eprintln!("error: {err:#}"); + eprintln!(); + print_usage(); + return ExitCode::from(2); + } + }; + + match run(opts) { + Ok(()) => ExitCode::SUCCESS, + Err(VerifyOrIoError::Verification(err)) => { + eprintln!("verification failed: {err:#}"); + ExitCode::from(2) + } + Err(VerifyOrIoError::Io(err)) => { + eprintln!("error: {err:#}"); + ExitCode::FAILURE + } + } +} + +#[derive(Debug)] +struct Options { + path: PathBuf, + json: bool, + all_pages: bool, + verify_prefix: bool, +} + +fn parse_args() -> Result { + let mut path: Option = None; + let mut json = false; + let mut all_pages = false; + let mut verify_prefix = false; + + for arg in std::env::args().skip(1) { + match arg.as_str() { + "--json" => json = true, + "--all-pages" => all_pages = true, + "--verify-prefix" => verify_prefix = true, + "-h" | "--help" => { + print_usage(); + std::process::exit(0); + } + other if other.starts_with("--") => bail!("unknown flag: {other}"), + other => { + if path.is_some() { + bail!("expected exactly one path argument; got an extra: {other}"); + } + path = Some(PathBuf::from(other)); + } + } + } + + Ok(Options { + path: path.context("path to a .parquet file is required")?, + json, + all_pages, + verify_prefix, + }) +} + +fn print_usage() { + eprintln!( + "usage: inspect_parquet [--json] [--all-pages] [--verify-prefix]\n\nInspects a \ + Parquet file's footer and page-level statistics.\nExit codes: 0 = success, 1 = I/O / \ + parse error, 2 = verification failed." + ); +} + +enum VerifyOrIoError { + Verification(anyhow::Error), + Io(anyhow::Error), +} + +impl> From for VerifyOrIoError { + fn from(err: E) -> Self { + Self::Io(err.into()) + } +} + +fn run(opts: Options) -> Result<(), VerifyOrIoError> { + let max_pages = if opts.all_pages { + usize::MAX + } else { + DEFAULT_PAGE_PREVIEW_LIMIT + }; + let report = inspect_parquet_page_stats(&opts.path, max_pages)?; + + if opts.json { + let json = serde_json::to_string_pretty(&report).context("serializing report as JSON")?; + println!("{json}"); + } else { + print_human_report(&opts.path, &report, opts.all_pages); + } + + if opts.verify_prefix { + verify_partition_prefix(&report).map_err(VerifyOrIoError::Verification)?; + if !opts.json { + println!(); + println!( + "verify-prefix: OK (rg_partition_prefix_len = {})", + report.rg_partition_prefix_len + ); + } + } + + Ok(()) +} + +fn print_human_report(path: &std::path::Path, report: &ParquetPageStatsReport, all_pages: bool) { + println!("== File =="); + println!(" path: {}", path.display()); + println!(" size: {} bytes", report.file_size); + println!(" num_row_groups: {}", report.num_row_groups); + println!( + " rg_partition_prefix_len: {}{}", + report.rg_partition_prefix_len, + if report.rg_partition_prefix_len == 0 { + " (absent — no alignment claimed)" + } else { + "" + } + ); + if let Some(sf) = &report.sort_fields { + println!(" sort_fields: {sf}"); + } + println!( + " page_index_coverage: {}", + if report.has_full_page_index_coverage() { + "full" + } else { + "partial — some columns missing column index" + } + ); + + if !report.kv_metadata.is_empty() { + println!(); + println!("== KV Metadata =="); + for (k, v) in &report.kv_metadata { + let display_value = if v.len() > 80 { + format!("{}… ({} bytes)", &v[..77], v.len()) + } else { + v.clone() + }; + println!(" {k} = {display_value}"); + } + } + + for (rg_idx, rg) in report.row_groups.iter().enumerate() { + println!(); + println!("== Row Group {rg_idx} =="); + println!(" num_rows: {}", rg.num_rows); + println!(" total_byte_size: {}", rg.total_byte_size); + for col in &rg.columns { + print_column(col, all_pages); + } + } +} + +fn print_column(col: &ColumnReport, all_pages: bool) { + println!(); + println!(" -- Column: {} --", col.column_path); + println!( + " column_index: {}, offset_index: {}, num_pages: {}", + col.has_column_index, col.has_offset_index, col.num_pages + ); + let (chunk_min, chunk_max) = ( + col.chunk_min.as_deref().unwrap_or("—"), + col.chunk_max.as_deref().unwrap_or("—"), + ); + println!(" chunk min/max: [{chunk_min}, {chunk_max}]"); + + if col.pages.is_empty() { + return; + } + + let displayed = if all_pages { + col.pages.len() + } else { + col.pages.len().min(DEFAULT_PAGE_PREVIEW_LIMIT) + }; + let truncated = col.num_pages.saturating_sub(displayed); + if displayed == 0 { + return; + } + + println!(" pages [showing {displayed} of {}]:", col.num_pages); + for (i, page) in col.pages.iter().take(displayed).enumerate() { + let min = page.min.as_deref().unwrap_or("—"); + let max = page.max.as_deref().unwrap_or("—"); + let nulls = page + .null_count + .map(|n| n.to_string()) + .unwrap_or_else(|| "—".to_string()); + let rows = page + .num_rows + .map(|n| n.to_string()) + .unwrap_or_else(|| "—".to_string()); + let off = page + .offset + .map(|n| format!("{n}")) + .unwrap_or_else(|| "—".to_string()); + let csz = page + .compressed_page_size + .map(|n| format!("{n}")) + .unwrap_or_else(|| "—".to_string()); + println!( + " [{i:>3}] rows={rows:>6} off={off:>10} compressed_size={csz:>8} \ + nulls={nulls:>6} min/max=[{min}, {max}]" + ); + } + if truncated > 0 { + println!(" … {truncated} more pages omitted (use --all-pages to see them)"); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs index e0e15e32e89..dbd964bb196 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs @@ -93,6 +93,15 @@ pub fn merge_parquet_split_metadata( first.window ); } + if input.rg_partition_prefix_len != first.rg_partition_prefix_len { + bail!( + "input {} has rg_partition_prefix_len {}, expected {} — splits with different \ + prefix lengths must not appear in the same merge", + i, + input.rg_partition_prefix_len, + first.rg_partition_prefix_len + ); + } } // Each merge adds one to the lineage depth. The policy uses this to @@ -112,6 +121,13 @@ pub fn merge_parquet_split_metadata( // Data-dependent fields come from the MergeOutputFile (extracted from // this output's actual rows during the merge write pass). + // + // `rg_partition_prefix_len` is reset to 0: the current merge writer + // does not enforce row group boundary alignment with sort prefix + // transitions. A future PR (the streaming column-major merge engine) + // will produce aligned output and propagate the prefix from inputs. + // Until then, claiming alignment on output would be dishonest, so we + // demote to 0 even when inputs have a higher prefix. let mut metadata = ParquetSplitMetadata { kind: first.kind, partition_id: first.partition_id, @@ -130,6 +146,7 @@ pub fn merge_parquet_split_metadata( num_merge_ops, row_keys_proto: output.row_keys_proto.clone(), zonemap_regexes: output.zonemap_regexes.clone(), + rg_partition_prefix_len: 0, }; // Finalize: tag sets may exceed the cardinality threshold. @@ -357,6 +374,37 @@ mod tests { assert!(result.is_err()); } + #[test] + fn test_mismatched_rg_partition_prefix_len_error() { + let s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s1.rg_partition_prefix_len = 1; + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output); + let err = result.expect_err("merge must reject mismatched prefix lengths"); + let msg = err.to_string(); + assert!( + msg.contains("rg_partition_prefix_len"), + "error should mention rg_partition_prefix_len, got: {msg}" + ); + } + + #[test] + fn test_output_prefix_len_demoted_to_zero() { + // Until the streaming column-major writer lands, the merge writer + // does not enforce alignment, so the output's prefix is 0 even when + // every input claims a higher value. This test pins that contract. + let mut s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s0.rg_partition_prefix_len = 3; + s1.rg_partition_prefix_len = 3; + + let output = make_output(200, 9000); + let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + assert_eq!(result.rg_partition_prefix_len, 0); + } + #[test] fn test_fresh_split_id_generated() { let inputs = vec![ diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index 5e29998217d..74b3dad52e2 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -40,8 +40,8 @@ pub use self::merge_order::MergeRun; use crate::sort_fields::{equivalent_schemas_for_compaction, parse_sort_fields}; use crate::sorted_series::SORTED_SERIES_COLUMN; use crate::storage::{ - PARQUET_META_NUM_MERGE_OPS, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, - PARQUET_META_WINDOW_START, ParquetWriterConfig, + PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_SORT_FIELDS, + PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, ParquetWriterConfig, }; /// Configuration for a merge operation. @@ -62,12 +62,20 @@ pub struct MergeConfig { } /// Metadata extracted from input files' Parquet KV metadata. -/// All inputs must agree on sort_fields, window_start, and window_duration. +/// All inputs must agree on sort_fields, window_start, window_duration, +/// and rg_partition_prefix_len. struct InputMetadata { sort_fields: String, window_start_secs: Option, window_duration_secs: u32, num_merge_ops: u32, + /// Number of leading sort columns whose transitions align with row + /// group boundaries. All input files must agree on this value (it's + /// part of the compaction scope key). The current merge writer does + /// not enforce alignment, so the *output* file is written with prefix + /// 0 regardless of this value. + #[allow(dead_code)] // wired for PR-6 streaming engine; PR-1 only validates. + rg_partition_prefix_len: u32, } /// Result of a single output file from the merge. @@ -76,6 +84,7 @@ struct InputMetadata { /// logical metadata (metric names, tags, time range) extracted from the /// actual rows in this output file. When the merge produces multiple /// outputs, each has metadata reflecting only its own rows. +#[derive(Debug)] pub struct MergeOutputFile { /// Path to the output Parquet file. pub path: PathBuf, @@ -305,6 +314,7 @@ fn extract_and_validate_input_metadata(paths: &[PathBuf]) -> Result = None; let mut consensus_window_start: Option> = None; let mut consensus_window_duration: Option = None; + let mut consensus_prefix_len: Option = None; let mut max_merge_ops: u32 = 0; for path in paths { @@ -419,6 +429,31 @@ fn extract_and_validate_input_metadata(paths: &[PathBuf]) -> Result max_merge_ops { max_merge_ops = file_merge_ops; } + + // Row group partition prefix length: must be consistent across all + // inputs. Absent KV → 0 (legacy default; no alignment claim). + let file_prefix_len = find_kv(PARQUET_META_RG_PARTITION_PREFIX_LEN) + .map(|s| s.parse::()) + .transpose() + .with_context(|| format!("parsing rg_partition_prefix_len from {}", path.display()))? + .unwrap_or(0); + + match consensus_prefix_len { + Some(expected) => { + if file_prefix_len != expected { + bail!( + "rg_partition_prefix_len mismatch in {}: expected {}, found {} — splits \ + with different prefix lengths must not appear in the same merge", + path.display(), + expected, + file_prefix_len + ); + } + } + None => { + consensus_prefix_len = Some(file_prefix_len); + } + } } Ok(InputMetadata { @@ -426,5 +461,6 @@ fn extract_and_validate_input_metadata(paths: &[PathBuf]) -> Result, window_duration: u32, + ) -> ParquetSplitMetadata { + test_split_full( + index_uid, + partition_id, + sort_fields, + window_start, + window_duration, + 0, + ) + } + + fn test_split_full( + index_uid: &str, + partition_id: u64, + sort_fields: &str, + window_start: Option, + window_duration: u32, + rg_partition_prefix_len: u32, ) -> ParquetSplitMetadata { let mut builder = ParquetSplitMetadata::metrics_builder() .split_id(ParquetSplitId::generate(ParquetSplitKind::Metrics)) .index_uid(index_uid) .partition_id(partition_id) .time_range(TimeRange::new(1000, 2000)) - .sort_fields(sort_fields); + .sort_fields(sort_fields) + .rg_partition_prefix_len(rg_partition_prefix_len); if let Some(start) = window_start { builder = builder @@ -251,6 +277,7 @@ mod tests { sort_fields: "metric_name|host|timestamp/V2".to_string(), window_start_secs: 1000, window_duration_secs: 3600, + rg_partition_prefix_len: 0, }; let scope_b = CompactionScope { index_uid: "idx:001".to_string(), @@ -258,6 +285,7 @@ mod tests { sort_fields: "metric_name|host|timestamp/V2".to_string(), window_start_secs: 4600, window_duration_secs: 3600, + rg_partition_prefix_len: 0, }; assert_eq!(result[&scope_a].len(), 3); @@ -312,5 +340,60 @@ mod tests { assert_eq!(scope.sort_fields, "metric_name|host|timestamp/V2"); assert_eq!(scope.window_start_secs, 7200); assert_eq!(scope.window_duration_secs, 3600); + assert_eq!(scope.rg_partition_prefix_len, 0); + } + + #[test] + fn test_different_rg_partition_prefix_len_separated() { + // A legacy split (prefix=0) and a single-RG-aligned split (prefix=3) + // share every other scope dimension but must NOT be merged together, + // because the merge engine requires uniform input format. + let sort = "metric_name|host|timestamp/V2"; + let splits = vec![ + test_split_full("idx:001", 0, sort, Some(1000), 3600, 0), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 3), + ]; + let result = group_by_compaction_scope(splits); + assert!( + result.is_empty(), + "splits with different rg_partition_prefix_len must end up in separate groups" + ); + } + + #[test] + fn test_same_rg_partition_prefix_len_grouped() { + let sort = "metric_name|host|timestamp/V2"; + let splits = vec![ + test_split_full("idx:001", 0, sort, Some(1000), 3600, 1), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 1), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 1), + ]; + let result = group_by_compaction_scope(splits); + assert_eq!(result.len(), 1, "matching prefix splits group together"); + let group = result.values().next().unwrap(); + assert_eq!(group.len(), 3); + + let scope = result.keys().next().unwrap(); + assert_eq!(scope.rg_partition_prefix_len, 1); + } + + #[test] + fn test_three_distinct_prefix_buckets() { + // Each prefix value forms its own bucket. Even with 6 splits sharing + // every other dimension, 3 different prefixes → 3 buckets of 2 each. + let sort = "metric_name|host|timestamp/V2"; + let splits = vec![ + test_split_full("idx:001", 0, sort, Some(1000), 3600, 0), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 0), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 1), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 1), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 3), + test_split_full("idx:001", 0, sort, Some(1000), 3600, 3), + ]; + let result = group_by_compaction_scope(splits); + assert_eq!(result.len(), 3, "three prefix values → three buckets"); + for group in result.values() { + assert_eq!(group.len(), 2); + } } } diff --git a/quickwit/quickwit-parquet-engine/src/merge/tests.rs b/quickwit/quickwit-parquet-engine/src/merge/tests.rs index 385a42d40eb..c34845a5fb1 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/tests.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/tests.rs @@ -1204,6 +1204,13 @@ fn test_merge_cross_record_batch_interleaving() { /// Build a minimal ParquetSplitMetadata for test files. /// The merge engine validates qh.sort_fields on every input. fn build_test_metadata() -> ParquetSplitMetadata { + build_test_metadata_with_prefix_len(0) +} + +/// Build minimal `ParquetSplitMetadata` with an explicit +/// `rg_partition_prefix_len`. Used by tests that exercise the merge +/// engine's prefix-consistency validation. +fn build_test_metadata_with_prefix_len(prefix_len: u32) -> ParquetSplitMetadata { ParquetSplitMetadata::metrics_builder() .split_id(ParquetSplitId::generate(ParquetSplitKind::Metrics)) .index_uid("test-index:0") @@ -1211,9 +1218,118 @@ fn build_test_metadata() -> ParquetSplitMetadata { .window_duration_secs(900) .window_start_secs(0) .time_range(TimeRange::new(0, 1)) + .rg_partition_prefix_len(prefix_len) .build() } +/// Write a test split with a caller-supplied `rg_partition_prefix_len` +/// embedded in the file's KV metadata. Used to exercise the merge +/// engine's prefix-consistency validation on real on-disk files. +fn write_test_split_with_prefix_len( + dir: &std::path::Path, + name: &str, + metric_names: &[&str], + timestamps: &[i64], + values: &[f64], + timeseries_ids: &[i64], + prefix_len: u32, +) -> PathBuf { + let batch = build_test_batch(metric_names, timestamps, values, timeseries_ids); + let table_config = TableConfig { + product_type: ProductType::Metrics, + sort_fields: Some(TEST_SORT_FIELDS.to_string()), + window_duration_secs: 900, + }; + let writer = ParquetWriter::new(ParquetWriterConfig::default(), &table_config).unwrap(); + let metadata = build_test_metadata_with_prefix_len(prefix_len); + let path = dir.join(name); + writer + .write_to_file_with_metadata(&batch, &path, Some(&metadata)) + .unwrap(); + path +} + +#[test] +fn test_merge_rejects_mismatched_rg_partition_prefix_len() { + let dir = TempDir::new().unwrap(); + + // input1 declares prefix_len = 1 (metric_name boundary alignment). + let input1 = write_test_split_with_prefix_len( + dir.path(), + "input1.parquet", + &["cpu", "cpu"], + &[100, 200], + &[1.0, 2.0], + &[42, 42], + 1, + ); + + // input2 declares prefix_len = 0 (legacy default). + let input2 = write_test_split_with_prefix_len( + dir.path(), + "input2.parquet", + &["mem", "mem"], + &[100, 200], + &[3.0, 4.0], + &[99, 99], + 0, + ); + + let output_dir = dir.path().join("output"); + std::fs::create_dir_all(&output_dir).unwrap(); + let config = MergeConfig { + num_outputs: 1, + writer_config: ParquetWriterConfig::default(), + }; + + let result = merge_sorted_parquet_files(&[input1, input2], &output_dir, &config); + let err = result.expect_err("merge must reject mismatched prefix lengths"); + let msg = format!("{err:#}"); + assert!( + msg.contains("rg_partition_prefix_len"), + "error should mention rg_partition_prefix_len, got: {msg}" + ); +} + +#[test] +fn test_merge_accepts_matching_rg_partition_prefix_len() { + // Sanity check: when both inputs declare the same prefix_len, the + // merge proceeds as usual. The output is demoted to prefix_len = 0 + // because the current writer does not enforce alignment. + let dir = TempDir::new().unwrap(); + + let input1 = write_test_split_with_prefix_len( + dir.path(), + "input1.parquet", + &["cpu", "cpu"], + &[100, 200], + &[1.0, 2.0], + &[42, 42], + 2, + ); + + let input2 = write_test_split_with_prefix_len( + dir.path(), + "input2.parquet", + &["mem", "mem"], + &[100, 200], + &[3.0, 4.0], + &[99, 99], + 2, + ); + + let output_dir = dir.path().join("output"); + std::fs::create_dir_all(&output_dir).unwrap(); + let config = MergeConfig { + num_outputs: 1, + writer_config: ParquetWriterConfig::default(), + }; + + let outputs = merge_sorted_parquet_files(&[input1, input2], &output_dir, &config).unwrap(); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 4); +} + /// Build a test RecordBatch from raw column data (shared by test helpers). fn build_test_batch( metric_names: &[&str], diff --git a/quickwit/quickwit-parquet-engine/src/split/metadata.rs b/quickwit/quickwit-parquet-engine/src/split/metadata.rs index b926dd02e83..50943a1d80a 100644 --- a/quickwit/quickwit-parquet-engine/src/split/metadata.rs +++ b/quickwit/quickwit-parquet-engine/src/split/metadata.rs @@ -207,6 +207,21 @@ pub struct ParquetSplitMetadata { /// Per-column zonemap regex strings, keyed by column name. /// Empty for pre-Phase-31 splits. pub zonemap_regexes: HashMap, + + /// Number of leading sort schema columns whose transitions align with + /// row group boundaries in the Parquet file. + /// + /// `0` = no alignment claimed (legacy default; safe for any boundaries). + /// `1` = first sort column (e.g., `metric_name`) — RG boundaries match + /// transitions in this column. + /// `N` (1 ≤ N ≤ sort_schema.len()) = aligned with first `N` sort columns. + /// A single-RG file vacuously satisfies any prefix; writers set `N` = + /// sort schema length so the streaming reader's fast path applies. + /// + /// **Compaction scope**: only splits with the same value of this field + /// merge together. Mixing prefix levels in a single merge is forbidden. + #[serde(default)] + pub rg_partition_prefix_len: u32, } /// Serde helper struct that uses `window_start` / `window_duration_secs` field @@ -248,6 +263,13 @@ struct ParquetSplitMetadataSerde { #[serde(default, skip_serializing_if = "HashMap::is_empty")] zonemap_regexes: HashMap, + + #[serde(default, skip_serializing_if = "is_zero_u32")] + rg_partition_prefix_len: u32, +} + +fn is_zero_u32(value: &u32) -> bool { + *value == 0 } impl From for ParquetSplitMetadata { @@ -274,6 +296,7 @@ impl From for ParquetSplitMetadata { num_merge_ops: s.num_merge_ops, row_keys_proto: s.row_keys_proto, zonemap_regexes: s.zonemap_regexes, + rg_partition_prefix_len: s.rg_partition_prefix_len, } } } @@ -303,6 +326,7 @@ impl From for ParquetSplitMetadataSerde { num_merge_ops: m.num_merge_ops, row_keys_proto: m.row_keys_proto, zonemap_regexes: m.zonemap_regexes, + rg_partition_prefix_len: m.rg_partition_prefix_len, } } } @@ -412,6 +436,7 @@ pub struct ParquetSplitMetadataBuilder { num_merge_ops: u32, row_keys_proto: Option>, zonemap_regexes: HashMap, + rg_partition_prefix_len: u32, } // The builder still accepts window_start and window_duration_secs separately @@ -438,6 +463,7 @@ impl ParquetSplitMetadataBuilder { num_merge_ops: 0, row_keys_proto: None, zonemap_regexes: HashMap::new(), + rg_partition_prefix_len: 0, } } @@ -545,6 +571,16 @@ impl ParquetSplitMetadataBuilder { self } + /// Set the row group partition prefix length. + /// + /// `0` (default) means no alignment claim. Higher values mean RG + /// boundaries align with the first `N` sort schema columns. See + /// [`ParquetSplitMetadata::rg_partition_prefix_len`]. + pub fn rg_partition_prefix_len(mut self, prefix_len: u32) -> Self { + self.rg_partition_prefix_len = prefix_len; + self + } + pub fn build(self) -> ParquetSplitMetadata { // TW-2 (ADR-003): window_duration must evenly divide 3600. // Enforced at build time so no invalid metadata propagates to storage. @@ -596,6 +632,7 @@ impl ParquetSplitMetadataBuilder { num_merge_ops: self.num_merge_ops, row_keys_proto: self.row_keys_proto, zonemap_regexes: self.zonemap_regexes, + rg_partition_prefix_len: self.rg_partition_prefix_len, } } } @@ -836,6 +873,62 @@ mod tests { assert_eq!(recovered.zonemap_regexes.get("host").unwrap(), "host-\\d+"); } + #[test] + fn test_rg_partition_prefix_len_default_zero() { + let metadata = ParquetSplitMetadata::metrics_builder() + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .build(); + assert_eq!(metadata.rg_partition_prefix_len, 0); + } + + #[test] + fn test_rg_partition_prefix_len_round_trip() { + let metadata = ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new("prefix-roundtrip")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .rg_partition_prefix_len(3) + .build(); + + let json = serde_json::to_string(&metadata).expect("serialize"); + let recovered: ParquetSplitMetadata = serde_json::from_str(&json).expect("deserialize"); + assert_eq!(recovered.rg_partition_prefix_len, 3); + } + + #[test] + fn test_rg_partition_prefix_len_absent_in_legacy_json() { + // JSON predating the field entirely should deserialize as 0. + let legacy_json = r#"{ + "split_id": "metrics_legacy", + "index_uid": "test-index:00000000000000000000000000", + "time_range": {"start_secs": 100, "end_secs": 200}, + "num_rows": 1, + "size_bytes": 1, + "metric_names": [], + "low_cardinality_tags": {}, + "high_cardinality_tag_keys": [], + "created_at": {"secs_since_epoch": 0, "nanos_since_epoch": 0} + }"#; + let metadata: ParquetSplitMetadata = + serde_json::from_str(legacy_json).expect("legacy JSON should deserialize"); + assert_eq!(metadata.rg_partition_prefix_len, 0); + } + + #[test] + fn test_rg_partition_prefix_len_zero_omitted_from_json() { + let metadata = ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new("prefix-zero")) + .index_uid("test-index:00000000000000000000000000") + .time_range(TimeRange::new(1000, 2000)) + .build(); + let json = serde_json::to_string(&metadata).expect("serialize"); + assert!( + !json.contains("rg_partition_prefix_len"), + "default value 0 should be omitted from JSON to keep payloads compact" + ); + } + #[test] fn test_skip_serializing_empty_compaction_fields() { let metadata = ParquetSplitMetadata::metrics_builder() diff --git a/quickwit/quickwit-parquet-engine/src/storage/config.rs b/quickwit/quickwit-parquet-engine/src/storage/config.rs index 02275f3fa29..2cea49926a4 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/config.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/config.rs @@ -66,6 +66,17 @@ pub struct ParquetWriterConfig { pub data_page_size: usize, /// Number of rows per write batch. pub write_batch_size: usize, + /// Whether to emit page-level statistics (Parquet Column Index + + /// Offset Index) in the footer. Default `true`. + /// + /// When `false`, only chunk-level (row-group) statistics are emitted, + /// which gives one min/max per (RG, column) and no per-page metadata. + /// This loses page-level pruning in queries — fine for multi-RG files + /// where RG-level pruning is already useful, but **strongly + /// discouraged for single-RG files**, where it collapses pruning to + /// one min/max per file. Reduces footer size by tens of KB on small + /// files (a few hundred KB on very wide schemas). + pub page_statistics_enabled: bool, } impl Default for ParquetWriterConfig { @@ -76,6 +87,7 @@ impl Default for ParquetWriterConfig { row_group_size: DEFAULT_ROW_GROUP_SIZE, data_page_size: DEFAULT_DATA_PAGE_SIZE, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, + page_statistics_enabled: true, } } } @@ -116,6 +128,14 @@ impl ParquetWriterConfig { self } + /// Enable or disable page-level statistics (column index + offset + /// index). Default `true`. See + /// [`ParquetWriterConfig::page_statistics_enabled`] for the trade-off. + pub fn with_page_statistics(mut self, enabled: bool) -> Self { + self.page_statistics_enabled = enabled; + self + } + /// Convert to Parquet WriterProperties using the given Arrow schema to configure /// per-column settings like dictionary encoding and bloom filters, with an empty /// sort order and no metadata. @@ -140,13 +160,29 @@ impl ParquetWriterConfig { kv_metadata: Option>, sort_field_names: &[String], ) -> WriterProperties { + // Page-level statistics let queries prune individual data pages via + // Parquet's Column Index + Offset Index in the footer. This is the + // prerequisite for the streaming column-major merge engine, where + // outputs may be a single large row group: without per-page stats, + // pruning would collapse to one min/max per file. Truncation length + // (64 bytes) bounds the per-page metadata footprint for high- + // cardinality string columns. + // + // The choice is controlled by `page_statistics_enabled`. Disabling + // it falls back to `Chunk`-level stats only — saves a few percent + // of footer space at the cost of page-level pruning. + let stats_level = if self.page_statistics_enabled { + EnabledStatistics::Page + } else { + EnabledStatistics::Chunk + }; let mut builder = WriterProperties::builder() .set_max_row_group_row_count(Some(self.row_group_size)) .set_data_page_size_limit(self.data_page_size) .set_write_batch_size(self.write_batch_size) .set_column_index_truncate_length(Some(64)) .set_sorting_columns(Some(sorting_cols)) - .set_statistics_enabled(EnabledStatistics::Chunk); + .set_statistics_enabled(stats_level); if let Some(kvs) = kv_metadata && !kvs.is_empty() @@ -346,23 +382,112 @@ mod tests { let schema = create_test_schema(); let props = config.to_writer_properties(&schema); - // Verify statistics are enabled at Chunk (row group) level + // Page-level stats are required for column index / offset index to + // land in the footer; downstream pruning relies on that data. let metric_name_path = ColumnPath::new(vec!["metric_name".to_string()]); assert_eq!( props.statistics_enabled(&metric_name_path), - EnabledStatistics::Chunk, - "statistics should be enabled at Chunk level" + EnabledStatistics::Page, + "statistics should be enabled at Page level" ); - // Verify for timestamp column as well (important for time range pruning) let timestamp_path = ColumnPath::new(vec!["timestamp_secs".to_string()]); assert_eq!( props.statistics_enabled(×tamp_path), + EnabledStatistics::Page, + "statistics should be enabled at Page level for timestamp" + ); + } + + #[test] + fn test_page_statistics_default_enabled() { + let config = ParquetWriterConfig::default(); + assert!(config.page_statistics_enabled); + } + + #[test] + fn test_page_statistics_disable_falls_back_to_chunk() { + let config = ParquetWriterConfig::new().with_page_statistics(false); + assert!(!config.page_statistics_enabled); + + let schema = create_test_schema(); + let props = config.to_writer_properties(&schema); + let metric_name_path = ColumnPath::new(vec!["metric_name".to_string()]); + assert_eq!( + props.statistics_enabled(&metric_name_path), EnabledStatistics::Chunk, - "statistics should be enabled at Chunk level for timestamp" + "disabling page stats should fall back to chunk-level stats" ); } + #[test] + fn test_page_statistics_disabled_writer_omits_indexes() { + // With the knob off, written files have no Column Index / Offset + // Index in the footer. The inspector should report + // `has_column_index = false` for every column. + use std::sync::Arc; + + use arrow::array::{ + DictionaryArray, Float64Array, Int64Array, RecordBatch, UInt8Array, UInt64Array, + }; + use arrow::datatypes::{Field, Int32Type}; + use parquet::arrow::ArrowWriter; + use tempfile::TempDir; + + use super::*; + use crate::storage::inspect_parquet_page_stats; + + let dir = TempDir::new().unwrap(); + let metric_name_array: DictionaryArray = (0..16) + .map(|i| Some(if i < 8 { "cpu" } else { "mem" })) + .collect(); + let timestamp_array = UInt64Array::from((0..16u64).collect::>()); + let value_array = Float64Array::from((0..16).map(|i| i as f64).collect::>()); + let tsid_array = Int64Array::from(vec![42i64; 16]); + let metric_type_array = UInt8Array::from(vec![0u8; 16]); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "metric_name", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new("metric_type", DataType::UInt8, false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("value", DataType::Float64, false), + Field::new("timeseries_id", DataType::Int64, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(metric_name_array), + Arc::new(metric_type_array), + Arc::new(timestamp_array), + Arc::new(value_array), + Arc::new(tsid_array), + ], + ) + .unwrap(); + + let path = dir.path().join("no_page_index.parquet"); + let config = ParquetWriterConfig::new().with_page_statistics(false); + let props = config.to_writer_properties(&schema); + let file = std::fs::File::create(&path).unwrap(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + for col in &report.row_groups[0].columns { + assert!( + !col.has_column_index, + "column '{}' must NOT have a column index when page stats are disabled", + col.column_path + ); + } + } + #[test] fn test_dictionary_encoding_enabled() { let config = ParquetWriterConfig::default(); diff --git a/quickwit/quickwit-parquet-engine/src/storage/inspect.rs b/quickwit/quickwit-parquet-engine/src/storage/inspect.rs new file mode 100644 index 00000000000..5172aa742bf --- /dev/null +++ b/quickwit/quickwit-parquet-engine/src/storage/inspect.rs @@ -0,0 +1,911 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Inspect a Parquet file's footer and page-level statistics. +//! +//! This module is the foundation for the `inspect_parquet` developer CLI +//! and for tests that verify on-disk format properties such as the +//! `qh.rg_partition_prefix_len` alignment claim. +//! +//! The inspector loads the Column Index and Offset Index (Parquet's +//! per-page min/max + offsets stored in the footer) explicitly via +//! `ReadOptionsBuilder::with_page_index(true)`. Without this, the +//! footer would be parsed with chunk-level statistics only, even if the +//! writer emitted page-level data. + +use std::path::Path; + +use anyhow::{Context, Result, bail}; +use parquet::file::page_index::column_index::{ + ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex, +}; +use parquet::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::file::serialized_reader::ReadOptionsBuilder; +use parquet::file::statistics::Statistics; +use serde::Serialize; + +use super::writer::{PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_SORT_FIELDS}; + +/// Aggregate report of a Parquet file's page-level statistics + KV metadata. +#[derive(Debug, Clone, Serialize)] +pub struct ParquetPageStatsReport { + /// File length in bytes. + pub file_size: u64, + /// All `qh.*` (and other) entries from the file's KV metadata. + pub kv_metadata: Vec<(String, String)>, + /// Number of row groups in the file. + pub num_row_groups: usize, + /// Per-row-group reports (length == `num_row_groups`). + pub row_groups: Vec, + /// Parsed value of `qh.rg_partition_prefix_len`. `0` (or absent) means + /// no alignment claim is recorded in the file. + pub rg_partition_prefix_len: u32, + /// Parsed value of `qh.sort_fields`, if present. + pub sort_fields: Option, +} + +/// Per-row-group view of a Parquet file. +#[derive(Debug, Clone, Serialize)] +pub struct RowGroupReport { + /// Row count in this row group. + pub num_rows: i64, + /// Total size in bytes of this row group's column chunks. + pub total_byte_size: i64, + /// One entry per column in the file's schema. + pub columns: Vec, +} + +/// Per-column view inside a row group. +#[derive(Debug, Clone, Serialize)] +pub struct ColumnReport { + /// Column path as stored in the Parquet schema (e.g., `"metric_name"`). + pub column_path: String, + /// Whether the file has a Column Index entry for this column. When + /// `false`, page-level statistics are unavailable for this column — + /// likely because `EnabledStatistics::Page` was not set when the file + /// was written. + pub has_column_index: bool, + /// Whether the file has an Offset Index entry for this column. + pub has_offset_index: bool, + /// Number of data pages in this column chunk (0 if no offset index). + pub num_pages: usize, + /// Chunk-level (row-group-wide) min, stringified for display. + pub chunk_min: Option, + /// Chunk-level (row-group-wide) max, stringified for display. + pub chunk_max: Option, + /// Per-page reports. Length == `num_pages` when both indexes are + /// present; truncated by callers that only want a preview. + pub pages: Vec, +} + +/// Per-page view inside a column chunk. +#[derive(Debug, Clone, Serialize)] +pub struct PageReport { + /// Stringified per-page min from the Column Index, if available. + pub min: Option, + /// Stringified per-page max from the Column Index, if available. + pub max: Option, + /// Null count from the Column Index, if available. + pub null_count: Option, + /// Row count of this page, derived from the Offset Index's + /// `first_row_index` deltas. `None` for the last page. + pub num_rows: Option, + /// Byte offset of this page within the file. + pub offset: Option, + /// Compressed size of this page on disk. + pub compressed_page_size: Option, +} + +impl ParquetPageStatsReport { + /// Returns true when every (row-group, column) pair has a Column Index + /// entry. The column-major streaming reader requires this. + pub fn has_full_page_index_coverage(&self) -> bool { + self.row_groups + .iter() + .all(|rg| rg.columns.iter().all(|c| c.has_column_index)) + } +} + +/// Inspect a Parquet file at `path` and produce a structured report. +/// +/// Pages are limited to the first `max_pages_per_column` per column. +/// Pass `usize::MAX` for the unrestricted view used by the +/// `--all-pages` CLI flag. +pub fn inspect_parquet_page_stats( + path: &Path, + max_pages_per_column: usize, +) -> Result { + let file = std::fs::File::open(path) + .with_context(|| format!("opening parquet file: {}", path.display()))?; + let file_size = file + .metadata() + .with_context(|| format!("reading file size: {}", path.display()))? + .len(); + + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options) + .with_context(|| format!("opening parquet reader: {}", path.display()))?; + let metadata = reader.metadata(); + + let kv_metadata: Vec<(String, String)> = metadata + .file_metadata() + .key_value_metadata() + .map(|kvs| { + kvs.iter() + .map(|kv| (kv.key.clone(), kv.value.clone().unwrap_or_default())) + .collect() + }) + .unwrap_or_default(); + + let rg_partition_prefix_len = kv_metadata + .iter() + .find(|(k, _)| k == PARQUET_META_RG_PARTITION_PREFIX_LEN) + .map(|(_, v)| v.parse::()) + .transpose() + .with_context(|| { + format!( + "parsing {PARQUET_META_RG_PARTITION_PREFIX_LEN} from {}", + path.display() + ) + })? + .unwrap_or(0); + + let sort_fields = kv_metadata + .iter() + .find(|(k, _)| k == PARQUET_META_SORT_FIELDS) + .map(|(_, v)| v.clone()); + + let column_index = metadata.column_index(); + let offset_index = metadata.offset_index(); + let num_row_groups = metadata.num_row_groups(); + + let mut row_groups = Vec::with_capacity(num_row_groups); + for rg_idx in 0..num_row_groups { + let rg_meta = metadata.row_group(rg_idx); + let mut columns = Vec::with_capacity(rg_meta.num_columns()); + + for col_idx in 0..rg_meta.num_columns() { + let col_chunk = rg_meta.column(col_idx); + let column_path = col_chunk.column_path().string(); + + let (chunk_min, chunk_max) = match col_chunk.statistics() { + Some(stats) => stringify_chunk_stats(stats), + None => (None, None), + }; + + let col_index_entry = column_index + .and_then(|idx| idx.get(rg_idx)) + .and_then(|cols| cols.get(col_idx)); + let off_index_entry = offset_index + .and_then(|idx| idx.get(rg_idx)) + .and_then(|cols| cols.get(col_idx)); + + let has_column_index = col_index_entry + .map(|idx| !matches!(idx, ColumnIndexMetaData::NONE)) + .unwrap_or(false); + let has_offset_index = off_index_entry.is_some(); + + let pages = build_page_reports(col_index_entry, off_index_entry, max_pages_per_column); + let num_pages = off_index_entry + .map(|locs| locs.page_locations().len()) + .unwrap_or(0); + + columns.push(ColumnReport { + column_path, + has_column_index, + has_offset_index, + num_pages, + chunk_min, + chunk_max, + pages, + }); + } + + row_groups.push(RowGroupReport { + num_rows: rg_meta.num_rows(), + total_byte_size: rg_meta.total_byte_size(), + columns, + }); + } + + Ok(ParquetPageStatsReport { + file_size, + kv_metadata, + num_row_groups, + row_groups, + rg_partition_prefix_len, + sort_fields, + }) +} + +/// Verify that a file's RG boundaries actually align with transitions in +/// the first `rg_partition_prefix_len` sort columns. +/// +/// The contract is the strong form: within each row group, every one of +/// the first `N` sort columns is constant. Implementation: each RG's +/// chunk-level min must equal its chunk-level max for those columns. +/// +/// Returns `Ok(())` when the claim holds (or `prefix_len == 0`, in which +/// case there is no claim to verify). Returns an error with a per-RG +/// diagnostic when it doesn't. +pub fn verify_partition_prefix(report: &ParquetPageStatsReport) -> Result<()> { + if report.rg_partition_prefix_len == 0 { + return Ok(()); + } + + let sort_fields_str = match &report.sort_fields { + Some(s) if !s.is_empty() => s.clone(), + _ => bail!( + "rg_partition_prefix_len = {} but {} is missing or empty — cannot verify alignment \ + without the sort schema", + report.rg_partition_prefix_len, + PARQUET_META_SORT_FIELDS + ), + }; + + let prefix_columns = + first_n_sort_field_names(&sort_fields_str, report.rg_partition_prefix_len as usize)?; + + for (rg_idx, rg) in report.row_groups.iter().enumerate() { + for col_name in &prefix_columns { + let col = rg + .columns + .iter() + .find(|c| c.column_path == *col_name) + .ok_or_else(|| { + anyhow::anyhow!( + "RG {rg_idx}: prefix column '{col_name}' is not present in the file's \ + schema", + ) + })?; + + match (&col.chunk_min, &col.chunk_max) { + (Some(min), Some(max)) if min == max => { + // OK: column has a single value across the entire RG. + } + (Some(min), Some(max)) => bail!( + "RG {rg_idx} violates rg_partition_prefix_len={} claim: column '{col_name}' \ + has min={min:?} != max={max:?} (must be constant across the row group)", + report.rg_partition_prefix_len + ), + _ => bail!( + "RG {rg_idx} violates rg_partition_prefix_len={} claim: column '{col_name}' \ + has no chunk-level statistics", + report.rg_partition_prefix_len + ), + } + } + } + + Ok(()) +} + +/// Extract the first `n` sort field names from a Husky-style sort schema +/// string (e.g., `"metric_name|host|timestamp_secs/V2"`). +/// +/// The trailing `/V2` (or other version marker) is stripped from the last +/// field. Each field's bare name is returned (no `:asc`/`:desc` decoration +/// in the input is supported here; if the format expands, this helper +/// should be updated rather than the call sites). +fn first_n_sort_field_names(sort_fields: &str, n: usize) -> Result> { + let segments: Vec<&str> = sort_fields.split('|').collect(); + if n > segments.len() { + bail!( + "rg_partition_prefix_len = {n} exceeds sort schema length = {}", + segments.len() + ); + } + let mut names = Vec::with_capacity(n); + for seg in segments.iter().take(n) { + let bare = seg.split('/').next().unwrap_or(seg); + names.push(bare.to_string()); + } + Ok(names) +} + +fn stringify_chunk_stats(stats: &Statistics) -> (Option, Option) { + let min = stringify_chunk_min(stats); + let max = stringify_chunk_max(stats); + (min, max) +} + +fn stringify_chunk_min(stats: &Statistics) -> Option { + match stats { + Statistics::Boolean(v) => v.min_opt().map(|x| x.to_string()), + Statistics::Int32(v) => v.min_opt().map(|x| x.to_string()), + Statistics::Int64(v) => v.min_opt().map(|x| x.to_string()), + Statistics::Int96(v) => v.min_opt().map(|x| format!("{x:?}")), + Statistics::Float(v) => v.min_opt().map(|x| x.to_string()), + Statistics::Double(v) => v.min_opt().map(|x| x.to_string()), + Statistics::ByteArray(v) => v.min_opt().map(stringify_byte_array), + Statistics::FixedLenByteArray(v) => v.min_opt().map(stringify_fixed_byte_array), + } +} + +fn stringify_chunk_max(stats: &Statistics) -> Option { + match stats { + Statistics::Boolean(v) => v.max_opt().map(|x| x.to_string()), + Statistics::Int32(v) => v.max_opt().map(|x| x.to_string()), + Statistics::Int64(v) => v.max_opt().map(|x| x.to_string()), + Statistics::Int96(v) => v.max_opt().map(|x| format!("{x:?}")), + Statistics::Float(v) => v.max_opt().map(|x| x.to_string()), + Statistics::Double(v) => v.max_opt().map(|x| x.to_string()), + Statistics::ByteArray(v) => v.max_opt().map(stringify_byte_array), + Statistics::FixedLenByteArray(v) => v.max_opt().map(stringify_fixed_byte_array), + } +} + +fn stringify_byte_array(b: &parquet::data_type::ByteArray) -> String { + String::from_utf8(b.data().to_vec()).unwrap_or_else(|_| format!("0x{}", hex_encode(b.data()))) +} + +fn stringify_fixed_byte_array(b: &parquet::data_type::FixedLenByteArray) -> String { + String::from_utf8(b.data().to_vec()).unwrap_or_else(|_| format!("0x{}", hex_encode(b.data()))) +} + +fn hex_encode(bytes: &[u8]) -> String { + let mut out = String::with_capacity(bytes.len() * 2); + for byte in bytes { + out.push_str(&format!("{byte:02x}")); + } + out +} + +fn build_page_reports( + col_index_entry: Option<&ColumnIndexMetaData>, + off_index_entry: Option<&OffsetIndexMetaData>, + max_pages: usize, +) -> Vec { + let Some(off_index) = off_index_entry else { + return Vec::new(); + }; + + let page_locations = off_index.page_locations(); + let take = page_locations.len().min(max_pages); + let mut pages = Vec::with_capacity(take); + + let stats = col_index_entry.map(per_page_stats).unwrap_or_default(); + + for (page_idx, location) in page_locations.iter().take(take).enumerate() { + let num_rows = next_first_row(page_locations, page_idx) + .map(|next| (next - location.first_row_index).max(0) as usize); + + let (min, max, null_count) = stats.get(page_idx).cloned().unwrap_or((None, None, None)); + + pages.push(PageReport { + min, + max, + null_count, + num_rows, + offset: Some(location.offset), + compressed_page_size: Some(location.compressed_page_size), + }); + } + + pages +} + +fn next_first_row(locations: &[PageLocation], page_idx: usize) -> Option { + locations.get(page_idx + 1).map(|loc| loc.first_row_index) +} + +type PageStat = (Option, Option, Option); + +fn per_page_stats(index: &ColumnIndexMetaData) -> Vec { + fn primitive(idx: &PrimitiveColumnIndex, to_string: F) -> Vec + where F: Fn(&T) -> String { + let num_pages = idx.num_pages() as usize; + let mins: Vec> = idx.min_values_iter().collect(); + let maxs: Vec> = idx.max_values_iter().collect(); + (0..num_pages) + .map(|i| { + ( + mins.get(i).copied().flatten().map(&to_string), + maxs.get(i).copied().flatten().map(&to_string), + idx.null_count(i), + ) + }) + .collect() + } + + fn byte_array(idx: &ByteArrayColumnIndex) -> Vec { + let num_pages = idx.num_pages() as usize; + (0..num_pages) + .map(|i| { + ( + idx.min_value(i).map(stringify_bytes), + idx.max_value(i).map(stringify_bytes), + idx.null_count(i), + ) + }) + .collect() + } + + match index { + ColumnIndexMetaData::NONE => Vec::new(), + ColumnIndexMetaData::BOOLEAN(idx) => primitive(idx, |b| b.to_string()), + ColumnIndexMetaData::INT32(idx) => primitive(idx, |x| x.to_string()), + ColumnIndexMetaData::INT64(idx) => primitive(idx, |x| x.to_string()), + ColumnIndexMetaData::INT96(idx) => primitive(idx, |x| format!("{x:?}")), + ColumnIndexMetaData::FLOAT(idx) => primitive(idx, |x| x.to_string()), + ColumnIndexMetaData::DOUBLE(idx) => primitive(idx, |x| x.to_string()), + ColumnIndexMetaData::BYTE_ARRAY(idx) => byte_array(idx), + ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(idx) => byte_array(idx), + } +} + +fn stringify_bytes(bytes: &[u8]) -> String { + String::from_utf8(bytes.to_vec()).unwrap_or_else(|_| format!("0x{}", hex_encode(bytes))) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{ + DictionaryArray, Float64Array, Int64Array, RecordBatch, UInt8Array, UInt64Array, + }; + use arrow::datatypes::{DataType, Field, Int32Type, Schema}; + use parquet::arrow::ArrowWriter; + use parquet::file::metadata::KeyValue; + use tempfile::TempDir; + + use super::*; + use crate::storage::ParquetWriterConfig; + + fn write_test_file( + dir: &Path, + name: &str, + num_rows: usize, + config: ParquetWriterConfig, + kv: Option>, + ) -> std::path::PathBuf { + let metric_names: Vec<&str> = (0..num_rows) + .map(|i| { + if i < num_rows / 2 { + "cpu.usage" + } else { + "mem.used" + } + }) + .collect(); + let metric_name_array: DictionaryArray = + metric_names.iter().map(|s| Some(*s)).collect(); + let timestamp_array = UInt64Array::from((0..num_rows as u64).collect::>()); + let value_array = Float64Array::from((0..num_rows).map(|i| i as f64).collect::>()); + let tsid_array = Int64Array::from(vec![42i64; num_rows]); + let metric_type_array = UInt8Array::from(vec![0u8; num_rows]); + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "metric_name", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new("metric_type", DataType::UInt8, false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("value", DataType::Float64, false), + Field::new("timeseries_id", DataType::Int64, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(metric_name_array), + Arc::new(metric_type_array), + Arc::new(timestamp_array), + Arc::new(value_array), + Arc::new(tsid_array), + ], + ) + .unwrap(); + + let path = dir.join(name); + let file = std::fs::File::create(&path).unwrap(); + let props = config.to_writer_properties_with_metadata( + &schema, + Vec::new(), + kv, + &["metric_name".to_string()], + ); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + path + } + + #[test] + fn test_inspect_finds_column_index_when_page_stats_enabled() { + let dir = TempDir::new().unwrap(); + let path = write_test_file( + dir.path(), + "with_pages.parquet", + 8, + ParquetWriterConfig::default(), + None, + ); + + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + assert_eq!(report.num_row_groups, 1); + assert!(report.has_full_page_index_coverage()); + for col in &report.row_groups[0].columns { + assert!( + col.has_column_index, + "column '{}' missing column index", + col.column_path + ); + assert!( + col.has_offset_index, + "column '{}' missing offset index", + col.column_path + ); + } + } + + #[test] + fn test_inspect_pages_have_min_max_for_sort_columns() { + // Two distinct metric names → at least one page boundary should + // reflect that. Page count depends on data_page_size_limit, but + // both pages must have populated min/max. + let dir = TempDir::new().unwrap(); + let path = write_test_file( + dir.path(), + "min_max.parquet", + 16, + ParquetWriterConfig::default(), + None, + ); + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + + let metric_name_col = &report.row_groups[0] + .columns + .iter() + .find(|c| c.column_path == "metric_name") + .expect("metric_name column present"); + for page in &metric_name_col.pages { + assert!(page.min.is_some(), "page min should be populated"); + assert!(page.max.is_some(), "page max should be populated"); + } + } + + #[test] + fn test_inspect_multi_page_column() { + // Force multiple pages by writing many rows with a small data + // page size, then assert the inspector reports more than one page. + let dir = TempDir::new().unwrap(); + let config = ParquetWriterConfig::default() + .with_data_page_size(64) + .with_write_batch_size(8); + let path = write_test_file(dir.path(), "multi_page.parquet", 1024, config, None); + let report = inspect_parquet_page_stats(&path, usize::MAX).unwrap(); + + let metric_name = report.row_groups[0] + .columns + .iter() + .find(|c| c.column_path == "metric_name") + .unwrap(); + assert!( + metric_name.num_pages > 1, + "expected multiple pages with small page size, got {}", + metric_name.num_pages + ); + // Per-page row counts (for all pages except the last) should sum + // to less than total rows; with the last page included we cover + // every row. + let known_rows: usize = metric_name.pages.iter().filter_map(|p| p.num_rows).sum(); + assert!(known_rows > 0); + } + + #[test] + fn test_inspect_marker_round_trips() { + let dir = TempDir::new().unwrap(); + let kv = vec![ + KeyValue::new( + PARQUET_META_RG_PARTITION_PREFIX_LEN.to_string(), + "3".to_string(), + ), + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "metric_name|timeseries_id|timestamp_secs/V2".to_string(), + ), + ]; + let path = write_test_file( + dir.path(), + "marker.parquet", + 16, + ParquetWriterConfig::default(), + Some(kv), + ); + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + + assert_eq!(report.rg_partition_prefix_len, 3); + assert_eq!( + report.sort_fields.as_deref(), + Some("metric_name|timeseries_id|timestamp_secs/V2") + ); + } + + #[test] + fn test_inspect_marker_absent_reads_as_zero() { + let dir = TempDir::new().unwrap(); + let path = write_test_file( + dir.path(), + "no_marker.parquet", + 8, + ParquetWriterConfig::default(), + None, + ); + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + assert_eq!(report.rg_partition_prefix_len, 0); + } + + #[test] + fn test_verify_partition_prefix_no_op_for_zero() { + let dir = TempDir::new().unwrap(); + let path = write_test_file( + dir.path(), + "no_marker.parquet", + 8, + ParquetWriterConfig::default(), + None, + ); + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + verify_partition_prefix(&report).expect("prefix=0 should always verify"); + } + + #[test] + fn test_verify_partition_prefix_fails_when_min_ne_max() { + // A file with prefix_len = 1 but multiple metric_names in the + // same RG must fail verification. + let dir = TempDir::new().unwrap(); + let kv = vec![ + KeyValue::new( + PARQUET_META_RG_PARTITION_PREFIX_LEN.to_string(), + "1".to_string(), + ), + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "metric_name|timeseries_id|timestamp_secs/V2".to_string(), + ), + ]; + let path = write_test_file( + dir.path(), + "violation.parquet", + 16, + ParquetWriterConfig::default(), + Some(kv), + ); + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + let err = verify_partition_prefix(&report) + .expect_err("two metric_names in one RG must fail prefix=1 verification"); + let msg = err.to_string(); + assert!( + msg.contains("rg_partition_prefix_len") && msg.contains("metric_name"), + "diagnostic should mention the marker and the column, got: {msg}" + ); + } + + #[test] + fn test_verify_partition_prefix_passes_for_single_metric_per_rg() { + // All rows have the same metric_name → prefix=1 should hold. + let dir = TempDir::new().unwrap(); + let metric_name_array: DictionaryArray = + (0..16).map(|_| Some("only.one")).collect(); + let timestamp_array = UInt64Array::from((0..16u64).collect::>()); + let value_array = Float64Array::from((0..16).map(|i| i as f64).collect::>()); + let tsid_array = Int64Array::from(vec![42i64; 16]); + let metric_type_array = UInt8Array::from(vec![0u8; 16]); + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "metric_name", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new("metric_type", DataType::UInt8, false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("value", DataType::Float64, false), + Field::new("timeseries_id", DataType::Int64, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(metric_name_array), + Arc::new(metric_type_array), + Arc::new(timestamp_array), + Arc::new(value_array), + Arc::new(tsid_array), + ], + ) + .unwrap(); + + let path = dir.path().join("single_metric.parquet"); + let file = std::fs::File::create(&path).unwrap(); + let kv = vec![ + KeyValue::new( + PARQUET_META_RG_PARTITION_PREFIX_LEN.to_string(), + "1".to_string(), + ), + KeyValue::new( + PARQUET_META_SORT_FIELDS.to_string(), + "metric_name|timeseries_id|timestamp_secs/V2".to_string(), + ), + ]; + let props = ParquetWriterConfig::default().to_writer_properties_with_metadata( + &schema, + Vec::new(), + Some(kv), + &["metric_name".to_string()], + ); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let report = inspect_parquet_page_stats(&path, 100).unwrap(); + verify_partition_prefix(&report).expect("single-metric file should satisfy prefix=1"); + } + + #[test] + fn test_first_n_sort_field_names_strips_version() { + let names = first_n_sort_field_names("metric_name|host|timestamp_secs/V2", 3).unwrap(); + assert_eq!(names, vec!["metric_name", "host", "timestamp_secs"]); + } + + #[test] + fn test_first_n_sort_field_names_partial() { + let names = first_n_sort_field_names("metric_name|host|timestamp_secs/V2", 1).unwrap(); + assert_eq!(names, vec!["metric_name"]); + } + + #[test] + fn test_first_n_sort_field_names_overflow_errors() { + let err = first_n_sort_field_names("metric_name|timestamp_secs/V2", 5).unwrap_err(); + assert!(err.to_string().contains("exceeds sort schema length")); + } + + /// Footer-size delta test: page-level statistics inflate the footer + /// because every (RG, column) gets a Column Index + Offset Index entry. + /// This test pins the rough magnitude so a regression that doubles or + /// halves the footer size is visible to reviewers, and so that the + /// claim in the PR description ("a few percent on representative data") + /// is backed by a number a future change can re-derive. + /// + /// Representative shape: ~100K rows × ~7 columns, default page size, + /// default 128K-row RG → one row group, dozens of pages per column. + #[test] + fn test_footer_size_delta_for_page_level_stats() { + let dir = TempDir::new().unwrap(); + let num_rows = 100_000; + + // Build a representative batch: a couple of dictionary-encoded + // string columns (cardinality matters for column index size), + // a timestamp column, a value column. + let metric_choices = ["cpu.user", "cpu.system", "mem.used", "disk.io"]; + let metric_names: Vec<&str> = (0..num_rows) + .map(|i| metric_choices[i % metric_choices.len()]) + .collect(); + let host_choices: Vec = (0..512).map(|i| format!("host-{i:03}")).collect(); + let host_names: Vec<&str> = (0..num_rows) + .map(|i| host_choices[i % host_choices.len()].as_str()) + .collect(); + + let metric_name_array: DictionaryArray = + metric_names.iter().map(|s| Some(*s)).collect(); + let host_array: DictionaryArray = host_names.iter().map(|s| Some(*s)).collect(); + let timestamp_array = UInt64Array::from((0..num_rows as u64).collect::>()); + let value_array = + Float64Array::from((0..num_rows).map(|i| i as f64 * 1.5).collect::>()); + let tsid_array = Int64Array::from( + (0..num_rows as i64) + .map(|i| (i % 4096) * 7919) + .collect::>(), + ); + let metric_type_array = UInt8Array::from(vec![0u8; num_rows]); + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "metric_name", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + false, + ), + Field::new("metric_type", DataType::UInt8, false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("value", DataType::Float64, false), + Field::new("timeseries_id", DataType::Int64, false), + Field::new( + "host", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(metric_name_array), + Arc::new(metric_type_array), + Arc::new(timestamp_array), + Arc::new(value_array), + Arc::new(tsid_array), + Arc::new(host_array), + ], + ) + .unwrap(); + + // Build a baseline writer config with chunk-level stats only — the + // pre-PR behavior. We override `to_writer_properties_with_metadata` + // by constructing properties manually here. + use parquet::basic::Compression as ParquetCompression; + use parquet::file::properties::{EnabledStatistics, WriterProperties}; + + let chunk_only_props = WriterProperties::builder() + .set_max_row_group_row_count(Some(128 * 1024)) + .set_data_page_size_limit(1024 * 1024) + .set_write_batch_size(64 * 1024) + .set_column_index_truncate_length(Some(64)) + .set_compression(ParquetCompression::ZSTD( + parquet::basic::ZstdLevel::try_new(3).unwrap(), + )) + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(); + + let chunk_path = dir.path().join("baseline_chunk.parquet"); + { + let f = std::fs::File::create(&chunk_path).unwrap(); + let mut w = ArrowWriter::try_new(f, batch.schema(), Some(chunk_only_props)).unwrap(); + w.write(&batch).unwrap(); + w.close().unwrap(); + } + + // The new on-disk format: page-level stats via the public config. + let page_path = dir.path().join("with_page_stats.parquet"); + let page_props = ParquetWriterConfig::default().to_writer_properties(&schema); + { + let f = std::fs::File::create(&page_path).unwrap(); + let mut w = ArrowWriter::try_new(f, batch.schema(), Some(page_props)).unwrap(); + w.write(&batch).unwrap(); + w.close().unwrap(); + } + + let chunk_size = std::fs::metadata(&chunk_path).unwrap().len(); + let page_size = std::fs::metadata(&page_path).unwrap().len(); + let delta = page_size as i64 - chunk_size as i64; + let pct = (delta as f64) / (chunk_size as f64) * 100.0; + + eprintln!( + "footer-size delta: chunk={chunk_size} bytes, page={page_size} bytes, delta={delta} \ + bytes ({pct:+.2}%)" + ); + + // The new file must not be smaller (page-level data is strictly + // additive). It also must not be more than 30% larger on this + // representative shape — that bound is generous; production data + // is observed at "a few percent" overhead. + assert!( + page_size >= chunk_size, + "page-level stats must not shrink the file" + ); + assert!( + pct < 30.0, + "footer-size delta {pct:.2}% exceeds 30% sanity bound" + ); + + // Sanity: confirm the page-stats file actually has column index + + // offset index for every column (the whole point of the change). + let report = inspect_parquet_page_stats(&page_path, 100).unwrap(); + assert!(report.has_full_page_index_coverage()); + } +} diff --git a/quickwit/quickwit-parquet-engine/src/storage/mod.rs b/quickwit/quickwit-parquet-engine/src/storage/mod.rs index 8b87063c25e..83f933d4ce0 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/mod.rs @@ -15,15 +15,20 @@ //! Storage layer for Parquet files. mod config; +pub mod inspect; pub(crate) mod split_writer; mod writer; pub use config::{Compression, ParquetWriterConfig}; +pub use inspect::{ + ColumnReport, PageReport, ParquetPageStatsReport, RowGroupReport, inspect_parquet_page_stats, + verify_partition_prefix, +}; pub use split_writer::ParquetSplitWriter; // Re-export metadata constants for use by the merge module and tests. pub(crate) use writer::{ - PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON, - PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, - PARQUET_META_ZONEMAP_REGEXES, + PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_ROW_KEYS, + PARQUET_META_ROW_KEYS_JSON, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, + PARQUET_META_WINDOW_START, PARQUET_META_ZONEMAP_REGEXES, }; pub use writer::{ParquetWriteError, ParquetWriter}; diff --git a/quickwit/quickwit-parquet-engine/src/storage/writer.rs b/quickwit/quickwit-parquet-engine/src/storage/writer.rs index 838efd2e7d1..1eeb87992f4 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/writer.rs @@ -57,6 +57,18 @@ pub(crate) const PARQUET_META_NUM_MERGE_OPS: &str = "qh.num_merge_ops"; pub(crate) const PARQUET_META_ROW_KEYS: &str = "qh.row_keys"; pub(crate) const PARQUET_META_ROW_KEYS_JSON: &str = "qh.row_keys_json"; pub(crate) const PARQUET_META_ZONEMAP_REGEXES: &str = "qh.zonemap_regexes"; +/// Number of leading sort schema columns whose transitions align with row +/// group boundaries. `0` (or absent) means no alignment is claimed — RG +/// boundaries are arbitrary (legacy default). `N` (where 1 ≤ N ≤ sort +/// schema length) means RG boundaries align with the first `N` sort +/// columns. A single-RG file vacuously satisfies any prefix; writers +/// producing single-RG files set `N` = sort schema length so single-RG +/// and metric-aligned multi-RG appear uniform to readers. +/// +/// The marker is part of the compaction scope: only splits with the +/// same `rg_partition_prefix_len` may merge. See +/// `quickwit-parquet-engine/src/merge/policy/scope.rs`. +pub(crate) const PARQUET_META_RG_PARTITION_PREFIX_LEN: &str = "qh.rg_partition_prefix_len"; /// Build Parquet key_value_metadata entries for compaction metadata. /// Returns Vec that can be added to WriterProperties. @@ -107,6 +119,13 @@ pub(crate) fn build_compaction_key_value_metadata( )); } + if metadata.rg_partition_prefix_len > 0 { + kvs.push(KeyValue::new( + PARQUET_META_RG_PARTITION_PREFIX_LEN.to_string(), + metadata.rg_partition_prefix_len.to_string(), + )); + } + if let Some(ref row_keys_bytes) = metadata.row_keys_proto { kvs.push(KeyValue::new( PARQUET_META_ROW_KEYS.to_string(), From 7121a2e30d112ce278072be3e02970506758f2a8 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 5 May 2026 10:06:57 -0400 Subject: [PATCH 2/3] fix: preserve rg_partition_prefix_len on single-RG merge outputs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids a compaction-bucket leak that would otherwise appear once PR-3 ships single-RG ingest before PR-6 ships the streaming column-major merge engine. Previously, every merge unconditionally set the output's `rg_partition_prefix_len` to 0, even when the writer happened to produce a single-RG output that vacuously satisfies any alignment claim. With single-RG ingest active and merge demoting on every operation, post-PR-3 ingest splits would leak out of the `prefix = sort_len` bucket on their first merge and never rejoin it — newer ingests would not merge with merge outputs. New rule: predict the output's row group count via `num_rows.div_ceil(row_group_size)`. If ≤ 1 RG, propagate the inputs' prefix; otherwise demote to 0. Both the metastore split metadata (`merge_parquet_split_metadata`) and the file's KV metadata (`build_merge_kv_metadata`) follow the same rule, so they always agree about what's on disk. A `debug_assert!` checks that the prediction matches the actual row group count returned by `ArrowWriter::close()` — catches a future config change that adds a byte-based RG threshold and silently invalidates the KV claim. `MergeOutputFile` gains a `num_row_groups: usize` field so the metastore-side rule can be applied without re-parsing the file. Test changes: - Rename `test_output_prefix_len_demoted_to_zero` to `test_output_prefix_len_demoted_when_multi_rg`; pin the demotion to the `num_row_groups > 1` case. - New `test_output_prefix_len_preserved_when_single_rg` asserting the propagation case. - New `test_merge_demotes_prefix_when_output_is_multi_rg` exercising the real writer with `row_group_size = 2` and verifying the file's KV records 0 via the inspector. - Extend `test_merge_accepts_matching_rg_partition_prefix_len` to inspector-verify the single-RG output's KV preserves the prefix. Test count: 382 → 384. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/merge/metadata_aggregation.rs | 65 ++++++++++--- .../quickwit-parquet-engine/src/merge/mod.rs | 17 +++- .../src/merge/tests.rs | 69 ++++++++++++- .../src/merge/writer.rs | 96 ++++++++++++++++--- 4 files changed, 217 insertions(+), 30 deletions(-) diff --git a/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs index dbd964bb196..ddf254f0865 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/metadata_aggregation.rs @@ -119,15 +119,25 @@ pub fn merge_parquet_split_metadata( let split_id = ParquetSplitId::generate(first.kind); let parquet_file = format!("{split_id}.parquet"); + // `rg_partition_prefix_len` propagation rule: a single-row-group + // output vacuously satisfies any prefix claim (no boundary to + // misalign), so we keep the inputs' prefix. Multi-RG output with + // arbitrary row-count-driven boundaries (the only kind the current + // merge writer can produce) cannot honor a non-zero claim and must + // reset to 0. PR-6 (streaming column-major merge engine) will + // produce sort-prefix-aligned multi-RG output and propagate the + // prefix unconditionally. + // + // This must agree with the value the writer embeds in the file's + // `qh.rg_partition_prefix_len` KV — see `write_merge_outputs`. + let output_prefix_len = if output.num_row_groups <= 1 { + first.rg_partition_prefix_len + } else { + 0 + }; + // Data-dependent fields come from the MergeOutputFile (extracted from // this output's actual rows during the merge write pass). - // - // `rg_partition_prefix_len` is reset to 0: the current merge writer - // does not enforce row group boundary alignment with sort prefix - // transitions. A future PR (the streaming column-major merge engine) - // will produce aligned output and propagate the prefix from inputs. - // Until then, claiming alignment on output would be dishonest, so we - // demote to 0 even when inputs have a higher prefix. let mut metadata = ParquetSplitMetadata { kind: first.kind, partition_id: first.partition_id, @@ -146,7 +156,7 @@ pub fn merge_parquet_split_metadata( num_merge_ops, row_keys_proto: output.row_keys_proto.clone(), zonemap_regexes: output.zonemap_regexes.clone(), - rg_partition_prefix_len: 0, + rg_partition_prefix_len: output_prefix_len, }; // Finalize: tag sets may exceed the cardinality threshold. @@ -192,10 +202,21 @@ mod tests { size_bytes: u64, time_range: (u64, u64), metric_names: &[&str], + ) -> MergeOutputFile { + make_output_full(num_rows, size_bytes, 1, time_range, metric_names) + } + + fn make_output_full( + num_rows: usize, + size_bytes: u64, + num_row_groups: usize, + time_range: (u64, u64), + metric_names: &[&str], ) -> MergeOutputFile { MergeOutputFile { path: PathBuf::from("/tmp/merged.parquet"), num_rows, + num_row_groups, size_bytes, row_keys_proto: Some(vec![0x08, 0x01]), zonemap_regexes: HashMap::from([("metric_name".to_string(), "cpu\\..*".to_string())]), @@ -391,20 +412,38 @@ mod tests { } #[test] - fn test_output_prefix_len_demoted_to_zero() { - // Until the streaming column-major writer lands, the merge writer - // does not enforce alignment, so the output's prefix is 0 even when - // every input claims a higher value. This test pins that contract. + fn test_output_prefix_len_demoted_when_multi_rg() { + // The current merge writer rolls over RGs at row count, not at + // sort-prefix transitions. When the output ends up with > 1 RG, + // the boundaries are at arbitrary places and the inputs' prefix + // claim cannot be honored — the output's prefix must be 0. let mut s0 = make_test_split("s0", (1000, 2000), 0); let mut s1 = make_test_split("s1", (1000, 2000), 0); s0.rg_partition_prefix_len = 3; s1.rg_partition_prefix_len = 3; - let output = make_output(200, 9000); + let output = make_output_full(200, 9000, 2, (1000, 2000), &["cpu.usage"]); let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); assert_eq!(result.rg_partition_prefix_len, 0); } + #[test] + fn test_output_prefix_len_preserved_when_single_rg() { + // A single-RG output vacuously satisfies any prefix alignment + // claim (one RG, no boundary to misalign). Propagate the inputs' + // prefix so the merge output stays in the same compaction bucket + // as the inputs, instead of leaking into the prefix=0 bucket on + // every merge. + let mut s0 = make_test_split("s0", (1000, 2000), 0); + let mut s1 = make_test_split("s1", (1000, 2000), 0); + s0.rg_partition_prefix_len = 3; + s1.rg_partition_prefix_len = 3; + + let output = make_output_full(200, 9000, 1, (1000, 2000), &["cpu.usage"]); + let result = merge_parquet_split_metadata(&[s0, s1], &output).unwrap(); + assert_eq!(result.rg_partition_prefix_len, 3); + } + #[test] fn test_fresh_split_id_generated() { let inputs = vec![ diff --git a/quickwit/quickwit-parquet-engine/src/merge/mod.rs b/quickwit/quickwit-parquet-engine/src/merge/mod.rs index 74b3dad52e2..008c456208a 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/mod.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/mod.rs @@ -71,9 +71,11 @@ struct InputMetadata { num_merge_ops: u32, /// Number of leading sort columns whose transitions align with row /// group boundaries. All input files must agree on this value (it's - /// part of the compaction scope key). The current merge writer does - /// not enforce alignment, so the *output* file is written with prefix - /// 0 regardless of this value. + /// part of the compaction scope key). Splitting row groups at the + /// claimed prefix boundary is not implemented by the current merge + /// writer — it lands in PR-6 (streaming column-major merge engine). + /// Until then, the *output* file is written with prefix 0 regardless + /// of this value. #[allow(dead_code)] // wired for PR-6 streaming engine; PR-1 only validates. rg_partition_prefix_len: u32, } @@ -92,6 +94,15 @@ pub struct MergeOutputFile { /// Number of rows in this output file. pub num_rows: usize, + /// Number of row groups the writer produced for this file. Used by + /// `merge_parquet_split_metadata` to decide whether the input prefix + /// alignment claim (`rg_partition_prefix_len`) can be propagated to + /// the output: a single-RG file vacuously satisfies any claim, so + /// we keep the inputs' prefix; a multi-RG file with arbitrary + /// boundaries (the only kind the current writer can produce) must + /// reset the claim to 0. + pub num_row_groups: usize, + /// File size in bytes. pub size_bytes: u64, diff --git a/quickwit/quickwit-parquet-engine/src/merge/tests.rs b/quickwit/quickwit-parquet-engine/src/merge/tests.rs index c34845a5fb1..419057db1de 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/tests.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/tests.rs @@ -1293,9 +1293,10 @@ fn test_merge_rejects_mismatched_rg_partition_prefix_len() { #[test] fn test_merge_accepts_matching_rg_partition_prefix_len() { - // Sanity check: when both inputs declare the same prefix_len, the - // merge proceeds as usual. The output is demoted to prefix_len = 0 - // because the current writer does not enforce alignment. + // Sanity check: when both inputs declare the same prefix_len AND the + // merged output fits in a single row group, the output preserves the + // inputs' prefix (single-RG vacuously satisfies any alignment claim). + // Verify both at the MergeOutputFile struct level and on disk. let dir = TempDir::new().unwrap(); let input1 = write_test_split_with_prefix_len( @@ -1328,6 +1329,68 @@ fn test_merge_accepts_matching_rg_partition_prefix_len() { let outputs = merge_sorted_parquet_files(&[input1, input2], &output_dir, &config).unwrap(); assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].num_rows, 4); + assert_eq!( + outputs[0].num_row_groups, 1, + "4 rows fit in one row group with the default 128K threshold" + ); + + // The output file's KV metadata must record the preserved prefix. + let report = crate::storage::inspect_parquet_page_stats(&outputs[0].path, 0).unwrap(); + assert_eq!( + report.rg_partition_prefix_len, 2, + "single-RG merge output should preserve the inputs' prefix in its KV metadata" + ); +} + +#[test] +fn test_merge_demotes_prefix_when_output_is_multi_rg() { + // When the merged output spans more than one row group, the writer + // cannot guarantee the inputs' prefix alignment claim and must + // record prefix=0 in the output's KV. Force the multi-RG case by + // setting a tiny `row_group_size` in the writer config. + let dir = TempDir::new().unwrap(); + + let input1 = write_test_split_with_prefix_len( + dir.path(), + "input1.parquet", + &["cpu", "cpu", "cpu"], + &[100, 200, 300], + &[1.0, 2.0, 3.0], + &[42, 42, 42], + 2, + ); + + let input2 = write_test_split_with_prefix_len( + dir.path(), + "input2.parquet", + &["mem", "mem", "mem"], + &[100, 200, 300], + &[4.0, 5.0, 6.0], + &[99, 99, 99], + 2, + ); + + let output_dir = dir.path().join("output"); + std::fs::create_dir_all(&output_dir).unwrap(); + let config = MergeConfig { + num_outputs: 1, + writer_config: ParquetWriterConfig::default().with_row_group_size(2), + }; + + let outputs = merge_sorted_parquet_files(&[input1, input2], &output_dir, &config).unwrap(); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].num_rows, 6); + assert!( + outputs[0].num_row_groups > 1, + "row_group_size=2 with 6 rows should produce multiple row groups, got {}", + outputs[0].num_row_groups + ); + + let report = crate::storage::inspect_parquet_page_stats(&outputs[0].path, 0).unwrap(); + assert_eq!( + report.rg_partition_prefix_len, 0, + "multi-RG merge output cannot honor the inputs' prefix and must record 0" + ); } /// Build a test RecordBatch from raw column data (shared by test helpers). diff --git a/quickwit/quickwit-parquet-engine/src/merge/writer.rs b/quickwit/quickwit-parquet-engine/src/merge/writer.rs index 44cb85d37ec..3ac908dab3c 100644 --- a/quickwit/quickwit-parquet-engine/src/merge/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/merge/writer.rs @@ -44,9 +44,9 @@ use crate::storage::split_writer::{ extract_metric_names, extract_service_names, extract_time_range, }; use crate::storage::{ - PARQUET_META_NUM_MERGE_OPS, PARQUET_META_ROW_KEYS, PARQUET_META_ROW_KEYS_JSON, - PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, PARQUET_META_WINDOW_START, - PARQUET_META_ZONEMAP_REGEXES, + PARQUET_META_NUM_MERGE_OPS, PARQUET_META_RG_PARTITION_PREFIX_LEN, PARQUET_META_ROW_KEYS, + PARQUET_META_ROW_KEYS_JSON, PARQUET_META_SORT_FIELDS, PARQUET_META_WINDOW_DURATION, + PARQUET_META_WINDOW_START, PARQUET_META_ZONEMAP_REGEXES, }; use crate::zonemap::{self, ZonemapOptions}; @@ -99,8 +99,29 @@ pub fn write_merge_outputs( zonemap::extract_zonemap_regexes(&input_meta.sort_fields, &sorted_batch, &zonemap_opts) .context("extracting zonemap regexes from merge output")?; + // Predict the output's row group count. ArrowWriter rolls over a + // new RG every `row_group_size` rows; we don't set a byte-based + // threshold so the row count is the only driver. This prediction + // determines the prefix alignment claim we embed in the file's KV + // metadata: single-RG output vacuously satisfies any prefix, so + // we propagate the input prefix; multi-RG output (with arbitrary + // row-count-driven boundaries) must claim 0. PR-6 will replace + // this with proper sort-prefix-aligned boundaries. + let predicted_num_rgs = + predict_num_row_groups(sorted_batch.num_rows(), config.writer_config.row_group_size); + let output_prefix_len = if predicted_num_rgs <= 1 { + input_meta.rg_partition_prefix_len + } else { + 0 + }; + // Build KV metadata. - let kv_entries = build_merge_kv_metadata(input_meta, &row_keys_proto, &zonemap_regexes); + let kv_entries = build_merge_kv_metadata( + input_meta, + &row_keys_proto, + &zonemap_regexes, + output_prefix_len, + ); // Build sorting_columns for Parquet metadata. let sorting_cols = build_sorting_columns(&sorted_batch, &input_meta.sort_fields)?; @@ -131,12 +152,24 @@ pub fn write_merge_outputs( low_cardinality_tags.insert(TAG_SERVICE.to_string(), service_names); } - let size_bytes = write_parquet_file(&sorted_batch, &output_path, props)?; + let written = write_parquet_file(&sorted_batch, &output_path, props)?; + + // Confirm prediction matches reality. If this ever fires, somebody + // enabled a byte-based RG threshold in the writer config and the + // KV's `rg_partition_prefix_len` will be inconsistent with the + // actual on-disk layout. + debug_assert_eq!( + predicted_num_rgs, written.num_row_groups, + "predicted RG count {} does not match actual {} — rg_partition_prefix_len in KV \ + metadata may be wrong", + predicted_num_rgs, written.num_row_groups, + ); outputs.push(MergeOutputFile { path: output_path, num_rows: sorted_batch.num_rows(), - size_bytes, + num_row_groups: written.num_row_groups, + size_bytes: written.size_bytes, row_keys_proto, zonemap_regexes, metric_names, @@ -204,11 +237,28 @@ fn apply_merge_permutation( Ok(result) } +/// Predict the number of row groups `ArrowWriter` will produce for a +/// batch of `num_rows` rows. Assumes `row_group_size` is the only RG +/// rollover threshold (which is the case as long as the writer config +/// does not set `set_max_row_group_bytes`). Returns at least 1. +fn predict_num_row_groups(num_rows: usize, row_group_size: usize) -> usize { + if num_rows == 0 || row_group_size == 0 { + return 1; + } + num_rows.div_ceil(row_group_size).max(1) +} + /// Build Parquet KV metadata entries for a merge output file. +/// +/// `output_prefix_len` is the alignment claim to embed in the output's +/// `qh.rg_partition_prefix_len` KV — caller computes this based on +/// whether the file is going to be single-RG (preserve input prefix) +/// or multi-RG (must be 0). fn build_merge_kv_metadata( input_meta: &InputMetadata, row_keys_proto: &Option>, zonemap_regexes: &std::collections::HashMap, + output_prefix_len: u32, ) -> Vec { let mut kvs = Vec::new(); @@ -238,6 +288,13 @@ fn build_merge_kv_metadata( input_meta.num_merge_ops.to_string(), )); + if output_prefix_len > 0 { + kvs.push(KeyValue::new( + PARQUET_META_RG_PARTITION_PREFIX_LEN.to_string(), + output_prefix_len.to_string(), + )); + } + if let Some(rk_bytes) = row_keys_proto { kvs.push(KeyValue::new( PARQUET_META_ROW_KEYS.to_string(), @@ -393,8 +450,21 @@ fn verify_sort_order(batch: &RecordBatch, sort_fields_str: &str) { } } -/// Write a RecordBatch to a Parquet file. -fn write_parquet_file(batch: &RecordBatch, path: &Path, props: WriterProperties) -> Result { +/// Result of writing a single Parquet output file. +struct WrittenFile { + /// File size in bytes on disk after `ArrowWriter::close()`. + size_bytes: u64, + /// Number of row groups the writer produced. + num_row_groups: usize, +} + +/// Write a RecordBatch to a Parquet file. Returns its on-disk size and +/// the number of row groups produced by the writer. +fn write_parquet_file( + batch: &RecordBatch, + path: &Path, + props: WriterProperties, +) -> Result { let file = std::fs::File::create(path) .with_context(|| format!("creating output file: {}", path.display()))?; @@ -405,13 +475,17 @@ fn write_parquet_file(batch: &RecordBatch, path: &Path, props: WriterProperties) .write(batch) .with_context(|| format!("writing batch: {}", path.display()))?; - writer + let metadata = writer .close() .with_context(|| format!("closing parquet writer: {}", path.display()))?; + let num_row_groups = metadata.num_row_groups(); - let size = std::fs::metadata(path) + let size_bytes = std::fs::metadata(path) .with_context(|| format!("reading file size: {}", path.display()))? .len(); - Ok(size) + Ok(WrittenFile { + size_bytes, + num_row_groups, + }) } From a911d3eabd4572a3028957f1bdcd62e6bf35d00b Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 5 May 2026 12:08:08 -0400 Subject: [PATCH 3/3] test: end-to-end page-index pruning through the metrics query path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gate-A verification before PR-3 (single-RG ingest cutover): proves that page-level statistics written by PR-1 are actually consumed by the production query path for pruning, not just embedded inertly in the footer. Findings: - The metrics read path at `MetricsParquetTableProvider::scan` already calls `ParquetSource::with_enable_page_index(true)`, so DataFusion loads the column index + offset index when reading. No new wiring needed on the reader side. - DataFusion's `PruningMetrics` (`page_index_rows_pruned`) counter on `DataSourceExec` is the testable signal — pruned > 0 means pages were eliminated using their min/max from the column index. The new integration test (`quickwit-datafusion/tests/metrics.rs::test_page_index_pruning_via_query`) builds a single split with two metric_names interleaved, forces the metric_name column into ~16 pages within one row group, runs `WHERE metric_name = 'cpu.usage'`, walks the executed plan, and asserts `page_index_rows_pruned >= 4096` (the rows from the *other* metric) plus correctness of the returned rows. Plumbing change: `ParquetWriterConfig::with_data_page_row_count_limit` exposes Parquet's per-page row count rollover threshold. The size-based `data_page_size` knob alone can't force multi-page output when dictionary-encoded columns RLE-compress to a handful of bytes regardless of row count. Default 0 = unbounded; production behavior unchanged. Tests: 14/14 metrics integration tests pass (was 13). Co-Authored-By: Claude Opus 4.7 (1M context) --- quickwit/quickwit-datafusion/tests/metrics.rs | 130 +++++++++++++++++- .../tests/metrics_splits/mod.rs | 77 ++++++++++- .../src/storage/config.rs | 18 +++ 3 files changed, 222 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-datafusion/tests/metrics.rs b/quickwit/quickwit-datafusion/tests/metrics.rs index 693abf09d0c..c36412305fc 100644 --- a/quickwit/quickwit-datafusion/tests/metrics.rs +++ b/quickwit/quickwit-datafusion/tests/metrics.rs @@ -32,7 +32,9 @@ mod common; mod metrics_splits; use common::{TestSandbox, create_metrics_index}; -use metrics_splits::{publish_split, publish_split_with_tag_metadata}; +use metrics_splits::{ + publish_split, publish_split_with_tag_metadata, publish_split_with_writer_config, +}; // ── Setup ────────────────────────────────────────────────────────── @@ -99,6 +101,132 @@ async fn test_select_all() { assert_eq!(batches[0].num_columns(), 5); } +/// Gate-A end-to-end test: page-level pruning fires through the +/// production read path. +/// +/// PR-1 made the writer emit `EnabledStatistics::Page` (column index + +/// offset index in the footer) and `MetricsParquetTableProvider::scan` +/// already wires `ParquetSource::with_enable_page_index(true)`. Together +/// they mean a query whose filter eliminates whole pages should report +/// a non-zero `page_index_rows_pruned` metric on the `DataSourceExec`. +/// +/// To make page-level pruning the *only* meaningful pruner, this test: +/// - puts both metric_names in a single split (no split-level pruning) +/// - forces a tiny `data_page_size` so each metric_name lives in its own pages within the same +/// row group (so RG-level pruning can't fully resolve the filter; page-level must contribute) +/// - asserts `page_index_rows_pruned`'s pruned count is greater than zero, and that the query +/// still returns the right rows +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_page_index_pruning_via_query() { + use datafusion::physical_plan::{collect, displayable}; + + let sandbox = start_sandbox().await; + let metastore = sandbox.metastore.clone(); + let data_dir = &sandbox.data_dir; + let builder = session_builder(&sandbox); + + let index_uid = create_metrics_index(&metastore, "test-page-prune", data_dir.path()).await; + + // Build a batch with both metric names. Rows are ordered by + // metric_name (because the writer sorts by sort schema before + // writing), so each metric ends up contiguous in the output — + // exactly the layout that lets page-level stats prune the + // unselected metric. + let n_per = 4096_u32; + let timestamps_a: Vec = (0..n_per).map(|i| 1_000 + i as u64).collect(); + let timestamps_b: Vec = (0..n_per).map(|i| 1_000 + (n_per + i) as u64).collect(); + let values_a: Vec = (0..n_per).map(|i| (i as f64) * 0.1).collect(); + let values_b: Vec = (0..n_per).map(|i| (i as f64) * 0.2).collect(); + let batch_a = make_batch("cpu.usage", ×tamps_a, &values_a, Some("web")); + let batch_b = make_batch("memory.used", ×tamps_b, &values_b, Some("web")); + let combined = arrow::compute::concat_batches(&batch_a.schema(), &[batch_a, batch_b]) + .expect("concat metric batches"); + + // Force the metric_name column into many pages. The size-based + // `data_page_size` knob alone isn't enough because metric_name with + // two distinct dictionary values RLE-compresses to a handful of + // bytes regardless of row count. Use the row-count limit instead; + // it's only checked at write-batch boundaries, so write_batch_size + // must be at most the row-count limit to actually take effect. + let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default() + .with_data_page_row_count_limit(512) + .with_write_batch_size(128); + publish_split_with_writer_config( + &metastore, + &index_uid, + data_dir.path(), + "combined", + &combined, + writer_config, + ) + .await; + + let create_table = r#" + CREATE OR REPLACE EXTERNAL TABLE "test-page-prune" ( + metric_name VARCHAR NOT NULL, metric_type TINYINT, + timestamp_secs BIGINT NOT NULL, value DOUBLE NOT NULL, service VARCHAR + ) STORED AS metrics LOCATION 'test-page-prune'"#; + let query = r#"SELECT value FROM "test-page-prune" WHERE metric_name = 'cpu.usage'"#; + + let ctx = builder.build_session().unwrap(); + ctx.sql(create_table) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let df = ctx.sql(query).await.unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + let result = collect(plan.clone(), ctx.task_ctx()).await.unwrap(); + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + + let pruned = collect_pruned_count(plan.as_ref(), "page_index_rows_pruned"); + + // The query keeps the 4096 cpu.usage rows; the other 4096 + // memory.used rows belong to pages whose min/max is "memory.used" + // and are eliminable by page-level pruning. Asserting "≥ n_per" + // catches the degenerate case where pruning does nothing AND any + // future regression that loses pruning effectiveness. + if pruned < n_per as usize { + let plan_str = format!("{}", displayable(plan.as_ref()).indent(true)); + panic!( + "expected page-level pruning to skip ≥ {n_per} rows; got {pruned}.\nPlan:\n{plan_str}", + ); + } + assert_eq!( + total_rows, n_per as usize, + "query should return exactly the cpu.usage rows; got {total_rows}" + ); +} + +/// Walk an executed `ExecutionPlan` tree and sum the `pruned` value of +/// every `PruningMetrics` matching `metric_name`. +fn collect_pruned_count( + plan: &dyn datafusion::physical_plan::ExecutionPlan, + metric_name: &str, +) -> usize { + use datafusion::physical_plan::metrics::MetricValue; + + let mut total = 0; + if let Some(metrics) = plan.metrics() { + for metric in metrics.iter() { + if let MetricValue::PruningMetrics { + name, + pruning_metrics, + } = metric.value() + && name.as_ref() == metric_name + { + total += pruning_metrics.pruned(); + } + } + } + for child in plan.children() { + total += collect_pruned_count(child.as_ref(), metric_name); + } + total +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_metric_name_pruning() { let sandbox = start_sandbox().await; diff --git a/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs b/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs index 532af78722d..98ce0fa517a 100644 --- a/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs +++ b/quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs @@ -38,9 +38,28 @@ pub async fn publish_split( split_name: &str, batch: &RecordBatch, ) { - publish_split_with_tag_metadata(metastore, index_uid, data_dir, split_name, batch, true).await; + publish_split_with_options( + metastore, + index_uid, + data_dir, + split_name, + batch, + ParquetWriterConfig::default(), + true, + ) + .await; } +/// Same as [`publish_split`] but lets the caller suppress the +/// low-cardinality tag metadata (`service`, `env`, `datacenter`, +/// `region`, `host`). Tests use `false` to verify pruning paths that +/// rely on writer-generated `zonemap_regexes` rather than exact tag +/// sets. +/// +/// Only referenced by `metrics.rs`; dead-code from the perspective of +/// `null_columns.rs` / `distributed.rs`, both of which include this +/// module via `mod metrics_splits;`. +#[allow(dead_code)] pub(crate) async fn publish_split_with_tag_metadata( metastore: &MetastoreServiceClient, index_uid: &IndexUid, @@ -48,9 +67,63 @@ pub(crate) async fn publish_split_with_tag_metadata( split_name: &str, batch: &RecordBatch, include_low_cardinality_tags: bool, +) { + publish_split_with_options( + metastore, + index_uid, + data_dir, + split_name, + batch, + ParquetWriterConfig::default(), + include_low_cardinality_tags, + ) + .await; +} + +/// Same as [`publish_split`] but with a caller-supplied +/// `ParquetWriterConfig`. Used by tests that need a specific on-disk +/// layout — e.g., a small `data_page_row_count_limit` to force the +/// metric_name column into multiple pages so page-level pruning has +/// something to prune. +/// +/// Only referenced by `metrics.rs`; dead-code from the perspective of +/// `null_columns.rs` / `distributed.rs`. +#[allow(dead_code)] +pub async fn publish_split_with_writer_config( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + data_dir: &std::path::Path, + split_name: &str, + batch: &RecordBatch, + writer_config: ParquetWriterConfig, +) { + publish_split_with_options( + metastore, + index_uid, + data_dir, + split_name, + batch, + writer_config, + true, + ) + .await; +} + +/// Inner helper combining both knobs. Kept private; the named +/// entry points above (`publish_split`, `publish_split_with_tag_metadata`, +/// `publish_split_with_writer_config`) cover the call patterns +/// tests need. +async fn publish_split_with_options( + metastore: &MetastoreServiceClient, + index_uid: &IndexUid, + data_dir: &std::path::Path, + split_name: &str, + batch: &RecordBatch, + writer_config: ParquetWriterConfig, + include_low_cardinality_tags: bool, ) { let (parquet_bytes, (row_keys_proto, zonemap_regexes)) = - ParquetWriter::new(ParquetWriterConfig::default(), &TableConfig::default()) + ParquetWriter::new(writer_config, &TableConfig::default()) .unwrap() .write_to_bytes(batch, None) .expect("parquet encode"); diff --git a/quickwit/quickwit-parquet-engine/src/storage/config.rs b/quickwit/quickwit-parquet-engine/src/storage/config.rs index 2cea49926a4..67dd1544bd4 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/config.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/config.rs @@ -64,6 +64,11 @@ pub struct ParquetWriterConfig { pub row_group_size: usize, /// Target size in bytes for data pages. pub data_page_size: usize, + /// Maximum rows per data page (`0` = unbounded — let `data_page_size` + /// drive the rollover). Useful for tests that need to force the + /// metric_name column into multiple pages even when it compresses + /// to a handful of bytes. + pub data_page_row_count_limit: usize, /// Number of rows per write batch. pub write_batch_size: usize, /// Whether to emit page-level statistics (Parquet Column Index + @@ -86,6 +91,7 @@ impl Default for ParquetWriterConfig { compression_level: Some(DEFAULT_ZSTD_LEVEL), row_group_size: DEFAULT_ROW_GROUP_SIZE, data_page_size: DEFAULT_DATA_PAGE_SIZE, + data_page_row_count_limit: 0, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, page_statistics_enabled: true, } @@ -122,6 +128,14 @@ impl ParquetWriterConfig { self } + /// Set the per-page row count limit. `0` means unbounded (rely on + /// `data_page_size` for rollover). See + /// [`ParquetWriterConfig::data_page_row_count_limit`]. + pub fn with_data_page_row_count_limit(mut self, limit: usize) -> Self { + self.data_page_row_count_limit = limit; + self + } + /// Set the write batch size. pub fn with_write_batch_size(mut self, size: usize) -> Self { self.write_batch_size = size; @@ -184,6 +198,10 @@ impl ParquetWriterConfig { .set_sorting_columns(Some(sorting_cols)) .set_statistics_enabled(stats_level); + if self.data_page_row_count_limit > 0 { + builder = builder.set_data_page_row_count_limit(self.data_page_row_count_limit); + } + if let Some(kvs) = kv_metadata && !kvs.is_empty() {