diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 5922b140e68..61aa6959f3b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7496,6 +7496,7 @@ dependencies = [ "once_cell", "quickwit-actors", "quickwit-common", + "quickwit-compaction", "quickwit-config", "quickwit-doc-mapper", "quickwit-index-management", diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs index e07940c9706..8508a82ccaf 100644 --- a/quickwit/quickwit-compaction/src/compactor_supervisor.rs +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -57,7 +57,6 @@ pub struct CompactorSupervisor { // Shared resources distributed to pipelines when spawning actor chains. io_throughput_limiter: Option, - split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, max_concurrent_split_uploads: usize, @@ -74,7 +73,6 @@ impl CompactorSupervisor { planner_client: CompactionPlannerServiceClient, num_pipeline_slots: usize, io_throughput_limiter: Option, - split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, max_concurrent_split_uploads: usize, @@ -87,7 +85,6 @@ impl CompactorSupervisor { planner_client, pipelines, io_throughput_limiter, - split_cache, metastore, storage_resolver, max_concurrent_split_uploads, @@ -185,7 +182,8 @@ impl CompactorSupervisor { let index_storage_uri = Uri::from_str(&assignment.index_storage_uri)?; let index_storage = self.storage_resolver.resolve(&index_storage_uri).await?; - let split_store = IndexingSplitStore::new(index_storage, self.split_cache.clone()); + let split_cache = Arc::new(IndexingSplitCache::no_caching()); + let split_store = IndexingSplitStore::new(index_storage, split_cache); let doc_mapper = build_doc_mapper(&doc_mapping, &search_settings)?; let merge_policy = merge_policy_from_settings(&indexing_settings); @@ -311,8 +309,6 @@ impl Handler for CompactorSupervisor { #[cfg(test)] mod tests { - use std::sync::Arc; - use quickwit_actors::Universe; use quickwit_common::temp_dir::TempDirectory; use quickwit_proto::compaction::{ @@ -334,7 +330,6 @@ mod tests { compaction_client, num_slots, None, - Arc::new(IndexingSplitCache::no_caching()), metastore, StorageResolver::for_test(), 2, @@ -544,7 +539,6 @@ mod tests { client, 3, None, - Arc::new(IndexingSplitCache::no_caching()), metastore, StorageResolver::for_test(), 2, diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs index 44948bc70ea..30c61587c41 100644 --- a/quickwit/quickwit-compaction/src/lib.rs +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -23,15 +23,12 @@ pub mod planner; pub type TaskId = String; -use std::sync::Arc; - pub use compactor_supervisor::CompactorSupervisor; use quickwit_actors::{Mailbox, Universe}; use quickwit_common::io; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_config::CompactorConfig; -use quickwit_indexing::IndexingSplitCache; use quickwit_proto::compaction::CompactionPlannerServiceClient; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::{IndexUid, NodeId, SourceId}; @@ -48,7 +45,6 @@ pub async fn start_compactor_service( node_id: NodeId, compaction_client: CompactionPlannerServiceClient, compactor_config: &CompactorConfig, - split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, event_broker: EventBroker, @@ -62,7 +58,6 @@ pub async fn start_compactor_service( compaction_client, compactor_config.max_concurrent_pipelines.get(), io_throughput_limiter, - split_cache, metastore, storage_resolver, compactor_config.max_concurrent_split_uploads, diff --git a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs index 94bfebacb54..637fe72017b 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs @@ -36,21 +36,18 @@ use super::compaction_state::CompactionState; use super::index_config_metastore::{IndexConfigMetastore, IndexEntry}; use crate::planner::metrics::COMPACTION_PLANNER_METRICS; +/// Cap on splits fetched per tick. Every tick, the planner re-scans the immature published set, +/// sorted by `maturity_timestamp` ASC so the most-urgent splits are processed first when a backlog +/// exists. Splits beyond this cap aren't lost -- they bubble into range as the front of the queue +/// is merged off. +const SCAN_PAGE_SIZE: usize = 5_000; +#[derive(Debug)] pub struct CompactionPlanner { state: CompactionState, index_config_metastore: IndexConfigMetastore, - cursor: i64, metastore: MetastoreServiceClient, } -impl Debug for CompactionPlanner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("CompactionPlanner") - .field("cursor", &self.cursor) - .finish() - } -} - const SCAN_AND_PLAN_INTERVAL: Duration = Duration::from_secs(5); /// On initialization, we want to wait for two intervals to allow any in-progress workers to report /// their progress, preventing us from frivolously rescheduling work. @@ -112,15 +109,11 @@ impl Handler for CompactionPlanner { } } -const STARTUP_LOOKBACK: Duration = Duration::from_secs(24 * 60 * 60); - impl CompactionPlanner { pub fn new(metastore: MetastoreServiceClient) -> Self { - let cursor = OffsetDateTime::now_utc().unix_timestamp() - STARTUP_LOOKBACK.as_secs() as i64; CompactionPlanner { state: CompactionState::new(), index_config_metastore: IndexConfigMetastore::new(metastore.clone()), - cursor, metastore, } } @@ -141,8 +134,6 @@ impl CompactionPlanner { if index_entry.is_split_mature(&split.split_metadata) { continue; } - self.cursor = self.cursor.max(split.update_timestamp); - info!(max_timestamp=%self.cursor, "[compaction planner] update metastore cursor min_timestamp cursor"); self.state.track_split(split.split_metadata); } } @@ -151,7 +142,8 @@ impl CompactionPlanner { let query = ListSplitsQuery::for_all_indexes() .with_split_state(SplitState::Published) .retain_immature(OffsetDateTime::now_utc()) - .with_update_timestamp_gte(self.cursor); + .sort_by_maturity_timestamp() + .with_limit(SCAN_PAGE_SIZE); let request = ListSplitsRequest::try_from_list_splits_query(&query)?; let splits = self .metastore @@ -237,7 +229,7 @@ fn emit_metastore_scan_metrics(new_splits: &[Split]) { COMPACTION_PLANNER_METRICS .new_splits_scanned .with_label_values([&index_uid.to_string()]) - .set(count as i64); + .inc_by(count as u64); } } @@ -270,6 +262,7 @@ fn build_task_assignment( #[cfg(test)] mod tests { + use std::ops::Bound; use std::time::Duration; use quickwit_common::ServiceStream; @@ -278,14 +271,15 @@ mod tests { ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, }; use quickwit_metastore::{ - IndexMetadata, IndexMetadataResponseExt, ListSplitsResponseExt, Split, SplitMaturity, - SplitMetadata, SplitState, + IndexMetadata, IndexMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, + SortBy, Split, SplitMaturity, SplitMetadata, SplitState, }; use quickwit_proto::compaction::CompactionSuccess; use quickwit_proto::metastore::{ IndexMetadataResponse, ListSplitsResponse, MetastoreError, MockMetastoreService, }; use quickwit_proto::types::IndexUid; + use time::OffsetDateTime; use super::*; @@ -334,17 +328,34 @@ mod tests { } #[tokio::test] - async fn test_scan_metastore() { + async fn test_scan_metastore_query_shape_and_passthrough() { let index_uid = IndexUid::for_test("test-index", 0); - let splits = vec![ - test_split("split-1", &index_uid, 1000), - test_split("split-2", &index_uid, 2000), + let returned_splits = vec![ + test_split("a", &index_uid, 1000), + test_split("b", &index_uid, 2000), ]; - let splits_clone = splits.clone(); + let returned_clone = returned_splits.clone(); + let scan_started_at = OffsetDateTime::now_utc().unix_timestamp(); let mut mock = MockMetastoreService::new(); - mock.expect_list_splits().returning(move |_| { - let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap(); + mock.expect_list_splits().returning(move |req| { + let query = req.deserialize_list_splits_query().unwrap(); + + assert_eq!(query.split_states, vec![SplitState::Published]); + assert_eq!(query.limit, Some(SCAN_PAGE_SIZE)); + assert_eq!(query.sort_by, SortBy::MaturityTimestamp); + + let Bound::Excluded(mature_at) = query.mature else { + panic!("expected Excluded mature bound, got {:?}", query.mature); + }; + let now_secs = OffsetDateTime::now_utc().unix_timestamp(); + assert!(mature_at.unix_timestamp() >= scan_started_at); + assert!(mature_at.unix_timestamp() <= now_secs); + + assert_eq!(query.update_timestamp.start, Bound::Unbounded); + assert_eq!(query.update_timestamp.end, Bound::Unbounded); + + let response = ListSplitsResponse::try_from_splits(returned_clone.clone()).unwrap(); Ok(ServiceStream::from(vec![Ok(response)])) }); @@ -352,12 +363,12 @@ mod tests { let result = planner.scan_metastore().await.unwrap(); assert_eq!(result.len(), 2); - assert_eq!(result[0].split_metadata.split_id, "split-1"); - assert_eq!(result[1].split_metadata.split_id, "split-2"); + assert_eq!(result[0].split_metadata.split_id, "a"); + assert_eq!(result[1].split_metadata.split_id, "b"); } #[tokio::test] - async fn test_ingest_splits_dedup_maturity_and_cursor() { + async fn test_ingest_splits_dedups_and_skips_mature() { let index_metadata = test_index_metadata(); let response = test_index_metadata_response(&index_metadata); let index_uid = index_metadata.index_uid.clone(); @@ -367,7 +378,6 @@ mod tests { .returning(move |_| Ok(response.clone())); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; // Pre-populate: "in-flight" is already being compacted. planner.state.track_split(SplitMetadata { @@ -390,11 +400,10 @@ mod tests { assert!(planner.state.is_split_tracked("fresh")); assert!(planner.state.is_split_tracked("in-flight")); assert!(!planner.state.is_split_tracked("mature")); - assert_eq!(planner.cursor, 3000); } #[tokio::test] - async fn test_scan_and_plan_metastore_error() { + async fn test_scan_and_plan_propagates_metastore_error() { let mut mock = MockMetastoreService::new(); mock.expect_list_splits().returning(|_| { Err(MetastoreError::Internal { @@ -404,11 +413,7 @@ mod tests { }); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - let original_cursor = planner.cursor; - - let result = planner.scan_and_plan().await; - assert!(result.is_err()); - assert_eq!(planner.cursor, original_cursor); + assert!(planner.scan_and_plan().await.is_err()); } #[tokio::test] @@ -425,7 +430,6 @@ mod tests { }); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; planner.ingest_splits(splits).await; assert!(!planner.state.is_split_tracked("orphan")); @@ -452,12 +456,60 @@ mod tests { .returning(move |_| Ok(index_metadata_response.clone())); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; planner.scan_and_plan().await.unwrap(); assert!(planner.state.is_split_tracked("s1")); assert!(planner.state.is_split_tracked("s2")); - assert_eq!(planner.cursor, 6000); + } + + #[tokio::test] + async fn test_failed_task_is_retracked_on_next_scan() { + // After a worker reports failure (or times out), planner-local + // tracking is cleared. Because there is no cursor, the next scan + // rediscovers the still-Published, still-immature splits and + // re-tracks them. + let index_metadata = test_index_metadata_with_merge_factor_2(); + let index_metadata_response = test_index_metadata_response(&index_metadata); + let index_uid = index_metadata.index_uid.clone(); + + let splits = vec![ + test_split("s1", &index_uid, 1000), + test_split("s2", &index_uid, 2000), + ]; + let splits_clone = splits.clone(); + + let mut mock = MockMetastoreService::new(); + mock.expect_list_splits().returning(move |_| { + let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap(); + Ok(ServiceStream::from(vec![Ok(response)])) + }); + mock.expect_index_metadata() + .returning(move |_| Ok(index_metadata_response.clone())); + + let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); + let node_id = NodeId::from("worker-1"); + + planner.scan_and_plan().await.unwrap(); + let assignments = planner.assign_tasks(&node_id, 10); + assert_eq!(assignments.len(), 1); + let task_id = assignments[0].task_id.clone(); + assert!(planner.state.is_split_tracked("s1")); + assert!(planner.state.is_split_tracked("s2")); + + // Worker reports failure; planner forgets the splits. + planner + .state + .process_failures(&[quickwit_proto::compaction::CompactionFailure { + task_id, + error_message: "boom".to_string(), + }]); + assert!(!planner.state.is_split_tracked("s1")); + assert!(!planner.state.is_split_tracked("s2")); + + // Next scan rediscovers them and re-tracks them. + planner.scan_and_plan().await.unwrap(); + assert!(planner.state.is_split_tracked("s1")); + assert!(planner.state.is_split_tracked("s2")); } /// Helper: creates a planner with merge_factor=2, ingests the given splits, @@ -477,7 +529,6 @@ mod tests { }); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; let splits: Vec = split_ids .iter() diff --git a/quickwit/quickwit-compaction/src/planner/compaction_state.rs b/quickwit/quickwit-compaction/src/planner/compaction_state.rs index 9658e958b7f..58cf85a5bb3 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_state.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_state.rs @@ -49,6 +49,7 @@ impl CompactionPartitionKey { } } +#[derive(Debug)] struct InFlightCompaction { task_id: TaskId, split_ids: Vec, @@ -59,6 +60,7 @@ struct InFlightCompaction { /// Tracks all split-level state for the compaction planner: /// which splits need compaction, which are in-flight, and which /// operations are pending assignment to workers. +#[derive(Debug)] pub struct CompactionState { needs_compaction: HashMap>, needs_compaction_split_ids: HashSet, @@ -108,14 +110,22 @@ impl CompactionState { let Some(splits) = self.needs_compaction.get_mut(partition_key) else { return; }; - for operation in merge_policy.operations(splits) { - for split in operation.splits_as_slice() { - self.needs_compaction_split_ids.remove(split.split_id()); - self.in_flight_split_ids - .insert(split.split_id().to_string()); + // `MergePolicy::operations` emits at most one op per level per call, which under a backlog + // leaves the bulk of `splits` untouched per tick. Loop until no new operations are created. + loop { + let operations = merge_policy.operations(splits); + if operations.is_empty() { + break; + } + for operation in operations { + for split in operation.splits_as_slice() { + self.needs_compaction_split_ids.remove(split.split_id()); + self.in_flight_split_ids + .insert(split.split_id().to_string()); + } + self.pending_operations + .push(partition_key.clone(), operation); } - self.pending_operations - .push(partition_key.clone(), operation); } if splits.is_empty() { self.needs_compaction.remove(partition_key); @@ -138,6 +148,7 @@ impl CompactionState { if let Some(inflight) = self.in_flight.remove(&failure.task_id) { warn!(task_id=%failure.task_id, error=%failure.error_message, "compaction task failed"); for split_id in &inflight.split_ids { + // these splits will be picked up again on the next metastore scan. self.in_flight_split_ids.remove(split_id.as_str()); } } @@ -183,6 +194,7 @@ impl CompactionState { error!(%task_id, node_id=%inflight.node_id, "compaction task timed out"); COMPACTION_PLANNER_METRICS.timed_out_operations.inc(); for split_id in &inflight.split_ids { + // these splits will be picked up again on the next metastore scan. self.in_flight_split_ids.remove(split_id.as_str()); } } diff --git a/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs b/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs index e2d42d99ba2..ba37b11c758 100644 --- a/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs +++ b/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs @@ -26,6 +26,7 @@ use tracing::error; use crate::planner::metrics::COMPACTION_PLANNER_METRICS; /// Everything the planner needs to know about a single index. +#[derive(Debug)] pub struct IndexEntry { config: IndexConfig, merge_policy: Arc, @@ -76,6 +77,7 @@ impl IndexEntry { /// Caches per-index configuration, merge policies, and doc mappers. /// Fetches from the metastore on demand. All accessors panic if called /// for an index that hasn't been loaded. +#[derive(Debug)] pub struct IndexConfigMetastore { indexes: HashMap, metastore_client: MetastoreServiceClient, diff --git a/quickwit/quickwit-compaction/src/planner/metrics.rs b/quickwit/quickwit-compaction/src/planner/metrics.rs index 2a4b2d9d4c7..4d8312d02c7 100644 --- a/quickwit/quickwit-compaction/src/planner/metrics.rs +++ b/quickwit/quickwit-compaction/src/planner/metrics.rs @@ -13,22 +13,24 @@ // limitations under the License. use once_cell::sync::Lazy; -use quickwit_common::metrics::{IntGauge, IntGaugeVec, new_gauge, new_gauge_vec}; +use quickwit_common::metrics::{ + IntCounter, IntCounterVec, IntGaugeVec, new_counter, new_counter_vec, new_gauge_vec, +}; pub struct CompactionPlannerMetrics { - pub new_splits_scanned: IntGaugeVec<1>, + pub new_splits_scanned: IntCounterVec<1>, pub splits_needing_compaction: IntGaugeVec<1>, pub pending_merge_operations: IntGaugeVec<2>, - pub timed_out_operations: IntGauge, - pub metastore_errors: IntGaugeVec<1>, + pub timed_out_operations: IntCounter, + pub metastore_errors: IntCounterVec<1>, } impl Default for CompactionPlannerMetrics { fn default() -> Self { CompactionPlannerMetrics { - new_splits_scanned: new_gauge_vec( + new_splits_scanned: new_counter_vec( "new_splits_scanned", - "number of new immature splits scanned from the metastore on the last tick", + "cumulative number of immature splits scanned from the metastore", "compaction_planner", &[], ["source_uid"], @@ -47,15 +49,16 @@ impl Default for CompactionPlannerMetrics { &[], ["source_uid", "merge_level"], ), - timed_out_operations: new_gauge( + timed_out_operations: new_counter( "timed_out_operations", - "number of merge operations that timed out waiting for a worker heartbeat", + "cumulative number of merge operations that timed out waiting for a worker \ + heartbeat", "compaction_planner", &[], ), - metastore_errors: new_gauge_vec( + metastore_errors: new_counter_vec( "metastore_errors", - "number of metastore errors encountered by the compaction planner", + "cumulative number of metastore errors encountered by the compaction planner", "compaction_planner", &[], ["operation"], diff --git a/quickwit/quickwit-compaction/src/planner/mod.rs b/quickwit/quickwit-compaction/src/planner/mod.rs index 1dc50d6ca46..0fe49bd190e 100644 --- a/quickwit/quickwit-compaction/src/planner/mod.rs +++ b/quickwit/quickwit-compaction/src/planner/mod.rs @@ -31,6 +31,7 @@ use crate::source_uid_metrics_label; /// Queue of merge operations awaiting assignment, with the /// `pending_merge_operations` gauge maintained inline. Push/pop are the only /// mutation paths so the metric stays consistent with `len()`. +#[derive(Debug)] struct PendingOperations { inner: VecDeque<(CompactionPartitionKey, MergeOperation)>, } diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 4df0a4e102d..8d405d443ec 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -239,39 +239,23 @@ pub struct CompactorConfig { /// Limits the IO throughput of the split downloader and the merge executor. #[serde(default)] pub max_merge_write_throughput: Option, - /// Maximum size of the local split store cache in bytes. - #[serde(default = "CompactorConfig::default_split_store_max_num_bytes")] - pub split_store_max_num_bytes: ByteSize, - /// Maximum number of splits in the local split store cache. - #[serde(default = "CompactorConfig::default_split_store_max_num_splits")] - pub split_store_max_num_splits: usize, } impl CompactorConfig { fn default_max_concurrent_pipelines() -> NonZeroUsize { - NonZeroUsize::new(quickwit_common::num_cpus() * 2 / 3).unwrap_or(NonZeroUsize::MIN) + NonZeroUsize::new(quickwit_common::num_cpus()).unwrap_or(NonZeroUsize::MIN) } fn default_max_concurrent_split_uploads() -> usize { 12 } - pub fn default_split_store_max_num_bytes() -> ByteSize { - ByteSize::gib(100) - } - - pub fn default_split_store_max_num_splits() -> usize { - 1_000 - } - #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { CompactorConfig { max_concurrent_pipelines: NonZeroUsize::new(2).unwrap(), max_concurrent_split_uploads: 4, max_merge_write_throughput: None, - split_store_max_num_bytes: ByteSize::mb(1), - split_store_max_num_splits: 3, } } } @@ -282,8 +266,6 @@ impl Default for CompactorConfig { max_concurrent_pipelines: Self::default_max_concurrent_pipelines(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), max_merge_write_throughput: None, - split_store_max_num_bytes: Self::default_split_store_max_num_bytes(), - split_store_max_num_splits: Self::default_split_store_max_num_splits(), } } } diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs index 70f19410ecf..1a4270d48ee 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs @@ -25,7 +25,7 @@ use quickwit_common::split_file; use quickwit_directories::BundleDirectory; use quickwit_storage::StorageResult; use tantivy::Directory; -use tantivy::directory::MmapDirectory; +use tantivy::directory::{Advice, MmapDirectory}; use tokio::sync::Mutex; use tracing::{debug, error, warn}; use ulid::Ulid; @@ -38,12 +38,15 @@ const SPLIT_MAX_AGE: Duration = Duration::from_secs(2 * 24 * 3_600); // 2 days pub fn get_tantivy_directory_from_split_bundle( split_file: &Path, ) -> StorageResult> { - let mmap_directory = MmapDirectory::open(split_file.parent().ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - format!("couldn't find parent for {}", split_file.display()), - ) - })?)?; + let mmap_directory = MmapDirectory::open_with_madvice( + split_file.parent().ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("couldn't find parent for {}", split_file.display()), + ) + })?, + Advice::Sequential, + )?; let split_fileslice = mmap_directory.open_read(Path::new(&split_file))?; Ok(Box::new(BundleDirectory::open_split(split_fileslice)?)) } diff --git a/quickwit/quickwit-janitor/Cargo.toml b/quickwit/quickwit-janitor/Cargo.toml index 8d4cad3beb9..16a9a540806 100644 --- a/quickwit/quickwit-janitor/Cargo.toml +++ b/quickwit/quickwit-janitor/Cargo.toml @@ -27,6 +27,7 @@ utoipa = { workspace = true } quickwit-actors = { workspace = true } quickwit-common = { workspace = true } +quickwit-compaction = { workspace = true } quickwit-config = { workspace = true } quickwit-doc-mapper = { workspace = true } quickwit-index-management = { workspace = true } diff --git a/quickwit/quickwit-janitor/src/janitor_service.rs b/quickwit/quickwit-janitor/src/janitor_service.rs index b8bfe5e54b0..712458ad805 100644 --- a/quickwit/quickwit-janitor/src/janitor_service.rs +++ b/quickwit/quickwit-janitor/src/janitor_service.rs @@ -16,6 +16,7 @@ use async_trait::async_trait; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, }; +use quickwit_compaction::planner::CompactionPlanner; use serde_json::{Value as JsonValue, json}; use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor}; @@ -24,6 +25,7 @@ pub struct JanitorService { delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, + compaction_planner_handle: ActorHandle, } impl JanitorService { @@ -31,11 +33,13 @@ impl JanitorService { delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, + compaction_planner_handle: ActorHandle, ) -> Self { Self { delete_task_service_handle, garbage_collector_handle, retention_policy_executor_handle, + compaction_planner_handle, } } @@ -49,6 +53,7 @@ impl JanitorService { delete_task_is_not_failure && self.garbage_collector_handle.state() != ActorState::Failure && self.retention_policy_executor_handle.state() != ActorState::Failure + && self.compaction_planner_handle.state() != ActorState::Failure } } diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index bef73160377..8bac8c46c21 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -14,8 +14,9 @@ #![deny(clippy::disallowed_methods)] -use quickwit_actors::{Mailbox, Universe}; +use quickwit_actors::{ActorHandle, Mailbox, Universe}; use quickwit_common::pubsub::EventBroker; +use quickwit_compaction::planner::CompactionPlanner; use quickwit_config::NodeConfig; use quickwit_indexing::actors::MergeSchedulerService; use quickwit_metastore::SplitInfo; @@ -39,6 +40,7 @@ use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor /// Schema used for the OpenAPI generation which are apart of this crate. pub struct JanitorApiSchemas; +#[allow(clippy::too_many_arguments)] pub async fn start_janitor_service( universe: &Universe, config: &NodeConfig, @@ -47,6 +49,7 @@ pub async fn start_janitor_service( storage_resolver: StorageResolver, event_broker: EventBroker, run_delete_task_service: bool, + compaction_planner_handle: ActorHandle, ) -> anyhow::Result> { info!("starting janitor service"); let garbage_collector = GarbageCollector::new(metastore.clone(), storage_resolver.clone()); @@ -77,6 +80,7 @@ pub async fn start_janitor_service( delete_task_service_handle, garbage_collector_handle, retention_policy_executor_handle, + compaction_planner_handle, ); let (janitor_service_mailbox, _janitor_service_handle) = universe.spawn_builder().spawn(janitor_service); diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql new file mode 100644 index 00000000000..5aae02487bd --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS splits_published_maturity_timestamp_idx; \ No newline at end of file diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql new file mode 100644 index 00000000000..ec26bb98298 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql @@ -0,0 +1,19 @@ +-- Index for the compaction planner's scan, which on every tick reads up to +-- LIMIT splits matching: +-- split_state = 'Published' +-- maturity_timestamp > now() +-- ordered by maturity_timestamp ascending. The planner keeps a local set of +-- already-tracked splits and dedups against it, so re-reading the immature +-- set every tick is intentional -- it's how the planner recovers splits +-- whose merge timed out or failed. +-- +-- The btree on (maturity_timestamp, split_id) lets postgres seek to the live +-- "still immature" range in index order, satisfying both the filter and the +-- ORDER BY without an extra sort. The split_id column is included as a +-- tiebreaker so postgres returns deterministic pages under LIMIT. +-- +-- The partial predicate is restricted to split_state = 'Published' because +-- partial-index predicates must be IMMUTABLE; "now()" cannot appear here. +CREATE INDEX IF NOT EXISTS splits_published_maturity_timestamp_idx + ON splits (maturity_timestamp, split_id) + WHERE split_state = 'Published'; \ No newline at end of file diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index e8fffeb7962..83c492e1ba0 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -48,7 +48,7 @@ pub use metastore::{ ListMetricsSplitsQuery, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, PublishMetricsSplitsRequestExt, PublishSplitsRequestExt, - StageMetricsSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, + SortBy, StageMetricsSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, UpdateSourceRequestExt, file_backed, }; pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index ddfee25afab..caaae7316a8 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -875,10 +875,21 @@ pub struct ListSplitsQuery { } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +/// Ordering applied to the result of a [`ListSplitsQuery`]. pub enum SortBy { + /// No ordering — the metastore may return splits in any order. None, + /// Order by `(delete_opstamp ASC, publish_timestamp ASC)`. Used by the + /// delete pipeline to process the splits with the most pending delete + /// work first. Staleness, + /// Order by `(index_uid ASC, split_id ASC)`, matching the splits-table + /// primary key. Used for stable pagination across all indexes. IndexUid, + /// Order by `(maturity_timestamp ASC, split_id ASC)`. Used by the + /// compaction planner so that under a backlog the splits closest to + /// becoming mature are processed first. + MaturityTimestamp, } impl SortBy { @@ -904,6 +915,16 @@ impl SortBy { .split_id .cmp(&right_split.split_metadata.split_id) }), + SortBy::MaturityTimestamp => left_split + .split_metadata + .maturity_unix_timestamp() + .cmp(&right_split.split_metadata.maturity_unix_timestamp()) + .then_with(|| { + left_split + .split_metadata + .split_id + .cmp(&right_split.split_metadata.split_id) + }), } } } @@ -1154,6 +1175,12 @@ impl ListSplitsQuery { self } + /// Sorts the splits by maturity_timestamp ascending, with split_id as a tiebreaker. + pub fn sort_by_maturity_timestamp(mut self) -> Self { + self.sort_by = SortBy::MaturityTimestamp; + self + } + /// Only return splits whose (index_uid, split_id) are lexicographically after this split. /// This is only useful if results are sorted by index_uid and split_id. pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 28519b8a294..29c26e41fb2 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -211,6 +211,10 @@ pub(super) fn append_query_filters_and_order_by( sql.order_by(Splits::IndexUid, Order::Asc) .order_by(Splits::SplitId, Order::Asc); } + SortBy::MaturityTimestamp => { + sql.order_by(Splits::MaturityTimestamp, Order::Asc) + .order_by(Splits::SplitId, Order::Asc); + } SortBy::None => (), } diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index fe88fe379d3..95066e87838 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -218,6 +218,18 @@ impl SplitMetadata { } } + /// Returns the unix timestamp at which the split becomes mature, or 0 if + /// the split is already mature (matching the metastore's stored + /// `maturity_timestamp` column). + pub fn maturity_unix_timestamp(&self) -> i64 { + match self.maturity { + SplitMaturity::Mature => 0, + SplitMaturity::Immature { maturation_period } => { + self.create_timestamp + maturation_period.as_secs() as i64 + } + } + } + #[cfg(any(test, feature = "testsuite"))] /// Returns an instance of `SplitMetadata` for testing. pub fn for_test(split_id: SplitId) -> SplitMetadata { diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 9be9b60971a..554820dfc80 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -57,7 +57,7 @@ pub use format::BodyFormat; use futures::StreamExt; use itertools::Itertools; use once_cell::sync::Lazy; -use quickwit_actors::{ActorExitStatus, Mailbox, SpawnContext, Universe}; +use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, SpawnContext, Universe}; use quickwit_cluster::{ Cluster, ClusterChange, ClusterChangeStream, ClusterNode, ListenerHandle, start_cluster_service, }; @@ -275,22 +275,31 @@ async fn balance_channel_for_service( /// /// On janitor nodes, spawns a `CompactionPlanner` actor and builds the client from /// its mailbox. On compactor-only nodes, connects to a remote janitor via gRPC. +/// +/// The second tuple element is the local planner's `ActorHandle`, returned only +/// on janitor nodes so the caller can attach it to the janitor liveness probe. async fn get_compaction_planner_client_if_needed( node_config: &NodeConfig, cluster: &Cluster, universe: &Universe, metastore_client: &MetastoreServiceClient, -) -> anyhow::Result> { +) -> anyhow::Result<( + Option, + Option>, +)> { let is_janitor = node_config.is_service_enabled(QuickwitService::Janitor); let is_compactor = node_config.is_service_enabled(QuickwitService::Compactor); if !is_janitor && !is_compactor { - return Ok(None); + return Ok((None, None)); } if is_janitor { let planner = CompactionPlanner::new(metastore_client.clone()); - let (mailbox, _handle) = universe.spawn_builder().spawn(planner); + let (mailbox, handle) = universe.spawn_builder().spawn(planner); info!("compaction planner actor started on janitor node"); - return Ok(Some(CompactionPlannerServiceClient::from_mailbox(mailbox))); + return Ok(( + Some(CompactionPlannerServiceClient::from_mailbox(mailbox)), + Some(handle), + )); } // Compactor-only node: connect to the planner on a remote janitor. let balance_channel = balance_channel_for_service(cluster, QuickwitService::Janitor).await; @@ -303,11 +312,14 @@ async fn get_compaction_planner_client_if_needed( bail!("compactor is enabled but no janitor node was found in the cluster") } info!("remote compaction planner detected on janitor node"); - Ok(Some(CompactionPlannerServiceClient::from_balance_channel( - balance_channel, - node_config.grpc_config.max_message_size, + Ok(( + Some(CompactionPlannerServiceClient::from_balance_channel( + balance_channel, + node_config.grpc_config.max_message_size, + None, + )), None, - ))) + )) } async fn start_ingest_client_if_needed( @@ -588,14 +600,15 @@ pub async fn serve_quickwit( .await .context("failed to start ingest v1 service")?; - let compaction_service_client_opt = get_compaction_planner_client_if_needed( - &node_config, - &cluster, - &universe, - &metastore_client, - ) - .await - .context("failed to initialize compaction service client")?; + let (compaction_service_client_opt, compaction_planner_handle_opt) = + get_compaction_planner_client_if_needed( + &node_config, + &cluster, + &universe, + &metastore_client, + ) + .await + .context("failed to initialize compaction service client")?; let indexing_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { let indexing_service = start_indexing_service( @@ -756,6 +769,8 @@ pub async fn serve_quickwit( }; let janitor_service_opt = if node_config.is_service_enabled(QuickwitService::Janitor) { + let compaction_planner_handle = compaction_planner_handle_opt + .expect("compaction planner handle must exist on janitor nodes"); let janitor_service = start_janitor_service( &universe, &node_config, @@ -764,6 +779,7 @@ pub async fn serve_quickwit( storage_resolver.clone(), event_broker.clone(), !get_bool_from_env(DISABLE_DELETE_TASK_SERVICE_ENV_KEY, false), + compaction_planner_handle, ) .await .context("failed to start janitor service")?; @@ -778,19 +794,6 @@ pub async fn serve_quickwit( let compaction_root_directory = quickwit_common::temp_dir::Builder::default() .tempdir_in(&compaction_dir) .context("failed to create compaction temp directory")?; - let split_store_quota = quickwit_indexing::SplitStoreQuota::try_new( - node_config.compactor_config.split_store_max_num_splits, - node_config.compactor_config.split_store_max_num_bytes, - )?; - let split_cache_dir = node_config - .data_dir_path - .join("compactor-split-cache") - .join("splits"); - let split_cache = Arc::new( - quickwit_indexing::IndexingSplitCache::open(split_cache_dir, split_store_quota) - .await - .context("failed to open compactor split cache")?, - ); let compaction_client = compaction_service_client_opt .clone() .expect("compactor service enabled but no compaction client available"); @@ -799,7 +802,6 @@ pub async fn serve_quickwit( cluster.self_node_id().into(), compaction_client, &node_config.compactor_config, - split_cache, metastore_client.clone(), storage_resolver.clone(), event_broker.clone(), @@ -1925,11 +1927,12 @@ mod tests { let mut node_config = NodeConfig::for_test(); node_config.enabled_services = HashSet::from([QuickwitService::Janitor, QuickwitService::Indexer]); - let result = + let (client_opt, handle_opt) = get_compaction_planner_client_if_needed(&node_config, &cluster, &universe, &metastore) .await .unwrap(); - assert!(result.is_some()); + assert!(client_opt.is_some()); + assert!(handle_opt.is_some()); // With compactor + janitor enabled, planner client is also returned. node_config.enabled_services = HashSet::from([ @@ -1937,19 +1940,21 @@ mod tests { QuickwitService::Indexer, QuickwitService::Compactor, ]); - let result = + let (client_opt, handle_opt) = get_compaction_planner_client_if_needed(&node_config, &cluster, &universe, &metastore) .await .unwrap(); - assert!(result.is_some()); + assert!(client_opt.is_some()); + assert!(handle_opt.is_some()); - // Neither janitor nor compactor: no client. + // Neither janitor nor compactor: no client, no handle. node_config.enabled_services = HashSet::from([QuickwitService::Indexer]); - let result = + let (client_opt, handle_opt) = get_compaction_planner_client_if_needed(&node_config, &cluster, &universe, &metastore) .await .unwrap(); - assert!(result.is_none()); + assert!(client_opt.is_none()); + assert!(handle_opt.is_none()); universe.assert_quit().await; }