request: replace try_join_all with JoinSet in plan handlers#535
request: replace try_join_all with JoinSet in plan handlers#535pingyu wants to merge 3 commits intotikv:masterfrom
Conversation
Replace futures::future::try_join_all with tokio::task::JoinSet in single_plan_handler and multi_store_handler to join tasks as they complete rather than collecting all handles first. This provides better error handling with explicit JoinError propagation. Signed-off-by: Ping Yu <yuping@pingcap.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughReplaces Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/request/plan.rs (3)
137-146: Consider logging which task/shard failed, not just theJoinError.
JoinError'sDisplaytypically only conveys "panic" / "cancelled" with little context. For multi-region debugging it's helpful to also log shard count / handler name so operators can correlate to a region. Optional polish:- error!("failed to join task: {}", e); + error!("single_plan_handler: failed to join shard task ({} shards): {}", shards_len, e);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/request/plan.rs` around lines 137 - 146, The join loop currently logs only the JoinError message which lacks context; update the error handling around join_set.join_next() in plan.rs to log which shard/task failed (include shards_len or a shard identifier and the handler name) along with the JoinError and then return Error::JoinError containing the original error. If the join results don’t carry a shard id, modify the task spawn to return a tuple like (shard_id, result) or maintain a counter/index while collecting results so you can include that shard_id/handler name in the error log and in the returned Error::JoinError.
137-164: Lost fail-fast/cancel-on-first-error behavior.Previously,
try_join_allwould short-circuit and drop the remaining shard futures as soon as any one returnedErr(_). The new loop drains every spawned task to completion before thecollect::<Result<Vec<_>>>()?at line 160 surfaces the first innerErr. Net effect:
- More work performed (extra TiKV RPCs, more
permitstaken) after a shard has already failed.- Externally observable side effects (retries, region-cache invalidations in
handle_other_error) continue for sibling shards even though the overall call is doomed to fail.If preserving the previous fail-fast semantics matters, break out of the
join_nextloop on the first innerErr(when!preserve_region_results) and drop theJoinSetto abort the rest:♻️ Sketch
- let mut results = Vec::with_capacity(shards_len); - while let Some(join_result) = join_set.join_next().await { - match join_result { - Ok(val) => results.push(val), - Err(e) => { - error!("failed to join task: {}", e); - return Err(Error::JoinError(e)); - } - } - } + let mut results = Vec::with_capacity(shards_len); + while let Some(join_result) = join_set.join_next().await { + match join_result { + Ok(Ok(val)) => results.push(Ok(val)), + Ok(Err(e)) if !preserve_region_results => { + // drop join_set to abort remaining tasks + return Err(e); + } + Ok(other) => results.push(other), + Err(e) => { + error!("failed to join task: {}", e); + return Err(Error::JoinError(e)); + } + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/request/plan.rs` around lines 137 - 164, The loop that awaits join_set.join_next currently drains all shard tasks and only surfaces the first inner Err later, losing the previous fail-fast behavior; modify the loop inside the block using join_set.join_next() so that when preserve_region_results is false and you encounter the first task result that is Ok(Err(inner_err)) you immediately break out, drop the JoinSet to abort remaining tasks, and return that inner error (mapped to the appropriate error type) rather than continuing to collect all results; when preserve_region_results is true keep the existing behavior that gathers all results. Ensure you update the handling around results, join_set, and Error mapping (referencing join_set, join_next, preserve_region_results, results, and Error::JoinError) to implement the early-abort path safely.
463-486: Same ordering caveat applies here; trailingcollect::<Vec<_>>()is a no-op.
- As in
single_plan_handler,resultsis ordered by completion, not bystoresiteration order. ForRetryableAllStoresthis is usually fine (per-store independent results), but please confirm no caller assumes alignment withpd_client.all_stores()order.resultsis alreadyVec<Result<P::Result>>, soresults.into_iter().collect::<Vec<_>>()is an identity transform — justOk(results).♻️ Minor cleanup
- Ok(results.into_iter().collect::<Vec<_>>()) + Ok(results)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/request/plan.rs` around lines 463 - 486, The current loop collects task results in completion order (via JoinSet) and then does an unnecessary .into_iter().collect::<Vec<_>>(); fix by either (A) if callers must preserve pd_client.all_stores() order, map the per-store result back to the original store order (e.g., attach store id/index when spawning single_store_handler and reorder results accordingly) or (B) if completion-order is acceptable (common for RetryableAllStores), simply return Ok(results) instead of the redundant collect. Update the block that spawns single_store_handler (reference: JoinSet, single_store_handler, stores, results) to include an index/identifier when preserving order, or replace the final line with Ok(results) to remove the no-op collect.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/request/plan.rs`:
- Around line 137-146: The current loop uses JoinSet::join_next(), which yields
tasks in completion order and thus reorders results compared to the original
try_join_all behavior; also it waits for all tasks even after the first inner
error instead of cancelling remaining work. Fix by preserving spawn order: when
spawning into the JoinSet (the same place you create join_set), attach the shard
index (or keep a Vec of JoinHandles) and collect results into a
Vec<Option<Result<...>>> indexed by shard so you can fill by index as each join
returns, then after the first Err call join_set.abort_all() (or abort remaining
handles) and return the error early; alternatively replace the JoinSet logic
with awaiting a Vec of join handles using futures::try_join_all or
tokio::try_join semantics to preserve order and short-circuit on error. Also
remove the redundant Ok(results.into_iter().collect::<Vec<_>>()) in
RetryableAllStores::execute since results is already a Vec.
---
Nitpick comments:
In `@src/request/plan.rs`:
- Around line 137-146: The join loop currently logs only the JoinError message
which lacks context; update the error handling around join_set.join_next() in
plan.rs to log which shard/task failed (include shards_len or a shard identifier
and the handler name) along with the JoinError and then return Error::JoinError
containing the original error. If the join results don’t carry a shard id,
modify the task spawn to return a tuple like (shard_id, result) or maintain a
counter/index while collecting results so you can include that shard_id/handler
name in the error log and in the returned Error::JoinError.
- Around line 137-164: The loop that awaits join_set.join_next currently drains
all shard tasks and only surfaces the first inner Err later, losing the previous
fail-fast behavior; modify the loop inside the block using join_set.join_next()
so that when preserve_region_results is false and you encounter the first task
result that is Ok(Err(inner_err)) you immediately break out, drop the JoinSet to
abort remaining tasks, and return that inner error (mapped to the appropriate
error type) rather than continuing to collect all results; when
preserve_region_results is true keep the existing behavior that gathers all
results. Ensure you update the handling around results, join_set, and Error
mapping (referencing join_set, join_next, preserve_region_results, results, and
Error::JoinError) to implement the early-abort path safely.
- Around line 463-486: The current loop collects task results in completion
order (via JoinSet) and then does an unnecessary
.into_iter().collect::<Vec<_>>(); fix by either (A) if callers must preserve
pd_client.all_stores() order, map the per-store result back to the original
store order (e.g., attach store id/index when spawning single_store_handler and
reorder results accordingly) or (B) if completion-order is acceptable (common
for RetryableAllStores), simply return Ok(results) instead of the redundant
collect. Update the block that spawns single_store_handler (reference: JoinSet,
single_store_handler, stores, results) to include an index/identifier when
preserving order, or replace the final line with Ok(results) to remove the no-op
collect.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
Pin indexmap (=2.13.1), icu_* (<2.2), and proptest (<1.11) to versions compatible with Rust 1.84.1. Upstream crates recently adopted edition 2024 or raised MSRV above 1.84.1, which breaks CI since Cargo.lock is gitignored and dependencies are resolved fresh on each run. Also fix rustfmt import grouping for JoinSet in plan.rs. Signed-off-by: Ping Yu <yuping@pingcap.com>
Signed-off-by: Ping Yu <yuping@pingcap.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/request/plan.rs (1)
1030-1045: Make the ordering test independent of wall-clock timing.The test relies on
sleep(10/20/30 ms)to force out-of-order completion. It's likely stable in practice on#[tokio::test]'s current-thread runtime, but a deterministic version makes the property under test (index-based ordering regardless of completion order) explicit and immune to CI jitter / future runtime changes. Two options:
- Use
tokio::time::pause()+advance(); or- Use
tokio::sync::Notify/oneshot channels to gate task completion in a controlled order.♻️ Sketch using paused time
#[tokio::test] async fn test_join_set_results_keep_spawn_order() { + tokio::time::pause(); let mut join_set = JoinSet::new(); for (idx, delay_ms) in [(0, 30), (1, 10), (2, 20)] { join_set.spawn(async move { sleep(Duration::from_millis(delay_ms)).await; (idx, idx) }); } + tokio::time::advance(Duration::from_millis(50)).await; let results = collect_join_set_results(join_set, 3, "test_handler") .await .unwrap(); assert_eq!(results, vec![0, 1, 2]); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/request/plan.rs` around lines 1030 - 1045, The test test_join_set_results_keep_spawn_order currently relies on real sleeps to induce out-of-order completion; make it deterministic by replacing the sleep-based timing with either tokio::time::pause()/advance() or explicit synchronization (e.g., tokio::sync::Notify or oneshot channels) so tasks complete in a controlled order; update the spawned tasks in the JoinSet (and the test harness around collect_join_set_results) to await a controllable signal instead of sleep, then drive completion order from the test (using advance() if you choose paused time or notify.send() / oneshot::send() for gated release) so the assertion on results remains independent of wall-clock timing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/request/plan.rs`:
- Around line 1030-1045: The test test_join_set_results_keep_spawn_order
currently relies on real sleeps to induce out-of-order completion; make it
deterministic by replacing the sleep-based timing with either
tokio::time::pause()/advance() or explicit synchronization (e.g.,
tokio::sync::Notify or oneshot channels) so tasks complete in a controlled
order; update the spawned tasks in the JoinSet (and the test harness around
collect_join_set_results) to await a controllable signal instead of sleep, then
drive completion order from the test (using advance() if you choose paused time
or notify.send() / oneshot::send() for gated release) so the assertion on
results remains independent of wall-clock timing.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1b2b5101-3fd6-4c13-9205-be1f2b22eba2
📒 Files selected for processing (3)
Cargo.tomlproto-build/Cargo.tomlsrc/request/plan.rs
Summary
futures::future::try_join_allwithtokio::task::JoinSetinsingle_plan_handlerandmulti_store_handlerJoinErrorlogging and propagation instead oftry_join_all's implicit join errorChanges
tokio::task::JoinSetimport, removed unusedfutures::future::try_join_alllog::errorimport for join failure loggingJoinSetwithjoin_next()loop patternJoinError: logs the error and returnsError::JoinErrorSummary by CodeRabbit