From c2572adae7f1e09f67e42f1393927a40e24f3f84 Mon Sep 17 00:00:00 2001 From: Florent Benoit Date: Tue, 12 May 2026 17:55:48 +0200 Subject: [PATCH] feat(exec): add bidirectional streaming for interactive TTY sessions The existing ExecSandbox RPC sends stdin upfront in the request body, making interactive programs (bash, top, vim) unusable with --tty. This adds an ExecSandboxInteractive bidirectional streaming RPC that forwards live keystrokes, terminal resize events, and EOF to the sandbox. - Add ExecSandboxInteractive RPC, ExecSandboxInput and ExecSandboxWindowResize messages to the proto definition - Implement server handler with relay bridging, PTY allocation via russh channel.split(), and timeout support - Implement CLI client with raw mode, spawn_blocking stdin reader, SIGWINCH resize forwarding, and process::exit for clean shutdown - Route to interactive path only when --tty is explicitly passed Signed-off-by: Florent Benoit --- crates/openshell-cli/src/run.rs | 137 ++++++ .../tests/ensure_providers_integration.rs | 26 +- .../openshell-cli/tests/mtls_integration.rs | 17 +- .../tests/provider_commands_integration.rs | 26 +- .../sandbox_create_lifecycle_integration.rs | 27 +- .../sandbox_name_fallback_integration.rs | 25 +- crates/openshell-server/src/grpc/mod.rs | 17 +- crates/openshell-server/src/grpc/sandbox.rs | 465 +++++++++++++++++- .../tests/auth_endpoint_integration.rs | 10 + .../tests/edge_tunnel_auth.rs | 12 +- .../tests/multiplex_integration.rs | 12 +- .../tests/multiplex_tls_integration.rs | 12 +- .../tests/supervisor_relay_integration.rs | 9 + .../tests/ws_tunnel_integration.rs | 12 +- proto/openshell.proto | 30 ++ 15 files changed, 787 insertions(+), 50 deletions(-) diff --git a/crates/openshell-cli/src/run.rs b/crates/openshell-cli/src/run.rs index 2797bd66c..8e5e3fea5 100644 --- a/crates/openshell-cli/src/run.rs +++ b/crates/openshell-cli/src/run.rs @@ -2396,6 +2396,11 @@ pub async fn sandbox_exec_grpc( let tty = tty_override .unwrap_or_else(|| std::io::stdin().is_terminal() && std::io::stdout().is_terminal()); + if tty_override == Some(true) && std::io::stdin().is_terminal() { + return sandbox_exec_interactive_grpc(client, &sandbox, command, workdir, timeout_seconds) + .await; + } + // Make the streaming gRPC call. let mut stream = client .exec_sandbox(ExecSandboxRequest { @@ -2406,6 +2411,7 @@ pub async fn sandbox_exec_grpc( timeout_seconds, stdin: stdin_payload, tty, + ..Default::default() }) .await .into_diagnostic()? @@ -2725,6 +2731,137 @@ async fn drain_and_shutdown_local_socket(mut socket: tokio::net::TcpStream) { let _ = socket.shutdown().await; } +struct RawModeGuard; + +impl Drop for RawModeGuard { + fn drop(&mut self) { + let _ = crossterm::terminal::disable_raw_mode(); + } +} + +async fn sandbox_exec_interactive_grpc( + mut client: crate::tls::GrpcClient, + sandbox: &Sandbox, + command: &[String], + workdir: Option<&str>, + timeout_seconds: u32, +) -> Result { + use futures::SinkExt; + use openshell_core::proto::{ExecSandboxInput, ExecSandboxWindowResize, exec_sandbox_input}; + + let (cols, rows) = crossterm::terminal::size().unwrap_or((80, 24)); + + let (mut input_tx, input_rx) = futures::channel::mpsc::channel::(4096); + + // Send the start message with exec metadata. + input_tx + .send(ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Start(ExecSandboxRequest { + sandbox_id: sandbox.object_id().to_string(), + command: command.to_vec(), + workdir: workdir.unwrap_or_default().to_string(), + environment: HashMap::new(), + timeout_seconds, + stdin: Vec::new(), + tty: true, + cols: u32::from(cols), + rows: u32::from(rows), + })), + }) + .await + .into_diagnostic()?; + + let mut stream = client + .exec_sandbox_interactive(input_rx) + .await + .into_diagnostic()? + .into_inner(); + + // Enable raw mode so keystrokes are forwarded immediately. + crossterm::terminal::enable_raw_mode().into_diagnostic()?; + let raw_guard = RawModeGuard; + + // Stdin reader: read raw bytes one at a time so single keystrokes + // (e.g. 'q' in top) are forwarded immediately. + let mut stdin_tx = input_tx.clone(); + tokio::task::spawn_blocking(move || { + let mut stdin = std::io::stdin().lock(); + let mut buf = [0u8; 4096]; + loop { + let n = match stdin.read(&mut buf) { + Ok(0) | Err(_) => break, + Ok(n) => n, + }; + if stdin_tx + .try_send(ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Stdin(buf[..n].to_vec())), + }) + .is_err() + { + break; + } + } + }); + + // SIGWINCH handler: forward terminal resize events. + #[cfg(unix)] + let mut resize_tx = input_tx.clone(); + #[cfg(unix)] + let resize_task = tokio::spawn(async move { + let mut sig = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::window_change()) + .expect("failed to register SIGWINCH handler"); + while sig.recv().await.is_some() { + if let Ok((c, r)) = crossterm::terminal::size() { + let msg = ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Resize( + ExecSandboxWindowResize { + cols: u32::from(c), + rows: u32::from(r), + }, + )), + }; + if resize_tx.send(msg).await.is_err() { + break; + } + } + } + }); + + let mut exit_code = 0i32; + let stdout = std::io::stdout(); + + while let Some(event) = stream.next().await { + let event = event.into_diagnostic()?; + match event.payload { + Some(exec_sandbox_event::Payload::Stdout(out)) => { + let mut handle = stdout.lock(); + handle.write_all(&out.data).into_diagnostic()?; + handle.flush().into_diagnostic()?; + } + Some(exec_sandbox_event::Payload::Stderr(err)) => { + let mut handle = stdout.lock(); + handle.write_all(&err.data).into_diagnostic()?; + handle.flush().into_diagnostic()?; + } + Some(exec_sandbox_event::Payload::Exit(exit)) => { + exit_code = exit.exit_code; + break; + } + None => {} + } + } + + #[cfg(unix)] + resize_task.abort(); + + // Drop the raw mode guard to restore the terminal before exiting. + drop(raw_guard); + + // The spawn_blocking stdin reader is stuck on stdin.read() and cannot be + // cancelled. Force-exit so the tokio runtime doesn't hang waiting for it. + std::process::exit(exit_code) +} + /// Print a single YAML line with dimmed keys and regular values. fn print_yaml_line(line: &str) { // Find leading whitespace diff --git a/crates/openshell-cli/tests/ensure_providers_integration.rs b/crates/openshell-cli/tests/ensure_providers_integration.rs index f1a11e661..9d3e8d571 100644 --- a/crates/openshell-cli/tests/ensure_providers_integration.rs +++ b/crates/openshell-cli/tests/ensure_providers_integration.rs @@ -13,14 +13,15 @@ use openshell_core::proto::{ CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, DetachSandboxProviderRequest, DetachSandboxProviderResponse, ExecSandboxEvent, - ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, GetGatewayConfigResponse, - GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, - GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, - HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, - ListSandboxProvidersRequest, ListSandboxProvidersResponse, ListSandboxesRequest, - ListSandboxesResponse, Provider, ProviderResponse, RevokeSshSessionRequest, - RevokeSshSessionResponse, SandboxResponse, SandboxStreamEvent, ServiceStatus, - SupervisorMessage, UpdateProviderRequest, WatchSandboxRequest, + ExecSandboxInput, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, + GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, + GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, + ListProvidersRequest, ListProvidersResponse, ListSandboxProvidersRequest, + ListSandboxProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, Provider, + ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse, + SandboxStreamEvent, ServiceStatus, SupervisorMessage, UpdateProviderRequest, + WatchSandboxRequest, }; use openshell_core::{ObjectId, ObjectName}; use rcgen::{ @@ -395,6 +396,15 @@ impl OpenShell for TestOpenShell { ))) } + type ExecSandboxInteractiveStream = + tokio_stream::wrappers::ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/mtls_integration.rs b/crates/openshell-cli/tests/mtls_integration.rs index c95e2cf98..bf286119b 100644 --- a/crates/openshell-cli/tests/mtls_integration.rs +++ b/crates/openshell-cli/tests/mtls_integration.rs @@ -4,10 +4,10 @@ use openshell_cli::tls::{TlsOptions, grpc_client}; use openshell_core::proto::{ CreateProviderRequest, CreateSshSessionRequest, CreateSshSessionResponse, - DeleteProviderRequest, DeleteProviderResponse, ExecSandboxEvent, ExecSandboxRequest, - GetProviderRequest, HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, - ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, ServiceStatus, - UpdateProviderRequest, + DeleteProviderRequest, DeleteProviderResponse, ExecSandboxEvent, ExecSandboxInput, + ExecSandboxRequest, GetProviderRequest, HealthRequest, HealthResponse, ListProvidersRequest, + ListProvidersResponse, ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, + ServiceStatus, UpdateProviderRequest, open_shell_server::{OpenShell, OpenShellServer}, }; use rcgen::{ @@ -286,6 +286,15 @@ impl OpenShell for TestOpenShell { ))) } + type ExecSandboxInteractiveStream = + tokio_stream::wrappers::ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/provider_commands_integration.rs b/crates/openshell-cli/tests/provider_commands_integration.rs index fbe824cbf..fd3bef643 100644 --- a/crates/openshell-cli/tests/provider_commands_integration.rs +++ b/crates/openshell-cli/tests/provider_commands_integration.rs @@ -9,14 +9,15 @@ use openshell_core::proto::{ CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, DetachSandboxProviderRequest, DetachSandboxProviderResponse, ExecSandboxEvent, - ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, GetGatewayConfigResponse, - GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, - GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, - HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, - ListSandboxProvidersRequest, ListSandboxProvidersResponse, ListSandboxesRequest, - ListSandboxesResponse, Provider, ProviderProfile, ProviderResponse, RevokeSshSessionRequest, - RevokeSshSessionResponse, SandboxResponse, SandboxStreamEvent, ServiceStatus, - SupervisorMessage, UpdateProviderRequest, WatchSandboxRequest, + ExecSandboxInput, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, + GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, + GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, + ListProvidersRequest, ListProvidersResponse, ListSandboxProvidersRequest, + ListSandboxProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, Provider, + ProviderProfile, ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, + SandboxResponse, SandboxStreamEvent, ServiceStatus, SupervisorMessage, UpdateProviderRequest, + WatchSandboxRequest, }; use openshell_core::{ObjectId, ObjectName}; use rcgen::{ @@ -504,6 +505,15 @@ impl OpenShell for TestOpenShell { ))) } + type ExecSandboxInteractiveStream = + tokio_stream::wrappers::ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs b/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs index a2fedab82..c1dff72db 100644 --- a/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs +++ b/crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs @@ -10,15 +10,15 @@ use openshell_core::proto::{ CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, DetachSandboxProviderRequest, DetachSandboxProviderResponse, ExecSandboxEvent, - ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, GetGatewayConfigResponse, - GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, - GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, - HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, - ListSandboxProvidersRequest, ListSandboxProvidersResponse, ListSandboxesRequest, - ListSandboxesResponse, PlatformEvent, ProviderResponse, RevokeSshSessionRequest, - RevokeSshSessionResponse, Sandbox, SandboxPhase, SandboxResponse, SandboxStreamEvent, - ServiceStatus, SupervisorMessage, UpdateProviderRequest, WatchSandboxRequest, - sandbox_stream_event, + ExecSandboxInput, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, + GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, + GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, + ListProvidersRequest, ListProvidersResponse, ListSandboxProvidersRequest, + ListSandboxProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, PlatformEvent, + ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, Sandbox, SandboxPhase, + SandboxResponse, SandboxStreamEvent, ServiceStatus, SupervisorMessage, UpdateProviderRequest, + WatchSandboxRequest, sandbox_stream_event, }; use rcgen::{ BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair, @@ -369,6 +369,15 @@ impl OpenShell for TestOpenShell { ))) } + type ExecSandboxInteractiveStream = + tokio_stream::wrappers::ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs b/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs index 94d5b3cfa..3a1edc155 100644 --- a/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs +++ b/crates/openshell-cli/tests/sandbox_name_fallback_integration.rs @@ -10,14 +10,14 @@ use openshell_core::proto::{ CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, DetachSandboxProviderRequest, DetachSandboxProviderResponse, ExecSandboxEvent, - ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, GetGatewayConfigResponse, - GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, - GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, - HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, - ListSandboxProvidersRequest, ListSandboxProvidersResponse, ListSandboxesRequest, - ListSandboxesResponse, ProviderResponse, Sandbox, SandboxPolicy, SandboxResponse, - SandboxStreamEvent, ServiceStatus, SupervisorMessage, UpdateProviderRequest, - WatchSandboxRequest, + ExecSandboxInput, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, + GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, + GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, + ListProvidersRequest, ListProvidersResponse, ListSandboxProvidersRequest, + ListSandboxProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, ProviderResponse, + Sandbox, SandboxPolicy, SandboxResponse, SandboxStreamEvent, ServiceStatus, SupervisorMessage, + UpdateProviderRequest, WatchSandboxRequest, }; use rcgen::{ BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair, @@ -307,6 +307,15 @@ impl OpenShell for TestOpenShell { ))) } + type ExecSandboxInteractiveStream = + tokio_stream::wrappers::ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-server/src/grpc/mod.rs b/crates/openshell-server/src/grpc/mod.rs index 16f016081..bb78e252e 100644 --- a/crates/openshell-server/src/grpc/mod.rs +++ b/crates/openshell-server/src/grpc/mod.rs @@ -16,10 +16,10 @@ use openshell_core::proto::{ DeleteProviderProfileResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, DetachSandboxProviderRequest, DetachSandboxProviderResponse, EditDraftChunkRequest, EditDraftChunkResponse, ExecSandboxEvent, - ExecSandboxRequest, GatewayMessage, GetDraftHistoryRequest, GetDraftHistoryResponse, - GetDraftPolicyRequest, GetDraftPolicyResponse, GetGatewayConfigRequest, - GetGatewayConfigResponse, GetProviderProfileRequest, GetProviderRequest, - GetSandboxConfigRequest, GetSandboxConfigResponse, GetSandboxLogsRequest, + ExecSandboxInput, ExecSandboxRequest, GatewayMessage, GetDraftHistoryRequest, + GetDraftHistoryResponse, GetDraftPolicyRequest, GetDraftPolicyResponse, + GetGatewayConfigRequest, GetGatewayConfigResponse, GetProviderProfileRequest, + GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, GetSandboxLogsRequest, GetSandboxLogsResponse, GetSandboxPolicyStatusRequest, GetSandboxPolicyStatusResponse, GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, ImportProviderProfilesRequest, ImportProviderProfilesResponse, @@ -251,6 +251,15 @@ impl OpenShell for OpenShellService { sandbox::handle_forward_tcp(&self.state, request).await } + type ExecSandboxInteractiveStream = ReceiverStream>; + + async fn exec_sandbox_interactive( + &self, + request: Request>, + ) -> Result, Status> { + sandbox::handle_exec_sandbox_interactive(&self.state, request).await + } + // --- SSH sessions --- async fn create_ssh_session( diff --git a/crates/openshell-server/src/grpc/sandbox.rs b/crates/openshell-server/src/grpc/sandbox.rs index ad37a5482..f9f646ac9 100644 --- a/crates/openshell-server/src/grpc/sandbox.rs +++ b/crates/openshell-server/src/grpc/sandbox.rs @@ -17,7 +17,7 @@ use openshell_core::proto::{ AttachSandboxProviderRequest, AttachSandboxProviderResponse, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteSandboxRequest, DeleteSandboxResponse, DetachSandboxProviderRequest, DetachSandboxProviderResponse, ExecSandboxEvent, ExecSandboxExit, - ExecSandboxRequest, ExecSandboxStderr, ExecSandboxStdout, GetSandboxRequest, + ExecSandboxInput, ExecSandboxRequest, ExecSandboxStderr, ExecSandboxStdout, GetSandboxRequest, ListSandboxProvidersRequest, ListSandboxProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, Provider, RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse, SandboxStreamEvent, SshRelayTarget, TcpForwardFrame, TcpForwardInit, @@ -1038,6 +1038,128 @@ async fn bridge_forward_tcp_stream( } } +// --------------------------------------------------------------------------- +// Interactive exec handler (bidirectional stdin streaming) +// --------------------------------------------------------------------------- + +fn validate_interactive_exec_start( + msg: Option, +) -> Result { + use openshell_core::proto::exec_sandbox_input::Payload; + + let msg = + msg.ok_or_else(|| Status::invalid_argument("empty stream: expected start message"))?; + + let Some(Payload::Start(req)) = msg.payload else { + return Err(Status::invalid_argument( + "first message must be a start payload", + )); + }; + + if req.sandbox_id.is_empty() { + return Err(Status::invalid_argument("sandbox_id is required")); + } + if req.command.is_empty() { + return Err(Status::invalid_argument("command is required")); + } + if req.environment.keys().any(|key| !is_valid_env_key(key)) { + return Err(Status::invalid_argument( + "environment keys must match ^[A-Za-z_][A-Za-z0-9_]*$", + )); + } + validate_exec_request_fields(&req)?; + + Ok(req) +} + +pub(super) async fn handle_exec_sandbox_interactive( + state: &Arc, + request: Request>, +) -> Result>>, Status> { + use openshell_core::ObjectId; + + let mut input_stream = request.into_inner(); + + let first_msg = input_stream + .message() + .await + .map_err(|e| Status::internal(format!("failed to read first message: {e}")))?; + + let req = validate_interactive_exec_start(first_msg)?; + + let sandbox = state + .store + .get_message::(&req.sandbox_id) + .await + .map_err(|e| Status::internal(format!("fetch sandbox failed: {e}")))? + .ok_or_else(|| Status::not_found("sandbox not found"))?; + + if SandboxPhase::try_from(sandbox.phase).ok() != Some(SandboxPhase::Ready) { + return Err(Status::failed_precondition("sandbox is not ready")); + } + + let (channel_id, relay_rx) = state + .supervisor_sessions + .open_relay(sandbox.object_id(), std::time::Duration::from_secs(15)) + .await + .map_err(|e| Status::unavailable(format!("supervisor relay failed: {e}")))?; + + let command_str = build_remote_exec_command(&req) + .map_err(|e| Status::invalid_argument(format!("command construction failed: {e}")))?; + let timeout_seconds = req.timeout_seconds; + let cols = req.cols; + let rows = req.rows; + + let sandbox_id = sandbox.object_id().to_string(); + + let (tx, rx) = mpsc::channel::>(256); + tokio::spawn(async move { + let relay_stream = match tokio::time::timeout(std::time::Duration::from_secs(10), relay_rx) + .await + { + Ok(Ok(Ok(stream))) => stream, + Ok(Ok(Err(status))) => { + warn!(sandbox_id = %sandbox_id, channel_id = %channel_id, error = %status.message(), "ExecSandboxInteractive: relay target open failed"); + let _ = tx.send(Err(status)).await; + return; + } + Ok(Err(_)) => { + warn!(sandbox_id = %sandbox_id, channel_id = %channel_id, "ExecSandboxInteractive: relay channel dropped"); + let _ = tx + .send(Err(Status::unavailable("relay channel dropped"))) + .await; + return; + } + Err(_) => { + warn!(sandbox_id = %sandbox_id, channel_id = %channel_id, "ExecSandboxInteractive: relay open timed out"); + let _ = tx + .send(Err(Status::deadline_exceeded("relay open timed out"))) + .await; + return; + } + }; + + if let Err(err) = stream_interactive_exec_over_relay( + tx.clone(), + &sandbox_id, + &channel_id, + relay_stream, + &command_str, + input_stream, + timeout_seconds, + cols, + rows, + ) + .await + { + warn!(sandbox_id = %sandbox_id, error = %err, "ExecSandboxInteractive failed"); + let _ = tx.send(Err(err)).await; + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) +} + // --------------------------------------------------------------------------- // SSH session handlers // --------------------------------------------------------------------------- @@ -1289,6 +1411,205 @@ async fn stream_exec_over_relay( Ok(()) } +#[allow(clippy::too_many_arguments)] +async fn stream_interactive_exec_over_relay( + tx: mpsc::Sender>, + sandbox_id: &str, + channel_id: &str, + relay_stream: tokio::io::DuplexStream, + command: &str, + input_stream: tonic::Streaming, + timeout_seconds: u32, + cols: u32, + rows: u32, +) -> Result<(), Status> { + let command_preview: String = command.chars().take(120).collect(); + info!( + sandbox_id = %sandbox_id, + channel_id = %channel_id, + command_len = command.len(), + command_preview = %command_preview, + "ExecSandboxInteractive (relay): command started" + ); + + let (local_proxy_port, proxy_task) = start_single_use_ssh_proxy_over_relay(relay_stream) + .await + .map_err(|e| Status::internal(format!("failed to start relay proxy: {e}")))?; + + let exec = run_interactive_exec_with_russh( + local_proxy_port, + command, + input_stream, + cols, + rows, + tx.clone(), + ); + + let exec_result = if timeout_seconds == 0 { + exec.await + } else if let Ok(r) = tokio::time::timeout( + std::time::Duration::from_secs(u64::from(timeout_seconds)), + exec, + ) + .await + { + r + } else { + let _ = tx + .send(Ok(ExecSandboxEvent { + payload: Some(openshell_core::proto::exec_sandbox_event::Payload::Exit( + ExecSandboxExit { exit_code: 124 }, + )), + })) + .await; + let _ = proxy_task.await; + return Ok(()); + }; + + let exit_code = match exec_result { + Ok(code) => code, + Err(status) => { + let _ = proxy_task.await; + return Err(status); + } + }; + + let _ = proxy_task.await; + + let _ = tx + .send(Ok(ExecSandboxEvent { + payload: Some(openshell_core::proto::exec_sandbox_event::Payload::Exit( + ExecSandboxExit { exit_code }, + )), + })) + .await; + + Ok(()) +} + +async fn run_interactive_exec_with_russh( + local_proxy_port: u16, + command: &str, + mut input_stream: tonic::Streaming, + cols: u32, + rows: u32, + tx: mpsc::Sender>, +) -> Result { + use openshell_core::proto::exec_sandbox_input::Payload; + use russh::ChannelMsg; + + if command.as_bytes().contains(&0) { + return Err(Status::invalid_argument( + "command contains null bytes at transport boundary", + )); + } + if command.len() > MAX_COMMAND_STRING_LEN { + return Err(Status::invalid_argument(format!( + "command exceeds {MAX_COMMAND_STRING_LEN} byte limit at transport boundary" + ))); + } + + let stream = TcpStream::connect(("127.0.0.1", local_proxy_port)) + .await + .map_err(|e| Status::internal(format!("failed to connect to ssh proxy: {e}")))?; + + let config = Arc::new(russh::client::Config::default()); + let mut client = russh::client::connect_stream(config, stream, SandboxSshClientHandler) + .await + .map_err(|e| Status::internal(format!("failed to establish ssh transport: {e}")))?; + + match client + .authenticate_none("sandbox") + .await + .map_err(|e| Status::internal(format!("failed to authenticate ssh session: {e}")))? + { + AuthResult::Success => {} + AuthResult::Failure { .. } => { + return Err(Status::permission_denied( + "ssh authentication rejected by sandbox", + )); + } + } + + let channel = client + .channel_open_session() + .await + .map_err(|e| Status::internal(format!("failed to open ssh channel: {e}")))?; + + channel + .request_pty(false, "xterm-256color", cols, rows, 0, 0, &[]) + .await + .map_err(|e| Status::internal(format!("failed to allocate PTY: {e}")))?; + + channel + .exec(true, command.as_bytes()) + .await + .map_err(|e| Status::internal(format!("failed to execute command over ssh: {e}")))?; + + let (mut read_half, write_half) = channel.split(); + + let stdin_task = tokio::spawn(async move { + while let Ok(Some(msg)) = input_stream.message().await { + match msg.payload { + Some(Payload::Stdin(data)) => { + if write_half.data(std::io::Cursor::new(data)).await.is_err() { + break; + } + } + Some(Payload::Resize(resize)) => { + let _ = write_half + .window_change(resize.cols, resize.rows, 0, 0) + .await; + } + Some(Payload::Start(_)) | None => {} + } + } + let _ = write_half.eof().await; + }); + + let mut exit_code: Option = None; + while let Some(msg) = read_half.wait().await { + match msg { + ChannelMsg::Data { data } => { + let _ = tx + .send(Ok(ExecSandboxEvent { + payload: Some(openshell_core::proto::exec_sandbox_event::Payload::Stdout( + ExecSandboxStdout { + data: data.to_vec(), + }, + )), + })) + .await; + } + ChannelMsg::ExtendedData { data, .. } => { + let _ = tx + .send(Ok(ExecSandboxEvent { + payload: Some(openshell_core::proto::exec_sandbox_event::Payload::Stderr( + ExecSandboxStderr { + data: data.to_vec(), + }, + )), + })) + .await; + } + ChannelMsg::ExitStatus { exit_status } => { + let converted = i32::try_from(exit_status).unwrap_or(i32::MAX); + exit_code = Some(converted); + } + ChannelMsg::Close => break, + _ => {} + } + } + + stdin_task.abort(); + + let _ = client + .disconnect(russh::Disconnect::ByApplication, "exec complete", "en") + .await; + + Ok(exit_code.unwrap_or(1)) +} + /// Create a localhost SSH proxy that bridges to a relay `DuplexStream`. /// /// The proxy forwards raw SSH bytes between the `russh` client and the relay. @@ -1899,4 +2220,146 @@ mod tests { assert_eq!(err.code(), tonic::Code::FailedPrecondition); } + + // ---- validate_interactive_exec_start ---- + + #[test] + fn interactive_exec_rejects_empty_stream() { + let err = validate_interactive_exec_start(None).unwrap_err(); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("expected start message")); + } + + #[test] + fn interactive_exec_rejects_stdin_as_first_message() { + use openshell_core::proto::exec_sandbox_input; + let msg = ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Stdin(b"hello".to_vec())), + }; + let err = validate_interactive_exec_start(Some(msg)).unwrap_err(); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("start payload")); + } + + #[test] + fn interactive_exec_rejects_resize_as_first_message() { + use openshell_core::proto::{ExecSandboxWindowResize, exec_sandbox_input}; + let msg = ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Resize( + ExecSandboxWindowResize { cols: 80, rows: 24 }, + )), + }; + let err = validate_interactive_exec_start(Some(msg)).unwrap_err(); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("start payload")); + } + + #[test] + fn interactive_exec_rejects_none_payload() { + let msg = ExecSandboxInput { payload: None }; + let err = validate_interactive_exec_start(Some(msg)).unwrap_err(); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + } + + #[test] + fn interactive_exec_rejects_missing_sandbox_id() { + use openshell_core::proto::exec_sandbox_input; + let msg = ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Start(ExecSandboxRequest { + command: vec!["bash".to_string()], + ..Default::default() + })), + }; + let err = validate_interactive_exec_start(Some(msg)).unwrap_err(); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("sandbox_id")); + } + + #[test] + fn interactive_exec_rejects_missing_command() { + use openshell_core::proto::exec_sandbox_input; + let msg = ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Start(ExecSandboxRequest { + sandbox_id: "test-id".to_string(), + ..Default::default() + })), + }; + let err = validate_interactive_exec_start(Some(msg)).unwrap_err(); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("command")); + } + + #[test] + fn interactive_exec_rejects_invalid_env_key() { + use openshell_core::proto::exec_sandbox_input; + let msg = ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Start(ExecSandboxRequest { + sandbox_id: "test-id".to_string(), + command: vec!["bash".to_string()], + environment: std::iter::once(("bad key!".to_string(), "val".to_string())).collect(), + ..Default::default() + })), + }; + let err = validate_interactive_exec_start(Some(msg)).unwrap_err(); + assert_eq!(err.code(), tonic::Code::InvalidArgument); + assert!(err.message().contains("environment")); + } + + #[test] + fn interactive_exec_accepts_valid_start() { + use openshell_core::proto::exec_sandbox_input; + let msg = ExecSandboxInput { + payload: Some(exec_sandbox_input::Payload::Start(ExecSandboxRequest { + sandbox_id: "test-id".to_string(), + command: vec!["bash".to_string()], + tty: true, + cols: 120, + rows: 40, + ..Default::default() + })), + }; + let req = validate_interactive_exec_start(Some(msg)).unwrap(); + assert_eq!(req.sandbox_id, "test-id"); + assert_eq!(req.command, vec!["bash"]); + assert!(req.tty); + assert_eq!(req.cols, 120); + assert_eq!(req.rows, 40); + } + + #[tokio::test] + async fn interactive_exec_rejects_sandbox_not_found() { + let state = test_server_state().await; + + let req = ExecSandboxRequest { + sandbox_id: "nonexistent".to_string(), + command: vec!["bash".to_string()], + tty: true, + ..Default::default() + }; + let sandbox_result = state + .store + .get_message::(&req.sandbox_id) + .await + .unwrap(); + assert!(sandbox_result.is_none()); + } + + #[tokio::test] + async fn interactive_exec_rejects_sandbox_not_ready() { + let state = test_server_state().await; + let mut sandbox = test_sandbox("not-ready", Vec::new()); + sandbox.phase = SandboxPhase::Provisioning as i32; + state.store.put_message(&sandbox).await.unwrap(); + + let stored = state + .store + .get_message::("sandbox-not-ready") + .await + .unwrap() + .unwrap(); + assert_ne!( + SandboxPhase::try_from(stored.phase).ok(), + Some(SandboxPhase::Ready) + ); + } } diff --git a/crates/openshell-server/tests/auth_endpoint_integration.rs b/crates/openshell-server/tests/auth_endpoint_integration.rs index f160f98b8..78c127821 100644 --- a/crates/openshell-server/tests/auth_endpoint_integration.rs +++ b/crates/openshell-server/tests/auth_endpoint_integration.rs @@ -622,6 +622,16 @@ impl openshell_core::proto::open_shell_server::OpenShell for TestOpenShell { )) } + type ExecSandboxInteractiveStream = tokio_stream::wrappers::ReceiverStream< + Result, + >; + async fn exec_sandbox_interactive( + &self, + _: tonic::Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("test")) + } + async fn update_config( &self, _: tonic::Request, diff --git a/crates/openshell-server/tests/edge_tunnel_auth.rs b/crates/openshell-server/tests/edge_tunnel_auth.rs index 689cfcf59..8751200aa 100644 --- a/crates/openshell-server/tests/edge_tunnel_auth.rs +++ b/crates/openshell-server/tests/edge_tunnel_auth.rs @@ -37,8 +37,8 @@ use hyper_util::{ use openshell_core::proto::{ CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, - ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, - GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + ExecSandboxEvent, ExecSandboxInput, ExecSandboxRequest, GatewayMessage, + GetGatewayConfigRequest, GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, @@ -267,6 +267,14 @@ impl OpenShell for TestOpenShell { Ok(Response::new(ReceiverStream::new(rx))) } + type ExecSandboxInteractiveStream = ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-server/tests/multiplex_integration.rs b/crates/openshell-server/tests/multiplex_integration.rs index 9cab950db..712983ff5 100644 --- a/crates/openshell-server/tests/multiplex_integration.rs +++ b/crates/openshell-server/tests/multiplex_integration.rs @@ -11,8 +11,8 @@ use hyper_util::{ use openshell_core::proto::{ CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, - ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, - GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + ExecSandboxEvent, ExecSandboxInput, ExecSandboxRequest, GatewayMessage, + GetGatewayConfigRequest, GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, @@ -235,6 +235,14 @@ impl OpenShell for TestOpenShell { Ok(Response::new(ReceiverStream::new(rx))) } + type ExecSandboxInteractiveStream = ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-server/tests/multiplex_tls_integration.rs b/crates/openshell-server/tests/multiplex_tls_integration.rs index 21b75c12c..4073246e8 100644 --- a/crates/openshell-server/tests/multiplex_tls_integration.rs +++ b/crates/openshell-server/tests/multiplex_tls_integration.rs @@ -13,8 +13,8 @@ use hyper_util::{ use openshell_core::proto::{ CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, - ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, - GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + ExecSandboxEvent, ExecSandboxInput, ExecSandboxRequest, GatewayMessage, + GetGatewayConfigRequest, GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, @@ -248,6 +248,14 @@ impl OpenShell for TestOpenShell { Ok(Response::new(ReceiverStream::new(rx))) } + type ExecSandboxInteractiveStream = ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/crates/openshell-server/tests/supervisor_relay_integration.rs b/crates/openshell-server/tests/supervisor_relay_integration.rs index 2d722b051..3de258789 100644 --- a/crates/openshell-server/tests/supervisor_relay_integration.rs +++ b/crates/openshell-server/tests/supervisor_relay_integration.rs @@ -96,6 +96,15 @@ impl OpenShell for RelayGateway { Err(Status::unimplemented("unused")) } + type ExecSandboxInteractiveStream = + ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("unused")) + } + async fn health( &self, _: tonic::Request, diff --git a/crates/openshell-server/tests/ws_tunnel_integration.rs b/crates/openshell-server/tests/ws_tunnel_integration.rs index 14d5e9bb7..3c2349f47 100644 --- a/crates/openshell-server/tests/ws_tunnel_integration.rs +++ b/crates/openshell-server/tests/ws_tunnel_integration.rs @@ -40,8 +40,8 @@ use hyper_util::{ use openshell_core::proto::{ CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse, DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse, - ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest, - GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, + ExecSandboxEvent, ExecSandboxInput, ExecSandboxRequest, GatewayMessage, + GetGatewayConfigRequest, GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse, @@ -261,6 +261,14 @@ impl OpenShell for TestOpenShell { Ok(Response::new(ReceiverStream::new(rx))) } + type ExecSandboxInteractiveStream = ReceiverStream>; + async fn exec_sandbox_interactive( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in test")) + } + async fn update_config( &self, _request: tonic::Request, diff --git a/proto/openshell.proto b/proto/openshell.proto index 883c1576c..f13eb223b 100644 --- a/proto/openshell.proto +++ b/proto/openshell.proto @@ -57,6 +57,11 @@ service OpenShell { // Forward one CLI-side TCP connection to a loopback TCP target in a sandbox. rpc ForwardTcp(stream TcpForwardFrame) returns (stream TcpForwardFrame); + // Execute an interactive command with bidirectional stdin/stdout streaming. + // The first client message MUST carry an ExecSandboxInput with the start + // variant. Subsequent messages carry stdin bytes or window resize events. + rpc ExecSandboxInteractive(stream ExecSandboxInput) returns (stream ExecSandboxEvent); + // Create a provider. rpc CreateProvider(CreateProviderRequest) returns (ProviderResponse); @@ -491,6 +496,12 @@ message ExecSandboxRequest { // Request a pseudo-terminal for the remote command. bool tty = 7; + + // Initial terminal columns (used when tty=true, 0 = use default). + uint32 cols = 8; + + // Initial terminal rows (used when tty=true, 0 = use default). + uint32 rows = 9; } // One stdout chunk from a sandbox exec. @@ -541,6 +552,25 @@ message TcpForwardFrame { } } +// Client-to-server message for interactive exec. +message ExecSandboxInput { + oneof payload { + // First message: exec request metadata. + ExecSandboxRequest start = 1; + // Subsequent messages: raw stdin bytes. + bytes stdin = 2; + // Terminal window size change. + ExecSandboxWindowResize resize = 3; + } +} + +// Terminal window resize event for interactive exec. +message ExecSandboxWindowResize { + uint32 cols = 1; + uint32 rows = 2; +} + + // SSH session record stored in persistence. message SshSession { // Kubernetes-style metadata (id, name, labels, timestamps, resource version).