Skip to content

request: replace try_join_all with JoinSet in plan handlers#535

Open
pingyu wants to merge 3 commits intotikv:masterfrom
pingyu:join-all-handles
Open

request: replace try_join_all with JoinSet in plan handlers#535
pingyu wants to merge 3 commits intotikv:masterfrom
pingyu:join-all-handles

Conversation

@pingyu
Copy link
Copy Markdown
Collaborator

@pingyu pingyu commented Apr 27, 2026

Summary

  • Replace futures::future::try_join_all with tokio::task::JoinSet in single_plan_handler and multi_store_handler
  • Join tasks as they complete rather than collecting all handles first
  • Better error handling: explicit JoinError logging and propagation instead of try_join_all's implicit join error

Changes

  • Added tokio::task::JoinSet import, removed unused futures::future::try_join_all
  • Added log::error import for join failure logging
  • Both plan handlers now use JoinSet with join_next() loop pattern
  • On JoinError: logs the error and returns Error::JoinError

Summary by CodeRabbit

  • Refactor
    • Improved internal async task handling for multi-region and all-stores execution with stronger error handling and ordered result collection.
  • Tests
    • Added a unit test ensuring results are returned in the original task order even when tasks finish out of order.
  • Chores
    • Tightened several dependency version constraints for build and runtime stability.

Replace futures::future::try_join_all with tokio::task::JoinSet in
single_plan_handler and multi_store_handler to join tasks as they
complete rather than collecting all handles first. This provides
better error handling with explicit JoinError propagation.

Signed-off-by: Ping Yu <yuping@pingcap.com>
@ti-chi-bot ti-chi-bot Bot added the dco-signoff: yes Indicates the PR's author has signed the dco. label Apr 27, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 27, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign skyzh for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot Bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label Apr 27, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 27, 2026

📝 Walkthrough

Walkthrough

Replaces try_join_all with tokio::task::JoinSet for concurrent task aggregation in src/request/plan.rs; tasks are spawned with indices, results collected via join_next(), and join failures are logged and converted to Error::JoinError with early termination.

Changes

Cohort / File(s) Summary
Task aggregation refactor
src/request/plan.rs
Replaced try_join_all with tokio::task::JoinSet for multi-region shard and all-stores execution; introduces indexed task spawn, collect_join_set_results helper, ordered result assembly, and early return on join failures.
Dependency constraints (runtime)
Cargo.toml
Pin indexmap to exact version, constrain several icu_* crates to <2.2, and restrict proptest dev-dependency to <1.11.
Dependency constraints (build)
proto-build/Cargo.toml
Pin build dependency indexmap = "2.13.1" and minor comment wording tweak.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I hopped through code with nimble feet,
Swapped JoinSet in—no more try_join_all beat.
Indexed tasks finish fast or slow,
I gather results in tidy row.
Logs and errors kept neat and sweet.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title directly and accurately describes the main change: replacing try_join_all with JoinSet in plan handlers, which matches the primary code modification in src/request/plan.rs.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/request/plan.rs (3)

137-146: Consider logging which task/shard failed, not just the JoinError.

JoinError's Display typically only conveys "panic" / "cancelled" with little context. For multi-region debugging it's helpful to also log shard count / handler name so operators can correlate to a region. Optional polish:

