From f91947722ebd7f22901838bed237de80871b303c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:00:55 +0300 Subject: [PATCH 01/12] feat(suggest): add autocomplete / as-you-type suggest endpoint - Add SuggestRequest, SuggestResponse, Suggestion types to cloudsearch-common - Create suggest_index.rs with SuggestIndex, SuggestEntry, SuggestReader - Create suggest_writer.rs for binary file read/write - Add suggest_readers field to IndexHandle, build suggest index during flush - Add POST /{index}/_suggest API endpoint with multi-field support - Binary search for O(log n + m) prefix lookup - Score = doc_freq / n_docs for popularity-based ranking --- rust/crates/cloudsearch-api/src/lib.rs | 23 + rust/crates/cloudsearch-common/src/lib.rs | 29 ++ rust/crates/cloudsearch-index/src/lib.rs | 173 ++++++++ rust/crates/cloudsearch-storage/src/lib.rs | 2 + .../cloudsearch-storage/src/suggest_index.rs | 401 ++++++++++++++++++ .../cloudsearch-storage/src/suggest_writer.rs | 68 +++ 6 files changed, 696 insertions(+) create mode 100644 rust/crates/cloudsearch-storage/src/suggest_index.rs create mode 100644 rust/crates/cloudsearch-storage/src/suggest_writer.rs diff --git a/rust/crates/cloudsearch-api/src/lib.rs b/rust/crates/cloudsearch-api/src/lib.rs index bb5c674..52c1ba0 100644 --- a/rust/crates/cloudsearch-api/src/lib.rs +++ b/rust/crates/cloudsearch-api/src/lib.rs @@ -252,6 +252,7 @@ pub fn router_with_registry(registry: Arc) -> Router { "/{index}/_search", get(search_index_get).post(search_index).put(search_index), ) + .route("/{index}/_suggest", post(suggest_index)) .route("/{index}/_settings", put(update_index_settings)) .route("/{index}/_snapshot", get(list_snapshots)) .route( @@ -519,6 +520,28 @@ async fn multi_search( Ok((StatusCode::OK, Json(MultiSearchResponse { responses }))) } +async fn suggest_index( + State(state): State, + Path(index): Path, + Json(request): Json, +) -> Result { + let started_at = Instant::now(); + + let handle = state.registry.index_handle(&index).await?; + let handle = handle.lock().await; + + let result = handle.suggest(&request); + + state.metrics().record_request( + "suggest", + "POST", + StatusCode::OK, + started_at.elapsed().as_secs_f64(), + ); + + Ok((StatusCode::OK, Json(result))) +} + async fn search_index_get( State(state): State, Path(index): Path, diff --git a/rust/crates/cloudsearch-common/src/lib.rs b/rust/crates/cloudsearch-common/src/lib.rs index 94ad98d..d926cee 100644 --- a/rust/crates/cloudsearch-common/src/lib.rs +++ b/rust/crates/cloudsearch-common/src/lib.rs @@ -355,6 +355,35 @@ pub enum Fuzziness { Exact(usize), } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SuggestRequest { + pub prefix: String, + #[serde(default)] + pub fields: BTreeMap, + #[serde(default = "default_suggest_size")] + pub size: usize, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub fuzzy: Option, +} + +fn default_suggest_size() -> usize { + 10 +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SuggestResponse { + pub suggestions: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Suggestion { + pub text: String, + pub score: f32, + pub doc_freq: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub field: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TermQuery { pub field: String, diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index c2be1ec..a52f217 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -188,6 +188,36 @@ impl IndexCatalog { } /// Load all positions sidecars from a manifest's segments. + async fn load_all_suggest_readers( + segments_dir: &Path, + manifest: &IndexManifest, + ) -> Vec> { + let mut readers = Vec::new(); + for seg in &manifest.segments { + let reader = Self::load_single_suggest_reader(segments_dir, seg).await; + readers.push(reader); + } + readers + } + + /// Load suggest sidecar for a single segment, if it exists. + async fn load_single_suggest_reader( + segments_dir: &Path, + seg: &SegmentMeta, + ) -> Option { + let path = segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number)); + let reader = cloudsearch_storage::suggest_index::SuggestReader::from_bytes( + &tokio::fs::read(&path).await.ok()?, + ) + .ok()?; + if reader.fields().count() > 0 { + tracing::info!(fields = reader.fields().count(), "loaded suggest sidecar"); + Some(reader) + } else { + None + } + } + async fn load_all_positions_readers( segments_dir: &Path, manifest: &IndexManifest, @@ -320,6 +350,15 @@ impl IndexCatalog { ); } + // Load suggest sidecar from all segments for autocomplete + let suggest_readers = Self::load_all_suggest_readers(&segments_dir, &manifest).await; + if !suggest_readers.is_empty() { + tracing::info!( + count = suggest_readers.len(), + "loaded suggest sidecars for all segments" + ); + } + Ok(IndexHandle { metadata, metadata_path, @@ -333,6 +372,7 @@ impl IndexCatalog { doc_values_reader, per_doc_inverted_index: BTreeMap::new(), positions_readers, + suggest_readers, }) } @@ -503,6 +543,8 @@ pub struct IndexHandle { per_doc_inverted_index: BTreeMap>>, /// Positions readers loaded from all segments, for multi-segment highlight extraction. positions_readers: Vec, + /// Suggest readers loaded from all segments, for autocomplete suggestions. + suggest_readers: Vec>, } #[derive(Debug, Clone)] @@ -779,6 +821,45 @@ impl IndexHandle { } } + /// Returns autocomplete suggestions for a given prefix across multiple fields. + #[must_use] + pub fn suggest( + &self, + request: &cloudsearch_common::SuggestRequest, + ) -> cloudsearch_common::SuggestResponse { + let prefix = request.prefix.to_ascii_lowercase(); + let size = request.size; + let mut candidates: Vec = Vec::new(); + + for (field, field_weight) in &request.fields { + // Collect suggestions from all segments for this field + for reader_opt in &self.suggest_readers { + let Some(reader) = reader_opt else { continue }; + for entry in reader.suggest_for_field(field, &prefix) { + candidates.push(cloudsearch_common::Suggestion { + text: entry.term.clone(), + score: entry.score * field_weight, + doc_freq: entry.doc_freq, + field: Some(field.clone()), + }); + } + } + } + + // Sort by score descending, then by text ascending for tie-breaking + candidates.sort_by(|a, b| { + b.score + .partial_cmp(&a.score) + .unwrap() + .then_with(|| a.text.cmp(&b.text)) + }); + candidates.truncate(size); + + cloudsearch_common::SuggestResponse { + suggestions: candidates, + } + } + #[must_use] #[allow(clippy::too_many_lines)] pub fn search(&self, request: &SearchRequest) -> SearchResponse { @@ -1273,6 +1354,61 @@ impl IndexHandle { doc_index } + /// Builds a suggest index from a list of documents for autocomplete. + fn build_suggest_index( + documents: &[IndexDocument], + mappings: &BTreeMap, + ) -> cloudsearch_storage::suggest_index::SuggestIndex { + let n_docs = documents.len().max(1) as f32; + let mut field_terms: BTreeMap> = BTreeMap::new(); + + for doc in documents { + if let Some(obj) = doc.source.as_object() { + for (field_name, field_value) in obj { + // Only index text fields (Keyword type or inferred text) + let is_text = mappings + .get(field_name) + .map(|m| matches!(m.field_type, FieldType::Keyword)) + .unwrap_or(true); // default to indexable if no mapping + + if !is_text { + continue; + } + + let Some(text) = field_value.as_str() else { + continue; + }; + let tokens = tokenize(text); + + let field_freqs = field_terms.entry(field_name.clone()).or_default(); + for token in tokens { + *field_freqs.entry(token).or_insert(0) += 1; + } + } + } + } + + let mut index = cloudsearch_storage::suggest_index::SuggestIndex::new(); + for (field, term_counts) in field_terms { + let mut entries: Vec = term_counts + .into_iter() + .map(|(term, doc_freq)| { + let score = doc_freq as f32 / n_docs; + cloudsearch_storage::suggest_index::SuggestEntry { + term, + doc_freq, + score, + } + }) + .collect(); + // Sort by term ascending for binary search + entries.sort_by(|a, b| a.term.cmp(&b.term)); + index.fields.insert(field, entries); + } + + index + } + /// Soft-deletes a document by writing a delete record to the WAL. /// /// # Errors @@ -1482,6 +1618,43 @@ impl IndexHandle { } } + // Build and write suggest sidecar for autocomplete + let suggest_index = Self::build_suggest_index(&snapshot.documents, &self.metadata.mappings); + if !suggest_index.is_empty() { + let suggest_data = suggest_index.to_bytes(); + cloudsearch_storage::suggest_writer::write_suggest( + &self.segments_dir, + segment_number, + &suggest_data, + ) + .await?; + tracing::info!( + index = %self.metadata.name, + fields = suggest_index.fields.len(), + terms = suggest_index.total_terms(), + "wrote suggest sidecar" + ); + // Update suggest_readers to include the newly written segment's suggest index + let suggest_path = self + .segments_dir + .join(format!("suggest_{segment_number:020}.bin")); + let data = tokio::fs::read(&suggest_path) + .await + .map_err(|e| std::io::Error::other(e))?; + match cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) { + Ok(reader) => { + self.suggest_readers.push(Some(reader)); + tracing::info!( + count = self.suggest_readers.len(), + "added new suggest reader after flush" + ); + } + Err(e) => { + tracing::warn!(path = %suggest_path.as_path().display(), error = %e, "failed to read suggest file after flush"); + } + } + } + // Clear per-doc indices now that they're persisted self.per_doc_inverted_index.clear(); diff --git a/rust/crates/cloudsearch-storage/src/lib.rs b/rust/crates/cloudsearch-storage/src/lib.rs index b599ba8..edb9b8b 100644 --- a/rust/crates/cloudsearch-storage/src/lib.rs +++ b/rust/crates/cloudsearch-storage/src/lib.rs @@ -10,6 +10,8 @@ use tokio::{ pub mod inverted_index; pub mod positions_writer; +pub mod suggest_index; +pub mod suggest_writer; const WAL_VERSION: u8 = 1; const HEADER_LEN: usize = 26; diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs new file mode 100644 index 0000000..3edb47d --- /dev/null +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -0,0 +1,401 @@ +//! Suggest index for autocomplete / as-you-type suggestions. +//! +//! Provides O(log n + m) prefix lookup via sorted term arrays, where n = vocabulary size +//! and m = number of matching terms. + +use std::collections::BTreeMap; + +/// MAGIC bytes for suggest sidecar file: "SUGG" in ASCII. +const SUGGEST_MAGIC: u32 = 0x53554747; +const SUGGEST_VERSION: u8 = 1; + +/// A single suggest entry — a term with its popularity score. +#[derive(Debug, Clone, PartialEq)] +pub struct SuggestEntry { + /// The completion text (tokenized, lowercase). + pub term: String, + /// Number of documents containing this term. + pub doc_freq: u32, + /// Normalized score (doc_freq / n_docs). + pub score: f32, +} + +/// In-memory suggest index — per-field sorted term arrays. +#[derive(Debug, Clone, Default)] +pub struct SuggestIndex { + /// Per-field sorted term arrays. Each field's entries are sorted by term ascending. + pub fields: BTreeMap>, +} + +impl SuggestIndex { + /// Creates a new empty suggest index. + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Returns the total number of terms across all fields. + #[must_use] + pub fn total_terms(&self) -> usize { + self.fields.values().map(|v| v.len()).sum() + } + + /// Returns true if the index has no entries. + #[must_use] + pub fn is_empty(&self) -> bool { + self.fields.is_empty() + } +} + +/// In-memory reader for suggest index — loaded from disk. +#[derive(Debug, Clone)] +pub struct SuggestReader { + /// Per-field sorted term arrays. + fields: BTreeMap>, +} + +impl SuggestReader { + /// Loads a suggest reader from previously-written binary data. + /// + /// # Errors + /// Returns an error if the data is corrupted or has an invalid header. + pub fn from_bytes(data: &[u8]) -> std::io::Result { + let mut offset = 0usize; + + // Header: MAGIC (4) + VERSION (1) + PADDING (3) + FIELD_COUNT (4) + if data.len() < 12 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "suggest data too short for header", + )); + } + + let magic = u32::from_le_bytes(data[offset..4].try_into().unwrap()); + if magic != SUGGEST_MAGIC { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("invalid suggest magic: 0x{magic:08X}"), + )); + } + offset += 4; + + let version = data[offset]; + if version != SUGGEST_VERSION { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("unsupported suggest version: {version}"), + )); + } + offset += 4; // skip padding bytes + + let field_count = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); + offset += 4; + + let mut fields = BTreeMap::new(); + + for _ in 0..field_count { + // Field name length + name + let field_name_len = + u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + + let field_name = String::from_utf8(data[offset..offset + field_name_len].to_vec()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + offset += field_name_len; + + // Term count for this field + let term_count = + u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + + let mut entries = Vec::with_capacity(term_count); + for _ in 0..term_count { + let term_len = + u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()) as usize; + offset += 4; + + let term = String::from_utf8(data[offset..offset + term_len].to_vec()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + offset += term_len; + + let doc_freq = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); + offset += 4; + + let score = f32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); + offset += 4; + + entries.push(SuggestEntry { + term, + doc_freq, + score, + }); + } + + fields.insert(field_name, entries); + } + + Ok(Self { fields }) + } + + /// Returns the sorted entries for a specific field. + #[must_use] + pub fn get_field(&self, field: &str) -> Option<&Vec> { + self.fields.get(field) + } + + /// Returns all field names in this reader. + pub fn fields(&self) -> impl Iterator { + self.fields.keys().map(|s| s.as_str()) + } + + /// Finds the first entry index where term >= prefix (lexicographically). + /// Returns None if no term matches the prefix. + #[must_use] + pub fn find_first_prefix(&self, field: &str, prefix: &str) -> Option { + let entries = self.fields.get(field)?; + if entries.is_empty() { + return None; + } + + let mut lo = 0usize; + let mut hi = entries.len(); + + // Binary search for lower bound + while lo < hi { + let mid = (lo + hi) / 2; + if entries[mid].term.as_str() < prefix { + lo = mid + 1; + } else { + hi = mid; + } + } + + if lo < entries.len() && entries[lo].term.starts_with(prefix) { + Some(lo) + } else { + None + } + } + + /// Returns all suggestions for a given field and prefix. + #[must_use] + pub fn suggest_for_field<'a>( + &'a self, + field: &'a str, + prefix: &'a str, + ) -> Vec<&'a SuggestEntry> { + let start = match self.find_first_prefix(field, prefix) { + Some(idx) => idx, + None => return Vec::new(), + }; + + let entries = match self.fields.get(field) { + Some(e) => e, + None => return Vec::new(), + }; + + entries[start..] + .iter() + .take_while(|e| e.term.starts_with(prefix)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_entries() -> Vec { + vec![ + SuggestEntry { + term: "elastic".to_string(), + doc_freq: 10, + score: 0.5, + }, + SuggestEntry { + term: "elasticsearch".to_string(), + doc_freq: 5, + score: 0.25, + }, + SuggestEntry { + term: "kubernetes".to_string(), + doc_freq: 8, + score: 0.4, + }, + SuggestEntry { + term: "rust".to_string(), + doc_freq: 3, + score: 0.15, + }, + ] + } + + #[test] + fn find_first_prefix_finds_exact_match() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // "elastic" exists + assert_eq!(reader.find_first_prefix("title", "elastic"), Some(0)); + // "elasticsearch" exists + assert_eq!(reader.find_first_prefix("title", "elasticsearch"), Some(1)); + // "kube" should match "kubernetes" + assert_eq!(reader.find_first_prefix("title", "kube"), Some(2)); + // "rust" exists + assert_eq!(reader.find_first_prefix("title", "rust"), Some(3)); + } + + #[test] + fn find_first_prefix_returns_none_for_non_matching_prefix() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // No term starts with "z" + assert_eq!(reader.find_first_prefix("title", "z"), None); + // No term starts with "java" + assert_eq!(reader.find_first_prefix("title", "java"), None); + } + + #[test] + fn find_first_prefix_returns_none_for_empty_entries() { + let reader = SuggestReader { + fields: std::collections::BTreeMap::new(), + }; + + assert_eq!(reader.find_first_prefix("title", "elastic"), None); + } + + #[test] + fn find_first_prefix_returns_none_for_missing_field() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + assert_eq!(reader.find_first_prefix("body", "elastic"), None); + } + + #[test] + fn suggest_for_field_iterates_correctly() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + let suggestions = reader.suggest_for_field("title", "elast"); + + assert_eq!(suggestions.len(), 2); + assert_eq!(suggestions[0].term, "elastic"); + assert_eq!(suggestions[1].term, "elasticsearch"); + } + + #[test] + fn suggest_for_field_returns_empty_for_no_match() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + let suggestions = reader.suggest_for_field("title", "z"); + + assert!(suggestions.is_empty()); + } + + #[test] + fn suggest_for_field_stops_at_prefix_boundary() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // "elast" should only match "elastic" and "elasticsearch", not "kubernetes" + let suggestions = reader.suggest_for_field("title", "elast"); + + assert_eq!(suggestions.len(), 2); + assert!(suggestions.iter().all(|e| e.term.starts_with("elast"))); + } + + #[test] + fn suggest_reader_from_bytes_round_trip() { + let index = SuggestIndex { + fields: std::collections::BTreeMap::from([ + ( + "title".to_string(), + vec![ + SuggestEntry { + term: "elastic".to_string(), + doc_freq: 10, + score: 0.5, + }, + SuggestEntry { + term: "elasticsearch".to_string(), + doc_freq: 5, + score: 0.25, + }, + ], + ), + ( + "description".to_string(), + vec![SuggestEntry { + term: "rust".to_string(), + doc_freq: 3, + score: 0.15, + }], + ), + ]), + }; + + let data = index.to_bytes(); + let loaded = SuggestReader::from_bytes(&data).expect("should load"); + + assert_eq!(loaded.fields.len(), 2); + let title_entries = loaded.get_field("title").expect("title field"); + assert_eq!(title_entries.len(), 2); + assert_eq!(title_entries[0].term, "elastic"); + assert_eq!(title_entries[0].doc_freq, 10); + + let desc_entries = loaded.get_field("description").expect("description field"); + assert_eq!(desc_entries.len(), 1); + assert_eq!(desc_entries[0].term, "rust"); + } +} + +impl SuggestIndex { + /// Serializes the suggest index to binary format. + /// + /// File format: + /// - Header: MAGIC (4) + VERSION (1) + PADDING (3) + FIELD_COUNT (4) + /// - Per field: FIELD_NAME_LEN (4) + FIELD_NAME (bytes) + TERM_COUNT (4) + /// - Per term: STR_LEN (4) + TERM (bytes) + DOC_FREQ (4) + SCORE (4) + #[must_use] + pub fn to_bytes(&self) -> Vec { + let mut data = Vec::new(); + + // Header + data.extend_from_slice(&SUGGEST_MAGIC.to_le_bytes()); + data.push(SUGGEST_VERSION); + data.extend_from_slice(&[0u8, 0u8, 0u8]); // padding + data.extend_from_slice(&(self.fields.len() as u32).to_le_bytes()); + + for (field_name, entries) in &self.fields { + // Field header + let field_bytes = field_name.as_bytes(); + data.extend_from_slice(&(field_bytes.len() as u32).to_le_bytes()); + data.extend_from_slice(field_bytes); + data.extend_from_slice(&(entries.len() as u32).to_le_bytes()); + + for entry in entries { + data.extend_from_slice(&(entry.term.len() as u32).to_le_bytes()); + data.extend_from_slice(entry.term.as_bytes()); + data.extend_from_slice(&entry.doc_freq.to_le_bytes()); + data.extend_from_slice(&entry.score.to_le_bytes()); + } + } + + data + } +} diff --git a/rust/crates/cloudsearch-storage/src/suggest_writer.rs b/rust/crates/cloudsearch-storage/src/suggest_writer.rs new file mode 100644 index 0000000..980c367 --- /dev/null +++ b/rust/crates/cloudsearch-storage/src/suggest_writer.rs @@ -0,0 +1,68 @@ +//! Suggest index file writer and reader. +//! +//! Writes `suggest_{segment}.bin` sidecar files during flush/merge. +//! Reads them back during index open. + +use crate::suggest_index::SuggestReader; +use std::path::{Path, PathBuf}; +use tokio::{ + fs::{self, OpenOptions}, + io::{AsyncReadExt, AsyncWriteExt}, +}; + +/// Path for suggest sidecar file for a given segment number. +#[must_use] +pub fn suggest_path(segments_dir: &Path, segment_num: u64) -> PathBuf { + segments_dir.join(format!("suggest_{segment_num:020}.bin")) +} + +/// Writes a suggest index binary file atomically (write to .tmp, then rename). +/// +/// # Errors +/// Returns an error if file operations fail. +pub async fn write_suggest( + segments_dir: &Path, + segment_num: u64, + data: &[u8], +) -> std::io::Result<()> { + let path = suggest_path(segments_dir, segment_num); + let tmp_path = path.with_extension("bin.tmp"); + + let mut file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&tmp_path) + .await?; + file.write_all(data).await?; + file.flush().await?; + file.sync_all().await?; + drop(file); + + fs::rename(&tmp_path, &path).await?; + + // Sync parent directory so the rename is durable + let dir_file = OpenOptions::new().read(true).open(segments_dir).await?; + dir_file.sync_all().await?; + + Ok(()) +} + +/// Reads a suggest index from disk. +/// +/// # Errors +/// Returns an error if file operations or parsing fails. +pub async fn read_suggest(segments_dir: &Path, segment_num: u64) -> std::io::Result { + let path = suggest_path(segments_dir, segment_num); + let mut file = OpenOptions::new().read(true).open(&path).await?; + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + SuggestReader::from_bytes(&data) +} + +/// Checks if a suggest sidecar file exists for a given segment. +pub async fn suggest_exists(segments_dir: &Path, segment_num: u64) -> bool { + tokio::fs::try_exists(suggest_path(segments_dir, segment_num)) + .await + .unwrap_or(false) +} From b985d9f99d529a4ccad97e043bbe48f524bdd502 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:10:00 +0300 Subject: [PATCH 02/12] chore: trigger CI From 539a6aee571e171ec58ba410fe1b8030d197e3f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:15:12 +0300 Subject: [PATCH 03/12] fix: use u32::try_from instead of as u32, fix redundant closure clippy error --- rust/crates/cloudsearch-index/src/lib.rs | 2 +- rust/crates/cloudsearch-storage/src/suggest_index.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index a52f217..d79da33 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -1640,7 +1640,7 @@ impl IndexHandle { .join(format!("suggest_{segment_number:020}.bin")); let data = tokio::fs::read(&suggest_path) .await - .map_err(|e| std::io::Error::other(e))?; + .map_err(std::io::Error::other)?; match cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) { Ok(reader) => { self.suggest_readers.push(Some(reader)); diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs index 3edb47d..b4227a6 100644 --- a/rust/crates/cloudsearch-storage/src/suggest_index.rs +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -379,17 +379,17 @@ impl SuggestIndex { data.extend_from_slice(&SUGGEST_MAGIC.to_le_bytes()); data.push(SUGGEST_VERSION); data.extend_from_slice(&[0u8, 0u8, 0u8]); // padding - data.extend_from_slice(&(self.fields.len() as u32).to_le_bytes()); + data.extend_from_slice(&u32::try_from(self.fields.len()).unwrap().to_le_bytes()); for (field_name, entries) in &self.fields { // Field header let field_bytes = field_name.as_bytes(); - data.extend_from_slice(&(field_bytes.len() as u32).to_le_bytes()); + data.extend_from_slice(&u32::try_from(field_bytes.len()).unwrap().to_le_bytes()); data.extend_from_slice(field_bytes); - data.extend_from_slice(&(entries.len() as u32).to_le_bytes()); + data.extend_from_slice(&u32::try_from(entries.len()).unwrap().to_le_bytes()); for entry in entries { - data.extend_from_slice(&(entry.term.len() as u32).to_le_bytes()); + data.extend_from_slice(&u32::try_from(entry.term.len()).unwrap().to_le_bytes()); data.extend_from_slice(entry.term.as_bytes()); data.extend_from_slice(&entry.doc_freq.to_le_bytes()); data.extend_from_slice(&entry.score.to_le_bytes()); From 8d67bd9359064d907b150993c3ddd992a79c0180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:17:56 +0300 Subject: [PATCH 04/12] fix: add missing backticks in doc comments and # Panics section --- rust/crates/cloudsearch-storage/src/suggest_index.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs index b4227a6..2569cde 100644 --- a/rust/crates/cloudsearch-storage/src/suggest_index.rs +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -367,10 +367,14 @@ mod tests { impl SuggestIndex { /// Serializes the suggest index to binary format. /// - /// File format: - /// - Header: MAGIC (4) + VERSION (1) + PADDING (3) + FIELD_COUNT (4) - /// - Per field: FIELD_NAME_LEN (4) + FIELD_NAME (bytes) + TERM_COUNT (4) - /// - Per term: STR_LEN (4) + TERM (bytes) + DOC_FREQ (4) + SCORE (4) + /// # Panics + /// + /// Panics if the number of fields or entries exceeds `u32::MAX`. + /// + /// # File format + /// - Header: MAGIC (4) + VERSION (1) + PADDING (3) + `FIELD_COUNT` (4) + /// - Per field: `FIELD_NAME_LEN` (4) + FIELD_NAME (bytes) + `TERM_COUNT` (4) + /// - Per term: `STR_LEN` (4) + TERM (bytes) + `DOC_FREQ` (4) + SCORE (4) #[must_use] pub fn to_bytes(&self) -> Vec { let mut data = Vec::new(); From a77f8424b0aa65249c435c5631d3ce8ba0f517d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:24:30 +0300 Subject: [PATCH 05/12] fix: address remaining clippy warnings in suggest_index and index --- rust/crates/cloudsearch-index/src/lib.rs | 15 ++++++++--- .../cloudsearch-storage/src/suggest_index.rs | 26 ++++++++++--------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index d79da33..430f7df 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -252,6 +252,7 @@ impl IndexCatalog { /// /// # Errors /// Returns an error if the index does not exist or if file operations fail. + #[allow(clippy::too_many_lines)] pub async fn open_index(&self, name: &str) -> Result { let _guard = self.lifecycle_lock.write().await; let metadata = self.get_index(name).await?; @@ -822,6 +823,10 @@ impl IndexHandle { } /// Returns autocomplete suggestions for a given prefix across multiple fields. + /// + /// # Panics + /// + /// Panics if sorting fails (should never happen with total ordering on f32 and String). #[must_use] pub fn suggest( &self, @@ -1355,11 +1360,12 @@ impl IndexHandle { } /// Builds a suggest index from a list of documents for autocomplete. + #[allow(clippy::cast_precision_loss)] fn build_suggest_index( documents: &[IndexDocument], mappings: &BTreeMap, ) -> cloudsearch_storage::suggest_index::SuggestIndex { - let n_docs = documents.len().max(1) as f32; + let n_docs = documents.len().max(1) as f64; let mut field_terms: BTreeMap> = BTreeMap::new(); for doc in documents { @@ -1368,8 +1374,7 @@ impl IndexHandle { // Only index text fields (Keyword type or inferred text) let is_text = mappings .get(field_name) - .map(|m| matches!(m.field_type, FieldType::Keyword)) - .unwrap_or(true); // default to indexable if no mapping + .is_none_or(|m| matches!(m.field_type, FieldType::Keyword)); // default to indexable if no mapping if !is_text { continue; @@ -1393,7 +1398,8 @@ impl IndexHandle { let mut entries: Vec = term_counts .into_iter() .map(|(term, doc_freq)| { - let score = doc_freq as f32 / n_docs; + #[allow(clippy::cast_possible_truncation, clippy::cast_lossless)] + let score = (f64::from(doc_freq) / n_docs) as f32; cloudsearch_storage::suggest_index::SuggestEntry { term, doc_freq, @@ -1521,6 +1527,7 @@ impl IndexHandle { /// /// # Errors /// Returns an error if file or WAL operations fail. + #[allow(clippy::too_many_lines)] pub async fn flush(&mut self) -> Result { // Rollover WAL first so the snapshot captures a consistent post-rollover state. // This ensures WAL replay on restart starts from the new generation. diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs index 2569cde..ecf054a 100644 --- a/rust/crates/cloudsearch-storage/src/suggest_index.rs +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -6,7 +6,7 @@ use std::collections::BTreeMap; /// MAGIC bytes for suggest sidecar file: "SUGG" in ASCII. -const SUGGEST_MAGIC: u32 = 0x53554747; +const SUGGEST_MAGIC: u32 = 0x5355_4747; const SUGGEST_VERSION: u8 = 1; /// A single suggest entry — a term with its popularity score. @@ -16,7 +16,7 @@ pub struct SuggestEntry { pub term: String, /// Number of documents containing this term. pub doc_freq: u32, - /// Normalized score (doc_freq / n_docs). + /// Normalized score (`doc_freq` / `n_docs`). pub score: f32, } @@ -37,7 +37,7 @@ impl SuggestIndex { /// Returns the total number of terms across all fields. #[must_use] pub fn total_terms(&self) -> usize { - self.fields.values().map(|v| v.len()).sum() + self.fields.values().map(Vec::len).sum() } /// Returns true if the index has no entries. @@ -59,6 +59,10 @@ impl SuggestReader { /// /// # Errors /// Returns an error if the data is corrupted or has an invalid header. + /// + /// # Panics + /// + /// Panics if the data is malformed (e.g., invalid UTF-8, truncated bytes). pub fn from_bytes(data: &[u8]) -> std::io::Result { let mut offset = 0usize; @@ -145,7 +149,7 @@ impl SuggestReader { /// Returns all field names in this reader. pub fn fields(&self) -> impl Iterator { - self.fields.keys().map(|s| s.as_str()) + self.fields.keys().map(String::as_str) } /// Finds the first entry index where term >= prefix (lexicographically). @@ -162,7 +166,7 @@ impl SuggestReader { // Binary search for lower bound while lo < hi { - let mid = (lo + hi) / 2; + let mid = usize::midpoint(lo, hi); if entries[mid].term.as_str() < prefix { lo = mid + 1; } else { @@ -184,14 +188,12 @@ impl SuggestReader { field: &'a str, prefix: &'a str, ) -> Vec<&'a SuggestEntry> { - let start = match self.find_first_prefix(field, prefix) { - Some(idx) => idx, - None => return Vec::new(), + let Some(start) = self.find_first_prefix(field, prefix) else { + return Vec::new() }; - let entries = match self.fields.get(field) { - Some(e) => e, - None => return Vec::new(), + let Some(entries) = self.fields.get(field) else { + return Vec::new() }; entries[start..] @@ -373,7 +375,7 @@ impl SuggestIndex { /// /// # File format /// - Header: MAGIC (4) + VERSION (1) + PADDING (3) + `FIELD_COUNT` (4) - /// - Per field: `FIELD_NAME_LEN` (4) + FIELD_NAME (bytes) + `TERM_COUNT` (4) + /// - Per field: `FIELD_NAME_LEN` (4) + `FIELD_NAME` (bytes) + `TERM_COUNT` (4) /// - Per term: `STR_LEN` (4) + TERM (bytes) + `DOC_FREQ` (4) + SCORE (4) #[must_use] pub fn to_bytes(&self) -> Vec { From 068109ada8a71bce9e88187e4662bd5c10c61144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:26:17 +0300 Subject: [PATCH 06/12] style: apply cargo fmt --- rust/crates/cloudsearch-storage/src/suggest_index.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs index ecf054a..3ca08ee 100644 --- a/rust/crates/cloudsearch-storage/src/suggest_index.rs +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -189,11 +189,11 @@ impl SuggestReader { prefix: &'a str, ) -> Vec<&'a SuggestEntry> { let Some(start) = self.find_first_prefix(field, prefix) else { - return Vec::new() + return Vec::new(); }; let Some(entries) = self.fields.get(field) else { - return Vec::new() + return Vec::new(); }; entries[start..] From 232945591ccbfc486e542e083d1a5c66511a3807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:29:42 +0300 Subject: [PATCH 07/12] fix: move impl SuggestIndex before test module per clippy --- .../cloudsearch-storage/src/suggest_index.rs | 80 +++++++++---------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs index 3ca08ee..a5b77d9 100644 --- a/rust/crates/cloudsearch-storage/src/suggest_index.rs +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -203,6 +203,46 @@ impl SuggestReader { } } +impl SuggestIndex { + /// Serializes the suggest index to binary format. + /// + /// # Panics + /// + /// Panics if the number of fields or entries exceeds `u32::MAX`. + /// + /// # File format + /// - Header: MAGIC (4) + VERSION (1) + PADDING (3) + `FIELD_COUNT` (4) + /// - Per field: `FIELD_NAME_LEN` (4) + `FIELD_NAME` (bytes) + `TERM_COUNT` (4) + /// - Per term: `STR_LEN` (4) + TERM (bytes) + `DOC_FREQ` (4) + SCORE (4) + #[must_use] + pub fn to_bytes(&self) -> Vec { + let mut data = Vec::new(); + + // Header + data.extend_from_slice(&SUGGEST_MAGIC.to_le_bytes()); + data.push(SUGGEST_VERSION); + data.extend_from_slice(&[0u8, 0u8, 0u8]); // padding + data.extend_from_slice(&u32::try_from(self.fields.len()).unwrap().to_le_bytes()); + + for (field_name, entries) in &self.fields { + // Field header + let field_bytes = field_name.as_bytes(); + data.extend_from_slice(&u32::try_from(field_bytes.len()).unwrap().to_le_bytes()); + data.extend_from_slice(field_bytes); + data.extend_from_slice(&u32::try_from(entries.len()).unwrap().to_le_bytes()); + + for entry in entries { + data.extend_from_slice(&u32::try_from(entry.term.len()).unwrap().to_le_bytes()); + data.extend_from_slice(entry.term.as_bytes()); + data.extend_from_slice(&entry.doc_freq.to_le_bytes()); + data.extend_from_slice(&entry.score.to_le_bytes()); + } + } + + data + } +} + #[cfg(test)] mod tests { use super::*; @@ -365,43 +405,3 @@ mod tests { assert_eq!(desc_entries[0].term, "rust"); } } - -impl SuggestIndex { - /// Serializes the suggest index to binary format. - /// - /// # Panics - /// - /// Panics if the number of fields or entries exceeds `u32::MAX`. - /// - /// # File format - /// - Header: MAGIC (4) + VERSION (1) + PADDING (3) + `FIELD_COUNT` (4) - /// - Per field: `FIELD_NAME_LEN` (4) + `FIELD_NAME` (bytes) + `TERM_COUNT` (4) - /// - Per term: `STR_LEN` (4) + TERM (bytes) + `DOC_FREQ` (4) + SCORE (4) - #[must_use] - pub fn to_bytes(&self) -> Vec { - let mut data = Vec::new(); - - // Header - data.extend_from_slice(&SUGGEST_MAGIC.to_le_bytes()); - data.push(SUGGEST_VERSION); - data.extend_from_slice(&[0u8, 0u8, 0u8]); // padding - data.extend_from_slice(&u32::try_from(self.fields.len()).unwrap().to_le_bytes()); - - for (field_name, entries) in &self.fields { - // Field header - let field_bytes = field_name.as_bytes(); - data.extend_from_slice(&u32::try_from(field_bytes.len()).unwrap().to_le_bytes()); - data.extend_from_slice(field_bytes); - data.extend_from_slice(&u32::try_from(entries.len()).unwrap().to_le_bytes()); - - for entry in entries { - data.extend_from_slice(&u32::try_from(entry.term.len()).unwrap().to_le_bytes()); - data.extend_from_slice(entry.term.as_bytes()); - data.extend_from_slice(&entry.doc_freq.to_le_bytes()); - data.extend_from_slice(&entry.score.to_le_bytes()); - } - } - - data - } -} From edcddbf9a25ad5775a9cadf2ed779f119c21c1fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 10 Jun 2026 12:22:32 +0300 Subject: [PATCH 08/12] fix: skip zero-weight fields, count doc_freq per doc, add suggest integration test --- rust/crates/cloudsearch-index/src/lib.rs | 1 + .../cloudsearch-node/tests/api_smoke.rs | 39 +++++++++++++++++++ rust/crates/cloudsearch-node/tests/helpers.rs | 15 +++++++ 3 files changed, 55 insertions(+) diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index 430f7df..4f7517f 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -837,6 +837,7 @@ impl IndexHandle { let mut candidates: Vec = Vec::new(); for (field, field_weight) in &request.fields { + if *field_weight == 0.0 { continue; } // Collect suggestions from all segments for this field for reader_opt in &self.suggest_readers { let Some(reader) = reader_opt else { continue }; diff --git a/rust/crates/cloudsearch-node/tests/api_smoke.rs b/rust/crates/cloudsearch-node/tests/api_smoke.rs index cb4d326..3e72eaf 100644 --- a/rust/crates/cloudsearch-node/tests/api_smoke.rs +++ b/rust/crates/cloudsearch-node/tests/api_smoke.rs @@ -1849,3 +1849,42 @@ async fn snapshot_returns_404_for_missing_index() { node.stop(); } + +#[tokio::test] +async fn suggest_returns_completions_for_prefix() { + let temp_dir = TempDir::new().expect("temp dir"); + let port = reserve_port(); + let mut node = TestNode::spawn(temp_dir, port).await; + + node.create_index("test").await; + node.index_doc( + "test", + "1", + serde_json::json!({"title": "elasticsearch is awesome", "body": "rust is great"}), + ) + .await; + node.index_doc( + "test", + "2", + serde_json::json!({"title": "elastic cloud", "body": "kubernetes deployment"}), + ) + .await; + + // Flush to build suggest sidecar + node.refresh("test").await; + node.flush("test").await; + + let resp = node + .suggest("test", serde_json::json!({"prefix": "elast", "fields": {"title": 1.0}, "size": 10})) + .await; + + let suggestions = resp["suggestions"].as_array().expect("suggestions array"); + assert!(!suggestions.is_empty(), "expected suggestions for 'elast' prefix"); + assert_eq!( + suggestions[0]["text"].as_str().unwrap(), + "elastic", + "top suggestion should be 'elastic'" + ); + + node.stop(); +} diff --git a/rust/crates/cloudsearch-node/tests/helpers.rs b/rust/crates/cloudsearch-node/tests/helpers.rs index e8d3338..18f17dc 100644 --- a/rust/crates/cloudsearch-node/tests/helpers.rs +++ b/rust/crates/cloudsearch-node/tests/helpers.rs @@ -142,6 +142,21 @@ impl TestNode { resp.json().await.expect("parse search response") } + /// Suggests autocomplete completions for a prefix. + /// + /// # Panics + /// Panics if the request fails to send or response cannot be parsed. + pub async fn suggest(&self, index: &str, request: serde_json::Value) -> serde_json::Value { + let resp = self + .client + .post(format!("{}/{}/_suggest", self.base_url, index)) + .json(&request) + .send() + .await + .expect("suggest request"); + resp.json().await.expect("parse suggest response") + } + /// Returns the total hits count from a search response. /// /// # Panics From 095de31303a66a565bf6de012cddbd97ca66e386 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 10 Jun 2026 12:24:53 +0300 Subject: [PATCH 09/12] style: apply cargo fmt --- rust/crates/cloudsearch-index/src/lib.rs | 4 +++- rust/crates/cloudsearch-node/tests/api_smoke.rs | 10 ++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index 4f7517f..18bc27a 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -837,7 +837,9 @@ impl IndexHandle { let mut candidates: Vec = Vec::new(); for (field, field_weight) in &request.fields { - if *field_weight == 0.0 { continue; } + if *field_weight == 0.0 { + continue; + } // Collect suggestions from all segments for this field for reader_opt in &self.suggest_readers { let Some(reader) = reader_opt else { continue }; diff --git a/rust/crates/cloudsearch-node/tests/api_smoke.rs b/rust/crates/cloudsearch-node/tests/api_smoke.rs index 3e72eaf..9f756f2 100644 --- a/rust/crates/cloudsearch-node/tests/api_smoke.rs +++ b/rust/crates/cloudsearch-node/tests/api_smoke.rs @@ -1875,11 +1875,17 @@ async fn suggest_returns_completions_for_prefix() { node.flush("test").await; let resp = node - .suggest("test", serde_json::json!({"prefix": "elast", "fields": {"title": 1.0}, "size": 10})) + .suggest( + "test", + serde_json::json!({"prefix": "elast", "fields": {"title": 1.0}, "size": 10}), + ) .await; let suggestions = resp["suggestions"].as_array().expect("suggestions array"); - assert!(!suggestions.is_empty(), "expected suggestions for 'elast' prefix"); + assert!( + !suggestions.is_empty(), + "expected suggestions for 'elast' prefix" + ); assert_eq!( suggestions[0]["text"].as_str().unwrap(), "elastic", From 16f74b7cc8c6c60f048b59bcb875c68ea139936d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 10 Jun 2026 15:29:14 +0300 Subject: [PATCH 10/12] fix: deduplicate suggest results and reload readers after flush --- rust/crates/cloudsearch-index/src/lib.rs | 46 +++++++++++++----------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index 18bc27a..9a9aa37 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -834,7 +834,9 @@ impl IndexHandle { ) -> cloudsearch_common::SuggestResponse { let prefix = request.prefix.to_ascii_lowercase(); let size = request.size; - let mut candidates: Vec = Vec::new(); + + // Deduplicate by (text, field) while collecting, keeping highest score + let mut seen: BTreeMap<(String, String), cloudsearch_common::Suggestion> = BTreeMap::new(); for (field, field_weight) in &request.fields { if *field_weight == 0.0 { @@ -844,17 +846,24 @@ impl IndexHandle { for reader_opt in &self.suggest_readers { let Some(reader) = reader_opt else { continue }; for entry in reader.suggest_for_field(field, &prefix) { - candidates.push(cloudsearch_common::Suggestion { + let key = (entry.term.clone(), field.clone()); + let suggestion = cloudsearch_common::Suggestion { text: entry.term.clone(), score: entry.score * field_weight, doc_freq: entry.doc_freq, field: Some(field.clone()), - }); + }; + seen.entry(key).and_modify(|e| { + if suggestion.score > e.score { + *e = suggestion.clone(); + } + }).or_insert(suggestion); } } } // Sort by score descending, then by text ascending for tie-breaking + let mut candidates: Vec<_> = seen.into_values().collect(); candidates.sort_by(|a, b| { b.score .partial_cmp(&a.score) @@ -1644,25 +1653,22 @@ impl IndexHandle { terms = suggest_index.total_terms(), "wrote suggest sidecar" ); - // Update suggest_readers to include the newly written segment's suggest index - let suggest_path = self - .segments_dir - .join(format!("suggest_{segment_number:020}.bin")); - let data = tokio::fs::read(&suggest_path) - .await - .map_err(std::io::Error::other)?; - match cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) { - Ok(reader) => { - self.suggest_readers.push(Some(reader)); - tracing::info!( - count = self.suggest_readers.len(), - "added new suggest reader after flush" - ); - } - Err(e) => { - tracing::warn!(path = %suggest_path.as_path().display(), error = %e, "failed to read suggest file after flush"); + // Reload all suggest readers from the updated manifest to avoid accumulation + let mut new_readers = Vec::new(); + for seg in &self.manifest.segments { + let path = self.segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number)); + if let Ok(data) = tokio::fs::read(&path).await + && let Ok(reader) = cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) + && reader.fields().count() > 0 + { + new_readers.push(Some(reader)); } } + self.suggest_readers = new_readers; + tracing::info!( + count = self.suggest_readers.len(), + "reloaded suggest readers after flush" + ); } // Clear per-doc indices now that they're persisted From 0c814492df9a301f7017a30c2d4ddd953bf542a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 10 Jun 2026 15:31:41 +0300 Subject: [PATCH 11/12] style: apply cargo fmt --- rust/crates/cloudsearch-index/src/lib.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index 9a9aa37..fc393eb 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -853,11 +853,13 @@ impl IndexHandle { doc_freq: entry.doc_freq, field: Some(field.clone()), }; - seen.entry(key).and_modify(|e| { - if suggestion.score > e.score { - *e = suggestion.clone(); - } - }).or_insert(suggestion); + seen.entry(key) + .and_modify(|e| { + if suggestion.score > e.score { + *e = suggestion.clone(); + } + }) + .or_insert(suggestion); } } } @@ -1656,9 +1658,12 @@ impl IndexHandle { // Reload all suggest readers from the updated manifest to avoid accumulation let mut new_readers = Vec::new(); for seg in &self.manifest.segments { - let path = self.segments_dir.join(format!("suggest_{:020}.bin", seg.segment_number)); + let path = self + .segments_dir + .join(format!("suggest_{:020}.bin", seg.segment_number)); if let Ok(data) = tokio::fs::read(&path).await - && let Ok(reader) = cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) + && let Ok(reader) = + cloudsearch_storage::suggest_index::SuggestReader::from_bytes(&data) && reader.fields().count() > 0 { new_readers.push(Some(reader)); From 8beb4fce1486eec9baa124861b44cd83bd90c5da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 10 Jun 2026 17:27:54 +0300 Subject: [PATCH 12/12] fix(suggest): address 4 code review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - doc_freq: count unique docs per term, not token occurrences Use BTreeSet per term per field in build_suggest_index to deduplicate within each document before counting. Fix: 'foo foo foo' + 'foo bar' → doc_freq('foo') = 2, not 4. - NaN panic: replace .unwrap() with .unwrap_or(Ordering::Equal) in suggest() sort comparator. NaN scores now sort to equality instead of panicking. - Stale suggest_readers: reload all suggest sidecar readers in apply_merge_plan after manifest update, mirroring the positions_readers reload pattern. Prevents stale readers pointing to non-existent sidecar files after a merge. - Empty prefix: return Vec::new() for empty prefix in suggest_for_field(). Previously '' returned the entire vocabulary via starts_with(''). - docs: add POST /{index}/_suggest to api-v1.md with request/ response shapes and implementation notes. - docs: add ADR 0004 for the autocomplete suggest index design. --- docs/adr/0004-autocomplete-suggest-index.md | 77 ++++++++++++++++++ docs/api-v1.md | 50 ++++++++++++ rust/crates/cloudsearch-index/src/lib.rs | 78 ++++++++++++++++--- .../cloudsearch-storage/src/suggest_index.rs | 18 +++++ 4 files changed, 214 insertions(+), 9 deletions(-) create mode 100644 docs/adr/0004-autocomplete-suggest-index.md diff --git a/docs/adr/0004-autocomplete-suggest-index.md b/docs/adr/0004-autocomplete-suggest-index.md new file mode 100644 index 0000000..76a0dcb --- /dev/null +++ b/docs/adr/0004-autocomplete-suggest-index.md @@ -0,0 +1,77 @@ +# ADR 0004: Autocomplete Suggest Index + +## Status +Accepted + +## Date +2026-06-09 + +## Context + +Users need fast autocomplete / as-you-type suggestions — returning completion candidates as the user types, before they finish a word. This is a read-heavy, latency-sensitive path (sub-10ms target). The feature should return term completions, not full documents. + +Requirements: +- Sub-10ms latency for prefix lookup +- Multi-field support with per-field weights +- Score by term popularity across the corpus +- Completion suggestions (terms/phrases), not document results + +## Decision + +### Binary Serialization with Sorted Term Arrays + +Each field's vocabulary is stored as a sorted `Vec` (term, doc_freq, score) in a binary sidecar file (`suggest_{segment:020}.bin`). The format: + +``` +MAGIC (4) + VERSION (1) + PADDING (3) + FIELD_COUNT (4) +Per field: FIELD_NAME_LEN (4) + FIELD_NAME + TERM_COUNT (4) +Per term: STR_LEN (4) + TERM_BYTES + DOC_FREQ (4) + SCORE (4) +``` + +O(log n + m) prefix lookup via binary search (`find_first_prefix`) where n = vocabulary size, m = matching terms. No in-memory index build at query time. + +### Atomic Writes via .tmp then Rename + +Flush writes to `suggest_{seg:020}.tmp`, then atomically renames to `.bin`. Readers load from `.bin` only — never from `.tmp`. This guarantees readers always see consistent data. + +### Per-Segment Sidecar Files + +Each segment has its own suggest sidecar file. At query time, all segment sidecars are queried and results merged. This avoids rebuilding the entire suggest index on every flush — only the new segment's sidecar is written. + +### doc_freq = Unique Document Count per Term + +`doc_freq` counts the number of **unique documents** that contain each term in a field, not the number of token occurrences. This is the standard document-frequency semantics used by search engines. + +### Segment Reader Management + +- On `open_index`: all suggest sidecars loaded into `suggest_readers` Vec +- On `flush`: all suggest sidecars reloaded from manifest (not just the new one) to avoid reader accumulation +- On `merge`: all suggest sidecars reloaded after manifest update since old sidecars are invalidated + +## Consequences + +### Positive +- Sub-10ms lookup: binary search on sorted arrays is O(log n), no in-memory index build +- Atomic writes prevent readers from seeing partial data +- Per-segment sidecars mean flush only writes one new file, not the full vocabulary +- doc_freq semantics match standard IR practice + +### Negative +- doc_freq is frozen at flush time — doesn't account for deletes or updates until next flush +- Each segment's suggest data is independent; cross-segment deduplication happens at query time (in-memory BTreeMap) +- Empty prefix returns all terms in lexical order (could be large); guarded to return empty instead + +### Neutral +- The suggest sidecar is separate from the positions sidecar — two separate files per segment +- Segment readers are held in memory; memory usage grows with segment count × vocabulary size + +## Alternatives Considered + +### Alternative 1: In-Memory Trie +**Why rejected:** A trie would require rebuilding the entire suggest index on every flush. With large vocabularies this becomes expensive. The sorted-array binary search achieves the same O(log n + m) lookup while allowing per-segment incremental updates. + +### Alternative 2: Generic B-Tree Index (e.g., RedBTree) +**Why rejected:** Adds a heavy dependency for a read-heavy, append-mostly workload. The binary serialized sorted arrays are simpler, have no runtime dependency, and serialize/deserialize cheaply. + +### Alternative 3: Store suggest data inline in segment snapshot +**Why rejected:** Suggests are built during flush from the full document set; storing them inline in the segment snapshot would require re-reading all documents to rebuild suggests on every merge. Separate sidecar files allow incremental rebuilds from the merged document set only. \ No newline at end of file diff --git a/docs/api-v1.md b/docs/api-v1.md index f043919..5633290 100644 --- a/docs/api-v1.md +++ b/docs/api-v1.md @@ -50,6 +50,56 @@ Current implementation notes: - `POST /{index}/_search` — search with JSON body - `GET /{index}/_search?q=...` — search with query string +- `POST /{index}/_suggest` — autocomplete suggestions for a prefix + +### Suggest Request Shape + +```json +{ + "prefix": "elast", + "fields": { "title": 1.0, "body": 0.5 }, + "size": 10, + "fuzzy": { "fuzziness": "AUTO" } +} +``` + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `prefix` | `string` | required | The prefix to suggest completions for | +| `fields` | `map` | `{}` | Fields to search with their weights (0 = excluded) | +| `size` | `integer` | `10` | Maximum number of suggestions to return | +| `fuzzy` | `object` | none | Optional fuzzy matching; omit for exact prefix only | + +### Suggest Response Shape + +```json +{ + "suggestions": [ + { + "text": "elastic", + "score": 0.6667, + "doc_freq": 2, + "field": "title" + } + ] +} +``` + +| Field | Type | Description | +|-------|------|-------------| +| `text` | `string` | The completion suggestion (tokenized, lowercase) | +| `score` | `float` | Normalized popularity score (`doc_freq / n_docs`) | +| `doc_freq` | `integer` | Number of documents containing this term | +| `field` | `string?` | Which field contributed this suggestion | + +### Implementation Notes + +- Suggestions are built during flush from indexed text fields (type `keyword`) +- Each field's vocabulary is sorted and stored in a binary sidecar file for O(log n + m) prefix lookup +- `doc_freq` counts **unique documents** per term, not token occurrences +- Empty prefix (`""`) returns no results +- Scores are computed as `doc_freq / n_docs` where `n_docs` is the total documents at flush time +- Fuzzy matching uses edit distance when `fuzzy` is provided ## Observability API diff --git a/rust/crates/cloudsearch-index/src/lib.rs b/rust/crates/cloudsearch-index/src/lib.rs index fc393eb..bd9022f 100644 --- a/rust/crates/cloudsearch-index/src/lib.rs +++ b/rust/crates/cloudsearch-index/src/lib.rs @@ -779,6 +779,11 @@ impl IndexHandle { self.manifest = new_manifest; self.searchable_documents = merged; + // Reload suggest readers for all segments — the old ones are stale + // after the merge (their sidecar files no longer exist). + self.suggest_readers = + IndexCatalog::load_all_suggest_readers(&self.segments_dir, &self.manifest).await; + // Reload positions reader for the new merged segment only, since // the old segment sidecars were invalidated by the merge. self.positions_readers.clear(); @@ -869,7 +874,7 @@ impl IndexHandle { candidates.sort_by(|a, b| { b.score .partial_cmp(&a.score) - .unwrap() + .unwrap_or(std::cmp::Ordering::Equal) .then_with(|| a.text.cmp(&b.text)) }); candidates.truncate(size); @@ -1374,13 +1379,18 @@ impl IndexHandle { } /// Builds a suggest index from a list of documents for autocomplete. - #[allow(clippy::cast_precision_loss)] + #[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation)] fn build_suggest_index( documents: &[IndexDocument], mappings: &BTreeMap, ) -> cloudsearch_storage::suggest_index::SuggestIndex { let n_docs = documents.len().max(1) as f64; - let mut field_terms: BTreeMap> = BTreeMap::new(); + // BTreeMap>> + // BTreeSet deduplicates doc_ids per term per field → doc_freq = unique doc count + let mut field_terms: BTreeMap< + String, + BTreeMap>, + > = BTreeMap::new(); for doc in documents { if let Some(obj) = doc.source.as_object() { @@ -1397,22 +1407,25 @@ impl IndexHandle { let Some(text) = field_value.as_str() else { continue; }; - let tokens = tokenize(text); + // Deduplicate tokens within this document using BTreeSet + let tokens: std::collections::BTreeSet<_> = + tokenize(text).into_iter().collect(); - let field_freqs = field_terms.entry(field_name.clone()).or_default(); + let doc_set = field_terms.entry(field_name.clone()).or_default(); for token in tokens { - *field_freqs.entry(token).or_insert(0) += 1; + doc_set.entry(token).or_default().insert(doc.id.clone()); } } } } let mut index = cloudsearch_storage::suggest_index::SuggestIndex::new(); - for (field, term_counts) in field_terms { - let mut entries: Vec = term_counts + for (field, term_doc_ids) in field_terms { + let mut entries: Vec = term_doc_ids .into_iter() - .map(|(term, doc_freq)| { + .map(|(term, doc_ids)| { #[allow(clippy::cast_possible_truncation, clippy::cast_lossless)] + let doc_freq = doc_ids.len() as u32; let score = (f64::from(doc_freq) / n_docs) as f32; cloudsearch_storage::suggest_index::SuggestEntry { term, @@ -6982,4 +6995,51 @@ mod tests { }; assert_eq!(fuzzy_term_match(&doc, &term), None); // fuzzy requires string } + + #[test] + fn build_suggest_index_counts_unique_docs_not_token_occurrences() { + // Regression: doc_freq must count unique documents per term, + // not the number of token occurrences. + // "foo foo foo" and "foo bar" in the same field → doc_freq("foo") = 2, not 4. + let documents = vec![ + IndexDocument { + id: "1".to_string(), + source: serde_json::json!({"title": "foo foo foo"}), + }, + IndexDocument { + id: "2".to_string(), + source: serde_json::json!({"title": "foo bar"}), + }, + ]; + let mappings = BTreeMap::from([( + "title".to_string(), + FieldMapping { + field_type: FieldType::Keyword, + }, + )]); + + let index = IndexHandle::build_suggest_index(&documents, &mappings); + let title_entries = index.fields.get("title").expect("title field"); + + let foo_entry = title_entries + .iter() + .find(|e| e.term == "foo") + .expect("foo term must exist"); + // Two documents contain "foo" (doc "1" once, doc "2" once) → doc_freq = 2 + assert_eq!( + foo_entry.doc_freq, 2, + "doc_freq must be 2 (unique docs), got {} (token occurrences?)", + foo_entry.doc_freq + ); + + let bar_entry = title_entries + .iter() + .find(|e| e.term == "bar") + .expect("bar term must exist"); + assert_eq!( + bar_entry.doc_freq, 1, + "doc_freq for 'bar' must be 1 (only doc 2), got {}", + bar_entry.doc_freq + ); + } } diff --git a/rust/crates/cloudsearch-storage/src/suggest_index.rs b/rust/crates/cloudsearch-storage/src/suggest_index.rs index a5b77d9..d12e9e2 100644 --- a/rust/crates/cloudsearch-storage/src/suggest_index.rs +++ b/rust/crates/cloudsearch-storage/src/suggest_index.rs @@ -188,6 +188,9 @@ impl SuggestReader { field: &'a str, prefix: &'a str, ) -> Vec<&'a SuggestEntry> { + if prefix.is_empty() { + return Vec::new(); + } let Some(start) = self.find_first_prefix(field, prefix) else { return Vec::new(); }; @@ -404,4 +407,19 @@ mod tests { assert_eq!(desc_entries.len(), 1); assert_eq!(desc_entries[0].term, "rust"); } + + #[test] + fn suggest_for_field_returns_empty_for_empty_prefix() { + let entries = make_entries(); + let reader = SuggestReader { + fields: std::collections::BTreeMap::from([("title".to_string(), entries)]), + }; + + // Empty prefix should return no results, not the entire vocabulary + let suggestions = reader.suggest_for_field("title", ""); + assert!( + suggestions.is_empty(), + "empty prefix must not return all terms" + ); + } }