Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl IndexerState {
last_delete_opstamp: u64,
splits: &'a mut FnvHashMap<u64, IndexedSplitBuilder>,
other_split_opt: &'a mut Option<IndexedSplitBuilder>,
counter: &'a mut IndexerCounters,
num_docs_in_workbench: u64,
ctx: &ActorContext<Indexer>,
) -> anyhow::Result<(&'a mut IndexedSplitBuilder, bool)> {
let num_splits = splits.len();
Expand All @@ -157,7 +157,7 @@ impl IndexerState {
// `OTHER` special partition.
if other_split_opt.is_none() {
warn!(
num_docs_in_workbench = counter.num_docs_in_workbench,
num_docs_in_workbench = num_docs_in_workbench,
max_num_partition = self.max_num_partitions.get(),
"Exceeding max_num_partition"
);
Expand Down Expand Up @@ -272,7 +272,7 @@ impl IndexerState {

async fn index_batch(
&self,
batch: ProcessedDocBatch,
mut batch: ProcessedDocBatch,
indexing_workbench_opt: &mut Option<IndexingWorkbench>,
counters: &mut IndexerCounters,
ctx: &ActorContext<Indexer>,
Expand All @@ -297,22 +297,19 @@ impl IndexerState {
.source_delta
.extend(batch.checkpoint_delta)
.context("batch delta does not follow indexer checkpoint")?;
let mut memory_usage_delta: i64 = 0;
counters.num_doc_batches_in_workbench += 1;
for doc in batch.docs {
let ProcessedDoc {
doc,
timestamp_opt,
partition,
num_bytes,
} = doc;
counters.num_docs_in_workbench += 1;
batch.docs.sort_unstable_by_key(|doc| doc.partition);
// sort by partition to minimize the number of splits we need to write to for this batch.

let docs_grouped_by_partition = batch.docs.into_iter().chunk_by(|doc| doc.partition);
for (partition, docs) in &docs_grouped_by_partition {
let mut memory_usage_delta: i64 = 0;
let (indexed_split, split_created) = self.get_or_create_indexed_split(
partition,
*last_delete_opstamp,
indexed_splits,
other_indexed_split_opt,
counters,
counters.num_docs_in_workbench,
ctx,
)?;
let mem_usage_before = indexed_split.index_writer.mem_usage() as u64;
Expand All @@ -321,21 +318,31 @@ impl IndexerState {
// memory usage.
memory_usage_delta += mem_usage_before as i64;
}
indexed_split.split_attrs.uncompressed_docs_size_in_bytes += num_bytes as u64;
indexed_split.split_attrs.num_docs += 1;
if let Some(timestamp) = timestamp_opt {
record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range);

for doc in docs {
let ProcessedDoc {
doc,
timestamp_opt,
num_bytes,
partition: _,
} = doc;
counters.num_docs_in_workbench += 1;
indexed_split.split_attrs.uncompressed_docs_size_in_bytes += num_bytes as u64;
indexed_split.split_attrs.num_docs += 1;
if let Some(timestamp) = timestamp_opt {
record_timestamp(timestamp, &mut indexed_split.split_attrs.time_range);
}
let _protect_guard = ctx.protect_zone();
indexed_split
.index_writer
.add_document(doc)
.context("failed to add document")?;
ctx.record_progress();
}
let _protect_guard = ctx.protect_zone();
indexed_split
.index_writer
.add_document(doc)
.context("failed to add document")?;
let mem_usage_after = indexed_split.index_writer.mem_usage() as u64;
memory_usage_delta += mem_usage_after as i64 - mem_usage_before as i64;
ctx.record_progress();
memory_usage.add(memory_usage_delta);
}
memory_usage.add(memory_usage_delta);
Ok(())
}
}
Expand Down
Loading