Skip to content

Route DataFusion scan tasks by file affinity#6402

Open
alexanderbianchi wants to merge 2 commits intobianchi/parquet-byte-range-cachefrom
bianchi/parquet-rendezvous-routing
Open

Route DataFusion scan tasks by file affinity#6402
alexanderbianchi wants to merge 2 commits intobianchi/parquet-byte-range-cachefrom
bianchi/parquet-rendezvous-routing

Conversation

@alexanderbianchi
Copy link
Copy Markdown
Collaborator

Summary

This stacks on the parquet footer/range cache work and uses the new datafusion-distributed task routing API to keep repeated parquet scan work sticky to the same searchers.

  • switches the DataFusion session setup to with_distributed_planner() and forwards TaskEstimator::route_tasks through the ArcTaskEstimator wrapper
  • pins datafusion-distributed to the merged routing API commit until the crate has a release with this API
  • routes single-file-scan DataSourceExec tasks by deterministic rendezvous hashing over each task's parquet file set and the available worker URLs
  • falls back to default routing when the plan is not a single FileScanConfig or when task/file grouping does not line up

Routing Details

The estimator already controls how a DataSourceExec leaf is split into distributed tasks: it asks for one task per output partition and uses PartitionIsolatorExec to group those partitions into task-local inputs.

With the new API, route_tasks receives the physical plan, the task count, and the currently available worker URLs. Quickwit inspects the plan for exactly one FileScanConfig, reconstructs the same contiguous partition groups that PartitionIsolatorExec will assign to each task, and derives an affinity key from the sorted list of parquet file paths in that task group. The object store URL is included in the key so identical paths in different stores do not collide.

Each task is then assigned with rendezvous hashing: for every (task affinity key, worker URL) pair we compute a stable hash and pick the highest-scoring worker. The returned Vec<Url> is ordered by task index, which is the shape expected by datafusion-distributed. This gives us stable cache affinity without requiring a central routing table or consistent worker ordering from cluster discovery.

This is intentionally naive for now: no file-size weighting, no task cost model, and no load cap. The goal is cache locality for repeated metrics parquet scans, not global load optimization. If the plan shape is ambiguous, Quickwit returns None and lets the distributed planner use its default routing.

Testing

  • cargo test -p quickwit-df-core
  • cargo test -p quickwit-datafusion
  • cargo check -p quickwit-serve --features datafusion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant