feat(index): parallel segment search with rayon#115
Conversation
|
Warning Review limit reached
More reviews will be available in 34 minutes. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more credits in the billing tab to continue. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughThis PR implements parallel BM25 document scoring in the search engine using Rayon, parallelizing scoring across document IDs while keeping source lookups sequential. It includes an ADR documenting the approach, workspace dependencies, a comprehensive Criterion benchmark suite, and the core parallel search implementation. ChangesParallel Segment Search Using Rayon
Sequence Diagram(s)sequenceDiagram
participant Client
participant Search as IndexHandle::search
participant Parallel as Rayon ParallelIterator
participant Storage as Document Store
Client->>Search: search(query)
Search->>Parallel: par_iter(doc_ids) filter and score
Parallel->>Parallel: BM25 scoring
Parallel->>Search: scores and doc_ids
Search->>Storage: Sequential source lookup
Storage->>Search: sources with scores
Search->>Search: Aggregations, sort, search_after
Search->>Client: SearchResponse
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Parallelizes the hot loop in search() that scores documents against the query. Only doc_id (String, Send+Sync) crosses thread boundaries; source lookups happen inside each parallel task via &self, avoiding the Send+Sync requirement on IndexDocument (whose serde_json::Value field contains RefCell). The scoring phase now uses into_par_iter() on doc_ids, filtering expired docs and MLT exclusions in parallel, then lookups source sequentially after scoring (O(hits) not O(all_docs)).
f4bc042 to
2f4361f
Compare
Benchmarks for search() across MatchAll, term, multi_match, and range queries at 1k and 10k document counts. Run with: cargo bench -p cloudsearch-index
ccd38de to
04689be
Compare
04689be to
b3dcad0
Compare
b3dcad0 to
4f164da
Compare
4f164da to
6ecd339
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rust/crates/cloudsearch-index/src/lib.rs (1)
1029-1044: ⚡ Quick winAvoid full
sourceclones inside sort comparator.Line 1029-1044 clones full documents for each comparison; compare on borrowed sort field values and
doc_idinstead to keep sort overhead low.Suggested refactor
- scored.sort_by(|(_, l_id, l_src), (_, r_id, r_src)| { - let lh = SearchHit { - id: l_id.clone(), - source: l_src.clone(), - score: None, - highlight: None, - sort_values: None, - }; - let rh = SearchHit { - id: r_id.clone(), - source: r_src.clone(), - score: None, - highlight: None, - sort_values: None, - }; - compare_hits(&lh, &rh, sort) - }); + scored.sort_by(|(_, l_id, l_src), (_, r_id, r_src)| { + let l_val = l_src.get(&sort.field).and_then(comparable_value); + let r_val = r_src.get(&sort.field).and_then(comparable_value); + + let ordering = match (l_val, r_val) { + (Some(l), Some(r)) => compare_sort_values(&l, &r, sort), + (None, Some(_)) => std::cmp::Ordering::Greater, + (Some(_), None) => std::cmp::Ordering::Less, + (None, None) => l_id.cmp(r_id), + }; + + if ordering == std::cmp::Ordering::Equal { + l_id.cmp(r_id) + } else { + ordering + } + });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rust/crates/cloudsearch-index/src/lib.rs` around lines 1029 - 1044, The comparator passed to scored.sort_by is cloning entire SearchHit.source for every comparison (in the closure that builds lh/rh and calls compare_hits), which is expensive; change the comparator to compare only the doc_id and the specific sort-field values by borrowing them instead of cloning the whole source. Concretely, in the scored.sort_by closure avoid constructing full SearchHit with cloned source—either extract the needed sort values and ids from l_src/r_src and call a new helper like compare_hits_by_values(&l_id, &r_id, l_sort_vals, r_sort_vals, sort) or change compare_hits to accept borrowed inputs (e.g., &str id and &Value or &Vec<Value> sort_values) so you only clone the doc_id when necessary and borrow sort fields from l_src/r_src. Ensure references to the existing symbols: scored.sort_by closure, SearchHit construction site, compare_hits (or a new compare_hits_by_values), and the sort variable are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/adr/0005-parallel-segment-search.md`:
- Around line 13-14: Replace the incorrect rationale in the ADR: remove the
claim that serde_json::Value “uses RefCell internally” and is not Send/Sync, and
instead state that serde_json::Value does implement Send + Sync; correct the
Rayon bound by noting that into_par_iter() on a Vec<T> requires T: Send (not T:
Send + Sync). Update the discussion around IndexDocument::source to explain the
actual constraint driving the design (e.g., that IndexDocument::source is
intentionally not moved across threads in current architecture or that
cloning/moving the Value is avoided for performance/ownership reasons), and
ensure the text aligns with the later statement that serde_json::Value “never
crosses thread boundaries.”
In `@rust/crates/cloudsearch-index/benches/search.rs`:
- Around line 58-67: The benchmark setup currently ignores Results from async
calls (e.g., catalog.initialize(), catalog.create_index(...), and the later
indexing calls), allowing silent failures; change these to propagate or fail
fast by handling the Result (use the ? operator if the surrounding function
returns Result, or call .expect("...")/.unwrap_or_else(|e| panic!(...)) with a
clear message) so any error from catalog.initialize(),
catalog.create_index(...), and the subsequent indexing calls is surfaced
immediately and aborts the benchmark.
---
Nitpick comments:
In `@rust/crates/cloudsearch-index/src/lib.rs`:
- Around line 1029-1044: The comparator passed to scored.sort_by is cloning
entire SearchHit.source for every comparison (in the closure that builds lh/rh
and calls compare_hits), which is expensive; change the comparator to compare
only the doc_id and the specific sort-field values by borrowing them instead of
cloning the whole source. Concretely, in the scored.sort_by closure avoid
constructing full SearchHit with cloned source—either extract the needed sort
values and ids from l_src/r_src and call a new helper like
compare_hits_by_values(&l_id, &r_id, l_sort_vals, r_sort_vals, sort) or change
compare_hits to accept borrowed inputs (e.g., &str id and &Value or &Vec<Value>
sort_values) so you only clone the doc_id when necessary and borrow sort fields
from l_src/r_src. Ensure references to the existing symbols: scored.sort_by
closure, SearchHit construction site, compare_hits (or a new
compare_hits_by_values), and the sort variable are updated accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a68d3fee-9724-4a58-b2fa-bb23b7915587
⛔ Files ignored due to path filters (1)
rust/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
docs/adr/0005-parallel-segment-search.mddocs/roadmap.mdrust/Cargo.tomlrust/crates/cloudsearch-index/Cargo.tomlrust/crates/cloudsearch-index/benches/search.rsrust/crates/cloudsearch-index/src/lib.rs
| The naive fix — using `into_par_iter()` on a `Vec<IndexDocument>` — is blocked by `serde_json::Value`. The `IndexDocument::source` field is `serde_json::Value`, which uses `RefCell` internally and is therefore neither `Send` nor `Sync`. Rayon requires all data processed in parallel to be `Send + Sync`. | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Is Rust serde_json::Value SendandSync? Please provide official rustdoc/crate-source evidence.
💡 Result:
Yes. serde_json::Value is both Send and Sync. Rustdoc evidence: the serde_json::Value docs list auto-trait implementations “impl Send for Value” and “impl Sync for Value”. [1] Crate-source evidence (structure that supports these auto traits): Value is a recursive enum whose variants only contain bool, Number, String, Vec<Value>, and Map<String, Value> (where Map is the crate’s map type). [2] So the compiler can derive/implement Send and Sync for serde_json::Value, and the docs.rs page confirms this explicitly. [1]
Citations:
- 1: https://docs.smithy.rs/serde_json/enum.Value.html
- 2: https://docs.rs/serde_json/latest/serde_json/value/struct.Serializer.html
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '1,120p' docs/adr/0005-parallel-segment-search.mdRepository: poyrazK/cloudSearch
Length of output: 4842
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n "struct IndexDocument|IndexDocument|source: serde_json::Value|serde_json::Value" -S .Repository: poyrazK/cloudSearch
Length of output: 44756
🌐 Web query:
serde_json Value impl Send Sync auto trait
💡 Result:
The serde_json::Value enum implements both Send and Sync auto traits [1][2]. These implementations allow Value to be safely transferred between threads (Send) and accessed concurrently by multiple threads through shared references (Sync). Because Value is composed of standard types that themselves implement these traits—such as String, BTreeMap, and Vec—it automatically satisfies the conditions for Send and Sync [3].
Citations:
- 1: https://docs.smithy.rs/serde_json/value/enum.Value.html
- 2: https://docs.smithy.rs/serde_json/enum.Value.html
- 3: https://github.com/serde-rs/json/blob/master/src/value/mod.rs
🌐 Web query:
docs.rs serde_json::Value Send Sync
💡 Result:
serde_json::Value is Send and Sync. Docs.rs’ rendered trait impls show “impl Send for Value” and “impl Sync for Value” for the serde_json::value::Value type [1]. The serde-rs/json source defines serde_json::Value as a pub enum Value with #[derive(Clone, Eq, PartialEq, Hash)], consistent with the documented Send/Sync implementations [2].
Citations:
- 1: https://docs.rs/serde_json/latest/serde_json/value/enum.Value.html
- 2: https://github.com/serde-rs/json/blob/master/src/value/mod.rs
🌐 Web query:
rayon Vec into_par_iter Send bound T: Send
💡 Result:
Yes, the Rayon trait IntoParallelIterator and its associated ParallelIterator trait require the items being processed to implement the Send trait [1][2]. When you call into_par_iter() on a collection such as a Vec<T>, the implementation of IntoParallelIterator for that type requires that T (the item type) satisfies T: Send [1][3]. This is because Rayon's parallel iterators work by splitting the collection and potentially moving items to different threads within the Rayon thread pool to process them in parallel [4][5]. If your type T does not implement Send, you will not be able to call into_par_iter() on a Vec<T>, and the compiler will typically report that the trait bound T: Send is not satisfied [6][5]. Key points regarding Rayon's thread safety requirements: - Send: Required for into_par_iter() (which consumes the collection) and par_iter_mut() (which iterates over mutable references) because items are moved or sent between threads [5]. - Sync: Required for par_iter() (which iterates over shared references) because items are shared across threads [5]. If you are encountering errors with custom types or trait objects, you must ensure they implement Send (and Sync if using par_iter()) to satisfy these bounds [6][5]. For trait objects, this is often done by using Box<dyn Trait + Send> or by adding Send and Sync as supertraits to your trait definition [6][5].
Citations:
- 1: https://docs.rs/rayon/latest/rayon/iter/trait.IntoParallelIterator.html
- 2: https://docs.rs/rayon/1.10.0/rayon/iter/trait.ParallelIterator.html
- 3: https://docs.rs/rayon/1.5.1/x86_64-pc-windows-msvc/rayon/iter/trait.IntoParallelIterator.html
- 4: https://developers.redhat.com/blog/2021/04/30/how-rust-makes-rayons-data-parallelism-magical
- 5: https://users.rust-lang.org/t/rayon-how-implement-into-par-iter-and-par-iter/38693
- 6: https://stackoverflow.com/questions/62374960/sync-and-send-trait-when-using-rayon-parallel-iterators
🏁 Script executed:
#!/bin/bash
set -euo pipefail
sed -n '90,160p' rust/crates/cloudsearch-common/src/lib.rsRepository: poyrazK/cloudSearch
Length of output: 1785
🏁 Script executed:
#!/bin/bash
set -euo pipefail
nl -ba docs/adr/0005-parallel-segment-search.md | sed -n '1,90p'Repository: poyrazK/cloudSearch
Length of output: 106
🏁 Script executed:
#!/bin/bash
set -euo pipefail
nl -ba docs/adr/0005-parallel-segment-search.md | sed -n '8,20p'Repository: poyrazK/cloudSearch
Length of output: 106
🏁 Script executed:
#!/bin/bash
set -euo pipefail
nl -ba rust/crates/cloudsearch-common/src/lib.rs | sed -n '110,160p'Repository: poyrazK/cloudSearch
Length of output: 106
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n "RefCell|Send\\b|Sync\\b|serde_json::Value" docs/adr/0005-parallel-segment-search.md
echo "---- ADR lines 1-30 ----"
cat -n docs/adr/0005-parallel-segment-search.md | sed -n '1,40p'
echo "---- ADR lines 30-80 ----"
cat -n docs/adr/0005-parallel-segment-search.md | sed -n '30,90p'Repository: poyrazK/cloudSearch
Length of output: 6140
Fix the ADR’s incorrect serde_json::Value / Rayon thread-safety rationale.
docs/adr/0005-parallel-segment-search.md line 13 states that serde_json::Value “uses RefCell internally” and is neither Send nor Sync, but serde_json::Value implements both Send and Sync (per serde-rs/json source / docs.rs). Line 13 also says “Rayon requires all data processed in parallel to be Send + Sync”; for into_par_iter() on a Vec<T>, the key bound is T: Send (not necessarily Sync). Update the ADR to remove/replace the incorrect RefCell/Send+Sync blocker and describe the actual constraint driving the design (it should be consistent with the later claim that serde_json::Value “never crosses thread boundaries” for the real reason).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/adr/0005-parallel-segment-search.md` around lines 13 - 14, Replace the
incorrect rationale in the ADR: remove the claim that serde_json::Value “uses
RefCell internally” and is not Send/Sync, and instead state that
serde_json::Value does implement Send + Sync; correct the Rayon bound by noting
that into_par_iter() on a Vec<T> requires T: Send (not T: Send + Sync). Update
the discussion around IndexDocument::source to explain the actual constraint
driving the design (e.g., that IndexDocument::source is intentionally not moved
across threads in current architecture or that cloning/moving the Value is
avoided for performance/ownership reasons), and ensure the text aligns with the
later statement that serde_json::Value “never crosses thread boundaries.”
| let _: cloudsearch_common::Result<()> = catalog.initialize().await; | ||
| let _: cloudsearch_common::Result<cloudsearch_common::IndexMetadata> = catalog | ||
| .create_index( | ||
| "test", | ||
| CreateIndexRequest { | ||
| settings: IndexSettings::default(), | ||
| ..Default::default() | ||
| }, | ||
| ) | ||
| .await; |
There was a problem hiding this comment.
Fail fast on setup/indexing errors in benchmark initialization.
Lines 58-67 and Line 83/Line 86/Line 87 ignore Results, so setup failures can silently invalidate benchmark data and performance conclusions.
Suggested fix
- let _: cloudsearch_common::Result<()> = catalog.initialize().await;
- let _: cloudsearch_common::Result<cloudsearch_common::IndexMetadata> = catalog
+ catalog.initialize().await.expect("initialize catalog");
+ catalog
.create_index(
"test",
CreateIndexRequest {
settings: IndexSettings::default(),
..Default::default()
},
)
- .await;
+ .await
+ .expect("create index");
@@
- let _: cloudsearch_common::Result<u64> = handle.index_document(doc).await;
+ handle.index_document(doc).await.expect("index document");
@@
- let _: cloudsearch_common::Result<usize> = handle.refresh().await;
- let _: cloudsearch_common::Result<cloudsearch_common::FlushResponse> = handle.flush().await;
+ handle.refresh().await.expect("refresh index");
+ handle.flush().await.expect("flush index");Also applies to: 83-87
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust/crates/cloudsearch-index/benches/search.rs` around lines 58 - 67, The
benchmark setup currently ignores Results from async calls (e.g.,
catalog.initialize(), catalog.create_index(...), and the later indexing calls),
allowing silent failures; change these to propagate or fail fast by handling the
Result (use the ? operator if the surrounding function returns Result, or call
.expect("...")/.unwrap_or_else(|e| panic!(...)) with a clear message) so any
error from catalog.initialize(), catalog.create_index(...), and the subsequent
indexing calls is surfaced immediately and aborts the benchmark.
6ecd339 to
0a363b8
Compare
0a363b8 to
bb47405
Compare
Summary
Changes
Summary by CodeRabbit
New Features
Documentation
Tests