diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d94e2457ee0..588a5628c76 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3016,8 +3016,7 @@ dependencies = [ [[package]] name = "datafusion-distributed" version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d75c8241c447c26614c9d235dd3d4d72e93112c8716b9509d2ff0d84e06b8e" +source = "git+https://github.com/datafusion-contrib/datafusion-distributed.git?rev=d7de9c3fb99fd7fa99da925b460dc2d4529c04a2#d7de9c3fb99fd7fa99da925b460dc2d4529c04a2" dependencies = [ "arrow-flight", "arrow-ipc", @@ -8483,6 +8482,7 @@ dependencies = [ "prost 0.14.3", "prost-build 0.14.3", "serde_json", + "siphasher", "tokio", "tonic 0.14.5", "tonic-prost", diff --git a/quickwit/quickwit-datafusion/Cargo.toml b/quickwit/quickwit-datafusion/Cargo.toml index 553d1cf5b52..82f43e5a3a9 100644 --- a/quickwit/quickwit-datafusion/Cargo.toml +++ b/quickwit/quickwit-datafusion/Cargo.toml @@ -36,7 +36,7 @@ datafusion-substrait = "53" datafusion-datasource = "53" datafusion-physical-plan = "53" datafusion-datasource-parquet = "53" -datafusion-distributed = "1.0" +datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed.git", rev = "d7de9c3fb99fd7fa99da925b460dc2d4529c04a2" } object_store = "0.13" [dev-dependencies] diff --git a/quickwit/quickwit-df-core/Cargo.toml b/quickwit/quickwit-df-core/Cargo.toml index abdc939e206..ce9144917c0 100644 --- a/quickwit/quickwit-df-core/Cargo.toml +++ b/quickwit/quickwit-df-core/Cargo.toml @@ -15,13 +15,14 @@ async-trait = { workspace = true } futures = { workspace = true } prost = { workspace = true } # substrait Plan::decode (runtime only) serde_json = { workspace = true } +siphasher = { workspace = true } tracing = { workspace = true } url = "2" arrow = { workspace = true } datafusion = "53" datafusion-substrait = "53" -datafusion-distributed = "1.0" +datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed.git", rev = "d7de9c3fb99fd7fa99da925b460dc2d4529c04a2" } object_store = "0.13" # gRPC surface is opt-in. The extension traits + session builder do not need diff --git a/quickwit/quickwit-df-core/src/session.rs b/quickwit/quickwit-df-core/src/session.rs index 245fdfa870c..d1f11a4c116 100644 --- a/quickwit/quickwit-df-core/src/session.rs +++ b/quickwit/quickwit-df-core/src/session.rs @@ -45,7 +45,7 @@ use datafusion::execution::object_store::ObjectStoreRegistry; use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_distributed::{ - DistributedExt, DistributedPhysicalOptimizerRule, TaskEstimator, WorkerResolver, + DistributedExt, SessionStateBuilderExt, TaskEstimator, TaskRoutingContext, WorkerResolver, }; use crate::data_source::{ @@ -346,7 +346,7 @@ impl DataFusionSessionBuilder { builder = builder .with_distributed_worker_resolver(ArcWorkerResolver(Arc::clone(resolver))) .with_distributed_task_estimator(ArcTaskEstimator(Arc::clone(&self.task_estimator))) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)); + .with_distributed_planner(); } Ok(SessionContext::new_with_state(builder.build())) @@ -386,6 +386,10 @@ impl TaskEstimator for ArcTaskEstimator { ) -> Option> { self.0.scale_up_leaf_node(plan, task_count, cfg) } + + fn route_tasks(&self, routing_ctx: &TaskRoutingContext<'_>) -> DFResult>> { + self.0.route_tasks(routing_ctx) + } } #[cfg(test)] diff --git a/quickwit/quickwit-df-core/src/task_estimator.rs b/quickwit/quickwit-df-core/src/task_estimator.rs index c082840ec58..2a66ff1d8e6 100644 --- a/quickwit/quickwit-df-core/src/task_estimator.rs +++ b/quickwit/quickwit-df-core/src/task_estimator.rs @@ -19,12 +19,19 @@ //! count equals the file-group count; for tantivy sources it equals the total //! segment count across splits. +use std::hash::{Hash, Hasher}; use std::sync::Arc; +use datafusion::common::Result as DFResult; use datafusion::config::ConfigOptions; +use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::datasource::source::DataSourceExec; use datafusion::physical_plan::ExecutionPlan; -use datafusion_distributed::{PartitionIsolatorExec, TaskEstimation, TaskEstimator}; +use datafusion_distributed::{ + PartitionIsolatorExec, TaskEstimation, TaskEstimator, TaskRoutingContext, +}; +use siphasher::sip::SipHasher; +use url::Url; /// Estimates distributed task count from the partition count of a /// `DataSourceExec` leaf. Returns `None` for any other plan node, letting the @@ -61,4 +68,278 @@ impl TaskEstimator for DataSourceExecPartitionEstimator { task_count, ))) } + + fn route_tasks(&self, routing_ctx: &TaskRoutingContext<'_>) -> DFResult>> { + if routing_ctx.available_urls.is_empty() { + log_routing_fallback(routing_ctx, "no_available_workers"); + return Ok(None); + } + let mut file_scans = Vec::new(); + collect_file_scan_configs(routing_ctx.plan, &mut file_scans); + let [file_scan] = file_scans.as_slice() else { + let reason = if file_scans.is_empty() { + "no_file_scan_config" + } else { + "multiple_file_scan_configs" + }; + log_routing_fallback(routing_ctx, reason); + return Ok(None); + }; + let task_affinities = file_scan_task_affinities(file_scan, routing_ctx.task_count); + if task_affinities.len() != routing_ctx.task_count { + tracing::info!( + strategy = "file_rendezvous", + reason = "task_affinity_count_mismatch", + task_count = routing_ctx.task_count, + task_affinity_count = task_affinities.len(), + available_worker_count = routing_ctx.available_urls.len(), + "datafusion task routing fell back to default routing" + ); + return Ok(None); + } + let routes = route_by_rendezvous(&task_affinities, routing_ctx.available_urls); + log_rendezvous_routes(routing_ctx, &task_affinities, &routes); + Ok(Some(routes)) + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +struct TaskAffinity { + task_index: usize, + affinity_key: String, +} + +fn collect_file_scan_configs<'a>( + plan: &'a Arc, + file_scans: &mut Vec<&'a FileScanConfig>, +) { + if let Some(data_source_exec) = plan.as_any().downcast_ref::() + && let Some(file_scan) = data_source_exec + .data_source() + .as_any() + .downcast_ref::() + { + file_scans.push(file_scan); + } + + for child in plan.children() { + collect_file_scan_configs(child, file_scans); + } +} + +fn file_scan_task_affinities(file_scan: &FileScanConfig, task_count: usize) -> Vec { + partition_groups(file_scan.file_groups.len(), task_count) + .into_iter() + .enumerate() + .map(|(task_index, partition_group)| { + let mut file_keys = Vec::new(); + for partition_idx in partition_group { + if let Some(file_group) = file_scan.file_groups.get(partition_idx) { + for file in file_group.files() { + file_keys.push(format!( + "{}{}", + file_scan.object_store_url.as_str(), + file.object_meta.location.as_ref() + )); + } + } + } + file_keys.sort_unstable(); + let affinity_key = if file_keys.is_empty() { + format!("empty-task-{task_index}") + } else { + file_keys.join("\n") + }; + TaskAffinity { + task_index, + affinity_key, + } + }) + .collect() +} + +fn partition_groups(input_partitions: usize, task_count: usize) -> Vec> { + if task_count == 0 { + return Vec::new(); + } + let q = input_partitions / task_count; + let r = input_partitions % task_count; + let mut off = 0; + (0..task_count) + .map(|i| q + usize::from(i < r)) + .map(|n| { + let result = (off..(off + n)).collect(); + off += n; + result + }) + .collect() +} + +fn route_by_rendezvous(task_affinities: &[TaskAffinity], available_urls: &[Url]) -> Vec { + let mut routes: Vec> = vec![None; task_affinities.len()]; + let mut task_order: Vec = (0..task_affinities.len()).collect(); + task_order.sort_unstable_by(|&left, &right| { + task_affinities[left] + .affinity_key + .cmp(&task_affinities[right].affinity_key) + .then_with(|| { + task_affinities[left] + .task_index + .cmp(&task_affinities[right].task_index) + }) + }); + + for task_idx in task_order { + let task = &task_affinities[task_idx]; + let mut candidates: Vec = (0..available_urls.len()).collect(); + candidates.sort_unstable_by(|&left, &right| { + let left_score = rendezvous_affinity(&available_urls[left], &task.affinity_key); + let right_score = rendezvous_affinity(&available_urls[right], &task.affinity_key); + right_score.cmp(&left_score).then_with(|| { + available_urls[left] + .as_str() + .cmp(available_urls[right].as_str()) + }) + }); + let chosen_idx = candidates[0]; + routes[task.task_index] = Some(available_urls[chosen_idx].clone()); + } + + routes + .into_iter() + .map(|route| route.expect("every task should be routed")) + .collect() +} + +fn rendezvous_affinity(url: &Url, key: &str) -> u64 { + let mut state = SipHasher::new(); + key.hash(&mut state); + url.as_str().hash(&mut state); + state.finish() +} + +fn affinity_fingerprint(key: &str) -> u64 { + let mut state = SipHasher::new(); + key.hash(&mut state); + state.finish() +} + +fn log_routing_fallback(routing_ctx: &TaskRoutingContext<'_>, reason: &'static str) { + tracing::info!( + strategy = "file_rendezvous", + reason, + task_count = routing_ctx.task_count, + available_worker_count = routing_ctx.available_urls.len(), + "datafusion task routing fell back to default routing" + ); +} + +fn log_rendezvous_routes( + routing_ctx: &TaskRoutingContext<'_>, + task_affinities: &[TaskAffinity], + routes: &[Url], +) { + const ROUTE_SAMPLE_SIZE: usize = 16; + + let mut route_distribution: Vec<(&str, usize)> = Vec::new(); + for route in routes { + let route = route.as_str(); + match route_distribution + .iter_mut() + .find(|(worker_url, _)| *worker_url == route) + { + Some((_, task_count)) => *task_count += 1, + None => route_distribution.push((route, 1)), + } + } + route_distribution.sort_unstable_by(|(left, _), (right, _)| left.cmp(right)); + + let route_sample: Vec<(usize, u64, &str)> = task_affinities + .iter() + .take(ROUTE_SAMPLE_SIZE) + .filter_map(|task_affinity| { + routes.get(task_affinity.task_index).map(|route| { + ( + task_affinity.task_index, + affinity_fingerprint(&task_affinity.affinity_key), + route.as_str(), + ) + }) + }) + .collect(); + + tracing::info!( + strategy = "file_rendezvous", + task_count = routes.len(), + available_worker_count = routing_ctx.available_urls.len(), + route_sample_size = route_sample.len(), + route_distribution = ?route_distribution, + route_sample = ?route_sample, + "datafusion task routing selected workers" + ); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn url(value: &str) -> Url { + Url::parse(value).unwrap() + } + + fn task(task_index: usize, affinity_key: &str) -> TaskAffinity { + TaskAffinity { + task_index, + affinity_key: affinity_key.to_string(), + } + } + + #[test] + fn partition_groups_match_partition_isolator_layout() { + assert_eq!(partition_groups(2, 1), vec![vec![0, 1]]); + assert_eq!(partition_groups(6, 2), vec![vec![0, 1, 2], vec![3, 4, 5]]); + assert_eq!( + partition_groups(10, 3), + vec![vec![0, 1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]] + ); + } + + #[test] + fn rendezvous_routing_is_stable_across_url_order() { + let tasks = vec![ + task(0, "s3://bucket/metrics-0.parquet"), + task(1, "s3://bucket/metrics-1.parquet"), + task(2, "s3://bucket/metrics-2.parquet"), + task(3, "s3://bucket/metrics-3.parquet"), + task(4, "s3://bucket/metrics-4.parquet"), + task(5, "s3://bucket/metrics-5.parquet"), + ]; + let urls = vec![ + url("http://node-0:7281"), + url("http://node-1:7281"), + url("http://node-2:7281"), + ]; + let mut reversed_urls = urls.clone(); + reversed_urls.reverse(); + + let routes = route_by_rendezvous(&tasks, &urls); + let reversed_routes = route_by_rendezvous(&tasks, &reversed_urls); + + assert_eq!(routes, reversed_routes); + } + + #[test] + fn rendezvous_routing_uses_best_affinity_node_per_task() { + let tasks = vec![task(0, "s3://bucket/metrics.parquet")]; + let urls = vec![url("http://node-0:7281"), url("http://node-1:7281")]; + + let routes = route_by_rendezvous(&tasks, &urls); + let expected = urls + .iter() + .max_by_key(|url| rendezvous_affinity(url, "s3://bucket/metrics.parquet")) + .unwrap() + .clone(); + + assert_eq!(routes, vec![expected]); + } }