-                    error!("failed to join task: {}", e);
+                    error!("single_plan_handler: failed to join shard task ({} shards): {}", shards_len, e);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/request/plan.rs` around lines 137 - 146, The join loop currently logs
only the JoinError message which lacks context; update the error handling around
join_set.join_next() in plan.rs to log which shard/task failed (include
shards_len or a shard identifier and the handler name) along with the JoinError
and then return Error::JoinError containing the original error. If the join
results don’t carry a shard id, modify the task spawn to return a tuple like
(shard_id, result) or maintain a counter/index while collecting results so you
can include that shard_id/handler name in the error log and in the returned
Error::JoinError.

137-164: Lost fail-fast/cancel-on-first-error behavior.

Previously, try_join_all would short-circuit and drop the remaining shard futures as soon as any one returned Err(_). The new loop drains every spawned task to completion before the collect::<Result<Vec<_>>>()? at line 160 surfaces the first inner Err. Net effect:

  • More work performed (extra TiKV RPCs, more permits taken) after a shard has already failed.
  • Externally observable side effects (retries, region-cache invalidations in handle_other_error) continue for sibling shards even though the overall call is doomed to fail.

If preserving the previous fail-fast semantics matters, break out of the join_next loop on the first inner Err (when !preserve_region_results) and drop the JoinSet to abort the rest:

♻️ Sketch
-        let mut results = Vec::with_capacity(shards_len);
-        while let Some(join_result) = join_set.join_next().await {
-            match join_result {
-                Ok(val) => results.push(val),
-                Err(e) => {
-                    error!("failed to join task: {}", e);
-                    return Err(Error::JoinError(e));
-                }
-            }
-        }
+        let mut results = Vec::with_capacity(shards_len);
+        while let Some(join_result) = join_set.join_next().await {
+            match join_result {
+                Ok(Ok(val)) => results.push(Ok(val)),
+                Ok(Err(e)) if !preserve_region_results => {
+                    // drop join_set to abort remaining tasks
+                    return Err(e);
+                }
+                Ok(other) => results.push(other),
+                Err(e) => {
+                    error!("failed to join task: {}", e);
+                    return Err(Error::JoinError(e));
+                }
+            }
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/request/plan.rs` around lines 137 - 164, The loop that awaits
join_set.join_next currently drains all shard tasks and only surfaces the first
inner Err later, losing the previous fail-fast behavior; modify the loop inside
the block using join_set.join_next() so that when preserve_region_results is
false and you encounter the first task result that is Ok(Err(inner_err)) you
immediately break out, drop the JoinSet to abort remaining tasks, and return
that inner error (mapped to the appropriate error type) rather than continuing
to collect all results; when preserve_region_results is true keep the existing
behavior that gathers all results. Ensure you update the handling around
results, join_set, and Error mapping (referencing join_set, join_next,
preserve_region_results, results, and Error::JoinError) to implement the
early-abort path safely.

463-486: Same ordering caveat applies here; trailing collect::<Vec<_>>() is a no-op.

  1. As in single_plan_handler, results is ordered by completion, not by stores iteration order. For RetryableAllStores this is usually fine (per-store independent results), but please confirm no caller assumes alignment with pd_client.all_stores() order.
  2. results is already Vec<Result<P::Result>>, so results.into_iter().collect::<Vec<_>>() is an identity transform — just Ok(results).
♻️ Minor cleanup
-        Ok(results.into_iter().collect::<Vec<_>>())
+        Ok(results)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/request/plan.rs` around lines 463 - 486, The current loop collects task
results in completion order (via JoinSet) and then does an unnecessary
.into_iter().collect::<Vec<_>>(); fix by either (A) if callers must preserve
pd_client.all_stores() order, map the per-store result back to the original
store order (e.g., attach store id/index when spawning single_store_handler and
reorder results accordingly) or (B) if completion-order is acceptable (common
for RetryableAllStores), simply return Ok(results) instead of the redundant
collect. Update the block that spawns single_store_handler (reference: JoinSet,
single_store_handler, stores, results) to include an index/identifier when
preserving order, or replace the final line with Ok(results) to remove the no-op
collect.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/request/plan.rs`:
- Around line 137-146: The current loop uses JoinSet::join_next(), which yields
tasks in completion order and thus reorders results compared to the original
try_join_all behavior; also it waits for all tasks even after the first inner
error instead of cancelling remaining work. Fix by preserving spawn order: when
spawning into the JoinSet (the same place you create join_set), attach the shard
index (or keep a Vec of JoinHandles) and collect results into a
Vec<Option<Result<...>>> indexed by shard so you can fill by index as each join
returns, then after the first Err call join_set.abort_all() (or abort remaining
handles) and return the error early; alternatively replace the JoinSet logic
with awaiting a Vec of join handles using futures::try_join_all or
tokio::try_join semantics to preserve order and short-circuit on error. Also
remove the redundant Ok(results.into_iter().collect::<Vec<_>>()) in
RetryableAllStores::execute since results is already a Vec.

---

