From 5d7364f0e6247ae494dd9fe4242c018da15bbda7 Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Wed, 21 Jan 2026 22:21:24 +0100 Subject: [PATCH 1/3] refactor(runtime): move to single unified execution::App interface This marks the previous lib::App one as deprecated, moves everything over to the new interface from Vim as the default, renames that to be more similar to what we had before + a couple simplifications, and keeps the previous ExecutionBackend/ExecutionHandle names as deprecated traits so tower-runner sees guiding deprecation warnings on its next relink rather than unresolved-import errors. --- crates/tower-cmd/src/run.rs | 18 +- crates/tower-runtime/src/auto_cleanup.rs | 28 ++- crates/tower-runtime/src/errors.rs | 3 + crates/tower-runtime/src/execution.rs | 85 +++++-- crates/tower-runtime/src/lib.rs | 13 +- crates/tower-runtime/src/local.rs | 211 ++++++++++-------- crates/tower-runtime/src/subprocess.rs | 26 ++- crates/tower-runtime/tests/local_test.rs | 6 +- crates/tower-runtime/tests/subprocess_test.rs | 4 +- 9 files changed, 257 insertions(+), 137 deletions(-) diff --git a/crates/tower-cmd/src/run.rs b/crates/tower-cmd/src/run.rs index 6bf2f7cd..6841e942 100644 --- a/crates/tower-cmd/src/run.rs +++ b/crates/tower-cmd/src/run.rs @@ -18,9 +18,8 @@ use tokio::sync::{ Mutex, }; use tokio::time::{sleep, timeout, Duration}; -use tower_runtime::execution::ExecutionHandle; use tower_runtime::execution::{ - CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionSpec, ResourceLimits, + App as _, Backend, CacheBackend, CacheConfig, CacheIsolation, ExecutionSpec, ResourceLimits, RuntimeConfig as ExecRuntimeConfig, }; use tower_runtime::subprocess::SubprocessBackend; @@ -231,7 +230,7 @@ where // Monitor app status concurrently let handle = Arc::new(Mutex::new(handle)); - let status_task = tokio::spawn(monitor_cli_status(Arc::clone(&handle))); + let status_task = tokio::spawn(monitor_app_status(Arc::clone(&handle))); // Wait for app to complete or SIGTERM let status_result = tokio::select! { @@ -250,6 +249,7 @@ where // And if we crashed, err out match status_result { Status::Exited => output::success("Your local run exited cleanly."), + Status::Cancelled => output::success("Your local run was cancelled."), Status::Crashed { code } => { output::error(&format!("Your local run crashed with exit code: {}", code)); return Err(Error::AppCrashed); @@ -680,12 +680,10 @@ async fn monitor_output(mut output: OutputReceiver) { } } -/// monitor_local_status is a helper function that will monitor the status of a given app and waits for +/// monitor_app_status is a helper function that will monitor the status of a given app and waits for /// it to progress to a terminal state. -async fn monitor_cli_status( - handle: Arc>, -) -> Status { - use tower_runtime::execution::ExecutionHandle as _; +async fn monitor_app_status(handle: Arc>) -> Status { + use tower_runtime::execution::App as _; debug!("Starting status monitoring for CLI execution"); let mut check_count = 0; @@ -709,6 +707,10 @@ async fn monitor_cli_status( debug!("Run exited cleanly, stopping status monitoring"); return status; } + Status::Cancelled => { + debug!("Run was cancelled, stopping status monitoring"); + return status; + } Status::Crashed { .. } => { debug!("Run crashed, stopping status monitoring"); return status; diff --git a/crates/tower-runtime/src/auto_cleanup.rs b/crates/tower-runtime/src/auto_cleanup.rs index 9e892d55..32b46079 100644 --- a/crates/tower-runtime/src/auto_cleanup.rs +++ b/crates/tower-runtime/src/auto_cleanup.rs @@ -12,7 +12,7 @@ use std::time::Duration; use tmpdir::TmpDir; use tokio::sync::Mutex; -use crate::App; +use crate::execution::App; /// How often to poll the app status to check if it has reached terminal state const STATUS_POLL_INTERVAL: Duration = Duration::from_secs(5); @@ -79,7 +79,9 @@ pub fn spawn_cleanup_monitor( #[cfg(test)] mod tests { use super::*; - use crate::{errors::Error, StartOptions, Status}; + use crate::execution::ServiceEndpoint; + use crate::{errors::Error, OutputReceiver, Status}; + use async_trait::async_trait; /// Mock LocalApp for testing that allows controlled status transitions struct MockLocalApp { @@ -103,18 +105,32 @@ mod tests { } } - impl crate::App for MockLocalApp { - async fn start(_opts: StartOptions) -> Result { - unimplemented!("MockLocalApp doesn't support start") + #[async_trait] + impl App for MockLocalApp { + fn id(&self) -> &str { + "mock" + } + + async fn status(&self) -> Result { + Ok(self.status.lock().await.clone()) + } + + async fn logs(&self) -> Result { + let (_, rx) = tokio::sync::mpsc::unbounded_channel(); + Ok(rx) } async fn terminate(&mut self) -> Result<(), Error> { Ok(()) } - async fn status(&self) -> Result { + async fn wait_for_completion(&self) -> Result { Ok(self.status.lock().await.clone()) } + + async fn service_endpoint(&self) -> Result, Error> { + Ok(None) + } } /// Helper to create temp directories for testing diff --git a/crates/tower-runtime/src/errors.rs b/crates/tower-runtime/src/errors.rs index 0326cd27..e7c7eacd 100644 --- a/crates/tower-runtime/src/errors.rs +++ b/crates/tower-runtime/src/errors.rs @@ -76,6 +76,9 @@ pub enum Error { #[snafu(display("dependency installation failed"))] DependencyInstallationFailed, + + #[snafu(display("failed to wait for process: {message}"))] + ProcessWaitFailed { message: String }, } impl From for Error { diff --git a/crates/tower-runtime/src/execution.rs b/crates/tower-runtime/src/execution.rs index 47d6dd26..2c5aab67 100644 --- a/crates/tower-runtime/src/execution.rs +++ b/crates/tower-runtime/src/execution.rs @@ -149,17 +149,19 @@ pub struct NetworkingSpec { } // ============================================================================ -// Execution Backend Trait +// Backend Trait // ============================================================================ -/// ExecutionBackend abstracts the compute substrate +/// Backend creates App instances for a specific compute substrate. +/// +/// Implementations: SubprocessBackend (subprocess), K8sBackend (Kubernetes) #[async_trait] -pub trait ExecutionBackend: Send + Sync { - /// The handle type this backend returns - type Handle: ExecutionHandle; +pub trait Backend: Send + Sync { + /// The App type this backend creates + type App: App; - /// Create a new execution environment - async fn create(&self, spec: ExecutionSpec) -> Result; + /// Create a new app execution + async fn create(&self, spec: ExecutionSpec) -> Result; /// Get backend capabilities fn capabilities(&self) -> BackendCapabilities; @@ -195,13 +197,15 @@ pub struct BackendCapabilities { } // ============================================================================ -// Execution Handle Trait +// App Trait // ============================================================================ -/// ExecutionHandle represents a running execution +/// App represents a running Tower application instance. +/// +/// Implementations: LocalApp (subprocess), K8sApp (Kubernetes pod) #[async_trait] -pub trait ExecutionHandle: Send + Sync { - /// Get a unique identifier for this execution +pub trait App: Send + Sync { + /// Unique identifier for this execution fn id(&self) -> &str; /// Get current execution status @@ -210,20 +214,26 @@ pub trait ExecutionHandle: Send + Sync { /// Subscribe to log stream async fn logs(&self) -> Result; - /// Terminate execution gracefully + /// Terminate execution gracefully (SIGTERM equivalent) async fn terminate(&mut self) -> Result<(), Error>; - /// Force kill execution - async fn kill(&mut self) -> Result<(), Error>; + /// Force kill execution (SIGKILL equivalent) + async fn kill(&mut self) -> Result<(), Error> { + self.terminate().await // default: same as terminate + } /// Wait for execution to complete async fn wait_for_completion(&self) -> Result; - /// Get service endpoint - async fn service_endpoint(&self) -> Result, Error>; + /// Get service endpoint (for long-running apps) + async fn service_endpoint(&self) -> Result, Error> { + Ok(None) // default: no endpoint + } /// Cleanup resources - async fn cleanup(&mut self) -> Result<(), Error>; + async fn cleanup(&mut self) -> Result<(), Error> { + self.terminate().await // default: just terminate + } } /// ServiceEndpoint describes how to reach a running service @@ -241,3 +251,44 @@ pub struct ServiceEndpoint { /// Full URL if applicable (e.g., "http://app-run-123.default.svc.cluster.local:8080") pub url: Option, } + +// ============================================================================ +// Deprecated Legacy Traits (kept so tower-runner sees clear migration warnings) +// ============================================================================ + +/// Previous name for [`Backend`]. Kept so downstream builds emit a deprecation +/// warning pointing at the new name instead of an unresolved-import error. +#[deprecated(note = "use `Backend` instead; associated type `Handle` is now `App`")] +#[async_trait] +#[allow(deprecated)] +pub trait ExecutionBackend: Send + Sync { + type Handle: ExecutionHandle; + + async fn create(&self, spec: ExecutionSpec) -> Result; + + fn capabilities(&self) -> BackendCapabilities; + + async fn cleanup(&self) -> Result<(), Error>; +} + +/// Previous name for [`App`]. Kept so downstream builds emit a deprecation +/// warning pointing at the new name instead of an unresolved-import error. +#[deprecated(note = "use `App` instead")] +#[async_trait] +pub trait ExecutionHandle: Send + Sync { + fn id(&self) -> &str; + + async fn status(&self) -> Result; + + async fn logs(&self) -> Result; + + async fn terminate(&mut self) -> Result<(), Error>; + + async fn kill(&mut self) -> Result<(), Error>; + + async fn wait_for_completion(&self) -> Result; + + async fn service_endpoint(&self) -> Result, Error>; + + async fn cleanup(&mut self) -> Result<(), Error>; +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 6ebbb061..8129e7bc 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -12,6 +12,9 @@ pub mod execution; pub mod local; pub mod subprocess; +// Re-export SubprocessBackend from subprocess module +pub use subprocess::SubprocessBackend; + use errors::Error; #[derive(Copy, Clone)] @@ -41,6 +44,7 @@ pub struct Output { pub enum Status { None, Running, + Cancelled, Exited, Crashed { code: i32, @@ -56,7 +60,10 @@ pub enum Status { impl Status { /// Returns true if this status represents a terminal state (run is finished) pub fn is_terminal(&self) -> bool { - matches!(self, Status::Exited | Status::Crashed { .. } | Status::Failed { .. }) + matches!( + self, + Status::Exited | Status::Cancelled | Status::Crashed { .. } | Status::Failed { .. } + ) } } @@ -64,16 +71,14 @@ pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; +#[deprecated(note = "use `execution::App` instead")] pub trait App: Send + Sync { - // start will start the process fn start(opts: StartOptions) -> impl Future> + Send where Self: Sized; - // terminate will terminate the subprocess fn terminate(&mut self) -> impl Future> + Send; - // status checks the status of an app fn status(&self) -> impl Future> + Send; } diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 707d4dbb..397194fe 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use std::collections::HashMap; use std::env; use std::path::PathBuf; @@ -33,20 +34,19 @@ use tower_package::{Manifest, Package}; use tower_telemetry::debug; use tower_uv::Uv; -use crate::{App, Channel, Output, FD}; +use crate::execution::App; +use crate::{Channel, Output, OutputReceiver, FD}; + +type Completion = Result; pub struct LocalApp { + id: String, status: Mutex>, - - // waiter is what we use to communicate that the overall process is finished by the execution - // handle. - waiter: Mutex>, - - // terminator is what we use to flag that we want to terminate the child process. + completion_receiver: Mutex>, terminator: CancellationToken, - - // execute_handle keeps track of the current state of the execution lifecycle. - execute_handle: Option>>, + task: Option>>, + output_receiver: Mutex>, + _package: Option, } // Helper function to check if a file is executable @@ -101,7 +101,7 @@ async fn find_bash() -> Result { async fn execute_local_app( opts: StartOptions, - sx: oneshot::Sender, + tx: oneshot::Sender, cancel_token: CancellationToken, ) -> Result<(), Error> { let ctx = opts.ctx.clone(); @@ -159,7 +159,7 @@ async fn execute_local_app( if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have // to return something on the relevant channel. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -176,7 +176,7 @@ async fn execute_local_app( ) .await?; - let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + let _ = tx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } else { // we put Uv in to protected mode when there's no caching configured/enabled. let protected_mode = opts.cache_dir.is_none(); @@ -198,7 +198,7 @@ async fn execute_local_app( // ensure everything is in place. if cancel_token.is_cancelled() { // again tell any waiters that we cancelled. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -222,19 +222,19 @@ async fn execute_local_app( )); // Wait for venv to finish up. - let res = wait_for_process(ctx.clone(), &cancel_token, child).await; - - if res != 0 { - // If the venv process failed, we want to return an error. - let _ = sx.send(res); - return Err(Error::VirtualEnvCreationFailed); + match wait_for_process(ctx.clone(), &cancel_token, child).await { + Ok(Status::Exited) => {} + res => { + let _ = tx.send(res); + return Err(Error::VirtualEnvCreationFailed); + } } // Check once more if the process was cancelled before we do a uv sync. The sync itself, // once started, will take a while and we have logic for checking for cancellation. if cancel_token.is_cancelled() { // again tell any waiters that we cancelled. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -275,12 +275,13 @@ async fn execute_local_app( )); // Let's wait for the setup to finish. We don't care about the results. - let res = wait_for_process(ctx.clone(), &cancel_token, child).await; - - if res != 0 { + match wait_for_process(ctx.clone(), &cancel_token, child).await { + Ok(Status::Exited) => {} // If the sync process failed, we want to return an error. - let _ = sx.send(res); - return Err(Error::DependencyInstallationFailed); + res => { + let _ = tx.send(res); + return Err(Error::DependencyInstallationFailed); + } } } } @@ -289,7 +290,7 @@ async fn execute_local_app( if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have // to return something on the relevant channel. - let _ = sx.send(-1); + let _ = tx.send(Ok(Status::Cancelled)); return Err(Error::Cancelled); } @@ -312,7 +313,7 @@ async fn execute_local_app( BufReader::new(stderr), )); - let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + let _ = tx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } // Everything was properly executed I suppose. @@ -324,70 +325,105 @@ impl Drop for LocalApp { // CancellationToken::cancel() is not async self.terminator.cancel(); - // Optionally spawn a task to wait for the handle - if let Some(execute_handle) = self.execute_handle.take() { - if let Ok(handle) = Handle::try_current() { - handle.spawn(async move { - let _ = execute_handle.await; + // Optionally spawn a task to wait for execution to complete + if let Some(task) = self.task.take() { + if let Ok(rt) = Handle::try_current() { + rt.spawn(async move { + let _ = task.await; }); } } } } -impl App for LocalApp { - async fn start(opts: StartOptions) -> Result { +impl LocalApp { + /// Create a new LocalApp with the given ID and StartOptions. + /// + /// The `output_receiver` parameter is optional - when provided (via Backend interface), + /// the `logs()` method will return this receiver. When None (legacy interface), + /// logs are sent to the output_sender in StartOptions and `logs()` returns an empty stream. + /// + /// The `package` parameter keeps the package (and its temp directory) alive for the + /// duration of the execution. + pub async fn new( + id: String, + opts: StartOptions, + output_receiver: Option, + package: Option, + ) -> Result { let terminator = CancellationToken::new(); - - let (sx, rx) = oneshot::channel::(); - let waiter = Mutex::new(rx); - - let handle = tokio::spawn(execute_local_app(opts, sx, terminator.clone())); - let execute_handle = Some(handle); + let (tx, rx) = oneshot::channel::(); + let task = tokio::spawn(execute_local_app(opts, tx, terminator.clone())); Ok(Self { - execute_handle, + id, + task: Some(task), terminator, - waiter, + completion_receiver: Mutex::new(rx), status: Mutex::new(None), + output_receiver: Mutex::new(output_receiver), + _package: package, }) } + /// Create a LocalApp using the legacy start() interface (for backward compatibility). + /// + /// Output is sent to the output_sender in StartOptions. The `logs()` method + /// will return an empty stream (use the output_sender's receiver directly). + pub async fn start(opts: StartOptions) -> Result { + Self::new("local".to_string(), opts, None, None).await + } +} + +#[async_trait] +impl App for LocalApp { + fn id(&self) -> &str { + &self.id + } + + async fn status(&self) -> Result { + let mut status = self.status.lock().await; + + if let Some(status) = status.as_ref() { + return Ok(status.clone()); + } + + match self.completion_receiver.lock().await.try_recv() { + Err(TryRecvError::Empty) => Ok(Status::Running), + Err(TryRecvError::Closed) => Err(Error::WaiterClosed), + Ok(completion) => { + let next_status = completion?; + *status = Some(next_status.clone()); + Ok(next_status) + } + } + } + + async fn logs(&self) -> Result { + // Take the receiver (can only be called once meaningfully) + // Returns empty channel if already taken or using legacy interface + let (_, empty) = tokio::sync::mpsc::unbounded_channel(); + Ok(self.output_receiver.lock().await.take().unwrap_or(empty)) + } + async fn terminate(&mut self) -> Result<(), Error> { self.terminator.cancel(); - // Now we should wait for the join handle to finish. - if let Some(execute_handle) = self.execute_handle.take() { - let _ = execute_handle.await; - self.execute_handle = None; + if let Some(task) = self.task.take() { + let _ = task.await; } Ok(()) } - async fn status(&self) -> Result { - let mut status = self.status.lock().await; - - if let Some(status) = status.clone() { - Ok(status) - } else { - let mut waiter = self.waiter.lock().await; - let res = waiter.try_recv(); - - match res { - Err(TryRecvError::Empty) => Ok(Status::Running), - Err(TryRecvError::Closed) => Err(Error::WaiterClosed), - Ok(t) => { - // We save this for the next time this gets called. - if t == 0 { - *status = Some(Status::Exited); - Ok(Status::Exited) - } else { - let next_status = Status::Crashed { code: t }; - *status = Some(next_status.clone()); - Ok(next_status) - } + async fn wait_for_completion(&self) -> Result { + loop { + let status = self.status().await?; + match status { + Status::None | Status::Running => { + tokio::time::sleep(Duration::from_millis(100)).await; } + _ => return Ok(status), } } } @@ -536,31 +572,32 @@ async fn wait_for_process( ctx: tower_telemetry::Context, cancel_token: &CancellationToken, mut child: Child, -) -> i32 { - let code = loop { +) -> Completion { + loop { if cancel_token.is_cancelled() { debug!(ctx: &ctx, "process cancelled, terminating child process"); kill_child_process(&ctx, child).await; - break -1; // return -1 to indicate that the process was cancelled. + return Ok(Status::Cancelled); } - let timeout = timeout(Duration::from_millis(25), child.wait()).await; - - if let Ok(res) = timeout { - if let Ok(status) = res { - break status.code().expect("no status code"); - } else { - // something went wrong. - debug!(ctx: &ctx, "failed to get status due to some kind of IO error: {}" , res.err().expect("no error somehow")); - break -1; + match timeout(Duration::from_millis(25), child.wait()).await { + Err(_) => continue, // timeout, check cancellation again + Ok(Err(e)) => { + debug!(ctx: &ctx, "IO error waiting on child process: {}", e); + return Err(Error::ProcessWaitFailed { + message: e.to_string(), + }); + } + Ok(Ok(status)) => { + let code = status.code().expect("process should have exit code"); + debug!(ctx: &ctx, "process exited with code {}", code); + return Ok(match code { + 0 => Status::Exited, + _ => Status::Crashed { code }, + }); } } - }; - - debug!(ctx: &ctx, "process exited with code {}", code); - - // this just shuts up the compiler about ignoring the results. - code + } } async fn drain_output( diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs index decdb1b0..b06ad9a9 100644 --- a/crates/tower-runtime/src/subprocess.rs +++ b/crates/tower-runtime/src/subprocess.rs @@ -1,13 +1,12 @@ -//! Subprocess execution backend +//! Local subprocess execution backend use crate::auto_cleanup; use crate::errors::Error; use crate::execution::{ - BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, - ServiceEndpoint, + App, Backend, BackendCapabilities, CacheBackend, ExecutionSpec, ServiceEndpoint, }; use crate::local::LocalApp; -use crate::{App, OutputReceiver, StartOptions, Status}; +use crate::{OutputReceiver, StartOptions, Status}; use async_trait::async_trait; use std::path::PathBuf; @@ -23,7 +22,7 @@ use tower_package::Package; /// Cleanup timeout after a run finishes (5 minutes) const CLEANUP_TIMEOUT: Duration = Duration::from_secs(5 * 60); -/// SubprocessBackend executes apps as a subprocess +/// SubprocessBackend executes apps as local subprocesses pub struct SubprocessBackend { /// Optional default cache directory to use cache_dir: Option, @@ -90,10 +89,10 @@ impl SubprocessBackend { } #[async_trait] -impl ExecutionBackend for SubprocessBackend { - type Handle = SubprocessHandle; +impl Backend for SubprocessBackend { + type App = SubprocessHandle; - async fn create(&self, spec: ExecutionSpec) -> Result { + async fn create(&self, spec: ExecutionSpec) -> Result { // Convert ExecutionSpec to StartOptions for LocalApp let (output_sender, output_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -146,12 +145,15 @@ impl ExecutionBackend for SubprocessBackend { secrets: spec.secrets, parameters: spec.parameters, env_vars, - output_sender: output_sender.clone(), + output_sender, cache_dir: final_cache_dir, // UV will use this via --cache-dir flag }; - // Start the LocalApp - let app = Arc::new(Mutex::new(LocalApp::start(opts).await?)); + // Start the LocalApp — the execution ID is used for monitoring; the package's + // tmp_dir is tracked separately below so auto_cleanup can reclaim it. + let app = Arc::new(Mutex::new( + LocalApp::new(spec.id.clone(), opts, None, None).await?, + )); let package_tmp_dir = Arc::new(Mutex::new(package_tmp_dir)); let uv_temp_dir = Arc::new(Mutex::new(uv_temp_dir)); @@ -207,7 +209,7 @@ pub struct SubprocessHandle { } #[async_trait] -impl ExecutionHandle for SubprocessHandle { +impl App for SubprocessHandle { fn id(&self) -> &str { &self.id } diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 5dda4d9d..2b891588 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::path::PathBuf; -use tower_runtime::{local::LocalApp, App, StartOptions, Status}; +use tower_runtime::execution::App as _; +use tower_runtime::{local::LocalApp, StartOptions, Status}; use config::Towerfile; use tower_package::{Package, PackageSpec}; @@ -373,6 +374,9 @@ async fn test_abort_on_dependency_installation_failure() { Status::None => { panic!("App should have a status"); } + Status::Cancelled => { + panic!("App should not have been cancelled"); + } Status::Failed { .. } => { panic!("App should have crashed, not failed with a platform error"); } diff --git a/crates/tower-runtime/tests/subprocess_test.rs b/crates/tower-runtime/tests/subprocess_test.rs index 8d7448c6..943f3f44 100644 --- a/crates/tower-runtime/tests/subprocess_test.rs +++ b/crates/tower-runtime/tests/subprocess_test.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::path::PathBuf; use tower_runtime::execution::{ - CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionHandle, ExecutionSpec, - ResourceLimits, RuntimeConfig, + App as _, Backend, CacheBackend, CacheConfig, CacheIsolation, ExecutionSpec, ResourceLimits, + RuntimeConfig, }; use tower_runtime::subprocess::SubprocessBackend; use tower_runtime::Status; From f10474f7657cdb7ec411a4737d5e90a3b1d35e02 Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Wed, 21 Jan 2026 22:23:14 +0100 Subject: [PATCH 2/3] chore: be more anal about .expect grammar --- crates/tower-runtime/src/local.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 397194fe..2afbf81e 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -205,7 +205,7 @@ async fn execute_local_app( let mut child = uv.venv(&working_dir, &env_vars).await?; // Drain the logs to the output channel. - let stdout = child.stdout.take().expect("no stdout"); + let stdout = child.stdout.take().expect("stdout should be available"); tokio::spawn(drain_output( FD::Stdout, Channel::Setup, @@ -213,7 +213,7 @@ async fn execute_local_app( BufReader::new(stdout), )); - let stderr = child.stderr.take().expect("no stderr"); + let stderr = child.stderr.take().expect("stderr should be available"); tokio::spawn(drain_output( FD::Stderr, Channel::Setup, @@ -258,7 +258,7 @@ async fn execute_local_app( } Ok(mut child) => { // Drain the logs to the output channel. - let stdout = child.stdout.take().expect("no stdout"); + let stdout = child.stdout.take().expect("stdout should be available"); tokio::spawn(drain_output( FD::Stdout, Channel::Setup, @@ -266,7 +266,7 @@ async fn execute_local_app( BufReader::new(stdout), )); - let stderr = child.stderr.take().expect("no stderr"); + let stderr = child.stderr.take().expect("stderr should be available"); tokio::spawn(drain_output( FD::Stderr, Channel::Setup, @@ -297,7 +297,7 @@ async fn execute_local_app( let mut child = uv.run(&working_dir, &program_path, &env_vars).await?; // Drain the logs to the output channel. - let stdout = child.stdout.take().expect("no stdout"); + let stdout = child.stdout.take().expect("stdout should be available"); tokio::spawn(drain_output( FD::Stdout, Channel::Program, @@ -305,7 +305,7 @@ async fn execute_local_app( BufReader::new(stdout), )); - let stderr = child.stderr.take().expect("no stderr"); + let stderr = child.stderr.take().expect("stderr should be available"); tokio::spawn(drain_output( FD::Stderr, Channel::Program, @@ -608,7 +608,7 @@ async fn drain_output( ) { let mut lines = input.lines(); - while let Some(line) = lines.next_line().await.expect("line iteration fialed") { + while let Some(line) = lines.next_line().await.expect("line iteration should succeed") { let _ = output.send(Output { channel, fd, From 5e6013700495ace268228d0c9c52a4103554fbdd Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Wed, 21 Jan 2026 22:48:52 +0100 Subject: [PATCH 3/3] fix: SIGKILL on process shouldn't panic the handle --- crates/tower-runtime/src/local.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 2afbf81e..dd1b8b85 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -589,11 +589,11 @@ async fn wait_for_process( }); } Ok(Ok(status)) => { - let code = status.code().expect("process should have exit code"); - debug!(ctx: &ctx, "process exited with code {}", code); - return Ok(match code { - 0 => Status::Exited, - _ => Status::Crashed { code }, + debug!(ctx: &ctx, "process exited: {:?}", status); + return Ok(match status.code() { + Some(0) => Status::Exited, + Some(code) => Status::Crashed { code }, + None => Status::Cancelled, // killed by signal }); } }