Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ datafusion-substrait = "53"
datafusion-datasource = "53"
datafusion-physical-plan = "53"
datafusion-datasource-parquet = "53"
datafusion-distributed = "1.0"
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed.git", rev = "d7de9c3fb99fd7fa99da925b460dc2d4529c04a2" }
object_store = "0.13"

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-df-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ async-trait = { workspace = true }
futures = { workspace = true }
prost = { workspace = true } # substrait Plan::decode (runtime only)
serde_json = { workspace = true }
siphasher = { workspace = true }
tracing = { workspace = true }
url = "2"

arrow = { workspace = true }
datafusion = "53"
datafusion-substrait = "53"
datafusion-distributed = "1.0"
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed.git", rev = "d7de9c3fb99fd7fa99da925b460dc2d4529c04a2" }
object_store = "0.13"

# gRPC surface is opt-in. The extension traits + session builder do not need
Expand Down
8 changes: 6 additions & 2 deletions quickwit/quickwit-df-core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use datafusion::execution::object_store::ObjectStoreRegistry;
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, TaskEstimator, WorkerResolver,
DistributedExt, SessionStateBuilderExt, TaskEstimator, TaskRoutingContext, WorkerResolver,
};

use crate::data_source::{
Expand Down Expand Up @@ -346,7 +346,7 @@ impl DataFusionSessionBuilder {
builder = builder
.with_distributed_worker_resolver(ArcWorkerResolver(Arc::clone(resolver)))
.with_distributed_task_estimator(ArcTaskEstimator(Arc::clone(&self.task_estimator)))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule));
.with_distributed_planner();
}

Ok(SessionContext::new_with_state(builder.build()))
Expand Down Expand Up @@ -386,6 +386,10 @@ impl TaskEstimator for ArcTaskEstimator {
) -> Option<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
self.0.scale_up_leaf_node(plan, task_count, cfg)
}

fn route_tasks(&self, routing_ctx: &TaskRoutingContext<'_>) -> DFResult<Option<Vec<url::Url>>> {
self.0.route_tasks(routing_ctx)
}
}

#[cfg(test)]
Expand Down
283 changes: 282 additions & 1 deletion quickwit/quickwit-df-core/src/task_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
//! count equals the file-group count; for tantivy sources it equals the total
//! segment count across splits.

use std::hash::{Hash, Hasher};
use std::sync::Arc;

use datafusion::common::Result as DFResult;
use datafusion::config::ConfigOptions;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_distributed::{PartitionIsolatorExec, TaskEstimation, TaskEstimator};
use datafusion_distributed::{
PartitionIsolatorExec, TaskEstimation, TaskEstimator, TaskRoutingContext,
};
use siphasher::sip::SipHasher;
use url::Url;

/// Estimates distributed task count from the partition count of a
/// `DataSourceExec` leaf. Returns `None` for any other plan node, letting the
Expand Down Expand Up @@ -61,4 +68,278 @@ impl TaskEstimator for DataSourceExecPartitionEstimator {
task_count,
)))
}

fn route_tasks(&self, routing_ctx: &TaskRoutingContext<'_>) -> DFResult<Option<Vec<Url>>> {
if routing_ctx.available_urls.is_empty() {
log_routing_fallback(routing_ctx, "no_available_workers");
return Ok(None);
}
let mut file_scans = Vec::new();
collect_file_scan_configs(routing_ctx.plan, &mut file_scans);
let [file_scan] = file_scans.as_slice() else {
let reason = if file_scans.is_empty() {
"no_file_scan_config"
} else {
"multiple_file_scan_configs"
};
log_routing_fallback(routing_ctx, reason);
return Ok(None);
};
let task_affinities = file_scan_task_affinities(file_scan, routing_ctx.task_count);
if task_affinities.len() != routing_ctx.task_count {
tracing::info!(
strategy = "file_rendezvous",
reason = "task_affinity_count_mismatch",
task_count = routing_ctx.task_count,
task_affinity_count = task_affinities.len(),
available_worker_count = routing_ctx.available_urls.len(),
"datafusion task routing fell back to default routing"
);
return Ok(None);
}
let routes = route_by_rendezvous(&task_affinities, routing_ctx.available_urls);
log_rendezvous_routes(routing_ctx, &task_affinities, &routes);
Ok(Some(routes))
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
struct TaskAffinity {
task_index: usize,
affinity_key: String,
}

fn collect_file_scan_configs<'a>(
plan: &'a Arc<dyn ExecutionPlan>,
file_scans: &mut Vec<&'a FileScanConfig>,
) {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>()
&& let Some(file_scan) = data_source_exec
.data_source()
.as_any()
.downcast_ref::<FileScanConfig>()
{
file_scans.push(file_scan);
}

for child in plan.children() {
collect_file_scan_configs(child, file_scans);
}
}

fn file_scan_task_affinities(file_scan: &FileScanConfig, task_count: usize) -> Vec<TaskAffinity> {
partition_groups(file_scan.file_groups.len(), task_count)
.into_iter()
.enumerate()
.map(|(task_index, partition_group)| {
let mut file_keys = Vec::new();
for partition_idx in partition_group {
if let Some(file_group) = file_scan.file_groups.get(partition_idx) {
for file in file_group.files() {
file_keys.push(format!(
"{}{}",
file_scan.object_store_url.as_str(),
file.object_meta.location.as_ref()
));
}
}
}
file_keys.sort_unstable();
let affinity_key = if file_keys.is_empty() {
format!("empty-task-{task_index}")
} else {
file_keys.join("\n")
};
TaskAffinity {
task_index,
affinity_key,
}
})
.collect()
}

