Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 2 additions & 8 deletions quickwit/quickwit-compaction/src/compactor_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub struct CompactorSupervisor {

// Shared resources distributed to pipelines when spawning actor chains.
io_throughput_limiter: Option<Limiter>,
split_cache: Arc<IndexingSplitCache>,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
max_concurrent_split_uploads: usize,
Expand All @@ -74,7 +73,6 @@ impl CompactorSupervisor {
planner_client: CompactionPlannerServiceClient,
num_pipeline_slots: usize,
io_throughput_limiter: Option<Limiter>,
split_cache: Arc<IndexingSplitCache>,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
max_concurrent_split_uploads: usize,
Expand All @@ -87,7 +85,6 @@ impl CompactorSupervisor {
planner_client,
pipelines,
io_throughput_limiter,
split_cache,
metastore,
storage_resolver,
max_concurrent_split_uploads,
Expand Down Expand Up @@ -185,7 +182,8 @@ impl CompactorSupervisor {

let index_storage_uri = Uri::from_str(&assignment.index_storage_uri)?;
let index_storage = self.storage_resolver.resolve(&index_storage_uri).await?;
let split_store = IndexingSplitStore::new(index_storage, self.split_cache.clone());
let split_cache = Arc::new(IndexingSplitCache::no_caching());
let split_store = IndexingSplitStore::new(index_storage, split_cache);

let doc_mapper = build_doc_mapper(&doc_mapping, &search_settings)?;
let merge_policy = merge_policy_from_settings(&indexing_settings);
Expand Down Expand Up @@ -311,8 +309,6 @@ impl Handler<CheckPipelineStatuses> for CompactorSupervisor {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use quickwit_actors::Universe;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_proto::compaction::{
Expand All @@ -334,7 +330,6 @@ mod tests {
compaction_client,
num_slots,
None,
Arc::new(IndexingSplitCache::no_caching()),
metastore,
StorageResolver::for_test(),
2,
Expand Down Expand Up @@ -544,7 +539,6 @@ mod tests {
client,
3,
None,
Arc::new(IndexingSplitCache::no_caching()),
metastore,
StorageResolver::for_test(),
2,
Expand Down
5 changes: 0 additions & 5 deletions quickwit/quickwit-compaction/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@ pub mod planner;

pub type TaskId = String;

use std::sync::Arc;

pub use compactor_supervisor::CompactorSupervisor;
use quickwit_actors::{Mailbox, Universe};
use quickwit_common::io;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_config::CompactorConfig;
use quickwit_indexing::IndexingSplitCache;
use quickwit_proto::compaction::CompactionPlannerServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::types::{IndexUid, NodeId, SourceId};
Expand All @@ -48,7 +45,6 @@ pub async fn start_compactor_service(
node_id: NodeId,
compaction_client: CompactionPlannerServiceClient,
compactor_config: &CompactorConfig,
split_cache: Arc<IndexingSplitCache>,
metastore: MetastoreServiceClient,
storage_resolver: StorageResolver,
event_broker: EventBroker,
Expand All @@ -62,7 +58,6 @@ pub async fn start_compactor_service(
compaction_client,
compactor_config.max_concurrent_pipelines.get(),
io_throughput_limiter,
split_cache,
metastore,
storage_resolver,
compactor_config.max_concurrent_split_uploads,
Expand Down
133 changes: 92 additions & 41 deletions quickwit/quickwit-compaction/src/planner/compaction_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,18 @@ use super::compaction_state::CompactionState;
use super::index_config_metastore::{IndexConfigMetastore, IndexEntry};
use crate::planner::metrics::COMPACTION_PLANNER_METRICS;

/// Cap on splits fetched per tick. Every tick, the planner re-scans the immature published set,
/// sorted by `maturity_timestamp` ASC so the most-urgent splits are processed first when a backlog
/// exists. Splits beyond this cap aren't lost -- they bubble into range as the front of the queue
/// is merged off.
const SCAN_PAGE_SIZE: usize = 5_000;
#[derive(Debug)]
pub struct CompactionPlanner {
state: CompactionState,
index_config_metastore: IndexConfigMetastore,
cursor: i64,
metastore: MetastoreServiceClient,
}

impl Debug for CompactionPlanner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompactionPlanner")
.field("cursor", &self.cursor)
.finish()
}
}

const SCAN_AND_PLAN_INTERVAL: Duration = Duration::from_secs(5);
/// On initialization, we want to wait for two intervals to allow any in-progress workers to report
/// their progress, preventing us from frivolously rescheduling work.
Expand Down Expand Up @@ -112,15 +109,11 @@ impl Handler<ReportStatusRequest> for CompactionPlanner {
}
}

const STARTUP_LOOKBACK: Duration = Duration::from_secs(24 * 60 * 60);

impl CompactionPlanner {
pub fn new(metastore: MetastoreServiceClient) -> Self {
let cursor = OffsetDateTime::now_utc().unix_timestamp() - STARTUP_LOOKBACK.as_secs() as i64;
CompactionPlanner {
state: CompactionState::new(),
index_config_metastore: IndexConfigMetastore::new(metastore.clone()),
cursor,
metastore,
}
}
Expand All @@ -141,8 +134,6 @@ impl CompactionPlanner {
if index_entry.is_split_mature(&split.split_metadata) {
continue;
}
self.cursor = self.cursor.max(split.update_timestamp);
info!(max_timestamp=%self.cursor, "[compaction planner] update metastore cursor min_timestamp cursor");
self.state.track_split(split.split_metadata);
}
}
Expand All @@ -151,7 +142,8 @@ impl CompactionPlanner {
let query = ListSplitsQuery::for_all_indexes()
.with_split_state(SplitState::Published)
.retain_immature(OffsetDateTime::now_utc())
.with_update_timestamp_gte(self.cursor);
.sort_by_maturity_timestamp()
.with_limit(SCAN_PAGE_SIZE);
let request = ListSplitsRequest::try_from_list_splits_query(&query)?;
let splits = self
.metastore
Expand Down Expand Up @@ -237,7 +229,7 @@ fn emit_metastore_scan_metrics(new_splits: &[Split]) {
COMPACTION_PLANNER_METRICS
.new_splits_scanned
.with_label_values([&index_uid.to_string()])
.set(count as i64);
.inc_by(count as u64);
}
}

Expand Down Expand Up @@ -270,6 +262,7 @@ fn build_task_assignment(

#[cfg(test)]
mod tests {
use std::ops::Bound;
use std::time::Duration;

use quickwit_common::ServiceStream;
Expand All @@ -278,14 +271,15 @@ mod tests {
ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig,
};
use quickwit_metastore::{
IndexMetadata, IndexMetadataResponseExt, ListSplitsResponseExt, Split, SplitMaturity,
SplitMetadata, SplitState,
IndexMetadata, IndexMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt,
SortBy, Split, SplitMaturity, SplitMetadata, SplitState,
};
use quickwit_proto::compaction::CompactionSuccess;
use quickwit_proto::metastore::{
IndexMetadataResponse, ListSplitsResponse, MetastoreError, MockMetastoreService,
};
use quickwit_proto::types::IndexUid;
use time::OffsetDateTime;

use super::*;

Expand Down Expand Up @@ -334,30 +328,47 @@ mod tests {
}

#[tokio::test]
async fn test_scan_metastore() {
async fn test_scan_metastore_query_shape_and_passthrough() {
let index_uid = IndexUid::for_test("test-index", 0);
let splits = vec![
test_split("split-1", &index_uid, 1000),
test_split("split-2", &index_uid, 2000),
let returned_splits = vec![
test_split("a", &index_uid, 1000),
test_split("b", &index_uid, 2000),
];
let splits_clone = splits.clone();
let returned_clone = returned_splits.clone();
let scan_started_at = OffsetDateTime::now_utc().unix_timestamp();

let mut mock = MockMetastoreService::new();
mock.expect_list_splits().returning(move |_| {
let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap();
mock.expect_list_splits().returning(move |req| {
let query = req.deserialize_list_splits_query().unwrap();

assert_eq!(query.split_states, vec![SplitState::Published]);
assert_eq!(query.limit, Some(SCAN_PAGE_SIZE));
assert_eq!(query.sort_by, SortBy::MaturityTimestamp);

let Bound::Excluded(mature_at) = query.mature else {
panic!("expected Excluded mature bound, got {:?}", query.mature);
};
let now_secs = OffsetDateTime::now_utc().unix_timestamp();
assert!(mature_at.unix_timestamp() >= scan_started_at);
assert!(mature_at.unix_timestamp() <= now_secs);

assert_eq!(query.update_timestamp.start, Bound::Unbounded);
assert_eq!(query.update_timestamp.end, Bound::Unbounded);

let response = ListSplitsResponse::try_from_splits(returned_clone.clone()).unwrap();
Ok(ServiceStream::from(vec![Ok(response)]))
});

let planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock));
let result = planner.scan_metastore().await.unwrap();

assert_eq!(result.len(), 2);
assert_eq!(result[0].split_metadata.split_id, "split-1");
assert_eq!(result[1].split_metadata.split_id, "split-2");
assert_eq!(result[0].split_metadata.split_id, "a");
assert_eq!(result[1].split_metadata.split_id, "b");
}

#[tokio::test]
async fn test_ingest_splits_dedup_maturity_and_cursor() {
async fn test_ingest_splits_dedups_and_skips_mature() {
let index_metadata = test_index_metadata();
let response = test_index_metadata_response(&index_metadata);
let index_uid = index_metadata.index_uid.clone();
Expand All @@ -367,7 +378,6 @@ mod tests {
.returning(move |_| Ok(response.clone()));

let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock));
planner.cursor = 0;

// Pre-populate: "in-flight" is already being compacted.
planner.state.track_split(SplitMetadata {
Expand All @@ -390,11 +400,10 @@ mod tests {
assert!(planner.state.is_split_tracked("fresh"));
assert!(planner.state.is_split_tracked("in-flight"));
assert!(!planner.state.is_split_tracked("mature"));
assert_eq!(planner.cursor, 3000);
}

#[tokio::test]
async fn test_scan_and_plan_metastore_error() {
async fn test_scan_and_plan_propagates_metastore_error() {
let mut mock = MockMetastoreService::new();
mock.expect_list_splits().returning(|_| {
Err(MetastoreError::Internal {
Expand All @@ -404,11 +413,7 @@ mod tests {
});

let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock));
let original_cursor = planner.cursor;

let result = planner.scan_and_plan().await;
assert!(result.is_err());
assert_eq!(planner.cursor, original_cursor);
assert!(planner.scan_and_plan().await.is_err());
}

#[tokio::test]
Expand All @@ -425,7 +430,6 @@ mod tests {
});

let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock));
planner.cursor = 0;
planner.ingest_splits(splits).await;

assert!(!planner.state.is_split_tracked("orphan"));
Expand All @@ -452,12 +456,60 @@ mod tests {
.returning(move |_| Ok(index_metadata_response.clone()));

let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock));
planner.cursor = 0;
planner.scan_and_plan().await.unwrap();

assert!(planner.state.is_split_tracked("s1"));
assert!(planner.state.is_split_tracked("s2"));
assert_eq!(planner.cursor, 6000);
}

#[tokio::test]
async fn test_failed_task_is_retracked_on_next_scan() {
// After a worker reports failure (or times out), planner-local
// tracking is cleared. Because there is no cursor, the next scan
// rediscovers the still-Published, still-immature splits and
// re-tracks them.
let index_metadata = test_index_metadata_with_merge_factor_2();
let index_metadata_response = test_index_metadata_response(&index_metadata);
let index_uid = index_metadata.index_uid.clone();

let splits = vec![
test_split("s1", &index_uid, 1000),
test_split("s2", &index_uid, 2000),
];
let splits_clone = splits.clone();

let mut mock = MockMetastoreService::new();
mock.expect_list_splits().returning(move |_| {
let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap();
Ok(ServiceStream::from(vec![Ok(response)]))
});
mock.expect_index_metadata()
.returning(move |_| Ok(index_metadata_response.clone()));

let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock));
let node_id = NodeId::from("worker-1");

planner.scan_and_plan().await.unwrap();
let assignments = planner.assign_tasks(&node_id, 10);
assert_eq!(assignments.len(), 1);
let task_id = assignments[0].task_id.clone();
assert!(planner.state.is_split_tracked("s1"));
assert!(planner.state.is_split_tracked("s2"));

// Worker reports failure; planner forgets the splits.
planner
.state
.process_failures(&[quickwit_proto::compaction::CompactionFailure {
task_id,
error_message: "boom".to_string(),
}]);
assert!(!planner.state.is_split_tracked("s1"));
assert!(!planner.state.is_split_tracked("s2"));

// Next scan rediscovers them and re-tracks them.
planner.scan_and_plan().await.unwrap();
assert!(planner.state.is_split_tracked("s1"));
assert!(planner.state.is_split_tracked("s2"));
}

/// Helper: creates a planner with merge_factor=2, ingests the given splits,
Expand All @@ -477,7 +529,6 @@ mod tests {
});

let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock));
planner.cursor = 0;

let splits: Vec<Split> = split_ids
.iter()
Expand Down
Loading
Loading