Nitpick comments:
In `@src/request/plan.rs`:
- Around line 137-146: The join loop currently logs only the JoinError message
which lacks context; update the error handling around join_set.join_next() in
plan.rs to log which shard/task failed (include shards_len or a shard identifier
and the handler name) along with the JoinError and then return Error::JoinError
containing the original error. If the join results don’t carry a shard id,
modify the task spawn to return a tuple like (shard_id, result) or maintain a
counter/index while collecting results so you can include that shard_id/handler
name in the error log and in the returned Error::JoinError.
- Around line 137-164: The loop that awaits join_set.join_next currently drains
all shard tasks and only surfaces the first inner Err later, losing the previous
fail-fast behavior; modify the loop inside the block using join_set.join_next()
so that when preserve_region_results is false and you encounter the first task
result that is Ok(Err(inner_err)) you immediately break out, drop the JoinSet to
abort remaining tasks, and return that inner error (mapped to the appropriate
error type) rather than continuing to collect all results; when
preserve_region_results is true keep the existing behavior that gathers all
results. Ensure you update the handling around results, join_set, and Error
mapping (referencing join_set, join_next, preserve_region_results, results, and
Error::JoinError) to implement the early-abort path safely.
- Around line 463-486: The current loop collects task results in completion
order (via JoinSet) and then does an unnecessary
.into_iter().collect::<Vec<_>>(); fix by either (A) if callers must preserve
pd_client.all_stores() order, map the per-store result back to the original
store order (e.g., attach store id/index when spawning single_store_handler and
reorder results accordingly) or (B) if completion-order is acceptable (common
for RetryableAllStores), simply return Ok(results) instead of the redundant
collect. Update the block that spawns single_store_handler (reference: JoinSet,
single_store_handler, stores, results) to include an index/identifier when
preserving order, or replace the final line with Ok(results) to remove the no-op
collect.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1453bedd-0a5d-4b2b-a719-0ee8a9a72cb3

📥 Commits

Reviewing files that changed from the base of the PR and between 26d6c72 and 36f1d40.

📒 Files selected for processing (1)
  • src/request/plan.rs

Comment thread src/request/plan.rs Outdated
pingyu added 2 commits April 27, 2026 16:26
Pin indexmap (=2.13.1), icu_* (<2.2), and proptest (<1.11) to versions
compatible with Rust 1.84.1. Upstream crates recently adopted edition 2024
or raised MSRV above 1.84.1, which breaks CI since Cargo.lock is gitignored
and dependencies are resolved fresh on each run.

Also fix rustfmt import grouping for JoinSet in plan.rs.

Signed-off-by: Ping Yu <yuping@pingcap.com>
Signed-off-by: Ping Yu <yuping@pingcap.com>
@ti-chi-bot ti-chi-bot Bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Apr 27, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/request/plan.rs (1)

1030-1045: Make the ordering test independent of wall-clock timing.

The test relies on sleep(10/20/30 ms) to force out-of-order completion. It's likely stable in practice on #[tokio::test]'s current-thread runtime, but a deterministic version makes the property under test (index-based ordering regardless of completion order) explicit and immune to CI jitter / future runtime changes. Two options:

  • Use tokio::time::pause() + advance(); or
  • Use tokio::sync::Notify/oneshot channels to gate task completion in a controlled order.
♻️ Sketch using paused time
     #[tokio::test]
     async fn test_join_set_results_keep_spawn_order() {
+        tokio::time::pause();
         let mut join_set = JoinSet::new();
         for (idx, delay_ms) in [(0, 30), (1, 10), (2, 20)] {
             join_set.spawn(async move {
                 sleep(Duration::from_millis(delay_ms)).await;
                 (idx, idx)
             });
         }
+        tokio::time::advance(Duration::from_millis(50)).await;

         let results = collect_join_set_results(join_set, 3, "test_handler")
             .await
             .unwrap();

         assert_eq!(results, vec![0, 1, 2]);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/request/plan.rs` around lines 1030 - 1045, The test
test_join_set_results_keep_spawn_order currently relies on real sleeps to induce
out-of-order completion; make it deterministic by replacing the sleep-based
timing with either tokio::time::pause()/advance() or explicit synchronization
(e.g., tokio::sync::Notify or oneshot channels) so tasks complete in a
controlled order; update the spawned tasks in the JoinSet (and the test harness
around collect_join_set_results) to await a controllable signal instead of
sleep, then drive completion order from the test (using advance() if you choose
paused time or notify.send() / oneshot::send() for gated release) so the
assertion on results remains independent of wall-clock timing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/request/plan.rs`:
- Around line 1030-1045: The test test_join_set_results_keep_spawn_order
currently relies on real sleeps to induce out-of-order completion; make it
deterministic by replacing the sleep-based timing with either
tokio::time::pause()/advance() or explicit synchronization (e.g.,
tokio::sync::Notify or oneshot channels) so tasks complete in a controlled
order; update the spawned tasks in the JoinSet (and the test harness around
collect_join_set_results) to await a controllable signal instead of sleep, then
drive completion order from the test (using advance() if you choose paused time
or notify.send() / oneshot::send() for gated release) so the assertion on
results remains independent of wall-clock timing.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1b2b5101-3fd6-4c13-9205-be1f2b22eba2

📥 Commits

Reviewing files that changed from the base of the PR and between 36f1d40 and fa90dcb.

📒 Files selected for processing (3)
  • Cargo.toml
  • proto-build/Cargo.toml
  • src/request/plan.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dco-signoff: yes Indicates the PR's author has signed the dco. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant