diff --git a/docs/adr/0004-autocomplete-suggest-index.md b/docs/adr/0004-autocomplete-suggest-index.md new file mode 100644 index 0000000..76a0dcb --- /dev/null +++ b/docs/adr/0004-autocomplete-suggest-index.md @@ -0,0 +1,77 @@ +# ADR 0004: Autocomplete Suggest Index + +## Status +Accepted + +## Date +2026-06-09 + +## Context + +Users need fast autocomplete / as-you-type suggestions — returning completion candidates as the user types, before they finish a word. This is a read-heavy, latency-sensitive path (sub-10ms target). The feature should return term completions, not full documents. + +Requirements: +- Sub-10ms latency for prefix lookup +- Multi-field support with per-field weights +- Score by term popularity across the corpus +- Completion suggestions (terms/phrases), not document results + +## Decision + +### Binary Serialization with Sorted Term Arrays + +Each field's vocabulary is stored as a sorted `Vec` (term, doc_freq, score) in a binary sidecar file (`suggest_{segment:020}.bin`). The format: + +``` +MAGIC (4) + VERSION (1) + PADDING (3) + FIELD_COUNT (4) +Per field: FIELD_NAME_LEN (4) + FIELD_NAME + TERM_COUNT (4) +Per term: STR_LEN (4) + TERM_BYTES + DOC_FREQ (4) + SCORE (4) +``` + +O(log n + m) prefix lookup via binary search (`find_first_prefix`) where n = vocabulary size, m = matching terms. No in-memory index build at query time. + +### Atomic Writes via .tmp then Rename + +Flush writes to `suggest_{seg:020}.tmp`, then atomically renames to `.bin`. Readers load from `.bin` only — never from `.tmp`. This guarantees readers always see consistent data. + +### Per-Segment Sidecar Files + +Each segment has its own suggest sidecar file. At query time, all segment sidecars are queried and results merged. This avoids rebuilding the entire suggest index on every flush — only the new segment's sidecar is written. + +### doc_freq = Unique Document Count per Term + +`doc_freq` counts the number of **unique documents** that contain each term in a field, not the number of token occurrences. This is the standard document-frequency semantics used by search engines. + +### Segment Reader Management + +- On `open_index`: all suggest sidecars loaded into `suggest_readers` Vec +- On `flush`: all suggest sidecars reloaded from manifest (not just the new one) to avoid reader accumulation +- On `merge`: all suggest sidecars reloaded after manifest update since old sidecars are invalidated + +## Consequences + +### Positive +- Sub-10ms lookup: binary search on sorted arrays is O(log n), no in-memory index build +- Atomic writes prevent readers from seeing partial data +- Per-segment sidecars mean flush only writes one new file, not the full vocabulary +- doc_freq semantics match standard IR practice + +### Negative +- doc_freq is frozen at flush time — doesn't account for deletes or updates until next flush +- Each segment's suggest data is independent; cross-segment deduplication happens at query time (in-memory BTreeMap) +- Empty prefix returns all terms in lexical order (could be large); guarded to return empty instead + +### Neutral +- The suggest sidecar is separate from the positions sidecar — two separate files per segment +- Segment readers are held in memory; memory usage grows with segment count × vocabulary size + +## Alternatives Considered + +### Alternative 1: In-Memory Trie +**Why rejected:** A trie would require rebuilding the entire suggest index on every flush. With large vocabularies this becomes expensive. The sorted-array binary search achieves the same O(log n + m) lookup while allowing per-segment incremental updates. + +### Alternative 2: Generic B-Tree Index (e.g., RedBTree) +**Why rejected:** Adds a heavy dependency for a read-heavy, append-mostly workload. The binary serialized sorted arrays are simpler, have no runtime dependency, and serialize/deserialize cheaply. + +### Alternative 3: Store suggest data inline in segment snapshot +**Why rejected:** Suggests are built during flush from the full document set; storing them inline in the segment snapshot would require re-reading all documents to rebuild suggests on every merge. Separate sidecar files allow incremental rebuilds from the merged document set only. \ No newline at end of file diff --git a/docs/api-v1.md b/docs/api-v1.md index f043919..5633290 100644 --- a/docs/api-v1.md +++ b/docs/api-v1.md @@ -50,6 +50,56 @@ Current implementation notes: - `POST /{index}/_search` — search with JSON body - `GET /{index}/_search?q=...` — search with query string +- `POST /{index}/_suggest` — autocomplete suggestions for a prefix + +### Suggest Request Shape + +```json +{ + "prefix": "elast", + "fields": { "title": 1.0, "body": 0.5 }, + "size": 10, + "fuzzy": { "fuzziness": "AUTO" } +} +``` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `prefix` | `string` | required | The prefix to suggest completions for | +| `fields` | `map` | `{}` | Fields to search with their weights (0 = excluded) | +| `size` | `integer` | `10` | Maximum number of suggestions to return | +| `fuzzy` | `object` | none | Optional fuzzy matching; omit for exact prefix only | + +### Suggest Response Shape + +```json +{ + "suggestions": [ + { + "text": "elastic", + "score": 0.6667, + "doc_freq": 2, + "field": "title" + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `text` | `string` | The completion suggestion (tokenized, lowercase) | +| `score` | `float` | Normalized popularity score (`doc_freq / n_docs`) | +| `doc_freq` | `integer` | Number of documents containing this term | +| `field` | `string?` | Which field contributed this suggestion | + +### Implementation Notes + +- Suggestions are built during flush from indexed text fields (type `keyword`) +- Each field's vocabulary is sorted and stored in a binary sidecar file for O(log n + m) prefix lookup +- `doc_freq` counts **unique documents** per term, not token occurrences +- Empty prefix (`""`) returns no results +- Scores are computed as `doc_freq / n_docs` where `n_docs` is the total documents at flush time +- Fuzzy matching uses edit distance when `fuzzy` is provided ## Observability API diff --git a/rust/crates/cloudsearch-api/src/lib.rs b/rust/crates/cloudsearch-api/src/lib.rs index bb5c674..52c1ba0 100644 --- a/rust/crates/cloudsearch-api/src/lib.rs +++ b/rust/crates/cloudsearch-api/src/lib.rs @@ -252,6 +252,7 @@ pub fn router_with_registry(registry: Arc) -> Router { "/{index}/_search", get(search_index_get).post(search_index).put(search_index), ) + .route("/{index}/_suggest", post(suggest_index)) .route("/{index}/_settings", put(update_index_settings)) .route("/{index}/_snapshot", get(list_snapshots)) .route( @@ -519,6 +520,28 @@ async fn multi_search( Ok((StatusCode::OK, Json(MultiSearchResponse { responses }))) } +async fn suggest_index( + State(state): State, + Path(index): Path, + Json(request): Json, +) -> Result { + let started_at = Instant::now(); + + let handle = state.registry.index_handle(&index).await?; + let handle = handle.lock().await; + + let result = handle.suggest(&request); + + state.metrics().record_request( + "suggest", + "POST", + StatusCode::OK, + started_at.elapsed().as_secs_f64(), + ); + + Ok((StatusCode::OK, Json(result))) +} + async fn search_index_get( State(state): State, Path(index): Path, diff --git a/rust/crates/cloudsearch-common/src/lib.rs b/rust/crates/cloudsearch-common/src/lib.rs index 94ad98d..d926cee 100644 --- a/rust/crates/cloudsearch-common/src/lib.rs +++ b/rust/crates/cloudsearch-common/src/lib.rs @@ -355,6 +355,35 @@ pub enum Fuzziness { Exact(usize), } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SuggestRequest { + pub prefix: String, + #[serde(default)] + pub fields: BTreeMap, + #[serde(default = "default_suggest_size")] + pub size: usize, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub fuzzy: Option, +} + +fn default_suggest_size() -> usize { + 10 +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SuggestResponse { + pub suggestions: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Suggestion { + pub text: String, + pub score: f32, + pub doc_freq: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub field: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TermQuery { pub field: String, diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index c2be1ec..bd9022f 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -188,6 +188,36 @@ impl IndexCatalog { } /// Load all positions sidecars from a manifest's segments. + async fn load_all_suggest_readers( + segments_dir: &Path, + manifest: &IndexManifest, + ) -> Vec> { + let mut readers = Vec::new(); + for seg in &manifest.segments { + let reader = Self::load_single_suggest_reader(segments_dir, seg).await; + readers.push(reader); + } + readers + } + + /// Load suggest sidecar for a single segment, if it exists. + async fn load_single_suggest_reader( + segments_dir: &Path, + seg: &SegmentMeta, + ) -> Option { + let path = segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number)); + let reader = cloudsearch_storage::suggest_index::SuggestReader::from_bytes( + &tokio::fs::read(&path).await.ok()?, + ) + .ok()?; + if reader.fields().count() > 0 { + tracing::info!(fields = reader.fields().count(), "loaded suggest sidecar"); + Some(reader) + } else { + None + } + } + async fn load_all_positions_readers( segments_dir: &Path, manifest: &IndexManifest, @@ -222,6 +252,7 @@ impl IndexCatalog { /// /// # Errors /// Returns an error if the index does not exist or if file operations fail. + #[allow(clippy::too_many_lines)] pub async fn open_index(&self, name: &str) -> Result { let _guard = self.lifecycle_lock.write().await; let metadata = self.get_index(name).await?; @@ -320,6 +351,15 @@ impl IndexCatalog { ); } + // Load suggest sidecar from all segments for autocomplete + let suggest_readers = Self::load_all_suggest_readers(&segments_dir, &manifest).await; + if !suggest_readers.is_empty() { + tracing::info!( + count = suggest_readers.len(), + "loaded suggest sidecars for all segments" + ); + } + Ok(IndexHandle { metadata, metadata_path, @@ -333,6 +373,7 @@ impl IndexCatalog { doc_values_reader, per_doc_inverted_index: BTreeMap::new(), positions_readers, + suggest_readers, }) } @@ -503,6 +544,8 @@ pub struct IndexHandle { per_doc_inverted_index: BTreeMap>>, /// Positions readers loaded from all segments, for multi-segment highlight extraction. positions_readers: Vec, + /// Suggest readers loaded from all segments, for autocomplete suggestions. + suggest_readers: Vec>, } #[derive(Debug, Clone)] @@ -736,6 +779,11 @@ impl IndexHandle { self.manifest = new_manifest; self.searchable_documents = merged; + // Reload suggest readers for all segments — the old ones are stale + // after the merge (their sidecar files no longer exist). + self.suggest_readers = + IndexCatalog::load_all_suggest_readers(&self.segments_dir, &self.manifest).await; + // Reload positions reader for the new merged segment only, since // the old segment sidecars were invalidated by the merge. self.positions_readers.clear(); @@ -779,6 +827,63 @@ impl IndexHandle { } } + /// Returns autocomplete suggestions for a given prefix across multiple fields. + /// + /// # Panics + /// + /// Panics if sorting fails (should never happen with total ordering on f32 and String). + #[must_use] + pub fn suggest( + &self, + request: &cloudsearch_common::SuggestRequest, + ) -> cloudsearch_common::SuggestResponse { + let prefix = request.prefix.to_ascii_lowercase(); + let size = request.size; + + // Deduplicate by (text, field) while collecting, keeping highest score + let mut seen: BTreeMap<(String, String), cloudsearch_common::Suggestion> = BTreeMap::new(); + + for (field, field_weight) in &request.fields { + if *field_weight == 0.0 { + continue; + } + // Collect suggestions from all segments for this field + for reader_opt in &self.suggest_readers { + let Some(reader) = reader_opt else { continue }; + for entry in reader.suggest_for_field(field, &prefix) { + let key = (entry.term.clone(), field.clone()); + let suggestion = cloudsearch_common::Suggestion { + text: entry.term.clone(), + score: entry.score * field_weight, + doc_freq: entry.doc_freq, + field: Some(field.clone()), + }; + seen.entry(key) + .and_modify(|e| { + if suggestion.score > e.score { + *e = suggestion.clone(); + } + }) + .or_insert(suggestion); + } + } + } + + // Sort by score descending, then by text ascending for tie-breaking + let mut candidates: Vec<_> = seen.into_values().collect(); + candidates.sort_by(|a, b| { + b.score + .partial_cmp(&a.score) + .unwrap_or(std::cmp::Ordering::Equal) + .then_with(|| a.text.cmp(&b.text)) + }); + candidates.truncate(size); + + cloudsearch_common::SuggestResponse { + suggestions: candidates, + } + } + #[must_use] #[allow(clippy::too_many_lines)] pub fn search(&self, request: &SearchRequest) -> SearchResponse { @@ -1273,6 +1378,70 @@ impl IndexHandle { doc_index } + /// Builds a suggest index from a list of documents for autocomplete. + #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)] + fn build_suggest_index( + documents: &[IndexDocument], + mappings: &BTreeMap, + ) -> cloudsearch_storage::suggest_index::SuggestIndex { + let n_docs = documents.len().max(1) as f64; + // BTreeMap>> + // BTreeSet deduplicates doc_ids per term per field → doc_freq = unique doc count + let mut field_terms: BTreeMap< + String, + BTreeMap>, + > = BTreeMap::new(); + + for doc in documents { + if let Some(obj) = doc.source.as_object() { + for (field_name, field_value) in obj { + // Only index text fields (Keyword type or inferred text) + let is_text = mappings + .get(field_name) + .is_none_or(|m| matches!(m.field_type, FieldType::Keyword)); // default to indexable if no mapping + + if !is_text { + continue; + } + + let Some(text) = field_value.as_str() else { + continue; + }; + // Deduplicate tokens within this document using BTreeSet + let tokens: std::collections::BTreeSet<_> = + tokenize(text).into_iter().collect(); + + let doc_set = field_terms.entry(field_name.clone()).or_default(); + for token in tokens { + doc_set.entry(token).or_default().insert(doc.id.clone()); + } + } + } + } + + let mut index = cloudsearch_storage::suggest_index::SuggestIndex::new(); + for (field, term_doc_ids) in field_terms { + let mut entries: Vec = term_doc_ids + .into_iter() + .map(|(term, doc_ids)| { + #[allow(clippy::cast_possible_truncation, clippy::cast_lossless)] + let doc_freq = doc_ids.len() as u32; + let score = (f64::from(doc_freq) / n_docs) as f32; + cloudsearch_storage::suggest_index::SuggestEntry { + term, + doc_freq, + score, + } + }) + .collect(); + // Sort by term ascending for binary search + entries.sort_by(|a, b| a.term.cmp(&b.term)); + index.fields.insert(field, entries); + } + + index + } + /// Soft-deletes a document by writing a delete record to the WAL. /// /// # Errors @@ -1385,6 +1554,7 @@ impl IndexHandle { /// /// # Errors /// Returns an error if file or WAL operations fail. + #[allow(clippy::too_many_lines)] pub async fn flush(&mut self) -> Result { // Rollover WAL first so the snapshot captures a consistent post-rollover state. // This ensures WAL replay on restart starts from the new generation. @@ -1482,6 +1652,43 @@ impl IndexHandle { } } + // Build and write suggest sidecar for autocomplete + let suggest_index = Self::build_suggest_index(&snapshot.documents, &self.metadata.mappings); + if !suggest_index.is_empty() { + let suggest_data = suggest_index.to_bytes(); + cloudsearch_storage::suggest_writer::write_suggest( + &self.segments_dir, + segment_number, + &suggest_data, + ) + .await?; + tracing::info!( + index = %self.metadata.name, + fields = suggest_index.fields.len(), + terms = suggest_index.total_terms(), + "wrote suggest sidecar" + ); + // Reload all suggest readers from the updated manifest to avoid accumulation + let mut new_readers = Vec::new(); + for seg in &self.manifest.segments { + let path = self + .segments_dir + .join(format!("suggest_{:020}.bin", seg.segment_number)); + if let Ok(data) = tokio::fs::read(&path).await + && let Ok(reader) = + cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) + && reader.fields().count() > 0 + { + new_readers.push(Some(reader)); + } + } + self.suggest_readers = new_readers; + tracing::info!( + count = self.suggest_readers.len(), + "reloaded suggest readers after flush" + ); + } + // Clear per-doc indices now that they're persisted self.per_doc_inverted_index.clear(); @@ -6788,4 +6995,51 @@ mod tests { }; assert_eq!(fuzzy_term_match(&doc, &term), None); // fuzzy requires string } + + #[test] + fn build_suggest_index_counts_unique_docs_not_token_occurrences() { + // Regression: doc_freq must count unique documents per term, + // not the number of token occurrences. + // "foo foo foo" and "foo bar" in the same field → doc_freq("foo") = 2, not 4. + let documents = vec![ + IndexDocument { + id: "1".to_string(), + source: serde_json::json!({"title": "foo foo foo"}), + }, + IndexDocument { + id: "2".to_string(), + source: serde_json::json!({"title": "foo bar"}), + }, + ]; + let mappings = BTreeMap::from([( + "title".to_string(), + FieldMapping { + field_type: FieldType::Keyword, + }, + )]); + + let index = IndexHandle::build_suggest_index(&documents, &mappings); + let title_entries = index.fields.get("title").expect("title field"); + + let foo_entry = title_entries + .iter() + .find(|e| e.term == "foo") + .expect("foo term must exist"); + // Two documents contain "foo" (doc "1" once, doc "2" once) → doc_freq = 2 + assert_eq!( + foo_entry.doc_freq, 2, + "doc_freq must be 2 (unique docs), got {} (token occurrences?)", + foo_entry.doc_freq + ); + + let bar_entry = title_entries + .iter() + .find(|e| e.term == "bar") + .expect("bar term must exist"); + assert_eq!( + bar_entry.doc_freq, 1, + "doc_freq for 'bar' must be 1 (only doc 2), got {}", + bar_entry.doc_freq + ); + } } diff --git a/rust/crates/cloudsearch-node/tests/api_smoke.rs b/rust/crates/cloudsearch-node/tests/api_smoke.rs index cb4d326..9f756f2 100644 --- a/rust/crates/cloudsearch-node/tests/api_smoke.rs +++ b/rust/crates/cloudsearch-node/tests/api_smoke.rs @@ -1849,3 +1849,48 @@ async fn snapshot_returns_404_for_missing_index() { node.stop(); } + +#[tokio::test] +async fn suggest_returns_completions_for_prefix() { + let temp_dir = TempDir::new().expect("temp dir"); + let port = reserve_port(); + let mut node = TestNode::spawn(temp_dir, port).await; + + node.create_index("test").await; + node.index_doc( + "test", + "1", + serde_json::json!({"title": "elasticsearch is awesome", "body": "rust is great"}), + ) + .await; + node.index_doc( + "test", + "2", + serde_json::json!({"title": "elastic cloud", "body": "kubernetes deployment"}), + ) + .await; + + // Flush to build suggest sidecar + node.refresh("test").await; + node.flush("test").await; + + let resp = node + .suggest( + "test", + serde_json::json!({"prefix": "elast", "fields": {"title": 1.0}, "size": 10}), + ) + .await; + + let suggestions = resp["suggestions"].as_array().expect("suggestions array"); + assert!( + !suggestions.is_empty(), + "expected suggestions for 'elast' prefix" + ); + assert_eq!( + suggestions[0]["text"].as_str().unwrap(), + "elastic", + "top suggestion should be 'elastic'" + ); + + node.stop(); +} diff --git a/rust/crates/cloudsearch-node/tests/helpers.rs b/rust/crates/cloudsearch-node/tests/helpers.rs index e8d3338..18f17dc 100644 --- a/rust/crates/cloudsearch-node/tests/helpers.rs +++ b/rust/crates/cloudsearch-node/tests/helpers.rs @@ -142,6 +142,21 @@ impl TestNode { resp.json().await.expect("parse search response") } + /// Suggests autocomplete completions for a prefix. + /// + /// # Panics + /// Panics if the request fails to send or response cannot be parsed. + pub async fn suggest(&self, index: &str, request: serde_json::Value) -> serde_json::Value { + let resp = self + .client + .post(format!("{}/{}/_suggest", self.base_url, index)) + .json(&request) + .send() + .await + .expect("suggest request"); + resp.json().await.expect("parse suggest response") + } + /// Returns the total hits count from a search response. /// /// # Panics diff --git a/rust/crates/cloudsearch-storage/src/lib.rs b/rust/crates/cloudsearch-storage/src/lib.rs index b599ba8..edb9b8b 100644 --- a/rust/crates/cloudsearch-storage/src/lib.rs +++ b/rust/crates/cloudsearch-storage/src/lib.rs @@ -10,6 +10,8 @@ use tokio::{ pub mod inverted_index; pub mod positions_writer; +pub mod suggest_index; +pub mod suggest_writer; const WAL_VERSION: u8 = 1; const HEADER_LEN: usize = 26; diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs new file mode 100644 index 0000000..d12e9e2 --- /dev/null +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -0,0 +1,425 @@ +//! Suggest index for autocomplete / as-you-type suggestions. +//! +//! Provides O(log n + m) prefix lookup via sorted term arrays, where n = vocabulary size +//! and m = number of matching terms. + +use std::collections::BTreeMap; + +/// MAGIC bytes for suggest sidecar file: "SUGG" in ASCII. +const SUGGEST_MAGIC: u32 = 0x5355_4747; +const SUGGEST_VERSION: u8 = 1; + +/// A single suggest entry — a term with its popularity score. +#[derive(Debug, Clone, PartialEq)] +pub struct SuggestEntry { + /// The completion text (tokenized, lowercase). + pub term: String, + /// Number of documents containing this term. + pub doc_freq: u32, + /// Normalized score (`doc_freq` / `n_docs`). + pub score: f32, +} + +/// In-memory suggest index — per-field sorted term arrays. +#[derive(Debug, Clone, Default)] +pub struct SuggestIndex { + /// Per-field sorted term arrays. Each field's entries are sorted by term ascending. + pub fields: BTreeMap>, +} + +impl SuggestIndex { + /// Creates a new empty suggest index. + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Returns the total number of terms across all fields. + #[must_use] + pub fn total_terms(&self) -> usize { + self.fields.values().map(Vec::len).sum() + } + + /// Returns true if the index has no entries. + #[must_use] + pub fn is_empty(&self) -> bool { + self.fields.is_empty() + } +} + +/// In-memory reader for suggest index — loaded from disk. +#[derive(Debug, Clone)] +pub struct SuggestReader { + /// Per-field sorted term arrays. + fields: BTreeMap>, +} + +impl SuggestReader { + /// Loads a suggest reader from previously-written binary data. + /// + /// # Errors + /// Returns an error if the data is corrupted or has an invalid header. + /// + /// # Panics + /// + /// Panics if the data is malformed (e.g., invalid UTF-8, truncated bytes). + pub fn from_bytes(data: &[u8]) -> std::io::Result { + let mut offset = 0usize; + + // Header: MAGIC (4) + VERSION (1) + PADDING (3) + FIELD_COUNT (4) + if data.len() < 12 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "suggest data too short for header", + )); + } + + let magic = u32::from_le_bytes(data[offset..4].try_into().unwrap()); + if magic != SUGGEST_MAGIC { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid suggest magic: 0x{magic:08X}"), + )); + } + offset += 4; + + let version = data[offset]; + if version != SUGGEST_VERSION { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("unsupported suggest version: {version}"), + )); + } + offset += 4; // skip padding bytes + + let field_count = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); + offset += 4; + + let mut fields = BTreeMap::new(); + + for _ in 0..field_count { + // Field name length + name + let field_name_len = + u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + + let field_name = String::from_utf8(data[offset..offset + field_name_len].to_vec()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + offset += field_name_len; + + // Term count for this field + let term_count = + u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + + let mut entries = Vec::with_capacity(term_count); + for _ in 0..term_count { + let term_len = + u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + + let term = String::from_utf8(data[offset..offset + term_len].to_vec()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + offset += term_len; + + let doc_freq = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); + offset += 4; + + let score = f32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); + offset += 4; + + entries.push(SuggestEntry { + term, + doc_freq, + score, + }); + } + + fields.insert(field_name, entries); + } + + Ok(Self { fields }) + } + + /// Returns the sorted entries for a specific field. + #[must_use] + pub fn get_field(&self, field: &str) -> Option<&Vec> { + self.fields.get(field) + } + + /// Returns all field names in this reader. + pub fn fields(&self) -> impl Iterator { + self.fields.keys().map(String::as_str) + } + + /// Finds the first entry index where term >= prefix (lexicographically). + /// Returns None if no term matches the prefix. + #[must_use] + pub fn find_first_prefix(&self, field: &str, prefix: &str) -> Option { + let entries = self.fields.get(field)?; + if entries.is_empty() { + return None; + } + + let mut lo = 0usize; + let mut hi = entries.len(); + + // Binary search for lower bound + while lo < hi { + let mid = usize::midpoint(lo, hi); + if entries[mid].term.as_str() < prefix { + lo = mid + 1; + } else { + hi = mid; + } + } + + if lo < entries.len() && entries[lo].term.starts_with(prefix) { + Some(lo) + } else { + None + } + } + + /// Returns all suggestions for a given field and prefix. + #[must_use] + pub fn suggest_for_field<'a>( + &'a self, + field: &'a str, + prefix: &'a str, + ) -> Vec<&'a SuggestEntry> { + if prefix.is_empty() { + return Vec::new(); + } + let Some(start) = self.find_first_prefix(field, prefix) else { + return Vec::new(); + }; + + let Some(entries) = self.fields.get(field) else { + return Vec::new(); + }; + + entries[start..] + .iter() + .take_while(|e| e.term.starts_with(prefix)) + .collect() + } +} + +impl SuggestIndex { + /// Serializes the suggest index to binary format. + /// + /// # Panics + /// + /// Panics if the number of fields or entries exceeds `u32::MAX`. + /// + /// # File format + /// - Header: MAGIC (4) + VERSION (1) + PADDING (3) + `FIELD_COUNT` (4) + /// - Per field: `FIELD_NAME_LEN` (4) + `FIELD_NAME` (bytes) + `TERM_COUNT` (4) + /// - Per term: `STR_LEN` (4) + TERM (bytes) + `DOC_FREQ` (4) + SCORE (4) + #[must_use] + pub fn to_bytes(&self) -> Vec { + let mut data = Vec::new(); + + // Header + data.extend_from_slice(&SUGGEST_MAGIC.to_le_bytes()); + data.push(SUGGEST_VERSION); + data.extend_from_slice(&[0u8, 0u8, 0u8]); // padding + data.extend_from_slice(&u32::try_from(self.fields.len()).unwrap().to_le_bytes()); + + for (field_name, entries) in &self.fields { + // Field header + let field_bytes = field_name.as_bytes(); + data.extend_from_slice(&u32::try_from(field_bytes.len()).unwrap().to_le_bytes()); + data.extend_from_slice(field_bytes); + data.extend_from_slice(&u32::try_from(entries.len()).unwrap().to_le_bytes()); + + for entry in entries { + data.extend_from_slice(&u32::try_from(entry.term.len()).unwrap().to_le_bytes()); + data.extend_from_slice(entry.term.as_bytes()); + data.extend_from_slice(&entry.doc_freq.to_le_bytes()); + data.extend_from_slice(&entry.score.to_le_bytes()); + } + } + + data + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_entries() -> Vec { + vec![ + SuggestEntry { + term: "elastic".to_string(), + doc_freq: 10, + score: 0.5, + }, + SuggestEntry { + term: "elasticsearch".to_string(), + doc_freq: 5, + score: 0.25, + }, + SuggestEntry { + term: "kubernetes".to_string(), + doc_freq: 8, + score: 0.4, + }, + SuggestEntry { + term: "rust".to_string(), + doc_freq: 3, + score: 0.15, + }, + ] + } + + #[test] + fn find_first_prefix_finds_exact_match() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // "elastic" exists + assert_eq!(reader.find_first_prefix("title", "elastic"), Some(0)); + // "elasticsearch" exists + assert_eq!(reader.find_first_prefix("title", "elasticsearch"), Some(1)); + // "kube" should match "kubernetes" + assert_eq!(reader.find_first_prefix("title", "kube"), Some(2)); + // "rust" exists + assert_eq!(reader.find_first_prefix("title", "rust"), Some(3)); + } + + #[test] + fn find_first_prefix_returns_none_for_non_matching_prefix() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // No term starts with "z" + assert_eq!(reader.find_first_prefix("title", "z"), None); + // No term starts with "java" + assert_eq!(reader.find_first_prefix("title", "java"), None); + } + + #[test] + fn find_first_prefix_returns_none_for_empty_entries() { + let reader = SuggestReader { + fields: std::collections::BTreeMap::new(), + }; + + assert_eq!(reader.find_first_prefix("title", "elastic"), None); + } + + #[test] + fn find_first_prefix_returns_none_for_missing_field() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + assert_eq!(reader.find_first_prefix("body", "elastic"), None); + } + + #[test] + fn suggest_for_field_iterates_correctly() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + let suggestions = reader.suggest_for_field("title", "elast"); + + assert_eq!(suggestions.len(), 2); + assert_eq!(suggestions[0].term, "elastic"); + assert_eq!(suggestions[1].term, "elasticsearch"); + } + + #[test] + fn suggest_for_field_returns_empty_for_no_match() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + let suggestions = reader.suggest_for_field("title", "z"); + + assert!(suggestions.is_empty()); + } + + #[test] + fn suggest_for_field_stops_at_prefix_boundary() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // "elast" should only match "elastic" and "elasticsearch", not "kubernetes" + let suggestions = reader.suggest_for_field("title", "elast"); + + assert_eq!(suggestions.len(), 2); + assert!(suggestions.iter().all(|e| e.term.starts_with("elast"))); + } + + #[test] + fn suggest_reader_from_bytes_round_trip() { + let index = SuggestIndex { + fields: std::collections::BTreeMap::from([ + ( + "title".to_string(), + vec![ + SuggestEntry { + term: "elastic".to_string(), + doc_freq: 10, + score: 0.5, + }, + SuggestEntry { + term: "elasticsearch".to_string(), + doc_freq: 5, + score: 0.25, + }, + ], + ), + ( + "description".to_string(), + vec![SuggestEntry { + term: "rust".to_string(), + doc_freq: 3, + score: 0.15, + }], + ), + ]), + }; + + let data = index.to_bytes(); + let loaded = SuggestReader::from_bytes(&data).expect("should load"); + + assert_eq!(loaded.fields.len(), 2); + let title_entries = loaded.get_field("title").expect("title field"); + assert_eq!(title_entries.len(), 2); + assert_eq!(title_entries[0].term, "elastic"); + assert_eq!(title_entries[0].doc_freq, 10); + + let desc_entries = loaded.get_field("description").expect("description field"); + assert_eq!(desc_entries.len(), 1); + assert_eq!(desc_entries[0].term, "rust"); + } + + #[test] + fn suggest_for_field_returns_empty_for_empty_prefix() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // Empty prefix should return no results, not the entire vocabulary + let suggestions = reader.suggest_for_field("title", ""); + assert!( + suggestions.is_empty(), + "empty prefix must not return all terms" + ); + } +} diff --git a/rust/crates/cloudsearch-storage/src/suggest_writer.rs b/rust/crates/cloudsearch-storage/src/suggest_writer.rs new file mode 100644 index 0000000..980c367 --- /dev/null +++ b/rust/crates/cloudsearch-storage/src/suggest_writer.rs @@ -0,0 +1,68 @@ +//! Suggest index file writer and reader. +//! +//! Writes `suggest_{segment}.bin` sidecar files during flush/merge. +//! Reads them back during index open. + +use crate::suggest_index::SuggestReader; +use std::path::{Path, PathBuf}; +use tokio::{ + fs::{self, OpenOptions}, + io::{AsyncReadExt, AsyncWriteExt}, +}; + +/// Path for suggest sidecar file for a given segment number. +#[must_use] +pub fn suggest_path(segments_dir: &Path, segment_num: u64) -> PathBuf { + segments_dir.join(format!("suggest_{segment_num:020}.bin")) +} + +/// Writes a suggest index binary file atomically (write to .tmp, then rename). +/// +/// # Errors +/// Returns an error if file operations fail. +pub async fn write_suggest( + segments_dir: &Path, + segment_num: u64, + data: &[u8], +) -> std::io::Result<()> { + let path = suggest_path(segments_dir, segment_num); + let tmp_path = path.with_extension("bin.tmp"); + + let mut file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&tmp_path) + .await?; + file.write_all(data).await?; + file.flush().await?; + file.sync_all().await?; + drop(file); + + fs::rename(&tmp_path, &path).await?; + + // Sync parent directory so the rename is durable + let dir_file = OpenOptions::new().read(true).open(segments_dir).await?; + dir_file.sync_all().await?; + + Ok(()) +} + +/// Reads a suggest index from disk. +/// +/// # Errors +/// Returns an error if file operations or parsing fails. +pub async fn read_suggest(segments_dir: &Path, segment_num: u64) -> std::io::Result { + let path = suggest_path(segments_dir, segment_num); + let mut file = OpenOptions::new().read(true).open(&path).await?; + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + SuggestReader::from_bytes(&data) +} + +/// Checks if a suggest sidecar file exists for a given segment. +pub async fn suggest_exists(segments_dir: &Path, segment_num: u64) -> bool { + tokio::fs::try_exists(suggest_path(segments_dir, segment_num)) + .await + .unwrap_or(false) +}