Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 129 additions & 1 deletion quickwit/quickwit-datafusion/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ mod common;
mod metrics_splits;

use common::{TestSandbox, create_metrics_index};
use metrics_splits::{publish_split, publish_split_with_tag_metadata};
use metrics_splits::{
publish_split, publish_split_with_tag_metadata, publish_split_with_writer_config,
};

// ── Setup ──────────────────────────────────────────────────────────

Expand Down Expand Up @@ -99,6 +101,132 @@ async fn test_select_all() {
assert_eq!(batches[0].num_columns(), 5);
}

/// Gate-A end-to-end test: page-level pruning fires through the
/// production read path.
///
/// PR-1 made the writer emit `EnabledStatistics::Page` (column index +
/// offset index in the footer) and `MetricsParquetTableProvider::scan`
/// already wires `ParquetSource::with_enable_page_index(true)`. Together
/// they mean a query whose filter eliminates whole pages should report
/// a non-zero `page_index_rows_pruned` metric on the `DataSourceExec`.
///
/// To make page-level pruning the *only* meaningful pruner, this test:
/// - puts both metric_names in a single split (no split-level pruning)
/// - forces a tiny `data_page_size` so each metric_name lives in its own pages within the same
/// row group (so RG-level pruning can't fully resolve the filter; page-level must contribute)
/// - asserts `page_index_rows_pruned`'s pruned count is greater than zero, and that the query
/// still returns the right rows
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_page_index_pruning_via_query() {
use datafusion::physical_plan::{collect, displayable};

let sandbox = start_sandbox().await;
let metastore = sandbox.metastore.clone();
let data_dir = &sandbox.data_dir;
let builder = session_builder(&sandbox);

let index_uid = create_metrics_index(&metastore, "test-page-prune", data_dir.path()).await;

// Build a batch with both metric names. Rows are ordered by
// metric_name (because the writer sorts by sort schema before
// writing), so each metric ends up contiguous in the output —
// exactly the layout that lets page-level stats prune the
// unselected metric.
let n_per = 4096_u32;
let timestamps_a: Vec<u64> = (0..n_per).map(|i| 1_000 + i as u64).collect();
let timestamps_b: Vec<u64> = (0..n_per).map(|i| 1_000 + (n_per + i) as u64).collect();
let values_a: Vec<f64> = (0..n_per).map(|i| (i as f64) * 0.1).collect();
let values_b: Vec<f64> = (0..n_per).map(|i| (i as f64) * 0.2).collect();
let batch_a = make_batch("cpu.usage", &timestamps_a, &values_a, Some("web"));
let batch_b = make_batch("memory.used", &timestamps_b, &values_b, Some("web"));
let combined = arrow::compute::concat_batches(&batch_a.schema(), &[batch_a, batch_b])
.expect("concat metric batches");

// Force the metric_name column into many pages. The size-based
// `data_page_size` knob alone isn't enough because metric_name with
// two distinct dictionary values RLE-compresses to a handful of
// bytes regardless of row count. Use the row-count limit instead;
// it's only checked at write-batch boundaries, so write_batch_size
// must be at most the row-count limit to actually take effect.
let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default()
.with_data_page_row_count_limit(512)
.with_write_batch_size(128);
publish_split_with_writer_config(
&metastore,
&index_uid,
data_dir.path(),
"combined",
&combined,
writer_config,
)
.await;

let create_table = r#"
CREATE OR REPLACE EXTERNAL TABLE "test-page-prune" (
metric_name VARCHAR NOT NULL, metric_type TINYINT,
timestamp_secs BIGINT NOT NULL, value DOUBLE NOT NULL, service VARCHAR
) STORED AS metrics LOCATION 'test-page-prune'"#;
let query = r#"SELECT value FROM "test-page-prune" WHERE metric_name = 'cpu.usage'"#;

let ctx = builder.build_session().unwrap();
ctx.sql(create_table)
.await
.unwrap()
.collect()
.await
.unwrap();

let df = ctx.sql(query).await.unwrap();
let plan = df.create_physical_plan().await.unwrap();
let result = collect(plan.clone(), ctx.task_ctx()).await.unwrap();
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();

let pruned = collect_pruned_count(plan.as_ref(), "page_index_rows_pruned");

// The query keeps the 4096 cpu.usage rows; the other 4096
// memory.used rows belong to pages whose min/max is "memory.used"
// and are eliminable by page-level pruning. Asserting "≥ n_per"
// catches the degenerate case where pruning does nothing AND any
// future regression that loses pruning effectiveness.
if pruned < n_per as usize {
let plan_str = format!("{}", displayable(plan.as_ref()).indent(true));
panic!(
"expected page-level pruning to skip ≥ {n_per} rows; got {pruned}.\nPlan:\n{plan_str}",
);
}
assert_eq!(
total_rows, n_per as usize,
"query should return exactly the cpu.usage rows; got {total_rows}"
);
}

