diff --git a/common/src/ctx.rs b/common/src/ctx.rs index 090872d6..ff383852 100644 --- a/common/src/ctx.rs +++ b/common/src/ctx.rs @@ -27,9 +27,18 @@ const DEFAULT_FLAME_CONF: &str = "flame-cluster.yaml"; const DEFAULT_CONTEXT_NAME: &str = "flame"; const DEFAULT_FLAME_ENDPOINT: &str = "http://127.0.0.1:8080"; /// Default policies to enable when none specified in config. -/// Available configurable policies: "priority", "drf", "gang" -/// Note: "shim" plugin is always enabled (required for executor matching) -pub const DEFAULT_POLICIES: &[&str] = &["priority", "drf", "gang"]; +/// +/// Effective default scheduler stack: `priority + gang + shim` +/// - "gang" is listed here so batch-size semantics and allocation guards are active +/// by default. Users who do not need gang scheduling may omit it; scheduling +/// will still work but allocates one executor per cycle with no batch constraint. +/// - "shim" is always-on (non-configurable) and runs regardless of this list. +/// +/// Optional configurable policies (add to cluster.policies in your config to enable): +/// - "drf" — Dominant Resource Fairness; enables fair multi-resource sharing across sessions. +/// +/// Note: listing "shim" in the config is harmless — a warning is logged and it is ignored. +pub const DEFAULT_POLICIES: &[&str] = &["priority", "gang"]; const DEFAULT_STORAGE: &str = "sqlite://flame.db"; const DEFAULT_MAX_EXECUTORS_PER_NODE: u32 = 128; pub const DEFAULT_SESSION_RETRY_LIMITS: u32 = 5; diff --git a/executor_manager/src/shims/grpc_shim.rs b/executor_manager/src/shims/grpc_shim.rs index f7334692..b27aed59 100644 --- a/executor_manager/src/shims/grpc_shim.rs +++ b/executor_manager/src/shims/grpc_shim.rs @@ -202,15 +202,13 @@ impl Future for WaitForSvcSocketFuture { #[cfg(test)] mod tests { + use super::super::SHIMS_TEST_LOCK; use super::*; use common::apis::{ApplicationContext, Shim as ShimType}; use std::collections::HashMap; use std::path::PathBuf; - use std::sync::Mutex; use tempfile::tempdir; - static TEST_LOCK: Mutex<()> = Mutex::new(()); - fn setup_test_env(temp: &tempfile::TempDir) -> PathBuf { let socket_dir = temp.path().join("sockets"); std::fs::create_dir_all(&socket_dir).unwrap(); @@ -237,7 +235,7 @@ mod tests { #[test] fn test_grpc_shim_new() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); let work_dir = create_test_work_dir("exec-grpc-test", &temp); @@ -251,7 +249,7 @@ mod tests { #[test] fn test_grpc_shim_endpoint() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); let work_dir = create_test_work_dir("exec-endpoint-test", &temp); @@ -263,7 +261,7 @@ mod tests { #[test] fn test_grpc_shim_close_without_connection() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); let work_dir = create_test_work_dir("exec-close-test", &temp); @@ -285,7 +283,7 @@ mod tests { #[tokio::test] #[allow(clippy::await_holding_lock)] async fn test_on_session_enter_without_connection() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); let work_dir = create_test_work_dir("exec-session-test", &temp); @@ -318,7 +316,7 @@ mod tests { #[tokio::test] #[allow(clippy::await_holding_lock)] async fn test_on_task_invoke_without_connection() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); let work_dir = create_test_work_dir("exec-task-test", &temp); @@ -341,7 +339,7 @@ mod tests { #[tokio::test] #[allow(clippy::await_holding_lock)] async fn test_on_session_leave_without_connection() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); let work_dir = create_test_work_dir("exec-leave-test", &temp); diff --git a/executor_manager/src/shims/mod.rs b/executor_manager/src/shims/mod.rs index 9938bf2a..b77efef0 100644 --- a/executor_manager/src/shims/mod.rs +++ b/executor_manager/src/shims/mod.rs @@ -225,16 +225,24 @@ pub trait Shim: Send + 'static { async fn on_session_leave(&mut self) -> Result<(), FlameError>; } +/// Single process-wide lock shared by all shims submodule tests. +/// +/// `shims::tests` and `shims::grpc_shim::tests` both mutate global process state +/// (`FLAME_SOCKET_DIR` env var, current directory). Using a single lock prevents +/// those tests from racing each other when the test binary runs them in parallel. +/// +/// Use `lock().unwrap_or_else(|e| e.into_inner())` so that a panic in one test +/// does not permanently poison the lock and block all subsequent tests. +#[cfg(test)] +pub(crate) static SHIMS_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + #[cfg(test)] mod tests { use super::*; use std::collections::HashMap; use std::fs::File; - use std::sync::Mutex; use tempfile::tempdir; - static TEST_LOCK: Mutex<()> = Mutex::new(()); - fn create_test_app(name: &str, working_directory: Option) -> ApplicationContext { ApplicationContext { name: name.to_string(), @@ -259,7 +267,7 @@ mod tests { #[test] fn test_executor_work_dir_with_auto_dir() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); let socket_dir = setup_test_env(&temp); @@ -280,7 +288,7 @@ mod tests { #[test] fn test_executor_work_dir_with_custom_working_directory() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); let socket_dir = setup_test_env(&temp); let custom_dir = temp.path().join("custom-workdir"); @@ -301,7 +309,7 @@ mod tests { #[test] fn test_executor_work_dir_socket_path_length() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); @@ -321,7 +329,7 @@ mod tests { #[test] fn test_executor_work_dir_cleanup_on_drop() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); @@ -355,7 +363,7 @@ mod tests { #[test] fn test_executor_work_dir_no_cleanup_custom_dir_on_drop() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); setup_test_env(&temp); let custom_dir = temp.path().join("persistent-workdir"); @@ -382,7 +390,7 @@ mod tests { #[test] fn test_socket_path_is_fixed_location() { - let _guard = TEST_LOCK.lock().unwrap(); + let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let temp = tempdir().unwrap(); let socket_dir = setup_test_env(&temp); diff --git a/flmadm/src/managers/config.rs b/flmadm/src/managers/config.rs index f3699149..95a98f08 100644 --- a/flmadm/src/managers/config.rs +++ b/flmadm/src/managers/config.rs @@ -44,7 +44,6 @@ cluster: resreq: "cpu=1,mem=2g" policies: - priority - - drf - gang storage: "fs://{prefix}/data" executors: diff --git a/session_manager/src/scheduler/actions/allocate.rs b/session_manager/src/scheduler/actions/allocate.rs index a0375ad8..bdc620b9 100644 --- a/session_manager/src/scheduler/actions/allocate.rs +++ b/session_manager/src/scheduler/actions/allocate.rs @@ -118,8 +118,12 @@ impl Action for AllocateAction { // (see `Context`): Gang's counters include binds Dispatch committed via // `Statement` in this same `Context`. If those already satisfy gang scheduling, do not // create more executors here. - let fulfilled = ctx.is_fulfilled(&ssn)?; - let ready = ctx.is_ready(&ssn)?; + // + // `is_ready` / `is_fulfilled` return true once the batch is satisfied (gang) or + // after the first op this cycle (no gang). Skip only when one of them is already + // true before we begin. + let fulfilled = ctx.is_fulfilled(&ssn); + let ready = ctx.is_ready(&ssn); if fulfilled || ready { tracing::debug!( "Skip allocate resources for session <{}>: is_fulfilled={}, is_ready={}", @@ -132,28 +136,30 @@ impl Action for AllocateAction { let mut stmt = Statement::new(ss.clone(), ctx.plugins.clone(), ctx.controller.clone()); - let pipelineable = void_executors + let pipelineable: Vec<_> = void_executors .values() .chain(unbinding_executors.values()) - .filter(|e| ctx.is_available(e, &ssn).unwrap_or(false)); + .filter(|e| ctx.is_available(e, &ssn).unwrap_or(false)) + .cloned() + .collect(); - for exec in pipelineable { + for exec in &pipelineable { stmt.pipeline(exec, &ssn)?; - if ctx.is_ready(&ssn)? { + if ctx.is_ready(&ssn) { break; } } for node in nodes.iter() { - if ctx.is_ready(&ssn)? { + if ctx.is_ready(&ssn) { break; } - while ctx.is_allocatable(node, &ssn)? && !ctx.is_ready(&ssn)? { + while ctx.is_allocatable(node, &ssn)? && !ctx.is_ready(&ssn) { stmt.allocate(node, &ssn)?; } } - if ctx.is_ready(&ssn)? { + if ctx.is_ready(&ssn) { let op_count = stmt.len(); let pipelined_ids = stmt.commit().await?; for id in &pipelined_ids { diff --git a/session_manager/src/scheduler/actions/dispatch.rs b/session_manager/src/scheduler/actions/dispatch.rs index e7b11f60..91d3b918 100644 --- a/session_manager/src/scheduler/actions/dispatch.rs +++ b/session_manager/src/scheduler/actions/dispatch.rs @@ -65,6 +65,21 @@ impl Action for DispatchAction { continue; } + // Skip sessions that already have enough executors committed (or in-flight from a + // previous cycle). Without this guard a session whose executor is still running + // `on_session_enter` (Binding state, counted in Gang's `allocated`) would receive + // additional bindings every scheduling cycle. + // + // `is_fulfilled` returns true once the batch is satisfied (gang) or after the + // first bind this cycle (no gang), so this guard works uniformly for both cases. + if ctx.is_fulfilled(&ssn) { + tracing::debug!( + "Session <{}> is already fulfilled (existing executors satisfy batch_size), skip dispatch.", + ssn.id + ); + continue; + } + tracing::debug!( "Session <{}> is underused, start to allocate resources.", &ssn.id @@ -75,13 +90,13 @@ impl Action for DispatchAction { for (_, exec) in idle_executors.iter() { if ctx.is_available(exec, &ssn)? { stmt.bind(exec, &ssn)?; - if ctx.is_fulfilled(&ssn)? { + if ctx.is_fulfilled(&ssn) { break; } } } - if ctx.is_fulfilled(&ssn)? { + if ctx.is_fulfilled(&ssn) && !stmt.is_empty() { tracing::debug!("Bind executor for session <{}>.", ssn.id); let bound_ids = stmt.commit().await?; for id in &bound_ids { @@ -97,7 +112,7 @@ impl Action for DispatchAction { continue; } else if !stmt.is_empty() { tracing::debug!( - "Discarding unfulfilled binding for session <{}>: no available idle executors", + "Discarding unfulfilled binding for session <{}>: gang batch incomplete", ssn.id ); stmt.discard()?; diff --git a/session_manager/src/scheduler/ctx.rs b/session_manager/src/scheduler/ctx.rs index a91dd563..960bc480 100644 --- a/session_manager/src/scheduler/ctx.rs +++ b/session_manager/src/scheduler/ctx.rs @@ -77,13 +77,21 @@ impl Context { /// Allocation-side batch readiness (e.g. Gang: pipelined + allocated executors form full /// batches). Reflects in-memory plugin state, including `Statement` ops earlier in this /// same cycle (typically pipeline/allocate before commit). - pub fn is_ready(&self, ssn: &SessionInfoPtr) -> Result { + /// + /// Returns `true` when the batch is complete (gang) or after the first pipeline/allocate + /// op in a no-gang cycle. Returns `false` before any op has been attempted, or when gang + /// reports the batch is still incomplete. + pub fn is_ready(&self, ssn: &SessionInfoPtr) -> bool { self.plugins.is_ready(ssn) } /// Binding-side batch readiness (e.g. Gang: bound + on-session executors form full batches). - /// After Dispatch commits binds, this can be true so Allocate skips provisioning. - pub fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> Result { + /// After Dispatch commits binds, this can be `true` so Allocate skips provisioning. + /// + /// Returns `true` when the batch is fulfilled (gang) or after the first bind op in a + /// no-gang cycle. Returns `false` before any bind has been attempted, or when gang + /// reports the batch is still incomplete. + pub fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> bool { self.plugins.is_fulfilled(ssn) } diff --git a/session_manager/src/scheduler/mod.rs b/session_manager/src/scheduler/mod.rs index a10e7e18..52f6760d 100644 --- a/session_manager/src/scheduler/mod.rs +++ b/session_manager/src/scheduler/mod.rs @@ -175,7 +175,13 @@ mod tests { } } - /// Test the allocation of void executors to underused sessions. + /// With the default policy (priority + gang, batch_size=1), AllocateAction must create + /// exactly `task_num` executors — one per pending task — in the first scheduling cycle, + /// and must NOT create additional executors in subsequent cycles. + /// + /// The gang plugin ensures `needed = (task_num / 1) * 1 = task_num`. Once all executors + /// are in snapshot (`allocated == task_num`), `is_ready = Some(true)` causes the pre-check + /// to skip the session, so the executor count stays stable across all subsequent cycles. #[test] fn test_allocate_executors() -> Result<(), FlameError> { let env = TestEnv::new()?; @@ -244,12 +250,87 @@ mod tests { assert_eq!(node_list.values().next().unwrap().name, "node_1"); let exec_list = controller.list_executor()?; - assert_eq!(exec_list.len(), 1); + assert_eq!( + exec_list.len(), + task_num as usize, + "cycle {i}: expected {task_num} executors (one per task), got {}", + exec_list.len() + ); } Ok(()) } + /// Regression test: when only the DRF plugin is enabled (without Gang), tasks must not stay + /// pending forever. + /// + /// Previously, `is_fulfilled` and `is_ready` both defaulted to `true` when no plugin + /// implemented them — `AllocateAction`'s pre-check `if fulfilled || ready { continue }` + /// caused it to skip every session before any executor was created. + /// + /// The fix changes `is_ready`/`is_fulfilled` to return `Option`. When Gang is not + /// loaded, both return `None` (no opinion). The pre-check only skips on `Some(true)`; + /// `None` is treated as "no batch constraint — 1 executor per cycle is enough". + #[test] + fn test_drf_only_policy_allocates_executor() -> Result<(), FlameError> { + let env = TestEnv::new()?; + let controller = env.controller.clone(); + + tokio_test::block_on( + controller.register_application("flmtest".to_string(), new_test_application()), + )?; + tokio_test::block_on( + controller + .storage() + .register_node(&new_test_node("node_1".to_string())), + )?; + + let ssn_id = format!("drf-only-{}", Utc::now().timestamp_nanos_opt().unwrap_or(0)); + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_id.clone(), + application: "flmtest".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: Some(common::apis::ResourceRequirement { + cpu: 1, + memory: 1024 * 1024 * 1024, + gpu: 0, + }), + }))?; + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + + // Use DRF-only policy (no explicit "gang" — Gang is now always-on internally). + let drf_only: Vec = vec!["drf".to_string()]; + + let snapshot = controller.snapshot()?; + let plugins = PluginManager::setup(&snapshot, &drf_only)?; + let mut ctx = Context { + snapshot: snapshot.clone(), + controller: controller.clone(), + plugins, + actions: vec![], + }; + + let dispatch = DispatchAction::new_ptr(); + tokio_test::block_on(dispatch.execute(&mut ctx))?; + + let alloc = AllocateAction::new_ptr(); + tokio_test::block_on(alloc.execute(&mut ctx))?; + + // An executor must have been created; without the fix it would be 0. + let exec_list = controller.list_executor()?; + assert_eq!( + exec_list.len(), + 1, + "DRF-only policy must create an executor for a pending task" + ); + + Ok(()) + } + /// One scheduling cycle must keep the same in-memory [`crate::scheduler::plugins::PluginManager`] /// so Gang (and similar) state from Dispatch is visible to Allocate. #[test] @@ -339,4 +420,327 @@ mod tests { Ok(()) } + + /// Regression: with only the priority plugin (no gang, no DRF), a session with a pending + /// task must still get an executor allocated. + /// + /// Previously, without gang, `is_ready` defaulted to `true` (unwrap_or) so AllocateAction's + /// pre-check always skipped every session. The fix changes `is_ready` / `is_fulfilled` to + /// return `Option`; `None` (no gang opinion) is no longer treated as "already ready". + #[test] + fn test_priority_only_policy_allocates_executor() -> Result<(), FlameError> { + let env = TestEnv::new()?; + let controller = env.controller.clone(); + + tokio_test::block_on( + controller.register_application("flmtest".to_string(), new_test_application()), + )?; + tokio_test::block_on( + controller + .storage() + .register_node(&new_test_node("node_1".to_string())), + )?; + + let ssn_id = format!("priority-only-{}", Uuid::new_v4()); + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_id.clone(), + application: "flmtest".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: Some(common::apis::ResourceRequirement { + cpu: 1, + memory: 1024 * 1024 * 1024, + gpu: 0, + }), + }))?; + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + + let priority_only: Vec = vec!["priority".to_string()]; + let snapshot = controller.snapshot()?; + let plugins = PluginManager::setup(&snapshot, &priority_only)?; + let mut ctx = Context { + snapshot: snapshot.clone(), + controller: controller.clone(), + plugins, + actions: vec![], + }; + + let dispatch = DispatchAction::new_ptr(); + tokio_test::block_on(dispatch.execute(&mut ctx))?; + + let alloc = AllocateAction::new_ptr(); + tokio_test::block_on(alloc.execute(&mut ctx))?; + + let exec_list = controller.list_executor()?; + assert_eq!( + exec_list.len(), + 1, + "priority-only policy must create exactly 1 executor for a pending task" + ); + Ok(()) + } + + /// Regression: with gang plugin, batch_size=1, and multiple pending tasks, AllocateAction + /// must create one executor per pending task — not stop at the first one. + /// + /// Root cause: the old `is_ready` formula returned `Some(true)` as soon as `allocated=1` + /// (because `1 % 1 == 0`), so the pre-check in AllocateAction skipped the session for all + /// subsequent cycles, leaving tasks 2…N without executors forever. + /// + /// With the fixed formula (`needed = (incomplete_tasks / batch_size) * batch_size`), a + /// session with 3 pending tasks has `needed=3`. The pre-check fires only when `total==3`, + /// so AllocateAction creates all 3 executors in a single scheduling cycle. + #[test] + fn test_gang_batch_size_1_allocates_for_each_task() -> Result<(), FlameError> { + let env = TestEnv::new()?; + let controller = env.controller.clone(); + + tokio_test::block_on( + controller.register_application("flmtest".to_string(), new_test_application()), + )?; + tokio_test::block_on( + controller + .storage() + .register_node(&new_test_node("node_1".to_string())), + )?; + + let ssn_id = format!("gang-batch1-multi-{}", Uuid::new_v4()); + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_id.clone(), + application: "flmtest".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: Some(common::apis::ResourceRequirement { + cpu: 1, + memory: 1024 * 1024 * 1024, + gpu: 0, + }), + }))?; + // 3 pending tasks → gang plugin needs 3 executors (needed = (3/1)*1 = 3) + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + + let gang_policies: Vec = vec!["priority".to_string(), "gang".to_string()]; + let snapshot = controller.snapshot()?; + let plugins = PluginManager::setup(&snapshot, &gang_policies)?; + let mut ctx = Context { + snapshot: snapshot.clone(), + controller: controller.clone(), + plugins, + actions: vec![], + }; + + let dispatch = DispatchAction::new_ptr(); + tokio_test::block_on(dispatch.execute(&mut ctx))?; + + let alloc = AllocateAction::new_ptr(); + tokio_test::block_on(alloc.execute(&mut ctx))?; + + let exec_list = controller.list_executor()?; + assert_eq!( + exec_list.len(), + 3, + "gang batch_size=1 with 3 pending tasks must create 3 executors in one cycle (got {})", + exec_list.len() + ); + Ok(()) + } + + /// With the gang plugin and batch_size=2, AllocateAction must create exactly 2 executors + /// in a single cycle (the full gang batch), not 1 and not more than 2. + #[test] + fn test_gang_batch_size_2_creates_two_executors_per_cycle() -> Result<(), FlameError> { + let env = TestEnv::new()?; + let controller = env.controller.clone(); + + tokio_test::block_on( + controller.register_application("flmtest".to_string(), new_test_application()), + )?; + tokio_test::block_on( + controller + .storage() + .register_node(&new_test_node("node_1".to_string())), + )?; + + let ssn_id = format!("gang-batch2-{}", Uuid::new_v4()); + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_id.clone(), + application: "flmtest".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 2, + priority: 0, + resreq: Some(common::apis::ResourceRequirement { + cpu: 1, + memory: 1024 * 1024 * 1024, + gpu: 0, + }), + }))?; + // Create 2 pending tasks so the session is considered underused + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + + // Both priority (for is_underused) and gang (for batch scheduling) are required. + let gang_policies: Vec = vec!["priority".to_string(), "gang".to_string()]; + let snapshot = controller.snapshot()?; + let plugins = PluginManager::setup(&snapshot, &gang_policies)?; + let mut ctx = Context { + snapshot: snapshot.clone(), + controller: controller.clone(), + plugins, + actions: vec![], + }; + + let dispatch = DispatchAction::new_ptr(); + tokio_test::block_on(dispatch.execute(&mut ctx))?; + + let alloc = AllocateAction::new_ptr(); + tokio_test::block_on(alloc.execute(&mut ctx))?; + + let exec_list = controller.list_executor()?; + assert_eq!( + exec_list.len(), + 2, + "gang batch_size=2 must create exactly 2 executors in one scheduling cycle" + ); + Ok(()) + } + + /// Without gang, AllocateAction must allocate at most 1 executor per cycle per session, + /// even when the node has plenty of capacity and there are many pending tasks. + /// + /// This guards against the "fill the node" regression that would occur if is_ready=None + /// were treated as "keep allocating". + #[test] + fn test_no_gang_allocates_at_most_one_executor_per_cycle() -> Result<(), FlameError> { + let env = TestEnv::new()?; + let controller = env.controller.clone(); + + tokio_test::block_on( + controller.register_application("flmtest".to_string(), new_test_application()), + )?; + tokio_test::block_on( + controller + .storage() + .register_node(&new_test_node("node_1".to_string())), + )?; + + let ssn_id = format!("no-gang-cap-{}", Uuid::new_v4()); + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_id.clone(), + application: "flmtest".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: Some(common::apis::ResourceRequirement { + cpu: 1, + memory: 1024 * 1024 * 1024, + gpu: 0, + }), + }))?; + // 5 pending tasks — node has 64 CPUs so without a cap we'd create 5+ executors + for _ in 0..5 { + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + } + + // Priority-only: no gang, no DRF + let priority_only: Vec = vec!["priority".to_string()]; + let snapshot = controller.snapshot()?; + let plugins = PluginManager::setup(&snapshot, &priority_only)?; + let mut ctx = Context { + snapshot: snapshot.clone(), + controller: controller.clone(), + plugins, + actions: vec![], + }; + + let alloc = AllocateAction::new_ptr(); + tokio_test::block_on(alloc.execute(&mut ctx))?; + + let exec_list = controller.list_executor()?; + assert_eq!( + exec_list.len(), + 1, + "without gang, AllocateAction must create at most 1 executor per cycle (got {})", + exec_list.len() + ); + Ok(()) + } + + /// DispatchAction must bind an idle executor to a session even when no gang plugin is + /// loaded (is_fulfilled returns None). + /// + /// Previously, when is_fulfilled defaulted to true (no gang), DispatchAction's pre-check + /// `if is_fulfilled { skip }` would skip every session. Now the pre-check only skips on + /// `Some(true)`; `None` proceeds with binding. + #[test] + fn test_dispatch_without_gang_binds_idle_executor() -> Result<(), FlameError> { + let env = TestEnv::new()?; + let controller = env.controller.clone(); + + tokio_test::block_on( + controller.register_application("flmtest".to_string(), new_test_application()), + )?; + tokio_test::block_on( + controller + .storage() + .register_node(&new_test_node("node_1".to_string())), + )?; + + let ssn_id = format!("dispatch-no-gang-{}", Uuid::new_v4()); + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_id.clone(), + application: "flmtest".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: Some(common::apis::ResourceRequirement { + cpu: 1, + memory: 1024 * 1024 * 1024, + gpu: 0, + }), + }))?; + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + + // Pre-create an idle executor bound to the session (simulate what AllocateAction would do) + let executor = + tokio_test::block_on(controller.create_executor("node_1".to_string(), ssn_id.clone()))?; + tokio_test::block_on(controller.register_executor(&executor))?; + + // Priority-only (no gang) + let priority_only: Vec = vec!["priority".to_string()]; + let snapshot = controller.snapshot()?; + let plugins = PluginManager::setup(&snapshot, &priority_only)?; + let mut ctx = Context { + snapshot: snapshot.clone(), + controller: controller.clone(), + plugins, + actions: vec![], + }; + + let dispatch = DispatchAction::new_ptr(); + tokio_test::block_on(dispatch.execute(&mut ctx))?; + + // The executor must have transitioned from Idle → Binding + let executors = controller.list_executor()?; + assert_eq!(executors.len(), 1); + assert_eq!( + executors[0].state, + common::apis::ExecutorState::Binding, + "DispatchAction must bind the idle executor even without gang plugin" + ); + Ok(()) + } } diff --git a/session_manager/src/scheduler/plugins/gang.rs b/session_manager/src/scheduler/plugins/gang.rs index 51eec191..abb309b5 100644 --- a/session_manager/src/scheduler/plugins/gang.rs +++ b/session_manager/src/scheduler/plugins/gang.rs @@ -13,7 +13,7 @@ limitations under the License. use std::collections::HashMap; -use common::apis::SessionID; +use common::apis::{SessionID, TaskState}; use common::FlameError; use crate::model::{ExecutorInfoPtr, NodeInfoPtr, SessionInfoPtr, SnapShot}; @@ -21,6 +21,10 @@ use crate::scheduler::plugins::{Plugin, PluginPtr}; struct GangState { batch_size: u32, + /// Number of incomplete (Pending + Running) tasks for this session, sampled once + /// per scheduling cycle in `setup()`. Used to compute the allocation threshold: + /// `needed = div_ceil(incomplete_tasks, batch_size) * batch_size`. + incomplete_tasks: u32, allocated: u32, pipelined: u32, bound: u32, @@ -53,10 +57,19 @@ impl Plugin for GangPlugin { .map_err(|e| FlameError::Internal(format!("failed to lock sessions: {}", e)))?; for ssn in sessions.values() { + // Count tasks that have not yet completed (Pending + Running). + let mut incomplete_tasks: u32 = 0; + for state in [TaskState::Pending, TaskState::Running] { + if let Some(c) = ssn.tasks_status.get(&state) { + incomplete_tasks = incomplete_tasks.saturating_add((*c).max(0) as u32); + } + } + self.ssn_state.insert( ssn.id.clone(), GangState { batch_size: ssn.batch_size.max(1), + incomplete_tasks, allocated: 0, pipelined: 0, bound: 0, @@ -77,16 +90,47 @@ impl Plugin for GangPlugin { Ok(()) } - fn is_ready(&self, ssn: &SessionInfoPtr) -> Option { - let state = self.ssn_state.get(&ssn.id)?; + /// Returns `true` when the cumulative executor count meets the allocation demand + /// for this session's incomplete tasks. + /// + /// ```text + /// needed = div_ceil(incomplete_tasks, batch_size) * batch_size + /// total = allocated + pipelined + /// ready = needed == 0 || total >= needed + /// ``` + /// + /// `needed` is the smallest multiple of `batch_size` ≥ `incomplete_tasks` (Pending + + /// Running), sampled once per cycle in `setup()`. Semantics: + /// + /// - `batch_size=1`, 5 tasks: `needed=5`. AllocateAction allocates until 5 executors + /// exist, saturating all task slots in one scheduling cycle. + /// - `batch_size=2`, 5 tasks: `needed=6` (ceil(5/2)×2). 6 executors created (3 gangs); + /// covers all 5 tasks with one executor left unbound until a new task arrives. + /// - Duplicate-prevention: `batch_size=1`, 1 task, 1 Binding executor in snapshot → + /// `needed=1, total=1+0=1 → true` → pre-check skips → no second executor. + fn is_ready(&self, ssn: &SessionInfoPtr) -> bool { + let Some(state) = self.ssn_state.get(&ssn.id) else { + return false; + }; + let needed = state.incomplete_tasks.div_ceil(state.batch_size) * state.batch_size; let total = state.allocated + state.pipelined; - Some(state.pipelined > 0 && total % state.batch_size == 0) + needed == 0 || (needed > 0 && total >= needed) } - fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> Option { - let state = self.ssn_state.get(&ssn.id)?; + /// Mirrors `is_ready` for the Dispatch path (bind instead of pipeline/allocate). + /// + /// ```text + /// needed = div_ceil(incomplete_tasks, batch_size) * batch_size + /// total = allocated + bound + /// ready = needed == 0 || total >= needed + /// ``` + fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> bool { + let Some(state) = self.ssn_state.get(&ssn.id) else { + return false; + }; + let needed = state.incomplete_tasks.div_ceil(state.batch_size) * state.batch_size; let total = state.allocated + state.bound; - Some(state.bound > 0 && total % state.batch_size == 0) + needed == 0 || (needed > 0 && total >= needed) } fn on_executor_allocate(&mut self, _node: NodeInfoPtr, ssn: SessionInfoPtr) { @@ -134,11 +178,18 @@ mod tests { use common::apis::{ExecutorState, ResourceRequirement, SessionState, Shim, TaskState}; use std::sync::Arc; + /// Build a test session with `batch_size.max(1)` pending tasks so that + /// `needed = (pending / batch_size) * batch_size == batch_size` for any + /// batch_size — i.e. exactly 1 complete batch is always the expected demand. fn create_test_session(id: &str, batch_size: u32) -> SessionInfoPtr { + create_test_session_with_pending(id, batch_size, batch_size.max(1) as i32) + } + + fn create_test_session_with_pending(id: &str, batch_size: u32, pending: i32) -> SessionInfoPtr { Arc::new(SessionInfo { id: id.to_string(), application: "test-app".to_string(), - tasks_status: HashMap::from([(TaskState::Pending, 1)]), + tasks_status: HashMap::from([(TaskState::Pending, pending)]), creation_time: Utc::now(), completion_time: None, state: SessionState::Open, @@ -196,12 +247,12 @@ mod tests { }; plugin.setup(&ss).unwrap(); - assert!(!plugin.is_fulfilled(&ssn).unwrap()); + assert!(!plugin.is_fulfilled(&ssn)); let node = create_test_node("node-1"); plugin.on_session_bind(ssn.clone()); - assert!(plugin.is_fulfilled(&ssn).unwrap()); + assert!(plugin.is_fulfilled(&ssn)); } #[test] @@ -216,16 +267,16 @@ mod tests { }; plugin.setup(&ss).unwrap(); - assert!(!plugin.is_fulfilled(&ssn).unwrap()); + assert!(!plugin.is_fulfilled(&ssn)); let node = create_test_node("node-1"); plugin.on_session_bind(ssn.clone()); - assert!(!plugin.is_fulfilled(&ssn).unwrap()); + assert!(!plugin.is_fulfilled(&ssn)); plugin.on_session_bind(ssn.clone()); - assert!(plugin.is_fulfilled(&ssn).unwrap()); + assert!(plugin.is_fulfilled(&ssn)); } #[test] @@ -243,12 +294,12 @@ mod tests { }; plugin.setup(&ss).unwrap(); - assert!(!plugin.is_fulfilled(&ssn).unwrap()); + assert!(!plugin.is_fulfilled(&ssn)); let node = create_test_node("node-1"); plugin.on_session_bind(ssn.clone()); - assert!(plugin.is_fulfilled(&ssn).unwrap()); + assert!(plugin.is_fulfilled(&ssn)); } #[test] @@ -263,12 +314,12 @@ mod tests { }; plugin.setup(&ss).unwrap(); - assert!(!plugin.is_ready(&ssn).unwrap()); + assert!(!plugin.is_ready(&ssn)); let node = create_test_node("node-1"); plugin.on_executor_allocate(node, ssn.clone()); - assert!(plugin.is_ready(&ssn).unwrap()); + assert!(plugin.is_ready(&ssn)); } #[test] @@ -283,16 +334,16 @@ mod tests { }; plugin.setup(&ss).unwrap(); - assert!(!plugin.is_ready(&ssn).unwrap()); + assert!(!plugin.is_ready(&ssn)); let node = create_test_node("node-1"); plugin.on_executor_allocate(node.clone(), ssn.clone()); - assert!(!plugin.is_ready(&ssn).unwrap()); + assert!(!plugin.is_ready(&ssn)); plugin.on_executor_allocate(node, ssn.clone()); - assert!(plugin.is_ready(&ssn).unwrap()); + assert!(plugin.is_ready(&ssn)); } #[test] @@ -310,12 +361,12 @@ mod tests { }; plugin.setup(&ss).unwrap(); - assert!(!plugin.is_ready(&ssn).unwrap()); + assert!(!plugin.is_ready(&ssn)); let node = create_test_node("node-1"); plugin.on_executor_allocate(node, ssn.clone()); - assert!(plugin.is_ready(&ssn).unwrap()); + assert!(plugin.is_ready(&ssn)); } #[test] @@ -333,19 +384,19 @@ mod tests { let exec = create_test_executor("exec-1", None); plugin.on_executor_pipeline(exec.clone(), ssn.clone()); - assert!(!plugin.is_ready(&ssn).unwrap()); + assert!(!plugin.is_ready(&ssn)); plugin.on_executor_pipeline(exec.clone(), ssn.clone()); - assert!(plugin.is_ready(&ssn).unwrap()); + assert!(plugin.is_ready(&ssn)); plugin.on_executor_discard(exec.clone(), ssn.clone()); - assert!(!plugin.is_ready(&ssn).unwrap()); + assert!(!plugin.is_ready(&ssn)); plugin.on_executor_discard(exec, ssn.clone()); - assert!(!plugin.is_ready(&ssn).unwrap()); + assert!(!plugin.is_ready(&ssn)); } #[test] @@ -362,18 +413,135 @@ mod tests { plugin.on_session_bind(ssn.clone()); - assert!(!plugin.is_fulfilled(&ssn).unwrap()); + assert!(!plugin.is_fulfilled(&ssn)); plugin.on_session_bind(ssn.clone()); - assert!(plugin.is_fulfilled(&ssn).unwrap()); + assert!(plugin.is_fulfilled(&ssn)); plugin.on_session_unbind(ssn.clone()); - assert!(!plugin.is_fulfilled(&ssn).unwrap()); + assert!(!plugin.is_fulfilled(&ssn)); plugin.on_session_unbind(ssn.clone()); - assert!(!plugin.is_fulfilled(&ssn).unwrap()); + assert!(!plugin.is_fulfilled(&ssn)); + } + + /// With batch_size=1 and 1 pending task, a Binding executor already in the snapshot + /// satisfies `needed=1`, so both `is_ready` and `is_fulfilled` return `Some(true)`. + /// This prevents AllocateAction / DispatchAction from creating a duplicate executor + /// while the existing one is still running `on_session_enter`. + #[test] + fn test_is_ready_and_fulfilled_with_binding_executor_batch_size_1() { + let ss = SnapShot::new(); + + // 1 pending task, batch_size=1 → needed = (1/1)*1 = 1 + let ssn = create_test_session("ssn-1", 1); + ss.add_session(ssn.clone()).unwrap(); + + // Simulate a Binding executor: ssn_id is set (the scheduler bound it). + let exec = create_test_executor("exec-1", Some("ssn-1")); + ss.add_executor(exec).unwrap(); + + let mut plugin = GangPlugin { + ssn_state: HashMap::new(), + }; + plugin.setup(&ss).unwrap(); + // incomplete_tasks=1, needed=1, allocated=1 + // is_ready: total = 1 + 0 = 1 == needed(1) → Some(true) + // is_fulfilled: total = 1 + 0 = 1 == needed(1) → Some(true) + + assert!( + plugin.is_ready(&ssn), + "is_ready must be true: 1 task, 1 snapshot executor → demand satisfied" + ); + assert!( + plugin.is_fulfilled(&ssn), + "is_fulfilled must be true: 1 task, 1 snapshot executor → demand satisfied" + ); + } + + /// With batch_size=2, a partial snapshot (1 of 2 needed executors) must not signal + /// "ready". AllocateAction must add one more executor to complete the gang. + #[test] + fn test_is_ready_partial_batch_still_allocates() { + let ss = SnapShot::new(); + + // 2 pending tasks, batch_size=2 → needed = (2/2)*2 = 2 + let ssn = create_test_session("ssn-1", 2); + ss.add_session(ssn.clone()).unwrap(); + + // Only 1 of 2 required executors is already in snapshot. + let exec = create_test_executor("exec-1", Some("ssn-1")); + ss.add_executor(exec).unwrap(); + + let mut plugin = GangPlugin { + ssn_state: HashMap::new(), + }; + plugin.setup(&ss).unwrap(); + // incomplete_tasks=2, needed=2, allocated=1 + // total = 1+0 = 1 → 1 != 2 → not ready + + assert!( + !plugin.is_ready(&ssn), + "is_ready must be false: only 1 of 2 needed executors allocated" + ); + assert!( + !plugin.is_fulfilled(&ssn), + "is_fulfilled must be false: only 1 of 2 needed executors in snapshot" + ); + + // Allocate one more → total = 1+1 = 2 == needed(2) → ready + let node = create_test_node("node-1"); + plugin.on_executor_allocate(node, ssn.clone()); + assert!( + plugin.is_ready(&ssn), + "is_ready must be true once the 2nd executor completes the gang" + ); + } + + /// Regression for the "only 1 executor ever" bug with batch_size=1 and multiple tasks. + /// + /// With 3 pending tasks and 1 snapshot executor: `needed=3`, `total=1 ≠ 3 → Some(false)`. + /// AllocateAction's pre-check does NOT fire. It then allocates executors until + /// `total == needed`, creating all 3 needed executors in one scheduling cycle. + #[test] + fn test_is_ready_batch_size_1_allocates_all_needed_executors() { + let ss = SnapShot::new(); + + // 3 pending tasks, batch_size=1 → needed = (3/1)*1 = 3 + let ssn = create_test_session_with_pending("ssn-multi", 1, 3); + ss.add_session(ssn.clone()).unwrap(); + + // 1 executor already in snapshot (e.g. Binding from previous cycle). + let exec = create_test_executor("exec-1", Some("ssn-multi")); + ss.add_executor(exec).unwrap(); + + let mut plugin = GangPlugin { + ssn_state: HashMap::new(), + }; + plugin.setup(&ss).unwrap(); + // incomplete_tasks=3, needed=3, allocated=1 → total=1 → 1 != 3 → Some(false) + + assert!( + !plugin.is_ready(&ssn), + "pre-check must not fire: need 3 executors, only 1 in snapshot" + ); + + // AllocateAction allocates executor 2 → total=1+1=2, still short. + let node = create_test_node("node-1"); + plugin.on_executor_allocate(node.clone(), ssn.clone()); + assert!( + !plugin.is_ready(&ssn), + "still not ready after 1 new allocation: total=2 ≠ needed=3" + ); + + // AllocateAction allocates executor 3 → total=1+2=3 == needed(3) → ready. + plugin.on_executor_allocate(node, ssn.clone()); + assert!( + plugin.is_ready(&ssn), + "is_ready must be true once all 3 needed executors are allocated" + ); } } diff --git a/session_manager/src/scheduler/plugins/mod.rs b/session_manager/src/scheduler/plugins/mod.rs index 3edc78ca..7e868da1 100644 --- a/session_manager/src/scheduler/plugins/mod.rs +++ b/session_manager/src/scheduler/plugins/mod.rs @@ -13,7 +13,7 @@ limitations under the License. use std::cmp::Ordering; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use stdng::collections; use stdng::{lock_ptr, new_ptr, MutexPtr}; @@ -90,12 +90,12 @@ pub trait Plugin: Send + Sync + 'static { None } - fn is_ready(&self, ssn: &SessionInfoPtr) -> Option { - None + fn is_ready(&self, ssn: &SessionInfoPtr) -> bool { + true } - fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> Option { - None + fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> bool { + true } // Events callbacks @@ -153,6 +153,16 @@ pub fn configurable_policy_names() -> Vec<&'static str> { pub struct PluginManager { pub plugins: MutexPtr>, + /// True iff the gang plugin is in the loaded set for this cycle. + /// Controls which path is taken in `is_ready` / `is_fulfilled`. + gang_loaded: bool, + /// Per-session count of pipeline/allocate ops committed this cycle. + /// Only used when `gang_loaded == false` to implement the "1 op per + /// cycle is sufficient" rule without exposing this concern to callers. + no_gang_alloc_ops: Mutex>, + /// Per-session count of bind ops committed this cycle. + /// Only used when `gang_loaded == false`. + no_gang_bind_ops: Mutex>, } impl PluginManager { @@ -161,13 +171,22 @@ impl PluginManager { enabled_policies: &[String], ) -> Result { let valid_names = configurable_policy_names(); + let all_plugin_names: Vec<&str> = PLUGIN_REGISTRY.iter().map(|p| p.name).collect(); for p in enabled_policies { if !valid_names.contains(&p.as_str()) { - return Err(FlameError::InvalidConfig(format!( - "unknown policy: {}. available: {:?}", - p, valid_names - ))); + if all_plugin_names.contains(&p.as_str()) { + // Plugin exists but is always-on (e.g. shim); listing it in policies is a no-op. + tracing::info!( + "Policy '{}' is always enabled and does not need to be listed explicitly; ignoring", + p + ); + } else { + return Err(FlameError::InvalidConfig(format!( + "unknown policy: {}. configurable policies: {:?}", + p, valid_names + ))); + } } } @@ -186,8 +205,13 @@ impl PluginManager { plugin.setup(ss)?; } + let gang_loaded = plugins.iter().any(|(name, _)| name == "gang"); + Ok(Arc::new(PluginManager { plugins: new_ptr(plugins), + gang_loaded, + no_gang_alloc_ops: Mutex::new(HashMap::new()), + no_gang_bind_ops: Mutex::new(HashMap::new()), })) } @@ -290,12 +314,17 @@ impl PluginManager { node: NodeInfoPtr, ssn: SessionInfoPtr, ) -> Result<(), FlameError> { + if !self.gang_loaded { + let mut ops = self + .no_gang_alloc_ops + .lock() + .map_err(|e| FlameError::Internal(format!("no_gang_alloc_ops lock: {e}")))?; + *ops.entry(ssn.id.clone()).or_insert(0) += 1; + } let mut plugins = lock_ptr!(self.plugins)?; - for (_, plugin) in plugins.iter_mut() { plugin.on_executor_allocate(node.clone(), ssn.clone()); } - Ok(()) } @@ -314,12 +343,17 @@ impl PluginManager { } pub fn on_session_bind(&self, ssn: SessionInfoPtr) -> Result<(), FlameError> { + if !self.gang_loaded { + let mut ops = self + .no_gang_bind_ops + .lock() + .map_err(|e| FlameError::Internal(format!("no_gang_bind_ops lock: {e}")))?; + *ops.entry(ssn.id.clone()).or_insert(0) += 1; + } let mut plugins = lock_ptr!(self.plugins)?; - for (_, plugin) in plugins.iter_mut() { plugin.on_session_bind(ssn.clone()); } - Ok(()) } @@ -332,27 +366,56 @@ impl PluginManager { Ok(()) } - /// True if every plugin that implements [`Plugin::is_ready`] reports readiness (no opinion - /// defaults to true). Counters are in-memory and advance when [`crate::scheduler::Statement`] - /// records `allocate` / `pipeline` without `discard`. Dispatch and Allocate share one - /// `PluginManager` per scheduling cycle. - pub fn is_ready(&self, ssn: &SessionInfoPtr) -> Result { - let plugins = lock_ptr!(self.plugins)?; - - Ok(plugins - .iter() - .all(|(_, plugin)| plugin.is_ready(ssn).unwrap_or(true))) + /// Returns batch-allocation readiness for a session. + /// + /// **Without gang** (`gang_loaded == false`): returns `true` once at least one + /// pipeline/allocate op has been recorded for this session via `on_executor_pipeline` or + /// `on_executor_allocate`. Before any op, returns `false`. This lets callers use a + /// uniform `if is_ready() { break; }` in their allocation loops — the loop naturally + /// stops after the first op with no batch constraint. + /// + /// **With gang**: returns `true` only when ALL plugins return `true`; returns `false` + /// as soon as any plugin says the batch is incomplete. Plugins that do not override + /// `is_ready` default to `true` (no opinion / do not block), so only opinionated + /// plugins (e.g. GangPlugin) can veto readiness. + /// + /// Counters advance when [`crate::scheduler::Statement`] records `pipeline`/`allocate` + /// without `discard`. Dispatch and Allocate share one `PluginManager` per cycle. + pub fn is_ready(&self, ssn: &SessionInfoPtr) -> bool { + if !self.gang_loaded { + let ops = self + .no_gang_alloc_ops + .lock() + .expect("no_gang_alloc_ops lock poisoned"); + return *ops.get(&ssn.id).unwrap_or(&0) > 0; + } + let plugins = self.plugins.lock().expect("plugins lock poisoned"); + plugins.iter().all(|(_, plugin)| plugin.is_ready(ssn)) } - /// True if every plugin that implements [`Plugin::is_fulfilled`] reports fulfillment (no opinion - /// defaults to true). Updates when [`crate::scheduler::Statement`] records `bind`; after - /// Dispatch commits, Allocate uses this to skip redundant provisioning. - pub fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> Result { - let plugins = lock_ptr!(self.plugins)?; - - Ok(plugins - .iter() - .all(|(_, plugin)| plugin.is_fulfilled(ssn).unwrap_or(true))) + /// Returns batch-binding fulfillment for a session. + /// + /// **Without gang** (`gang_loaded == false`): returns `true` once at least one bind op + /// has been recorded for this session via `on_session_bind`. Before any bind, returns + /// `false`. Callers can use `if is_fulfilled() { break/skip; }` uniformly. + /// + /// **With gang**: returns `true` only when ALL plugins return `true`; returns `false` + /// as soon as any plugin says the batch is incomplete. Plugins that do not override + /// `is_fulfilled` default to `true` (no opinion / do not block), so only opinionated + /// plugins (e.g. GangPlugin) can veto fulfillment. + /// + /// Updates when [`crate::scheduler::Statement`] records `bind`; after Dispatch commits, + /// Allocate uses this to skip redundant provisioning. + pub fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> bool { + if !self.gang_loaded { + let ops = self + .no_gang_bind_ops + .lock() + .expect("no_gang_bind_ops lock poisoned"); + return *ops.get(&ssn.id).unwrap_or(&0) > 0; + } + let plugins = self.plugins.lock().expect("plugins lock poisoned"); + plugins.iter().all(|(_, plugin)| plugin.is_fulfilled(ssn)) } pub fn on_executor_pipeline( @@ -360,12 +423,17 @@ impl PluginManager { exec: ExecutorInfoPtr, ssn: SessionInfoPtr, ) -> Result<(), FlameError> { + if !self.gang_loaded { + let mut ops = self + .no_gang_alloc_ops + .lock() + .map_err(|e| FlameError::Internal(format!("no_gang_alloc_ops lock: {e}")))?; + *ops.entry(ssn.id.clone()).or_insert(0) += 1; + } let mut plugins = lock_ptr!(self.plugins)?; - for (_, plugin) in plugins.iter_mut() { plugin.on_executor_pipeline(exec.clone(), ssn.clone()); } - Ok(()) } @@ -496,9 +564,12 @@ impl collections::Cmp for SsnOrderFn { #[cfg(test)] mod tests { use super::*; - use crate::model::ExecutorInfo; + use crate::model::{ExecutorInfo, NodeInfo, SessionInfo, SnapShot}; use chrono::Utc; - use common::apis::{ExecutorState, ResourceRequirement, Shim}; + use common::apis::{ + ExecutorState, NodeState, ResourceRequirement, SessionState, Shim, TaskState, + }; + use std::collections::HashMap; /// Create a test executor sized by `n` units (1 unit = (cpu:1, memory:1024, gpu:0)). fn create_test_executor(id: &str, n: u32) -> ExecutorInfoPtr { @@ -581,4 +652,185 @@ mod tests { // // This is a known limitation documented in the Plugin trait. } + + fn make_policies(names: &[&str]) -> Vec { + names.iter().map(|s| s.to_string()).collect() + } + + fn make_session(id: &str, batch_size: u32) -> SessionInfoPtr { + Arc::new(SessionInfo { + id: id.to_string(), + application: "test-app".to_string(), + tasks_status: HashMap::from([(TaskState::Pending, batch_size.max(1) as i32)]), + creation_time: Utc::now(), + completion_time: None, + state: SessionState::Open, + min_instances: 0, + max_instances: None, + batch_size, + priority: 0, + resreq: Some(ResourceRequirement { + cpu: 1, + memory: 1024, + gpu: 0, + }), + retry_count: 0, + }) + } + + fn make_node(name: &str) -> Arc { + Arc::new(NodeInfo { + name: name.to_string(), + allocatable: ResourceRequirement { + cpu: 4, + memory: 8192, + gpu: 0, + }, + state: NodeState::Ready, + }) + } + + /// Without gang, is_ready returns false before any op (no batch constraint active yet). + #[test] + fn test_is_ready_returns_false_without_gang() { + let ss = SnapShot::new(); + let ssn = make_session("ssn-1", 1); + ss.add_session(ssn.clone()).unwrap(); + + let plugins = PluginManager::setup(&ss, &make_policies(&["priority"])).unwrap(); + assert!( + !plugins.is_ready(&ssn), + "is_ready must be false before any op when gang plugin is not loaded" + ); + } + + /// Without gang, is_fulfilled returns false before any bind op. + #[test] + fn test_is_fulfilled_returns_false_without_gang() { + let ss = SnapShot::new(); + let ssn = make_session("ssn-1", 1); + ss.add_session(ssn.clone()).unwrap(); + + let plugins = PluginManager::setup(&ss, &make_policies(&["priority"])).unwrap(); + assert!( + !plugins.is_fulfilled(&ssn), + "is_fulfilled must be false when gang plugin is not loaded" + ); + } + + /// When gang is loaded, is_ready returns false before any allocation event because the + /// batch counter starts at 0. + #[test] + fn test_is_ready_returns_false_before_allocation_with_gang() { + let ss = SnapShot::new(); + let ssn = make_session("ssn-1", 1); + ss.add_session(ssn.clone()).unwrap(); + + let plugins = PluginManager::setup(&ss, &make_policies(&["gang"])).unwrap(); + assert!( + !plugins.is_ready(&ssn), + "is_ready must be false initially when gang is loaded" + ); + } + + /// After one on_executor_allocate event, gang batch_size=1 is satisfied → true. + #[test] + fn test_is_ready_returns_true_after_batch_filled_with_gang() { + let ss = SnapShot::new(); + let ssn = make_session("ssn-1", 1); + ss.add_session(ssn.clone()).unwrap(); + + let plugins = PluginManager::setup(&ss, &make_policies(&["gang"])).unwrap(); + + let node = make_node("node-1"); + plugins.on_executor_allocate(node, ssn.clone()).unwrap(); + + assert!( + plugins.is_ready(&ssn), + "is_ready must be true after one allocation with batch_size=1" + ); + } + + /// Gang batch_size=2: after one allocation is_ready is still false; true only after the + /// second allocation completes the batch. + #[test] + fn test_is_ready_batch_size_2_requires_two_allocations() { + let ss = SnapShot::new(); + let ssn = make_session("ssn-1", 2); + ss.add_session(ssn.clone()).unwrap(); + + let plugins = PluginManager::setup(&ss, &make_policies(&["gang"])).unwrap(); + let node = make_node("node-1"); + + assert!(!plugins.is_ready(&ssn)); + + plugins + .on_executor_allocate(node.clone(), ssn.clone()) + .unwrap(); + assert!( + !plugins.is_ready(&ssn), + "is_ready must still be false after only 1 of 2 required allocations" + ); + + plugins.on_executor_allocate(node, ssn.clone()).unwrap(); + assert!( + plugins.is_ready(&ssn), + "is_ready must be true once the full batch of 2 is allocated" + ); + } + + /// When gang is loaded, is_fulfilled starts as false and becomes true after the batch of + /// on_session_bind events. + #[test] + fn test_is_fulfilled_false_then_true_with_gang() { + let ss = SnapShot::new(); + let ssn = make_session("ssn-1", 1); + ss.add_session(ssn.clone()).unwrap(); + + let plugins = PluginManager::setup(&ss, &make_policies(&["gang"])).unwrap(); + + assert!(!plugins.is_fulfilled(&ssn)); + + plugins.on_session_bind(ssn.clone()).unwrap(); + assert!( + plugins.is_fulfilled(&ssn), + "is_fulfilled must be true after one bind with batch_size=1" + ); + } + + /// With gang loaded, is_ready starts false (no ops yet). + /// Without gang, is_ready also starts false (no ops yet), and only becomes true after + /// the first pipeline/allocate op is recorded via on_executor_allocate. + #[test] + fn test_gang_loaded_iff_listed_in_policies() { + let ss = SnapShot::new(); + let ssn = make_session("ssn-1", 1); + ss.add_session(ssn.clone()).unwrap(); + + // With gang: is_ready is false before any op (batch counter starts at 0). + let with_gang = PluginManager::setup(&ss, &make_policies(&["gang"])).unwrap(); + assert!( + !with_gang.is_ready(&ssn), + "is_ready must be false before any op when gang is loaded" + ); + + // Without gang: is_ready is also false before any op. + let ss2 = SnapShot::new(); + ss2.add_session(ssn.clone()).unwrap(); + let without_gang = PluginManager::setup(&ss2, &make_policies(&["priority"])).unwrap(); + assert!( + !without_gang.is_ready(&ssn), + "is_ready must be false before any op when gang is not loaded" + ); + + // Without gang: is_ready becomes true after the first pipeline op. + let node = make_node("node-1"); + without_gang + .on_executor_allocate(node, ssn.clone()) + .unwrap(); + assert!( + without_gang.is_ready(&ssn), + "is_ready must be true after the first op with no gang constraint" + ); + } }