Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions common/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@ const DEFAULT_FLAME_CONF: &str = "flame-cluster.yaml";
const DEFAULT_CONTEXT_NAME: &str = "flame";
const DEFAULT_FLAME_ENDPOINT: &str = "http://127.0.0.1:8080";
/// Default policies to enable when none specified in config.
/// Available configurable policies: "priority", "drf", "gang"
/// Note: "shim" plugin is always enabled (required for executor matching)
pub const DEFAULT_POLICIES: &[&str] = &["priority", "drf", "gang"];
///
/// Effective default scheduler stack: `priority + gang + shim`
/// - "gang" is listed here so batch-size semantics and allocation guards are active
/// by default. Users who do not need gang scheduling may omit it; scheduling
/// will still work but allocates one executor per cycle with no batch constraint.
/// - "shim" is always-on (non-configurable) and runs regardless of this list.
///
/// Optional configurable policies (add to cluster.policies in your config to enable):
/// - "drf" — Dominant Resource Fairness; enables fair multi-resource sharing across sessions.
///
/// Note: listing "shim" in the config is harmless — a warning is logged and it is ignored.
pub const DEFAULT_POLICIES: &[&str] = &["priority", "gang"];
const DEFAULT_STORAGE: &str = "sqlite://flame.db";
const DEFAULT_MAX_EXECUTORS_PER_NODE: u32 = 128;
pub const DEFAULT_SESSION_RETRY_LIMITS: u32 = 5;
Expand Down
16 changes: 7 additions & 9 deletions executor_manager/src/shims/grpc_shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,13 @@ impl Future for WaitForSvcSocketFuture {

#[cfg(test)]
mod tests {
use super::super::SHIMS_TEST_LOCK;
use super::*;
use common::apis::{ApplicationContext, Shim as ShimType};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Mutex;
use tempfile::tempdir;

static TEST_LOCK: Mutex<()> = Mutex::new(());

fn setup_test_env(temp: &tempfile::TempDir) -> PathBuf {
let socket_dir = temp.path().join("sockets");
std::fs::create_dir_all(&socket_dir).unwrap();
Expand All @@ -237,7 +235,7 @@ mod tests {

#[test]
fn test_grpc_shim_new() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);
let work_dir = create_test_work_dir("exec-grpc-test", &temp);
Expand All @@ -251,7 +249,7 @@ mod tests {

#[test]
fn test_grpc_shim_endpoint() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);
let work_dir = create_test_work_dir("exec-endpoint-test", &temp);
Expand All @@ -263,7 +261,7 @@ mod tests {

#[test]
fn test_grpc_shim_close_without_connection() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);
let work_dir = create_test_work_dir("exec-close-test", &temp);
Expand All @@ -285,7 +283,7 @@ mod tests {
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn test_on_session_enter_without_connection() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);
let work_dir = create_test_work_dir("exec-session-test", &temp);
Expand Down Expand Up @@ -318,7 +316,7 @@ mod tests {
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn test_on_task_invoke_without_connection() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);
let work_dir = create_test_work_dir("exec-task-test", &temp);
Expand All @@ -341,7 +339,7 @@ mod tests {
#[tokio::test]
#[allow(clippy::await_holding_lock)]
async fn test_on_session_leave_without_connection() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);
let work_dir = create_test_work_dir("exec-leave-test", &temp);
Expand Down
26 changes: 17 additions & 9 deletions executor_manager/src/shims/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,24 @@ pub trait Shim: Send + 'static {
async fn on_session_leave(&mut self) -> Result<(), FlameError>;
}

/// Single process-wide lock shared by all shims submodule tests.
///
/// `shims::tests` and `shims::grpc_shim::tests` both mutate global process state
/// (`FLAME_SOCKET_DIR` env var, current directory). Using a single lock prevents
/// those tests from racing each other when the test binary runs them in parallel.
///
/// Use `lock().unwrap_or_else(|e| e.into_inner())` so that a panic in one test
/// does not permanently poison the lock and block all subsequent tests.
#[cfg(test)]
pub(crate) static SHIMS_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::fs::File;
use std::sync::Mutex;
use tempfile::tempdir;

static TEST_LOCK: Mutex<()> = Mutex::new(());

fn create_test_app(name: &str, working_directory: Option<String>) -> ApplicationContext {
ApplicationContext {
name: name.to_string(),
Expand All @@ -259,7 +267,7 @@ mod tests {

#[test]
fn test_executor_work_dir_with_auto_dir() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
let socket_dir = setup_test_env(&temp);

Expand All @@ -280,7 +288,7 @@ mod tests {

#[test]
fn test_executor_work_dir_with_custom_working_directory() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
let socket_dir = setup_test_env(&temp);
let custom_dir = temp.path().join("custom-workdir");
Expand All @@ -301,7 +309,7 @@ mod tests {

#[test]
fn test_executor_work_dir_socket_path_length() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);

Expand All @@ -321,7 +329,7 @@ mod tests {

#[test]
fn test_executor_work_dir_cleanup_on_drop() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);

Expand Down Expand Up @@ -355,7 +363,7 @@ mod tests {

#[test]
fn test_executor_work_dir_no_cleanup_custom_dir_on_drop() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
setup_test_env(&temp);
let custom_dir = temp.path().join("persistent-workdir");
Expand All @@ -382,7 +390,7 @@ mod tests {

#[test]
fn test_socket_path_is_fixed_location() {
let _guard = TEST_LOCK.lock().unwrap();
let _guard = SHIMS_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let temp = tempdir().unwrap();
let socket_dir = setup_test_env(&temp);

Expand Down
1 change: 0 additions & 1 deletion flmadm/src/managers/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ cluster:
resreq: "cpu=1,mem=2g"
policies:
- priority
- drf
- gang
storage: "fs://{prefix}/data"
executors:
Expand Down
24 changes: 15 additions & 9 deletions session_manager/src/scheduler/actions/allocate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ impl Action for AllocateAction {
// (see `Context`): Gang's counters include binds Dispatch committed via
// `Statement` in this same `Context`. If those already satisfy gang scheduling, do not
// create more executors here.
let fulfilled = ctx.is_fulfilled(&ssn)?;
let ready = ctx.is_ready(&ssn)?;
//
// `is_ready` / `is_fulfilled` return true once the batch is satisfied (gang) or
// after the first op this cycle (no gang). Skip only when one of them is already
// true before we begin.
let fulfilled = ctx.is_fulfilled(&ssn);
let ready = ctx.is_ready(&ssn);
if fulfilled || ready {
tracing::debug!(
"Skip allocate resources for session <{}>: is_fulfilled={}, is_ready={}",
Expand All @@ -132,28 +136,30 @@ impl Action for AllocateAction {

let mut stmt = Statement::new(ss.clone(), ctx.plugins.clone(), ctx.controller.clone());

let pipelineable = void_executors
let pipelineable: Vec<_> = void_executors
.values()
.chain(unbinding_executors.values())
.filter(|e| ctx.is_available(e, &ssn).unwrap_or(false));
.filter(|e| ctx.is_available(e, &ssn).unwrap_or(false))
.cloned()
.collect();

for exec in pipelineable {
for exec in &pipelineable {
stmt.pipeline(exec, &ssn)?;
if ctx.is_ready(&ssn)? {
if ctx.is_ready(&ssn) {
break;
}
}

for node in nodes.iter() {
if ctx.is_ready(&ssn)? {
if ctx.is_ready(&ssn) {
break;
}
while ctx.is_allocatable(node, &ssn)? && !ctx.is_ready(&ssn)? {
while ctx.is_allocatable(node, &ssn)? && !ctx.is_ready(&ssn) {
stmt.allocate(node, &ssn)?;
}
}

if ctx.is_ready(&ssn)? {
if ctx.is_ready(&ssn) {
let op_count = stmt.len();
let pipelined_ids = stmt.commit().await?;
for id in &pipelined_ids {
Expand Down
21 changes: 18 additions & 3 deletions session_manager/src/scheduler/actions/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ impl Action for DispatchAction {
continue;
}

// Skip sessions that already have enough executors committed (or in-flight from a
// previous cycle). Without this guard a session whose executor is still running
// `on_session_enter` (Binding state, counted in Gang's `allocated`) would receive
// additional bindings every scheduling cycle.
//
// `is_fulfilled` returns true once the batch is satisfied (gang) or after the
// first bind this cycle (no gang), so this guard works uniformly for both cases.
if ctx.is_fulfilled(&ssn) {
tracing::debug!(
"Session <{}> is already fulfilled (existing executors satisfy batch_size), skip dispatch.",
ssn.id
);
continue;
}

tracing::debug!(
"Session <{}> is underused, start to allocate resources.",
&ssn.id
Expand All @@ -75,13 +90,13 @@ impl Action for DispatchAction {
for (_, exec) in idle_executors.iter() {
if ctx.is_available(exec, &ssn)? {
stmt.bind(exec, &ssn)?;
if ctx.is_fulfilled(&ssn)? {
if ctx.is_fulfilled(&ssn) {
break;
}
}
}

if ctx.is_fulfilled(&ssn)? {
if ctx.is_fulfilled(&ssn) && !stmt.is_empty() {
tracing::debug!("Bind executor for session <{}>.", ssn.id);
let bound_ids = stmt.commit().await?;
for id in &bound_ids {
Expand All @@ -97,7 +112,7 @@ impl Action for DispatchAction {
continue;
} else if !stmt.is_empty() {
tracing::debug!(
"Discarding unfulfilled binding for session <{}>: no available idle executors",
"Discarding unfulfilled binding for session <{}>: gang batch incomplete",
ssn.id
);
stmt.discard()?;
Expand Down
14 changes: 11 additions & 3 deletions session_manager/src/scheduler/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,21 @@ impl Context {
/// Allocation-side batch readiness (e.g. Gang: pipelined + allocated executors form full
/// batches). Reflects in-memory plugin state, including `Statement` ops earlier in this
/// same cycle (typically pipeline/allocate before commit).
pub fn is_ready(&self, ssn: &SessionInfoPtr) -> Result<bool, FlameError> {
///
/// Returns `true` when the batch is complete (gang) or after the first pipeline/allocate
/// op in a no-gang cycle. Returns `false` before any op has been attempted, or when gang
/// reports the batch is still incomplete.
pub fn is_ready(&self, ssn: &SessionInfoPtr) -> bool {
self.plugins.is_ready(ssn)
}

/// Binding-side batch readiness (e.g. Gang: bound + on-session executors form full batches).
/// After Dispatch commits binds, this can be true so Allocate skips provisioning.
pub fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> Result<bool, FlameError> {
/// After Dispatch commits binds, this can be `true` so Allocate skips provisioning.
///
/// Returns `true` when the batch is fulfilled (gang) or after the first bind op in a
/// no-gang cycle. Returns `false` before any bind has been attempted, or when gang
/// reports the batch is still incomplete.
pub fn is_fulfilled(&self, ssn: &SessionInfoPtr) -> bool {
self.plugins.is_fulfilled(ssn)
}

Expand Down
Loading
Loading