diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 68d9ad60f96..5922b140e68 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7150,6 +7150,8 @@ version = "0.8.0" dependencies = [ "anyhow", "async-trait", + "itertools 0.14.0", + "once_cell", "quickwit-actors", "quickwit-common", "quickwit-config", diff --git a/quickwit/quickwit-compaction/Cargo.toml b/quickwit/quickwit-compaction/Cargo.toml index c018327c8c4..556fe491a5b 100644 --- a/quickwit/quickwit-compaction/Cargo.toml +++ b/quickwit/quickwit-compaction/Cargo.toml @@ -26,6 +26,8 @@ time = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } ulid = { workspace = true } +itertools = "0.14.0" +once_cell = "1.21.4" [dev-dependencies] quickwit-actors = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-compaction/src/compaction_pipeline.rs b/quickwit/quickwit-compaction/src/compaction_pipeline.rs index 270cb5312d9..d481c66b62a 100644 --- a/quickwit/quickwit-compaction/src/compaction_pipeline.rs +++ b/quickwit/quickwit-compaction/src/compaction_pipeline.rs @@ -30,9 +30,10 @@ use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::{IndexUid, SourceId, SplitId}; -use tracing::{debug, error, info}; +use tracing::{error, info}; -use crate::TaskId; +use crate::metrics::COMPACTOR_METRICS; +use crate::{TaskId, source_uid_metrics_label}; #[derive(Clone, Debug, PartialEq)] pub enum PipelineStatus { @@ -46,6 +47,7 @@ pub struct PipelineStatusUpdate { pub index_uid: IndexUid, pub source_id: SourceId, pub split_ids: Vec, + pub merge_level: u64, pub status: PipelineStatus, } @@ -94,6 +96,7 @@ pub struct CompactionPipeline { io_throughput_limiter: Option, max_concurrent_split_uploads: usize, event_broker: EventBroker, + pipeline_start: Option, } impl CompactionPipeline { @@ -128,6 +131,7 @@ impl CompactionPipeline { io_throughput_limiter, max_concurrent_split_uploads, event_broker, + pipeline_start: None, } } @@ -193,17 +197,32 @@ impl CompactionPipeline { } if !failure_actor_names.is_empty() { + self.record_pipeline_duration(); let error_msg = format!("failed actors: {:?}", failure_actor_names); error!(task_id=%self.task_id, "{error_msg}"); self.status = PipelineStatus::Failed { error: error_msg }; return; } if !has_healthy { - debug!(task_id=%self.task_id, "all compaction pipeline actors completed"); + self.record_pipeline_duration(); + info!(task_id=%self.task_id, "all compaction pipeline actors completed"); self.status = PipelineStatus::Completed; } } + fn record_pipeline_duration(&self) { + if let Some(pipeline_start_time) = self.pipeline_start { + let elapsed = pipeline_start_time.elapsed().as_secs_f64(); + let merge_level = self.merge_operation.merge_level(); + let index_label = + source_uid_metrics_label(&self.pipeline_id.index_uid, &self.pipeline_id.source_id); + COMPACTOR_METRICS + .compaction_duration + .with_label_values([index_label.as_str(), &merge_level.to_string()]) + .observe(elapsed); + } + } + fn build_status_update(&self) -> PipelineStatusUpdate { PipelineStatusUpdate { task_id: self.task_id.clone(), @@ -216,6 +235,7 @@ impl CompactionPipeline { .map(|split| split.split_id().to_string()) .collect(), status: self.status.clone(), + merge_level: self.merge_operation.merge_level() as u64, } } @@ -295,6 +315,8 @@ impl CompactionPipeline { .set_kill_switch(self.kill_switch.child()) .spawn(merge_split_downloader); + let now = Instant::now(); + self.pipeline_start = Some(now); // Kick off the pipeline. merge_split_downloader_mailbox .try_send_message(self.merge_operation.clone()) diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs index 188125689aa..5b813cc5b98 100644 --- a/quickwit/quickwit-compaction/src/compactor_supervisor.rs +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -35,9 +35,11 @@ use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::NodeId; use quickwit_storage::StorageResolver; -use tracing::{error, info, warn}; +use tracing::{error, info}; use crate::compaction_pipeline::{CompactionPipeline, PipelineStatus, PipelineStatusUpdate}; +use crate::metrics::COMPACTOR_METRICS; +use crate::source_uid_metrics_label; const CHECK_PIPELINE_STATUSES_INTERVAL: Duration = Duration::from_secs(1); @@ -207,15 +209,24 @@ impl CompactorSupervisor { .iter() .filter(|s| matches!(s.status, PipelineStatus::InProgress)) .count(); - let available_slots = (self.pipelines.len() - in_progress_count) as u32; + let available_slots = (self.pipelines.len() - in_progress_count) as i64; + COMPACTOR_METRICS.available_slots.set(available_slots); let mut in_progress = Vec::new(); let mut successes = Vec::new(); let mut failures = Vec::new(); for update in statuses { + let merge_level = update.merge_level; + let index_label = source_uid_metrics_label(&update.index_uid, &update.source_id); + let label_values = [index_label.as_str(), &merge_level.to_string()]; + match &update.status { PipelineStatus::InProgress => { + COMPACTOR_METRICS + .compactions_in_progress + .with_label_values(label_values) + .inc(); in_progress.push(CompactionInProgress { task_id: update.task_id.clone(), index_uid: Some(update.index_uid.clone()), @@ -224,11 +235,27 @@ impl CompactorSupervisor { }); } PipelineStatus::Completed => { + COMPACTOR_METRICS + .compactions_in_progress + .with_label_values(label_values) + .dec(); + COMPACTOR_METRICS + .compactions_succeeded + .with_label_values(label_values) + .inc(); successes.push(CompactionSuccess { task_id: update.task_id.clone(), }); } PipelineStatus::Failed { error } => { + COMPACTOR_METRICS + .compactions_in_progress + .with_label_values(label_values) + .dec(); + COMPACTOR_METRICS + .compactions_failed + .with_label_values(label_values) + .inc(); failures.push(CompactionFailure { task_id: update.task_id.clone(), error_message: error.clone(), @@ -239,7 +266,7 @@ impl CompactorSupervisor { ReportStatusRequest { node_id: self.node_id.to_string(), - available_slots, + available_slots: available_slots as u32, in_progress, successes, failures, @@ -287,7 +314,7 @@ impl Handler for CompactorSupervisor { .await; } Err(error) => { - warn!(%error, "failed to report status to compaction planner"); + error!(%error, "failed to report status to compaction planner"); } } ctx.schedule_self_msg(CHECK_PIPELINE_STATUSES_INTERVAL, CheckPipelineStatuses); @@ -413,6 +440,7 @@ mod tests { index_uid: Some(index_metadata.index_uid.clone()), source_id: "test-source".to_string(), index_storage_uri: config.index_uri.to_string(), + merge_level: 1, } } @@ -572,6 +600,7 @@ mod tests { source_id: "src".to_string(), split_ids: vec!["s1".to_string(), "s2".to_string()], status: PipelineStatus::InProgress, + merge_level: 1, }, PipelineStatusUpdate { task_id: "task-2".to_string(), @@ -579,6 +608,7 @@ mod tests { source_id: "src".to_string(), split_ids: vec!["s3".to_string()], status: PipelineStatus::Completed, + merge_level: 1, }, PipelineStatusUpdate { task_id: "task-3".to_string(), @@ -588,6 +618,7 @@ mod tests { status: PipelineStatus::Failed { error: "boom".to_string(), }, + merge_level: 1, }, ]; diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs index 352f153b4fe..44948bc70ea 100644 --- a/quickwit/quickwit-compaction/src/lib.rs +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -18,6 +18,7 @@ mod compaction_pipeline; #[allow(dead_code)] mod compactor_supervisor; +mod metrics; pub mod planner; pub type TaskId = String; @@ -33,10 +34,14 @@ use quickwit_config::CompactorConfig; use quickwit_indexing::IndexingSplitCache; use quickwit_proto::compaction::CompactionPlannerServiceClient; use quickwit_proto::metastore::MetastoreServiceClient; -use quickwit_proto::types::NodeId; +use quickwit_proto::types::{IndexUid, NodeId, SourceId}; use quickwit_storage::StorageResolver; use tracing::info; +pub fn source_uid_metrics_label(index_uid: &IndexUid, source_id: &SourceId) -> String { + format!("{}-{}", index_uid, source_id) +} + #[allow(clippy::too_many_arguments)] pub async fn start_compactor_service( universe: &Universe, diff --git a/quickwit/quickwit-compaction/src/metrics.rs b/quickwit/quickwit-compaction/src/metrics.rs new file mode 100644 index 00000000000..1d6bb234065 --- /dev/null +++ b/quickwit/quickwit-compaction/src/metrics.rs @@ -0,0 +1,75 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use once_cell::sync::Lazy; +use quickwit_common::metrics::{ + HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, exponential_buckets, new_counter_vec, + new_gauge, new_gauge_vec, new_histogram_vec, +}; + +pub struct CompactorMetrics { + pub compactions_in_progress: IntGaugeVec<2>, + pub compactions_failed: IntCounterVec<2>, + pub compactions_succeeded: IntCounterVec<2>, + pub available_slots: IntGauge, + pub compaction_duration: HistogramVec<2>, +} + +fn compaction_duration_buckets() -> Vec { + exponential_buckets(0.5, 2.0, 14).expect("compaction duration buckets should be valid") +} + +impl Default for CompactorMetrics { + fn default() -> Self { + CompactorMetrics { + compactions_in_progress: new_gauge_vec( + "compactions_in_progress", + "number of compaction merge operations currently running on this compactor", + "compactor", + &[], + ["source_uid", "merge_level"], + ), + compactions_failed: new_counter_vec( + "compactions_failed", + "total number of compaction merge operations that have failed", + "compactor", + &[], + ["source_uid", "merge_level"], + ), + compactions_succeeded: new_counter_vec( + "compactions_succeeded", + "total number of compaction merge operations that have completed successfully", + "compactor", + &[], + ["source_uid", "merge_level"], + ), + available_slots: new_gauge( + "available_slots", + "number of compaction slots currently available on this compactor", + "compactor", + &[], + ), + compaction_duration: new_histogram_vec( + "compaction_duration_seconds", + "duration of compaction merge operations in seconds", + "compactor", + &[], + ["source_uid", "merge_level"], + compaction_duration_buckets(), + ), + } + } +} + +pub static COMPACTOR_METRICS: Lazy = Lazy::new(CompactorMetrics::default); diff --git a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs index 57144df0088..94bfebacb54 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs @@ -17,6 +17,7 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; +use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler}; use quickwit_indexing::merge_policy::MergeOperation; use quickwit_metastore::{ @@ -28,11 +29,12 @@ use quickwit_proto::compaction::{ use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId, SourceId}; use time::OffsetDateTime; -use tracing::error; +use tracing::{error, info}; use ulid::Ulid; use super::compaction_state::CompactionState; use super::index_config_metastore::{IndexConfigMetastore, IndexEntry}; +use crate::planner::metrics::COMPACTION_PLANNER_METRICS; pub struct CompactionPlanner { state: CompactionState, @@ -68,7 +70,7 @@ impl Actor for CompactionPlanner { fn observable_state(&self) -> Self::ObservableState {} async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { - tracing::info!("compaction planner starting, scanning metastore for immature splits"); + info!("compaction planner starting, scanning metastore for immature splits"); ctx.schedule_self_msg(INITIAL_SCAN_AND_PLAN_INTERVAL, ScanAndPlan); Ok(()) } @@ -140,6 +142,7 @@ impl CompactionPlanner { continue; } self.cursor = self.cursor.max(split.update_timestamp); + info!(max_timestamp=%self.cursor, "[compaction planner] update metastore cursor min_timestamp cursor"); self.state.track_split(split.split_metadata); } } @@ -153,9 +156,24 @@ impl CompactionPlanner { let splits = self .metastore .list_splits(request) - .await? + .await + .inspect_err(|error| { + error!(%error, "[compaction-planner] error calling metastore list_splits"); + COMPACTION_PLANNER_METRICS + .metastore_errors + .with_label_values(["scan"]) + .inc(); + })? .collect_splits() - .await?; + .await + .inspect_err(|error| { + error!(%error, "[compaction-planner] error collecting metastore splits"); + COMPACTION_PLANNER_METRICS + .metastore_errors + .with_label_values(["collect_splits"]) + .inc(); + })?; + emit_metastore_scan_metrics(&splits); Ok(splits) } @@ -163,6 +181,7 @@ impl CompactionPlanner { let splits = self.scan_metastore().await?; self.ingest_splits(splits).await; self.run_merge_policies(); + self.state.emit_metrics(); Ok(()) } @@ -208,6 +227,20 @@ impl CompactionPlanner { } } +fn emit_metastore_scan_metrics(new_splits: &[Split]) { + let size = new_splits.len(); + info!(%size, "[compaction planner] new splits scanned from metastore"); + let counts = new_splits + .iter() + .counts_by(|split| &split.split_metadata.index_uid); + for (&index_uid, &count) in counts.iter() { + COMPACTION_PLANNER_METRICS + .new_splits_scanned + .with_label_values([&index_uid.to_string()]) + .set(count as i64); + } +} + fn build_task_assignment( task_id: &str, index_entry: &IndexEntry, @@ -231,6 +264,7 @@ fn build_task_assignment( index_uid: Some(index_uid.clone()), source_id: source_id.to_string(), index_storage_uri: index_entry.index_storage_uri(), + merge_level: operation.merge_level() as u64, } } diff --git a/quickwit/quickwit-compaction/src/planner/compaction_state.rs b/quickwit/quickwit-compaction/src/planner/compaction_state.rs index 1bc1fb91cc8..c9a5d104c19 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_state.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_state.rs @@ -13,16 +13,19 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; +use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, Instant}; +use itertools::Itertools; use quickwit_indexing::merge_policy::{MergeOperation, MergePolicy}; use quickwit_metastore::SplitMetadata; use quickwit_proto::compaction::{CompactionFailure, CompactionInProgress, CompactionSuccess}; use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId, SourceId, SplitId}; use tracing::{error, info, warn}; -use crate::TaskId; +use crate::planner::metrics::COMPACTION_PLANNER_METRICS; +use crate::{TaskId, source_uid_metrics_label}; const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60); @@ -177,6 +180,7 @@ impl CompactionState { for task_id in timed_out_task_ids { if let Some(inflight) = self.in_flight.remove(&task_id) { error!(%task_id, node_id=%inflight.node_id, "compaction task timed out"); + COMPACTION_PLANNER_METRICS.timed_out_operations.inc(); for split_id in &inflight.split_ids { self.in_flight_split_ids.remove(split_id.as_str()); } @@ -206,6 +210,56 @@ impl CompactionState { }, ); } + + pub fn emit_metrics(&self) { + // total number of splits that need to be merged by compaction partition key + self.needs_compaction + .iter() + .map(|(compaction_partition_key, splits)| { + ( + source_uid_metrics_label( + &compaction_partition_key.index_uid, + &compaction_partition_key.source_id, + ), + splits.len() as i64, + ) + }) + .into_grouping_map() + .sum() + .iter() + .for_each(|(partition_key, &total_splits)| { + COMPACTION_PLANNER_METRICS + .splits_needing_compaction + .with_label_values([partition_key.as_str()]) + .set(total_splits) + }); + + // merge operations by index_uid/merge level + self.pending_operations + .iter() + .map(|(compaction_partition_key, merge_operation)| { + // The 1s get summed up to give the total number of operations per index per level. + ( + ( + source_uid_metrics_label( + &compaction_partition_key.index_uid, + &compaction_partition_key.source_id, + ), + merge_operation.merge_level() as i64, + ), + 1, + ) + }) + .into_grouping_map() + .sum() + .iter() + .for_each(|((partition_key, merge_level), &count)| { + COMPACTION_PLANNER_METRICS + .pending_merge_operations + .with_label_values([partition_key.as_str(), &merge_level.to_string()]) + .set(count); + }); + } } #[cfg(test)] diff --git a/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs b/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs index 5fe2a9a6bdc..e2d42d99ba2 100644 --- a/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs +++ b/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs @@ -21,6 +21,9 @@ use quickwit_indexing::merge_policy::{MergePolicy, merge_policy_from_settings}; use quickwit_metastore::{IndexMetadataResponseExt, SplitMaturity, SplitMetadata}; use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{DocMappingUid, IndexUid}; +use tracing::error; + +use crate::planner::metrics::COMPACTION_PLANNER_METRICS; /// Everything the planner needs to know about a single index. pub struct IndexEntry { @@ -99,7 +102,15 @@ impl IndexConfigMetastore { index_uid: Some(index_uid.clone()), index_id: None, }) - .await?; + .await + .inspect_err(|error| { + error!(%error, "[compaction-planner] error getting index metadata from metastore"); + COMPACTION_PLANNER_METRICS + .metastore_errors + .with_label_values(["index_metadata"]) + .inc(); + })?; + let index_metadata = response.deserialize_index_metadata()?; let doc_mapper = build_doc_mapper( diff --git a/quickwit/quickwit-compaction/src/planner/metrics.rs b/quickwit/quickwit-compaction/src/planner/metrics.rs new file mode 100644 index 00000000000..2a4b2d9d4c7 --- /dev/null +++ b/quickwit/quickwit-compaction/src/planner/metrics.rs @@ -0,0 +1,68 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use once_cell::sync::Lazy; +use quickwit_common::metrics::{IntGauge, IntGaugeVec, new_gauge, new_gauge_vec}; + +pub struct CompactionPlannerMetrics { + pub new_splits_scanned: IntGaugeVec<1>, + pub splits_needing_compaction: IntGaugeVec<1>, + pub pending_merge_operations: IntGaugeVec<2>, + pub timed_out_operations: IntGauge, + pub metastore_errors: IntGaugeVec<1>, +} + +impl Default for CompactionPlannerMetrics { + fn default() -> Self { + CompactionPlannerMetrics { + new_splits_scanned: new_gauge_vec( + "new_splits_scanned", + "number of new immature splits scanned from the metastore on the last tick", + "compaction_planner", + &[], + ["source_uid"], + ), + splits_needing_compaction: new_gauge_vec( + "splits_needing_compaction", + "number of splits currently tracked as needing compaction", + "compaction_planner", + &[], + ["source_uid"], + ), + pending_merge_operations: new_gauge_vec( + "pending_merge_operations", + "number of pending merge operations awaiting assignment", + "compaction_planner", + &[], + ["source_uid", "merge_level"], + ), + timed_out_operations: new_gauge( + "timed_out_operations", + "number of merge operations that timed out waiting for a worker heartbeat", + "compaction_planner", + &[], + ), + metastore_errors: new_gauge_vec( + "metastore_errors", + "number of metastore errors encountered by the compaction planner", + "compaction_planner", + &[], + ["operation"], + ), + } + } +} + +pub static COMPACTION_PLANNER_METRICS: Lazy = + Lazy::new(CompactionPlannerMetrics::default); diff --git a/quickwit/quickwit-compaction/src/planner/mod.rs b/quickwit/quickwit-compaction/src/planner/mod.rs index bd075ab900d..888253baa4e 100644 --- a/quickwit/quickwit-compaction/src/planner/mod.rs +++ b/quickwit/quickwit-compaction/src/planner/mod.rs @@ -17,5 +17,6 @@ mod compaction_planner; mod compaction_state; #[allow(dead_code)] mod index_config_metastore; +pub(crate) mod metrics; pub use compaction_planner::CompactionPlanner; diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 7269b37ae22..996d682b6e8 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -52,9 +52,7 @@ "enable_otlp_endpoint": true, "split_store_max_num_bytes": "1T", "split_store_max_num_splits": 10000, - "max_concurrent_split_uploads": 8, - "max_merge_write_throughput": "100mb", - "merge_concurrency": 2 + "max_concurrent_split_uploads": 8 }, "ingest_api": { "replication_factor": 2 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index ea715dcffe0..6a63ff31883 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -43,8 +43,6 @@ enable_otlp_endpoint = true split_store_max_num_bytes = "1T" split_store_max_num_splits = 10_000 max_concurrent_split_uploads = 8 -max_merge_write_throughput = "100mb" -merge_concurrency = 2 [ingest_api] replication_factor = 2 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index face0852972..49e418ed924 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -47,8 +47,6 @@ indexer: split_store_max_num_bytes: 1T split_store_max_num_splits: 10000 max_concurrent_split_uploads: 8 - max_merge_write_throughput: 100mb - merge_concurrency: 2 ingest_api: replication_factor: 2 diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index ff981fcf31c..4df0a4e102d 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -146,15 +146,6 @@ pub struct IndexerConfig { pub split_store_max_num_splits: usize, #[serde(default = "IndexerConfig::default_max_concurrent_split_uploads")] pub max_concurrent_split_uploads: usize, - /// Limits the IO throughput of the `SplitDownloader` and the `MergeExecutor`. - /// On hardware where IO is constrained, it makes sure that Merges (a batch operation) - /// does not starve indexing itself (as it is a latency sensitive operation). - #[serde(default)] - pub max_merge_write_throughput: Option, - /// Maximum number of merge or delete operation that can be executed concurrently. - /// (defaults to num_cpu / 2). - #[serde(default = "IndexerConfig::default_merge_concurrency")] - pub merge_concurrency: NonZeroUsize, /// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry /// Protocol (OTLP). #[serde(default = "IndexerConfig::default_enable_otlp_endpoint")] @@ -202,10 +193,6 @@ impl IndexerConfig { 1_000 } - pub fn default_merge_concurrency() -> NonZeroUsize { - NonZeroUsize::new(quickwit_common::num_cpus() * 2 / 3).unwrap_or(NonZeroUsize::MIN) - } - fn default_cpu_capacity() -> CpuCapacity { CpuCapacity::one_cpu_thread() * (quickwit_common::num_cpus() as u32) } @@ -220,8 +207,6 @@ impl IndexerConfig { split_store_max_num_splits: 3, max_concurrent_split_uploads: 4, cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32, - max_merge_write_throughput: None, - merge_concurrency: NonZeroUsize::new(3).unwrap(), enable_standalone_compactors: false, }; Ok(indexer_config) @@ -237,8 +222,6 @@ impl Default for IndexerConfig { split_store_max_num_splits: Self::default_split_store_max_num_splits(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), cpu_capacity: Self::default_cpu_capacity(), - merge_concurrency: Self::default_merge_concurrency(), - max_merge_write_throughput: None, enable_standalone_compactors: Self::default_enable_standalone_compactors(), } } @@ -949,23 +932,6 @@ mod tests { "1500m" ); } - { - let indexer_config: IndexerConfig = - serde_yaml::from_str(r#"merge_concurrency: 5"#).unwrap(); - assert_eq!( - indexer_config.merge_concurrency, - NonZeroUsize::new(5).unwrap() - ); - let indexer_config_json = serde_json::to_value(&indexer_config).unwrap(); - assert_eq!( - indexer_config_json - .get("merge_concurrency") - .unwrap() - .as_u64() - .unwrap(), - 5 - ); - } { let indexer_config: IndexerConfig = serde_yaml::from_str(r#"cpu_capacity: 1500m"#).unwrap(); diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 0bf765df123..1f9dc30d6d3 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -673,10 +673,8 @@ mod tests { split_store_max_num_bytes: ByteSize::tb(1), split_store_max_num_splits: 10_000, max_concurrent_split_uploads: 8, - merge_concurrency: NonZeroUsize::new(2).unwrap(), cpu_capacity: IndexerConfig::default_cpu_capacity(), enable_cooperative_indexing: false, - max_merge_write_throughput: Some(ByteSize::mb(100)), enable_standalone_compactors: false, } ); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 6cfe1d6ea5a..0acd20706cb 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -462,7 +462,7 @@ impl IndexingPipeline { None, self.params.split_store.clone(), SplitsUpdateMailbox::Sequencer(sequencer_mailbox), - self.params.max_concurrent_split_uploads_index, + self.params.max_concurrent_split_uploads, self.params.event_broker.clone(), ); let (uploader_mailbox, uploader_handle) = ctx @@ -628,7 +628,7 @@ impl IndexingPipeline { self.params.metastore.clone(), self.params.storage.clone(), SplitsUpdateMailbox::Sequencer(parquet_sequencer_mailbox), - self.params.max_concurrent_split_uploads_index, + self.params.max_concurrent_split_uploads, ); let (parquet_uploader_mailbox, parquet_uploader_handle) = ctx .spawn_actor() @@ -847,7 +847,7 @@ pub struct IndexingPipelineParams { pub indexing_directory: TempDirectory, pub indexing_settings: IndexingSettings, pub split_store: IndexingSplitStore, - pub max_concurrent_split_uploads_index: usize, + pub max_concurrent_split_uploads: usize, pub cooperative_indexing_permits: Option>, // Merge-related parameters @@ -980,7 +980,7 @@ mod tests { split_store, merge_policy: default_merge_policy(), queues_dir_path: PathBuf::from("./queues"), - max_concurrent_split_uploads_index: 4, + max_concurrent_split_uploads: 4, cooperative_indexing_permits: None, event_broker: EventBroker::default(), params_fingerprint: 42u64, @@ -1100,7 +1100,7 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), - max_concurrent_split_uploads_index: 4, + max_concurrent_split_uploads: 4, cooperative_indexing_permits: None, event_broker: Default::default(), params_fingerprint: 42u64, @@ -1219,7 +1219,7 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), - max_concurrent_split_uploads_index: 4, + max_concurrent_split_uploads: 4, cooperative_indexing_permits: None, params_fingerprint: 42u64, event_broker: Default::default(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 1aafbc01dec..9c27a06f450 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -250,11 +250,6 @@ impl IndexingService { let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) .map_err(|error| IndexingError::Internal(error.to_string()))?; - // The concurrent uploads budget is split in 2: 1/2 for the indexing pipeline, 1/2 for the - // merge pipeline. When there is no local merge pipeline, the indexing pipeline gets the - // full budget. - let max_concurrent_split_uploads_index = self.max_concurrent_split_uploads; - let params_fingerprint = indexing_pipeline_params_fingerprint(&index_config, &source_config); if let Some(expected_params_fingerprint) = expected_params_fingerprint { @@ -283,7 +278,7 @@ impl IndexingService { indexing_directory, indexing_settings: index_config.indexing_settings.clone(), split_store, - max_concurrent_split_uploads_index, + max_concurrent_split_uploads: self.max_concurrent_split_uploads, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), // The merge policy is needed in the uploader for determining split maturity merge_policy, diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index df0b8339c02..1c78783089e 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -31,7 +31,8 @@ pub use crate::actors::{ pub use crate::controlled_directory::ControlledDirectory; use crate::models::IndexingStatistics; pub use crate::split_store::{ - IndexingSplitCache, IndexingSplitStore, get_tantivy_directory_from_split_bundle, + IndexingSplitCache, IndexingSplitStore, SplitStoreQuota, + get_tantivy_directory_from_split_bundle, }; pub mod actors; diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 9319f8d8498..37bbf15edbc 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -121,6 +121,14 @@ impl MergeOperation { pub fn splits_as_slice(&self) -> &[SplitMetadata] { self.splits.as_slice() } + + pub fn merge_level(&self) -> usize { + self.splits + .iter() + .map(|s| s.num_merge_ops) + .max() + .unwrap_or(0) + } } impl fmt::Debug for MergeOperation { diff --git a/quickwit/quickwit-proto/protos/quickwit/compaction.proto b/quickwit/quickwit-proto/protos/quickwit/compaction.proto index 6f6e4bc6f75..3c82d6b34c7 100644 --- a/quickwit/quickwit-proto/protos/quickwit/compaction.proto +++ b/quickwit/quickwit-proto/protos/quickwit/compaction.proto @@ -60,4 +60,5 @@ message MergeTaskAssignment { quickwit.common.IndexUid index_uid = 7; string source_id = 8; string index_storage_uri = 9; + uint64 merge_level = 10; } \ No newline at end of file diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs index 67b2f24fe15..1404816f276 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs @@ -66,6 +66,8 @@ pub struct MergeTaskAssignment { pub source_id: ::prost::alloc::string::String, #[prost(string, tag = "9")] pub index_storage_uri: ::prost::alloc::string::String, + #[prost(uint64, tag = "10")] + pub merge_level: u64, } /// BEGIN quickwit-codegen #[allow(unused_imports)] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 3d5a8cd2428..9be9b60971a 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -778,7 +778,19 @@ pub async fn serve_quickwit( let compaction_root_directory = quickwit_common::temp_dir::Builder::default() .tempdir_in(&compaction_dir) .context("failed to create compaction temp directory")?; - let split_cache = Arc::new(quickwit_indexing::IndexingSplitCache::no_caching()); + let split_store_quota = quickwit_indexing::SplitStoreQuota::try_new( + node_config.compactor_config.split_store_max_num_splits, + node_config.compactor_config.split_store_max_num_bytes, + )?; + let split_cache_dir = node_config + .data_dir_path + .join("compactor-split-cache") + .join("splits"); + let split_cache = Arc::new( + quickwit_indexing::IndexingSplitCache::open(split_cache_dir, split_store_quota) + .await + .context("failed to open compactor split cache")?, + ); let compaction_client = compaction_service_client_opt .clone() .expect("compactor service enabled but no compaction client available");