Route DataFusion scan tasks by file affinity#6402
Open
alexanderbianchi wants to merge 2 commits intobianchi/parquet-byte-range-cachefrom
Open
Route DataFusion scan tasks by file affinity#6402alexanderbianchi wants to merge 2 commits intobianchi/parquet-byte-range-cachefrom
alexanderbianchi wants to merge 2 commits intobianchi/parquet-byte-range-cachefrom
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This stacks on the parquet footer/range cache work and uses the new
datafusion-distributedtask routing API to keep repeated parquet scan work sticky to the same searchers.with_distributed_planner()and forwardsTaskEstimator::route_tasksthrough theArcTaskEstimatorwrapperdatafusion-distributedto the merged routing API commit until the crate has a release with this APIDataSourceExectasks by deterministic rendezvous hashing over each task's parquet file set and the available worker URLsFileScanConfigor when task/file grouping does not line upRouting Details
The estimator already controls how a
DataSourceExecleaf is split into distributed tasks: it asks for one task per output partition and usesPartitionIsolatorExecto group those partitions into task-local inputs.With the new API,
route_tasksreceives the physical plan, the task count, and the currently available worker URLs. Quickwit inspects the plan for exactly oneFileScanConfig, reconstructs the same contiguous partition groups thatPartitionIsolatorExecwill assign to each task, and derives an affinity key from the sorted list of parquet file paths in that task group. The object store URL is included in the key so identical paths in different stores do not collide.Each task is then assigned with rendezvous hashing: for every
(task affinity key, worker URL)pair we compute a stable hash and pick the highest-scoring worker. The returnedVec<Url>is ordered by task index, which is the shape expected bydatafusion-distributed. This gives us stable cache affinity without requiring a central routing table or consistent worker ordering from cluster discovery.This is intentionally naive for now: no file-size weighting, no task cost model, and no load cap. The goal is cache locality for repeated metrics parquet scans, not global load optimization. If the plan shape is ambiguous, Quickwit returns
Noneand lets the distributed planner use its default routing.Testing
cargo test -p quickwit-df-corecargo test -p quickwit-datafusioncargo check -p quickwit-serve --features datafusion