Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/quickwit-doc-mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true

[dependencies]
anyhow = { workspace = true }
arrow = { workspace = true, optional = true }
base64 = { workspace = true }
fnv = { workspace = true }
hex = { workspace = true }
Expand Down Expand Up @@ -44,6 +45,7 @@ quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-query = { workspace = true }

[features]
metrics = ["dep:arrow"]
testsuite = []

[[bench]]
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-doc-mapper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub use doc_mapping::{DocMapping, Mode, ModeType};
pub use error::{DocParsingError, QueryParserError};
use quickwit_common::shared_consts::FIELD_PRESENCE_FIELD_NAME;
use quickwit_proto::types::DocMappingUid;
#[cfg(feature = "metrics")]
pub use routing_expression::ArrowRowContext;
pub use routing_expression::RoutingExpr;

/// Field name reserved for storing the source document.
Expand Down
144 changes: 144 additions & 0 deletions quickwit/quickwit-doc-mapper/src/routing_expression/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::Hasher;

use arrow::array::{Array, AsArray};
use arrow::datatypes::{DataType, Int32Type};
use arrow::record_batch::RecordBatch;
#[cfg(test)]
use serde_json::Value as JsonValue;

#[cfg(test)]
use super::RoutingExpr;
use super::RoutingExprContext;

/// Context for evaluating routing expressions against a single row of an Arrow `RecordBatch`.
///
/// Hashing is deliberately consistent with the JSON-backed `RoutingExprContext`
/// implementation so identical logical values produce the same `partition_id`
/// whether they arrive as JSON or Arrow IPC.
pub struct ArrowRowContext<'a> {
batch: &'a RecordBatch,
row_idx: usize,
}

impl<'a> ArrowRowContext<'a> {
/// Creates an Arrow-backed routing context for one row in a `RecordBatch`.
pub fn new(batch: &'a RecordBatch, row_idx: usize) -> Self {
Self { batch, row_idx }
}
}

impl<'a> RoutingExprContext for ArrowRowContext<'a> {
fn hash_attribute<H: Hasher>(&self, attr_name: &[String], hasher: &mut H) {
// Metrics/sketches have flat schemas — attr_name is always a single column name.
let col_name = &attr_name[0];
let col_idx = match self.batch.schema().index_of(col_name) {
Ok(idx) => idx,
Err(_) => {
hasher.write_u8(0u8);
return;
}
};
let column = self.batch.column(col_idx);
if column.is_null(self.row_idx) {
hasher.write_u8(0u8);
return;
}
// Extract the string value. Routing expressions reference string tag columns;
// non-string columns are treated as absent.
let string_value = match column.data_type() {
DataType::Dictionary(_, value_type) if value_type.as_ref() == &DataType::Utf8 => {
let dict = column
.as_any()
.downcast_ref::<arrow::array::DictionaryArray<Int32Type>>()
.expect("dictionary column should be DictionaryArray<Int32>");
let values = dict.values().as_string::<i32>();
let key = dict.keys().value(self.row_idx) as usize;
Some(values.value(key))
}
DataType::Utf8 => {
let arr = column.as_string::<i32>();
Some(arr.value(self.row_idx))
}
_ => None,
};
match string_value {
Some(s) => {
// Match JSON impl: 1u8 (present) + hash_json_val for String (3u8 + len + bytes).
hasher.write_u8(1u8);
hasher.write_u8(3u8);
hasher.write_u64(s.len() as u64);
hasher.write(s.as_bytes());
}
None => {
hasher.write_u8(0u8);
}
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow::array::StringDictionaryBuilder;
use arrow::datatypes::{Field, Schema as ArrowSchema};

use super::*;

#[test]
fn test_arrow_row_context_hash_matches_json() {
let routing_expr = RoutingExpr::new("hash_mod((metric_name,host), 100)").unwrap();

let json_ctx: serde_json::Map<String, JsonValue> = serde_json::from_str(
r#"{"metric_name": "cpu.usage", "host": "server-01", "env": "prod"}"#,
)
.unwrap();
let json_hash = routing_expr.eval_hash(&json_ctx);

let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("metric_name", dict_type.clone(), false),
Field::new("host", dict_type.clone(), true),
Field::new("env", dict_type, true),
]));

let mut metric_name_builder = StringDictionaryBuilder::<Int32Type>::new();
metric_name_builder.append_value("cpu.usage");
let mut host_builder = StringDictionaryBuilder::<Int32Type>::new();
host_builder.append_value("server-01");
let mut env_builder = StringDictionaryBuilder::<Int32Type>::new();
env_builder.append_value("prod");

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(metric_name_builder.finish()),
Arc::new(host_builder.finish()),
Arc::new(env_builder.finish()),
],
)
.unwrap();

let arrow_ctx = ArrowRowContext::new(&batch, 0);
let arrow_hash = routing_expr.eval_hash(&arrow_ctx);

assert_eq!(
json_hash, arrow_hash,
"Arrow and JSON contexts must produce identical partition hashes"
);
}
}
11 changes: 11 additions & 0 deletions quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ pub(crate) use expression_dsl::parse_field_name;
use serde_json::Value as JsonValue;
use siphasher::sip::SipHasher;

#[cfg(feature = "metrics")]
mod metrics;

#[cfg(feature = "metrics")]
pub use metrics::ArrowRowContext;

pub trait RoutingExprContext {
fn hash_attribute<H: Hasher>(&self, attr_name: &[String], hasher: &mut H);
}
Expand Down Expand Up @@ -134,6 +140,11 @@ impl RoutingExpr {
})
}

/// Returns `true` if no routing expression is configured.
pub fn is_empty(&self) -> bool {
self.inner_opt.is_none()
}

/// Evaluates the expression applied to the given
/// context and returns a u64 hash.
///
Expand Down
113 changes: 113 additions & 0 deletions quickwit/quickwit-dst/src/invariants/merge_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Shared merge policy invariant checks.
//!
//! These pure functions are the single source of truth for merge operation
//! validity, used by both Stateright models and production code.

/// MP-1: all splits in a merge operation must share the same `num_merge_ops`.
///
/// If splits from different levels are merged, the output gets stamped with
/// `max(levels) + 1`, prematurely maturing lower-level data and breaking
/// the bounded write amplification guarantee.
pub fn all_same_merge_level(num_merge_ops: &[u32]) -> bool {
match num_merge_ops.first() {
None => true,
Some(&first) => num_merge_ops.iter().all(|&n| n == first),
}
}

/// MP-2: every merge operation must have at least 2 input splits.
///
/// Merging a single split is a no-op that wastes I/O. Merging zero splits
/// is nonsensical.
pub fn has_minimum_splits(count: usize) -> bool {
count >= 2
}

/// MP-3: all splits in a merge operation must share the same compaction scope.
///
/// The scope is defined by `(sort_fields, window_start, window_duration)`.
/// The merge engine validates that all inputs agree on these; a policy bug
/// that groups incompatible splits will cause the merge to fail.
///
/// `index_uid` and `partition_id` are also part of the scope but are
/// typically enforced by the grouping layer before the policy runs.
pub fn all_same_compaction_scope(
sort_fields: &[&str],
windows: &[(i64, i64)], // (start, duration) pairs
) -> bool {
let same_sort = match sort_fields.first() {
None => true,
Some(&first) => sort_fields.iter().all(|&s| s == first),
};
let same_window = match windows.first() {
None => true,
Some(&first) => windows.iter().all(|w| *w == first),
};
same_sort && same_window
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_all_same_merge_level() {
assert!(all_same_merge_level(&[]));
assert!(all_same_merge_level(&[0]));
assert!(all_same_merge_level(&[2, 2, 2]));
assert!(!all_same_merge_level(&[0, 1]));
assert!(!all_same_merge_level(&[0, 0, 1]));
}

#[test]
fn test_has_minimum_splits() {
assert!(!has_minimum_splits(0));
assert!(!has_minimum_splits(1));
assert!(has_minimum_splits(2));
assert!(has_minimum_splits(100));
}

#[test]
fn test_all_same_compaction_scope() {
// Empty is vacuously true.
assert!(all_same_compaction_scope(&[], &[]));

// Same scope.
assert!(all_same_compaction_scope(
&["a|b|ts/V2", "a|b|ts/V2"],
&[(0, 3600), (0, 3600)],
));

// Different sort fields.
assert!(!all_same_compaction_scope(
&["a|b|ts/V2", "a|ts/V2"],
&[(0, 3600), (0, 3600)],
));

// Same start, different duration.
assert!(!all_same_compaction_scope(
&["a|b|ts/V2", "a|b|ts/V2"],
&[(0, 900), (0, 1800)],
));

// Different start.
assert!(!all_same_compaction_scope(
&["a|b|ts/V2", "a|b|ts/V2"],
&[(0, 3600), (3600, 3600)],
));
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-dst/src/invariants/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! No external dependencies — only `std`.

mod check;
pub mod merge_policy;
pub mod recorder;
pub mod registry;
pub mod sort;
Expand Down
19 changes: 19 additions & 0 deletions quickwit/quickwit-dst/src/invariants/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ pub enum InvariantId {
DM4,
/// DM-5: timeseries_id persists through compaction without recomputation
DM5,

/// MP-1: all splits in a merge operation have the same num_merge_ops level
MP1,
/// MP-2: every merge operation has at least 2 input splits
MP2,
/// MP-3: all splits in a merge operation share the same compaction scope
MP3,
}

impl InvariantId {
Expand Down Expand Up @@ -106,6 +113,10 @@ impl InvariantId {
Self::DM3 => "DM-3",
Self::DM4 => "DM-4",
Self::DM5 => "DM-5",

Self::MP1 => "MP-1",
Self::MP2 => "MP-2",
Self::MP3 => "MP-3",
}
}

Expand Down Expand Up @@ -136,6 +147,10 @@ impl InvariantId {
Self::DM3 => "no interpolation — only ingested points",
Self::DM4 => "deterministic TSID from tags",
Self::DM5 => "TSID persists through compaction",

Self::MP1 => "merge op splits share num_merge_ops level",
Self::MP2 => "merge op has at least 2 splits",
Self::MP3 => "merge op splits share compaction scope",
}
}
}
Expand All @@ -157,6 +172,7 @@ mod tests {
assert_eq!(InvariantId::CS3.to_string(), "CS-3");
assert_eq!(InvariantId::MC4.to_string(), "MC-4");
assert_eq!(InvariantId::DM5.to_string(), "DM-5");
assert_eq!(InvariantId::MP1.to_string(), "MP-1");
}

#[test]
Expand All @@ -182,6 +198,9 @@ mod tests {
InvariantId::DM3,
InvariantId::DM4,
InvariantId::DM5,
InvariantId::MP1,
InvariantId::MP2,
InvariantId::MP3,
];
for id in all {
assert!(!id.description().is_empty(), "{} has empty description", id);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ testsuite = [
"quickwit-proto/testsuite",
"quickwit-storage/testsuite"
]
metrics = ["dep:arrow", "dep:quickwit-parquet-engine"]
metrics = ["dep:arrow", "dep:quickwit-parquet-engine", "quickwit-doc-mapper/metrics"]
vrl = ["dep:vrl", "quickwit-config/vrl"]
postgres = ["quickwit-metastore/postgres"]
ci-test = []
Expand Down
Loading
Loading