From cf8f369e407bd0b10ed63f6bea25a077dc63f739 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 22 Apr 2026 15:10:02 +0200 Subject: [PATCH] sort batch docs by partition to reduce mem usage lookups Sort the incoming ProcessedDocBatch by partition and group with chunk_by so get_or_create_indexed_split and the index_writer mem_usage() probes are done once per partition group instead of once per doc. --- .../quickwit-indexing/src/actors/indexer.rs | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 3d272caf11a..60c5b1fd081 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -145,7 +145,7 @@ impl IndexerState { last_delete_opstamp: u64, splits: &'a mut FnvHashMap, other_split_opt: &'a mut Option, - counter: &'a mut IndexerCounters, + num_docs_in_workbench: u64, ctx: &ActorContext, ) -> anyhow::Result<(&'a mut IndexedSplitBuilder, bool)> { let num_splits = splits.len(); @@ -157,7 +157,7 @@ impl IndexerState { // `OTHER` special partition. if other_split_opt.is_none() { warn!( - num_docs_in_workbench = counter.num_docs_in_workbench, + num_docs_in_workbench = num_docs_in_workbench, max_num_partition = self.max_num_partitions.get(), "Exceeding max_num_partition" ); @@ -272,7 +272,7 @@ impl IndexerState { async fn index_batch( &self, - batch: ProcessedDocBatch, + mut batch: ProcessedDocBatch, indexing_workbench_opt: &mut Option, counters: &mut IndexerCounters, ctx: &ActorContext, @@ -297,22 +297,19 @@ impl IndexerState { .source_delta .extend(batch.checkpoint_delta) .context("batch delta does not follow indexer checkpoint")?; - let mut memory_usage_delta: i64 = 0; counters.num_doc_batches_in_workbench += 1; - for doc in batch.docs { - let ProcessedDoc { - doc, - timestamp_opt, - partition, - num_bytes, - } = doc; - counters.num_docs_in_workbench += 1; + batch.docs.sort_unstable_by_key(|doc| doc.partition); + // sort by partition to minimize the number of splits we need to write to for this batch. + + let docs_grouped_by_partition = batch.docs.into_iter().chunk_by(|doc| doc.partition); + for (partition, docs) in &docs_grouped_by_partition { + let mut memory_usage_delta: i64 = 0; let (indexed_split, split_created) = self.get_or_create_indexed_split( partition, *last_delete_opstamp, indexed_splits, other_indexed_split_opt, - counters, + counters.num_docs_in_workbench, ctx, )?; let mem_usage_before = indexed_split.index_writer.mem_usage() as u64; @@ -321,21 +318,31 @@ impl IndexerState { // memory usage. memory_usage_delta += mem_usage_before as i64; } - indexed_split.split_attrs.uncompressed_docs_size_in_bytes += num_bytes as u64; - indexed_split.split_attrs.num_docs += 1; - if let Some(timestamp) = timestamp_opt { - record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range); + + for doc in docs { + let ProcessedDoc { + doc, + timestamp_opt, + num_bytes, + partition: _, + } = doc; + counters.num_docs_in_workbench += 1; + indexed_split.split_attrs.uncompressed_docs_size_in_bytes += num_bytes as u64; + indexed_split.split_attrs.num_docs += 1; + if let Some(timestamp) = timestamp_opt { + record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range); + } + let _protect_guard = ctx.protect_zone(); + indexed_split + .index_writer + .add_document(doc) + .context("failed to add document")?; + ctx.record_progress(); } - let _protect_guard = ctx.protect_zone(); - indexed_split - .index_writer - .add_document(doc) - .context("failed to add document")?; let mem_usage_after = indexed_split.index_writer.mem_usage() as u64; memory_usage_delta += mem_usage_after as i64 - mem_usage_before as i64; - ctx.record_progress(); + memory_usage.add(memory_usage_delta); } - memory_usage.add(memory_usage_delta); Ok(()) } }