fn partition_groups(input_partitions: usize, task_count: usize) -> Vec<Vec<usize>> {
if task_count == 0 {
return Vec::new();
}
let q = input_partitions / task_count;
let r = input_partitions % task_count;
let mut off = 0;
(0..task_count)
.map(|i| q + usize::from(i < r))
.map(|n| {
let result = (off..(off + n)).collect();
off += n;
result
})
.collect()
}

fn route_by_rendezvous(task_affinities: &[TaskAffinity], available_urls: &[Url]) -> Vec<Url> {
let mut routes: Vec<Option<Url>> = vec![None; task_affinities.len()];
let mut task_order: Vec<usize> = (0..task_affinities.len()).collect();
task_order.sort_unstable_by(|&left, &right| {
task_affinities[left]
.affinity_key
.cmp(&task_affinities[right].affinity_key)
.then_with(|| {
task_affinities[left]
.task_index
.cmp(&task_affinities[right].task_index)
})
});

for task_idx in task_order {
let task = &task_affinities[task_idx];
let mut candidates: Vec<usize> = (0..available_urls.len()).collect();
candidates.sort_unstable_by(|&left, &right| {
let left_score = rendezvous_affinity(&available_urls[left], &task.affinity_key);
let right_score = rendezvous_affinity(&available_urls[right], &task.affinity_key);
right_score.cmp(&left_score).then_with(|| {
available_urls[left]
.as_str()
.cmp(available_urls[right].as_str())
})
});
let chosen_idx = candidates[0];
routes[task.task_index] = Some(available_urls[chosen_idx].clone());
}

routes
.into_iter()
.map(|route| route.expect("every task should be routed"))
.collect()
}

fn rendezvous_affinity(url: &Url, key: &str) -> u64 {
let mut state = SipHasher::new();
key.hash(&mut state);
url.as_str().hash(&mut state);
state.finish()
}

fn affinity_fingerprint(key: &str) -> u64 {
let mut state = SipHasher::new();
key.hash(&mut state);
state.finish()
}

fn log_routing_fallback(routing_ctx: &TaskRoutingContext<'_>, reason: &'static str) {
tracing::info!(
strategy = "file_rendezvous",
reason,
task_count = routing_ctx.task_count,
available_worker_count = routing_ctx.available_urls.len(),
"datafusion task routing fell back to default routing"
);
}

fn log_rendezvous_routes(
routing_ctx: &TaskRoutingContext<'_>,
task_affinities: &[TaskAffinity],
routes: &[Url],
) {
const ROUTE_SAMPLE_SIZE: usize = 16;

let mut route_distribution: Vec<(&str, usize)> = Vec::new();
for route in routes {
let route = route.as_str();
match route_distribution
.iter_mut()
.find(|(worker_url, _)| *worker_url == route)
{
Some((_, task_count)) => *task_count += 1,
None => route_distribution.push((route, 1)),
}
}
route_distribution.sort_unstable_by(|(left, _), (right, _)| left.cmp(right));

let route_sample: Vec<(usize, u64, &str)> = task_affinities
.iter()
.take(ROUTE_SAMPLE_SIZE)
.filter_map(|task_affinity| {
routes.get(task_affinity.task_index).map(|route| {
(
task_affinity.task_index,
affinity_fingerprint(&task_affinity.affinity_key),
route.as_str(),
)
})
})
.collect();

tracing::info!(
strategy = "file_rendezvous",
task_count = routes.len(),
available_worker_count = routing_ctx.available_urls.len(),
route_sample_size = route_sample.len(),
route_distribution = ?route_distribution,
route_sample = ?route_sample,
"datafusion task routing selected workers"
);
}

#[cfg(test)]
mod tests {
use super::*;

fn url(value: &str) -> Url {
Url::parse(value).unwrap()
}

fn task(task_index: usize, affinity_key: &str) -> TaskAffinity {
TaskAffinity {
task_index,
affinity_key: affinity_key.to_string(),
}
}

#[test]
fn partition_groups_match_partition_isolator_layout() {
assert_eq!(partition_groups(2, 1), vec![vec![0, 1]]);
assert_eq!(partition_groups(6, 2), vec![vec![0, 1, 2], vec![3, 4, 5]]);
assert_eq!(
partition_groups(10, 3),
vec![vec![0, 1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]
);
}

#[test]
fn rendezvous_routing_is_stable_across_url_order() {
let tasks = vec![
task(0, "s3://bucket/metrics-0.parquet"),
task(1, "s3://bucket/metrics-1.parquet"),
task(2, "s3://bucket/metrics-2.parquet"),
task(3, "s3://bucket/metrics-3.parquet"),
task(4, "s3://bucket/metrics-4.parquet"),
task(5, "s3://bucket/metrics-5.parquet"),
];
let urls = vec![
url("http://node-0:7281"),
url("http://node-1:7281"),
url("http://node-2:7281"),
];
let mut reversed_urls = urls.clone();
reversed_urls.reverse();

let routes = route_by_rendezvous(&tasks, &urls);
let reversed_routes = route_by_rendezvous(&tasks, &reversed_urls);

assert_eq!(routes, reversed_routes);
}

#[test]
fn rendezvous_routing_uses_best_affinity_node_per_task() {
let tasks = vec![task(0, "s3://bucket/metrics.parquet")];
let urls = vec![url("http://node-0:7281"), url("http://node-1:7281")];

let routes = route_by_rendezvous(&tasks, &urls);
let expected = urls
.iter()
.max_by_key(|url| rendezvous_affinity(url, "s3://bucket/metrics.parquet"))
.unwrap()
.clone();

assert_eq!(routes, vec![expected]);
}
}
Loading