/// Walk an executed `ExecutionPlan` tree and sum the `pruned` value of
/// every `PruningMetrics` matching `metric_name`.
fn collect_pruned_count(
plan: &dyn datafusion::physical_plan::ExecutionPlan,
metric_name: &str,
) -> usize {
use datafusion::physical_plan::metrics::MetricValue;

let mut total = 0;
if let Some(metrics) = plan.metrics() {
for metric in metrics.iter() {
if let MetricValue::PruningMetrics {
name,
pruning_metrics,
} = metric.value()
&& name.as_ref() == metric_name
{
total += pruning_metrics.pruned();
}
}
}
for child in plan.children() {
total += collect_pruned_count(child.as_ref(), metric_name);
}
total
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_metric_name_pruning() {
let sandbox = start_sandbox().await;
Expand Down
77 changes: 75 additions & 2 deletions quickwit/quickwit-datafusion/tests/metrics_splits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,92 @@ pub async fn publish_split(
split_name: &str,
batch: &RecordBatch,
) {
publish_split_with_tag_metadata(metastore, index_uid, data_dir, split_name, batch, true).await;
publish_split_with_options(
metastore,
index_uid,
data_dir,
split_name,
batch,
ParquetWriterConfig::default(),
true,
)
.await;
}

/// Same as [`publish_split`] but lets the caller suppress the
/// low-cardinality tag metadata (`service`, `env`, `datacenter`,
/// `region`, `host`). Tests use `false` to verify pruning paths that
/// rely on writer-generated `zonemap_regexes` rather than exact tag
/// sets.
///
/// Only referenced by `metrics.rs`; dead-code from the perspective of
/// `null_columns.rs` / `distributed.rs`, both of which include this
/// module via `mod metrics_splits;`.
#[allow(dead_code)]
pub(crate) async fn publish_split_with_tag_metadata(
metastore: &MetastoreServiceClient,
index_uid: &IndexUid,
data_dir: &std::path::Path,
split_name: &str,
batch: &RecordBatch,
include_low_cardinality_tags: bool,
) {
publish_split_with_options(
metastore,
index_uid,
data_dir,
split_name,
batch,
ParquetWriterConfig::default(),
include_low_cardinality_tags,
)
.await;
}

/// Same as [`publish_split`] but with a caller-supplied
/// `ParquetWriterConfig`. Used by tests that need a specific on-disk
/// layout — e.g., a small `data_page_row_count_limit` to force the
/// metric_name column into multiple pages so page-level pruning has
/// something to prune.
///
/// Only referenced by `metrics.rs`; dead-code from the perspective of
/// `null_columns.rs` / `distributed.rs`.
#[allow(dead_code)]
pub async fn publish_split_with_writer_config(
metastore: &MetastoreServiceClient,
index_uid: &IndexUid,
data_dir: &std::path::Path,
split_name: &str,
batch: &RecordBatch,
writer_config: ParquetWriterConfig,
) {
publish_split_with_options(
metastore,
index_uid,
data_dir,
split_name,
batch,
writer_config,
true,
)
.await;
}

/// Inner helper combining both knobs. Kept private; the named
/// entry points above (`publish_split`, `publish_split_with_tag_metadata`,
/// `publish_split_with_writer_config`) cover the call patterns
/// tests need.
async fn publish_split_with_options(
metastore: &MetastoreServiceClient,
index_uid: &IndexUid,
data_dir: &std::path::Path,
split_name: &str,
batch: &RecordBatch,
writer_config: ParquetWriterConfig,
include_low_cardinality_tags: bool,
) {
let (parquet_bytes, (row_keys_proto, zonemap_regexes)) =
ParquetWriter::new(ParquetWriterConfig::default(), &TableConfig::default())
ParquetWriter::new(writer_config, &TableConfig::default())
.unwrap()
.write_to_bytes(batch, None)
.expect("parquet encode");
Expand Down
Loading
Loading