From debb18bcc820c657a16af61275e3b63275f8bece Mon Sep 17 00:00:00 2001 From: Adrien LANGOU Date: Mon, 4 May 2026 18:07:10 +0200 Subject: [PATCH 1/2] feat(gateway): add readiness probe metrics and test-only store close Emit Prometheus readiness metrics for database probes (healthy gauge and outcome-labeled latency histogram) with coverage in health HTTP tests. Restrict Store::close behind test support cfg to prevent accidental runtime pool shutdown under live traffic. Signed-off-by: Adrien Langou --- architecture/gateway.md | 5 + crates/openshell-core/src/config.rs | 37 ++ crates/openshell-server/Cargo.toml | 1 + crates/openshell-server/src/cli.rs | 43 +- crates/openshell-server/src/http.rs | 471 +++++++++++++++++- crates/openshell-server/src/lib.rs | 16 +- crates/openshell-server/src/multiplex.rs | 10 +- .../openshell-server/src/persistence/mod.rs | 22 + .../src/persistence/postgres.rs | 17 +- .../src/persistence/sqlite.rs | 17 +- .../tests/auth_endpoint_integration.rs | 10 +- .../tests/edge_tunnel_auth.rs | 15 +- .../tests/multiplex_integration.rs | 17 +- .../tests/multiplex_tls_integration.rs | 15 +- .../tests/supervisor_relay_integration.rs | 14 +- .../tests/ws_tunnel_integration.rs | 17 +- deploy/helm/openshell/README.md | 7 + .../helm/openshell/templates/statefulset.yaml | 2 + deploy/helm/openshell/values.yaml | 6 +- deploy/man/openshell-gateway.8.md | 4 + deploy/man/openshell-gateway.env.5.md | 4 + deploy/rpm/CONFIGURATION.md | 1 + docs/sandboxes/manage-gateways.mdx | 2 + tasks/test.toml | 7 +- 24 files changed, 723 insertions(+), 37 deletions(-) diff --git a/architecture/gateway.md b/architecture/gateway.md index 383430503..d37c2c64e 100644 --- a/architecture/gateway.md +++ b/architecture/gateway.md @@ -33,6 +33,11 @@ health, metrics, or tunnel routes. The plaintext service router also rejects browser requests whose Fetch Metadata, Origin, or Referer headers indicate a cross-origin or sibling-subdomain request. +Dedicated health listeners expose `/healthz` (process liveness only) and +`/readyz` (dependency-aware readiness). Readiness probes include a bounded DB +connectivity check with a configurable timeout so operators can tune pod +eviction sensitivity without changing liveness behavior. + Supported auth modes: | Mode | Use | diff --git a/crates/openshell-core/src/config.rs b/crates/openshell-core/src/config.rs index 1ec0926fc..b00181db4 100644 --- a/crates/openshell-core/src/config.rs +++ b/crates/openshell-core/src/config.rs @@ -30,6 +30,9 @@ pub const DEFAULT_STOP_TIMEOUT_SECS: u32 = 10; /// Default allowed clock skew for SSH handshake validation, in seconds. pub const DEFAULT_SSH_HANDSHAKE_SKEW_SECS: u64 = 300; +/// Default timeout for gateway DB readiness probes, in seconds. +pub const DEFAULT_READINESS_DATABASE_TIMEOUT_SECS: u64 = 1; + /// Default Podman bridge network name. pub const DEFAULT_NETWORK_NAME: &str = "openshell"; @@ -197,6 +200,13 @@ pub struct Config { #[serde(default)] pub metrics_bind_address: Option, + /// Timeout for database readiness probes, in seconds. + /// + /// Used by `/readyz` (and `/health`, which aliases readiness) to bound + /// `Store::ping` checks before returning an unhealthy dependency status. + #[serde(default = "default_readiness_database_timeout_secs")] + pub readiness_database_timeout_secs: u64, + /// Additional bind addresses that serve the same multiplexed gRPC/HTTP /// surface as `bind_address`. /// @@ -415,6 +425,7 @@ impl Config { bind_address: default_bind_address(), health_bind_address: None, metrics_bind_address: None, + readiness_database_timeout_secs: default_readiness_database_timeout_secs(), extra_bind_addresses: Vec::new(), log_level: default_log_level(), tls, @@ -458,6 +469,13 @@ impl Config { self } + /// Set the database readiness probe timeout in seconds. + #[must_use] + pub const fn with_readiness_database_timeout_secs(mut self, secs: u64) -> Self { + self.readiness_database_timeout_secs = secs; + self + } + /// Append an extra listener address to the multiplex service. /// /// Duplicate entries (matching `bind_address` or any existing entry) are @@ -711,6 +729,10 @@ const fn default_ssh_handshake_skew_secs() -> u64 { DEFAULT_SSH_HANDSHAKE_SKEW_SECS } +const fn default_readiness_database_timeout_secs() -> u64 { + DEFAULT_READINESS_DATABASE_TIMEOUT_SECS +} + const fn default_ssh_session_ttl_secs() -> u64 { 86400 // 24 hours } @@ -764,6 +786,15 @@ mod tests { assert!(cfg.health_bind_address.is_none()); } + #[test] + fn config_new_sets_default_readiness_database_timeout() { + let cfg = Config::new(None); + assert_eq!( + cfg.readiness_database_timeout_secs, + super::DEFAULT_READINESS_DATABASE_TIMEOUT_SECS + ); + } + #[test] fn service_routing_allows_loopback_plaintext_http_by_default() { let cfg = Config::new(None); @@ -817,6 +848,12 @@ mod tests { assert_eq!(cfg.health_bind_address, Some(addr)); } + #[test] + fn config_with_readiness_database_timeout_sets_value() { + let cfg = Config::new(None).with_readiness_database_timeout_secs(9); + assert_eq!(cfg.readiness_database_timeout_secs, 9); + } + #[test] fn detect_driver_returns_none_without_k8s_env_or_binaries() { // When KUBERNETES_SERVICE_HOST is not set and no docker/podman binaries diff --git a/crates/openshell-server/Cargo.toml b/crates/openshell-server/Cargo.toml index 9cba99045..9edf51f78 100644 --- a/crates/openshell-server/Cargo.toml +++ b/crates/openshell-server/Cargo.toml @@ -91,6 +91,7 @@ nix = { workspace = true } [features] dev-settings = ["openshell-core/dev-settings"] +test-support = [] [dev-dependencies] hyper-rustls = { version = "0.27", default-features = false, features = ["native-tokio", "http1", "tls12", "logging", "ring", "webpki-tokio"] } diff --git a/crates/openshell-server/src/cli.rs b/crates/openshell-server/src/cli.rs index 3cd9e8c79..67e00d076 100644 --- a/crates/openshell-server/src/cli.rs +++ b/crates/openshell-server/src/cli.rs @@ -7,8 +7,8 @@ use clap::{ArgAction, Command, CommandFactory, FromArgMatches, Parser}; use miette::{IntoDiagnostic, Result}; use openshell_core::ComputeDriverKind; use openshell_core::config::{ - DEFAULT_DOCKER_NETWORK_NAME, DEFAULT_SERVER_PORT, DEFAULT_SSH_HANDSHAKE_SKEW_SECS, - DEFAULT_SSH_PORT, + DEFAULT_DOCKER_NETWORK_NAME, DEFAULT_READINESS_DATABASE_TIMEOUT_SECS, DEFAULT_SERVER_PORT, + DEFAULT_SSH_HANDSHAKE_SKEW_SECS, DEFAULT_SSH_PORT, }; use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; @@ -62,6 +62,15 @@ struct RunArgs { #[arg(long, default_value_t = 0, env = "OPENSHELL_METRICS_PORT")] metrics_port: u16, + /// Timeout in seconds for database readiness probes (`/readyz`, `/health`). + #[arg( + long, + default_value_t = DEFAULT_READINESS_DATABASE_TIMEOUT_SECS, + env = "OPENSHELL_READINESS_DB_TIMEOUT_SECS", + value_parser = clap::value_parser!(u64).range(1..) + )] + readiness_db_timeout_secs: u64, + /// Log level (trace, debug, info, warn, error). #[arg(long, default_value = "info", env = "OPENSHELL_LOG_LEVEL")] log_level: String, @@ -413,6 +422,7 @@ async fn run_from_args(args: RunArgs) -> Result<()> { .with_ssh_gateway_port(args.ssh_gateway_port) .with_sandbox_ssh_port(args.sandbox_ssh_port) .with_ssh_handshake_skew_secs(args.ssh_handshake_skew_secs) + .with_readiness_database_timeout_secs(args.readiness_db_timeout_secs) .with_server_sans(args.server_sans) .with_loopback_service_http(args.enable_loopback_service_http); @@ -579,6 +589,35 @@ mod tests { assert_eq!(cli.run.bind_address, IpAddr::V4(Ipv4Addr::LOCALHOST)); } + #[test] + fn command_parses_readiness_db_timeout_from_flag() { + let _lock = ENV_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let _guard = EnvVarGuard::remove("OPENSHELL_READINESS_DB_TIMEOUT_SECS"); + let cli = Cli::try_parse_from([ + "openshell-gateway", + "--db-url", + "sqlite::memory:", + "--readiness-db-timeout-secs", + "9", + ]) + .unwrap(); + assert_eq!(cli.run.readiness_db_timeout_secs, 9); + } + + #[test] + fn command_reads_readiness_db_timeout_from_env() { + let _lock = ENV_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let _guard = EnvVarGuard::set("OPENSHELL_READINESS_DB_TIMEOUT_SECS", "11"); + + let cli = + Cli::try_parse_from(["openshell-gateway", "--db-url", "sqlite::memory:"]).unwrap(); + assert_eq!(cli.run.readiness_db_timeout_secs, 11); + } + #[test] fn command_reads_bind_address_from_env() { let _lock = ENV_LOCK diff --git a/crates/openshell-server/src/http.rs b/crates/openshell-server/src/http.rs index 40f0b39d0..c8be2cab5 100644 --- a/crates/openshell-server/src/http.rs +++ b/crates/openshell-server/src/http.rs @@ -2,6 +2,17 @@ // SPDX-License-Identifier: Apache-2.0 //! HTTP health endpoints using Axum. +//! +//! Three endpoints with distinct semantics: +//! - `/healthz` — Kubernetes liveness probe. Returns `200 OK` whenever the +//! process is responsive. Intentionally does NOT depend on Postgres so a +//! transient DB outage does not cascade into a `CrashLoopBackOff`. +//! - `/readyz` — Kubernetes readiness probe. Pings the persistence store and +//! returns `503 Service Unavailable` when the database is unreachable, so +//! kube-proxy removes the pod from the Service while the dependency +//! recovers (no restart needed). +//! - `/health` — Alias of `/readyz` for external monitors +//! that conventionally probe `/health`. use axum::{ Json, Router, @@ -11,46 +22,211 @@ use axum::{ response::IntoResponse, routing::get, }; +use metrics::{gauge, histogram}; use metrics_exporter_prometheus::PrometheusHandle; +use openshell_core::config::DEFAULT_READINESS_DATABASE_TIMEOUT_SECS; use serde::Serialize; +use std::future::Future; use std::sync::Arc; +use std::time::{Duration, Instant}; +use tracing::warn; + +use crate::persistence::{PersistenceResult, Store}; + +const STATUS_HEALTHY: &str = "healthy"; +const STATUS_UNHEALTHY: &str = "unhealthy"; +const DATABASE_UNAVAILABLE_ERROR: &str = "database unavailable"; +const DATABASE_TIMEOUT_ERROR: &str = "database health check timed out"; +const METRIC_READINESS_DATABASE_HEALTHY: &str = "openshell_server_readiness_database_healthy"; +const METRIC_READINESS_DATABASE_PROBE_DURATION_SECONDS: &str = + "openshell_server_readiness_database_probe_duration_seconds"; +const METRIC_OUTCOME_LABEL: &str = "outcome"; + +#[derive(Clone, Debug)] +struct HealthRouterState { + store: Arc, + readiness_db_timeout: Duration, +} + +#[derive(Clone, Copy, Debug)] +enum ProbeOutcome { + Success, + DbError, + Timeout, +} + +impl ProbeOutcome { + const fn as_label(self) -> &'static str { + match self { + Self::Success => "success", + Self::DbError => "db_error", + Self::Timeout => "timeout", + } + } +} + +/// Per-dependency check entry exposed under `checks` in the JSON payload. +#[derive(Debug, Serialize)] +pub struct DependencyCheck { + /// `"healthy"` or `"unhealthy"`. + pub status: &'static str, + + /// Wall-clock time taken by the check, when measurable. + #[serde(skip_serializing_if = "Option::is_none")] + pub latency_ms: Option, + + /// Failure detail. Absent on success. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Aggregated dependency results. +#[derive(Debug, Serialize)] +pub struct HealthChecks { + pub database: DependencyCheck, +} -/// Health check response. +/// Readiness response payload. #[derive(Debug, Serialize)] pub struct HealthResponse { - /// Service status. + /// Overall status: `"healthy"` if every dependency is healthy. pub status: &'static str, /// Service version. pub version: &'static str, -} -/// Simple health check - returns 200 OK. -async fn health() -> impl IntoResponse { - StatusCode::OK + /// Per-dependency breakdown. + pub checks: HealthChecks, } -/// Kubernetes liveness probe. +/// Kubernetes liveness probe — process responsiveness only. async fn healthz() -> impl IntoResponse { StatusCode::OK } -/// Kubernetes readiness probe with detailed status. -async fn readyz() -> impl IntoResponse { +/// Kubernetes readiness probe — fails when a dependency is unreachable. +async fn readyz(State(state): State>) -> impl IntoResponse { + check_dependencies(&state.store, state.readiness_db_timeout).await +} + +/// Convenience alias of [`readyz`] for monitors that probe `/health`. +async fn health(State(state): State>) -> impl IntoResponse { + check_dependencies(&state.store, state.readiness_db_timeout).await +} + +async fn check_dependencies( + store: &Store, + readiness_db_timeout: Duration, +) -> (StatusCode, Json) { + let database = ping_database(store, readiness_db_timeout).await; + let healthy = database.status == STATUS_HEALTHY; let response = HealthResponse { - status: "healthy", + status: if healthy { + STATUS_HEALTHY + } else { + STATUS_UNHEALTHY + }, version: openshell_core::VERSION, + checks: HealthChecks { database }, + }; + let code = if healthy { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE }; + (code, Json(response)) +} + +async fn ping_database(store: &Store, readiness_db_timeout: Duration) -> DependencyCheck { + run_database_probe(store.ping(), readiness_db_timeout).await +} + +async fn run_database_probe(probe: F, timeout: Duration) -> DependencyCheck +where + F: Future>, +{ + let started = Instant::now(); + match tokio::time::timeout(timeout, probe).await { + Ok(Ok(())) => { + let elapsed = started.elapsed(); + record_database_probe_metrics(true, ProbeOutcome::Success, elapsed.as_secs_f64()); + DependencyCheck { + status: STATUS_HEALTHY, + latency_ms: Some(elapsed_ms(elapsed)), + error: None, + } + } + Ok(Err(err)) => { + let elapsed = started.elapsed(); + let latency_ms = elapsed_ms(elapsed); + record_database_probe_metrics(false, ProbeOutcome::DbError, elapsed.as_secs_f64()); + warn!(error = %err, latency_ms, "database health check failed"); + DependencyCheck { + status: STATUS_UNHEALTHY, + latency_ms: Some(latency_ms), + error: Some(DATABASE_UNAVAILABLE_ERROR.to_string()), + } + } + Err(_) => { + let elapsed = started.elapsed(); + record_database_probe_metrics(false, ProbeOutcome::Timeout, elapsed.as_secs_f64()); + warn!( + timeout_ms = u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX), + "database health check timed out" + ); + DependencyCheck { + status: STATUS_UNHEALTHY, + latency_ms: None, + error: Some(DATABASE_TIMEOUT_ERROR.to_string()), + } + } + } +} + +fn record_database_probe_metrics(healthy: bool, outcome: ProbeOutcome, latency_seconds: f64) { + gauge!(METRIC_READINESS_DATABASE_HEALTHY).set(if healthy { 1.0 } else { 0.0 }); + histogram!( + METRIC_READINESS_DATABASE_PROBE_DURATION_SECONDS, + METRIC_OUTCOME_LABEL => outcome.as_label() + ) + .record(latency_seconds); +} + +fn elapsed_ms(elapsed: Duration) -> u64 { + u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX) +} - (StatusCode::OK, Json(response)) +/// Create the health router. The store is required so `/readyz` and `/health` +/// can ping the persistence layer. +/// +/// This public wrapper preserves the existing library API by applying the +/// crate-level default timeout. The server runtime path uses +/// [`health_router_with_readiness_timeout`] to inject the configured value. +pub fn health_router(store: Arc) -> Router { + health_router_with_readiness_timeout( + store, + Duration::from_secs(DEFAULT_READINESS_DATABASE_TIMEOUT_SECS), + ) } -/// Create the health router. -pub fn health_router() -> Router { +/// Create the health router with an explicit readiness DB probe timeout. +/// +/// Used by the gateway runtime so `/readyz` and `/health` honor operator +/// configuration instead of relying on the crate default. +pub fn health_router_with_readiness_timeout( + store: Arc, + readiness_db_timeout: Duration, +) -> Router { + let state = Arc::new(HealthRouterState { + store, + readiness_db_timeout, + }); + Router::new() .route("/health", get(health)) .route("/healthz", get(healthz)) .route("/readyz", get(readyz)) + .with_state(state) } /// Create the metrics router for the dedicated metrics port. @@ -64,7 +240,7 @@ async fn render_metrics(State(handle): State) -> impl IntoResp handle.render() } -/// Create the HTTP router. +/// Create the HTTP router served on the multiplexed gateway port. pub fn http_router(state: Arc) -> Router { crate::ws_tunnel::router(state.clone()) .merge(crate::auth::router(state.clone())) @@ -305,3 +481,270 @@ mod tests { assert!(!browser_context_allows_plaintext_service_request(&req)); } } + +#[cfg(test)] +mod readiness_tests { + use super::*; + use axum::body::Body; + use http::Request; + use http_body_util::BodyExt; + use metrics::set_default_local_recorder; + use metrics_exporter_prometheus::PrometheusBuilder; + use tower::ServiceExt; + + async fn in_memory_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store"), + ) + } + + async fn get(router: Router, path: &str) -> (StatusCode, serde_json::Value) { + let response = router + .oneshot(Request::get(path).body(Body::empty()).unwrap()) + .await + .expect("router responds"); + let status = response.status(); + let bytes = response + .into_body() + .collect() + .await + .expect("collect body") + .to_bytes(); + let body = if bytes.is_empty() { + serde_json::Value::Null + } else { + serde_json::from_slice(&bytes).expect("response is valid JSON") + }; + (status, body) + } + + async fn get_text(router: Router, path: &str) -> (StatusCode, String) { + let response = router + .oneshot(Request::get(path).body(Body::empty()).unwrap()) + .await + .expect("router responds"); + let status = response.status(); + let bytes = response + .into_body() + .collect() + .await + .expect("collect body") + .to_bytes(); + let body = String::from_utf8(bytes.to_vec()).expect("metrics endpoint returns utf-8"); + (status, body) + } + + fn metric_sample_value(rendered: &str, metric_name: &str) -> Option { + rendered.lines().find_map(|line| { + line.strip_prefix(metric_name) + .and_then(|rest| rest.strip_prefix(' ')) + .and_then(|value| value.parse::().ok()) + }) + } + + fn metric_sample_value_with_outcome( + rendered: &str, + metric_name: &str, + outcome: ProbeOutcome, + ) -> Option { + let prefix = format!( + "{metric_name}{{{METRIC_OUTCOME_LABEL}=\"{}\"}} ", + outcome.as_label() + ); + rendered.lines().find_map(|line| { + line.strip_prefix(&prefix) + .and_then(|value| value.parse::().ok()) + }) + } + + #[tokio::test] + async fn healthz_is_minimal_and_does_not_touch_the_database() { + // Liveness must succeed even when the DB has been closed, otherwise a + // transient Postgres outage would CrashLoopBackOff the gateway. + let store = in_memory_store().await; + store.close().await; + let (status, body) = get(health_router(store), "/healthz").await; + assert_eq!(status, StatusCode::OK); + assert!(body.is_null(), "healthz must return an empty body"); + } + + #[tokio::test] + async fn readyz_returns_200_with_healthy_payload_when_db_is_reachable() { + let store = in_memory_store().await; + let (status, body) = get(health_router(store), "/readyz").await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + assert!( + body["checks"]["database"]["latency_ms"].is_number(), + "expected latency_ms in healthy payload" + ); + assert!( + body["checks"]["database"]["error"].is_null(), + "healthy payload must omit the error field" + ); + } + + #[tokio::test] + async fn health_alias_mirrors_readyz_when_db_is_reachable() { + let store = in_memory_store().await; + let (status, body) = get(health_router(store), "/health").await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + } + + #[tokio::test] + async fn readyz_returns_503_with_unhealthy_payload_when_db_is_unreachable() { + let store = in_memory_store().await; + store.close().await; + let (status, body) = get(health_router(store), "/readyz").await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["status"], "unhealthy"); + assert_eq!( + body["checks"]["database"]["error"], + DATABASE_UNAVAILABLE_ERROR + ); + } + + #[tokio::test] + async fn health_alias_returns_503_when_db_is_unreachable() { + let store = in_memory_store().await; + store.close().await; + let (status, body) = get(health_router(store), "/health").await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["status"], "unhealthy"); + } + + #[tokio::test] + async fn timeout_reports_unhealthy_database_check_without_internal_error_details() { + let check = run_database_probe( + async { + tokio::time::sleep(Duration::from_millis(25)).await; + Ok::<(), crate::persistence::PersistenceError>(()) + }, + Duration::from_millis(1), + ) + .await; + assert_eq!(check.status, STATUS_UNHEALTHY); + assert!( + check.latency_ms.is_none(), + "timeout should not report latency" + ); + assert_eq!(check.error.as_deref(), Some(DATABASE_TIMEOUT_ERROR)); + } + + #[tokio::test(flavor = "current_thread")] + async fn database_probe_emits_prometheus_readiness_metrics() { + let recorder = PrometheusBuilder::new().build_recorder(); + let handle = recorder.handle(); + let _local_recorder = set_default_local_recorder(&recorder); + let histogram_count_metric = + format!("{METRIC_READINESS_DATABASE_PROBE_DURATION_SECONDS}_count"); + + let healthy_check = run_database_probe( + async { Ok::<(), crate::persistence::PersistenceError>(()) }, + Duration::from_millis(50), + ) + .await; + assert_eq!(healthy_check.status, STATUS_HEALTHY); + let rendered = handle.render(); + assert_eq!( + metric_sample_value(&rendered, METRIC_READINESS_DATABASE_HEALTHY), + Some(1.0) + ); + assert_eq!( + metric_sample_value_with_outcome( + &rendered, + &histogram_count_metric, + ProbeOutcome::Success + ), + Some(1.0) + ); + + let store = in_memory_store().await; + store.close().await; + let failed_check = run_database_probe(store.ping(), Duration::from_millis(50)).await; + assert_eq!(failed_check.status, STATUS_UNHEALTHY); + let rendered = handle.render(); + assert_eq!( + metric_sample_value(&rendered, METRIC_READINESS_DATABASE_HEALTHY), + Some(0.0) + ); + assert_eq!( + metric_sample_value_with_outcome( + &rendered, + &histogram_count_metric, + ProbeOutcome::Success + ), + Some(1.0) + ); + assert_eq!( + metric_sample_value_with_outcome( + &rendered, + &histogram_count_metric, + ProbeOutcome::DbError + ), + Some(1.0) + ); + + let timeout_check = run_database_probe( + async { + tokio::time::sleep(Duration::from_millis(25)).await; + Ok::<(), crate::persistence::PersistenceError>(()) + }, + Duration::from_millis(1), + ) + .await; + assert_eq!(timeout_check.status, STATUS_UNHEALTHY); + let rendered = handle.render(); + assert_eq!( + metric_sample_value(&rendered, METRIC_READINESS_DATABASE_HEALTHY), + Some(0.0) + ); + assert_eq!( + metric_sample_value_with_outcome( + &rendered, + &histogram_count_metric, + ProbeOutcome::Success + ), + Some(1.0) + ); + assert_eq!( + metric_sample_value_with_outcome( + &rendered, + &histogram_count_metric, + ProbeOutcome::DbError + ), + Some(1.0) + ); + assert_eq!( + metric_sample_value_with_outcome( + &rendered, + &histogram_count_metric, + ProbeOutcome::Timeout + ), + Some(1.0) + ); + + let (status, metrics_payload) = get_text(metrics_router(handle.clone()), "/metrics").await; + assert_eq!(status, StatusCode::OK); + assert!(metrics_payload.contains(METRIC_READINESS_DATABASE_HEALTHY)); + assert!(metrics_payload.contains(&format!( + "{histogram_count_metric}{{{METRIC_OUTCOME_LABEL}=\"{}\"}}", + ProbeOutcome::Success.as_label() + ))); + assert!(metrics_payload.contains(&format!( + "{histogram_count_metric}{{{METRIC_OUTCOME_LABEL}=\"{}\"}}", + ProbeOutcome::DbError.as_label() + ))); + assert!(metrics_payload.contains(&format!( + "{histogram_count_metric}{{{METRIC_OUTCOME_LABEL}=\"{}\"}}", + ProbeOutcome::Timeout.as_label() + ))); + } +} diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index 93ccdc9dc..074f7b4b9 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -54,7 +54,7 @@ pub use grpc::OpenShellService; pub use http::{health_router, http_router, metrics_router, service_http_router}; pub use multiplex::{MultiplexService, MultiplexedService}; use openshell_driver_kubernetes::KubernetesComputeConfig; -use persistence::Store; +pub use persistence::Store; use sandbox_index::SandboxIndex; use sandbox_watch::SandboxWatchBus; pub use tls::TlsAcceptor; @@ -158,6 +158,11 @@ pub async fn run_server( if database_url.is_empty() { return Err(Error::config("database_url is required")); } + if config.readiness_database_timeout_secs == 0 { + return Err(Error::config( + "readiness_database_timeout_secs must be greater than 0", + )); + } let driver = configured_compute_driver(&config)?; if config.ssh_handshake_secret.is_empty() && !matches!(driver, ComputeDriverKind::Docker | ComputeDriverKind::Vm) @@ -249,8 +254,15 @@ pub async fn run_server( )) })?; info!(address = %health_bind_address, "Health server listening"); + let health_store = store.clone(); + let readiness_db_timeout = Duration::from_secs(config.readiness_database_timeout_secs); tokio::spawn(async move { - if let Err(e) = axum::serve(health_listener, health_router().into_make_service()).await + if let Err(e) = axum::serve( + health_listener, + http::health_router_with_readiness_timeout(health_store, readiness_db_timeout) + .into_make_service(), + ) + .await { error!("Health server error: {e}"); } diff --git a/crates/openshell-server/src/multiplex.rs b/crates/openshell-server/src/multiplex.rs index 9f74723eb..86e47605d 100644 --- a/crates/openshell-server/src/multiplex.rs +++ b/crates/openshell-server/src/multiplex.rs @@ -546,11 +546,19 @@ mod tests { assert_ne!(id1.header_value(), id2.header_value()); } + async fn test_health_store() -> Arc { + Arc::new( + crate::Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) + } + async fn start_http_server_with_middleware() -> std::net::SocketAddr { let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); - let http_service = crate::http::health_router(); + let http_service = crate::http::health_router(test_health_store().await); let http_service = request_id_middleware!(http_service); let service = MultiplexedService::new(http_service.clone(), http_service); diff --git a/crates/openshell-server/src/persistence/mod.rs b/crates/openshell-server/src/persistence/mod.rs index 1c926bd4a..dfd9c3a5e 100644 --- a/crates/openshell-server/src/persistence/mod.rs +++ b/crates/openshell-server/src/persistence/mod.rs @@ -132,6 +132,28 @@ impl Store { } } + /// Verify connectivity to the underlying database. + pub async fn ping(&self) -> PersistenceResult<()> { + match self { + Self::Postgres(store) => store.ping().await, + Self::Sqlite(store) => store.ping().await, + } + } + + /// Test support only: close the underlying connection pool. + /// + /// There is no runtime shutdown path yet. If we add graceful shutdown, + /// this API can be made public for that explicit shutdown flow. + /// + /// Do not call from runtime code today; this tears down the active pool. + #[cfg(any(test, feature = "test-support"))] + pub async fn close(&self) { + match self { + Self::Postgres(store) => store.close().await, + Self::Sqlite(store) => store.close().await, + } + } + /// Insert or update a generic named object. pub async fn put( &self, diff --git a/crates/openshell-server/src/persistence/postgres.rs b/crates/openshell-server/src/persistence/postgres.rs index 2cd6a046f..7d34a59b4 100644 --- a/crates/openshell-server/src/persistence/postgres.rs +++ b/crates/openshell-server/src/persistence/postgres.rs @@ -10,7 +10,7 @@ use crate::policy_store::{ policy_record_from_parts, }; use sqlx::postgres::PgPoolOptions; -use sqlx::{PgPool, Row}; +use sqlx::{Connection, PgPool, Row}; static POSTGRES_MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/postgres"); @@ -40,6 +40,21 @@ impl PostgresStore { .map_err(|e| map_migrate_error(&e)) } + /// Verify the database is reachable by acquiring a pooled connection + /// and issuing a protocol-level ping. + pub async fn ping(&self) -> PersistenceResult<()> { + let mut conn = self.pool.acquire().await.map_err(|e| map_db_error(&e))?; + conn.ping().await.map_err(|e| map_db_error(&e)) + } + + /// Test support only: close the underlying connection pool. + /// + /// Do not call from runtime code; this tears down the active pool. + #[cfg(any(test, feature = "test-support"))] + pub async fn close(&self) { + self.pool.close().await; + } + pub async fn put( &self, object_type: &str, diff --git a/crates/openshell-server/src/persistence/sqlite.rs b/crates/openshell-server/src/persistence/sqlite.rs index fafb07597..be5b38e62 100644 --- a/crates/openshell-server/src/persistence/sqlite.rs +++ b/crates/openshell-server/src/persistence/sqlite.rs @@ -10,7 +10,7 @@ use crate::policy_store::{ policy_record_from_parts, }; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use sqlx::{Row, SqlitePool}; +use sqlx::{Connection, Row, SqlitePool}; use std::str::FromStr; static SQLITE_MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations/sqlite"); @@ -55,6 +55,21 @@ impl SqliteStore { .map_err(|e| map_migrate_error(&e)) } + /// Verify the database is reachable by acquiring a pooled connection + /// and issuing a ping. + pub async fn ping(&self) -> PersistenceResult<()> { + let mut conn = self.pool.acquire().await.map_err(|e| map_db_error(&e))?; + conn.ping().await.map_err(|e| map_db_error(&e)) + } + + /// Test support only: close the underlying connection pool. + /// + /// Do not call from runtime code; this tears down the active pool. + #[cfg(any(test, feature = "test-support"))] + pub async fn close(&self) { + self.pool.close().await; + } + pub async fn put( &self, object_type: &str, diff --git a/crates/openshell-server/tests/auth_endpoint_integration.rs b/crates/openshell-server/tests/auth_endpoint_integration.rs index a74fb0f38..47439b8bc 100644 --- a/crates/openshell-server/tests/auth_endpoint_integration.rs +++ b/crates/openshell-server/tests/auth_endpoint_integration.rs @@ -815,13 +815,19 @@ async fn plaintext_server_accepts_grpc_and_http() { HealthRequest, ServiceStatus, open_shell_client::OpenShellClient, open_shell_server::OpenShellServer, }; - use openshell_server::{MultiplexedService, health_router}; + use openshell_server::{MultiplexedService, Store, health_router}; + use std::sync::Arc; let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); + let store = Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(store); let service = MultiplexedService::new(grpc_service, http_service); let server = tokio::spawn(async move { diff --git a/crates/openshell-server/tests/edge_tunnel_auth.rs b/crates/openshell-server/tests/edge_tunnel_auth.rs index a4676232e..97cfa6ea3 100644 --- a/crates/openshell-server/tests/edge_tunnel_auth.rs +++ b/crates/openshell-server/tests/edge_tunnel_auth.rs @@ -48,12 +48,13 @@ use openshell_core::proto::{ open_shell_client::OpenShellClient, open_shell_server::{OpenShell, OpenShellServer}, }; -use openshell_server::{MultiplexedService, TlsAcceptor, health_router}; +use openshell_server::{MultiplexedService, Store, TlsAcceptor, health_router}; use rcgen::{CertificateParams, IsCa, KeyPair}; use rustls::RootCertStore; use rustls::pki_types::CertificateDer; use rustls_pemfile::certs; use std::io::Write; +use std::sync::Arc; use tempfile::tempdir; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -507,7 +508,7 @@ async fn start_test_server( let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); let handle = tokio::spawn(async move { @@ -943,3 +944,13 @@ async fn tunnel_mode_rogue_cert_still_rejected() { server.abort(); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/crates/openshell-server/tests/multiplex_integration.rs b/crates/openshell-server/tests/multiplex_integration.rs index 086a1d307..eaba3d70b 100644 --- a/crates/openshell-server/tests/multiplex_integration.rs +++ b/crates/openshell-server/tests/multiplex_integration.rs @@ -22,7 +22,8 @@ use openshell_core::proto::{ open_shell_client::OpenShellClient, open_shell_server::{OpenShell, OpenShellServer}, }; -use openshell_server::{MultiplexedService, health_router}; +use openshell_server::{MultiplexedService, Store, health_router}; +use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -403,7 +404,7 @@ async fn serves_grpc_and_http_on_same_port() { let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); let server = tokio::spawn(async move { @@ -480,7 +481,7 @@ async fn grpc_response_propagates_request_id() { )) .layer(PropagateRequestIdLayer::new(x_request_id)) .service(OpenShellServer::new(TestOpenShell)); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); tokio::spawn(async move { @@ -518,3 +519,13 @@ async fn grpc_response_propagates_request_id() { let echoed = response.metadata().get("x-request-id").unwrap(); assert_eq!(echoed.to_str().unwrap(), "grpc-corr-id"); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/crates/openshell-server/tests/multiplex_tls_integration.rs b/crates/openshell-server/tests/multiplex_tls_integration.rs index 289f608f1..c5a22cb8f 100644 --- a/crates/openshell-server/tests/multiplex_tls_integration.rs +++ b/crates/openshell-server/tests/multiplex_tls_integration.rs @@ -24,12 +24,13 @@ use openshell_core::proto::{ open_shell_client::OpenShellClient, open_shell_server::{OpenShell, OpenShellServer}, }; -use openshell_server::{MultiplexedService, TlsAcceptor, health_router}; +use openshell_server::{MultiplexedService, Store, TlsAcceptor, health_router}; use rcgen::{CertificateParams, IsCa, KeyPair}; use rustls::RootCertStore; use rustls::pki_types::CertificateDer; use rustls_pemfile::certs; use std::io::Write; +use std::sync::Arc; use tempfile::tempdir; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -487,7 +488,7 @@ async fn start_test_server( let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); let handle = tokio::spawn(async move { @@ -749,3 +750,13 @@ async fn mtls_wrong_ca_client_cert_rejected() { server.abort(); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/crates/openshell-server/tests/supervisor_relay_integration.rs b/crates/openshell-server/tests/supervisor_relay_integration.rs index cf4fa3985..429547631 100644 --- a/crates/openshell-server/tests/supervisor_relay_integration.rs +++ b/crates/openshell-server/tests/supervisor_relay_integration.rs @@ -28,7 +28,7 @@ use openshell_core::proto::{ open_shell_server::{OpenShell, OpenShellServer}, }; use openshell_server::supervisor_session::SupervisorSessionRegistry; -use openshell_server::{MultiplexedService, health_router}; +use openshell_server::{MultiplexedService, Store, health_router}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::sync::mpsc; @@ -369,7 +369,7 @@ async fn spawn_gateway(registry: Arc) -> Channel { let addr = listener.local_addr().unwrap(); let grpc = OpenShellServer::new(RelayGateway { registry }); - let service = MultiplexedService::new(grpc, health_router()); + let service = MultiplexedService::new(grpc, health_router(test_health_store().await)); tokio::spawn(async move { loop { @@ -660,3 +660,13 @@ async fn open_relay_enforces_per_sandbox_cap_under_concurrent_burst() { .await .expect("other sandbox should not be affected by sbx cap"); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/crates/openshell-server/tests/ws_tunnel_integration.rs b/crates/openshell-server/tests/ws_tunnel_integration.rs index c385b8af3..e2b5828d6 100644 --- a/crates/openshell-server/tests/ws_tunnel_integration.rs +++ b/crates/openshell-server/tests/ws_tunnel_integration.rs @@ -51,8 +51,9 @@ use openshell_core::proto::{ open_shell_client::OpenShellClient, open_shell_server::{OpenShell, OpenShellServer}, }; -use openshell_server::{MultiplexedService, health_router}; +use openshell_server::{MultiplexedService, Store, health_router}; use std::net::SocketAddr; +use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; @@ -662,7 +663,7 @@ async fn start_grpc_server() -> (SocketAddr, tokio::task::JoinHandle<()>) { let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let service = MultiplexedService::new(grpc_service, http_service); let handle = tokio::spawn(async move { @@ -690,7 +691,7 @@ async fn start_ws_tunnel_server() -> (SocketAddr, tokio::task::JoinHandle<()>) { let addr = listener.local_addr().unwrap(); let grpc_service = OpenShellServer::new(TestOpenShell); - let http_service = health_router(); + let http_service = health_router(test_health_store().await); let app = test_ws_tunnel_router(MultiplexedService::new(grpc_service, http_service)); let handle = tokio::spawn(async move { @@ -920,3 +921,13 @@ async fn ws_tunnel_graceful_close() { stack.abort(); } + +/// Build an in-memory store sufficient for wiring `health_router` in tests +/// where the persistence layer itself is not under test. +async fn test_health_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for tests"), + ) +} diff --git a/deploy/helm/openshell/README.md b/deploy/helm/openshell/README.md index cc856731d..c919587ab 100644 --- a/deploy/helm/openshell/README.md +++ b/deploy/helm/openshell/README.md @@ -53,6 +53,13 @@ See [`values.yaml`](values.yaml) for configurable values. Selected overlays: - [`ci/values-cert-manager.yaml`](ci/values-cert-manager.yaml) — cert-manager integration - [`ci/values-keycloak.yaml`](ci/values-keycloak.yaml) — Keycloak OIDC integration +Readiness timeout tuning is split across two values: + +- `server.readinessDbTimeoutSecs` configures the gateway's internal DB readiness timeout for `/readyz`. +- `probes.readiness.timeoutSeconds` configures the Kubernetes probe request timeout. + +Keep `server.readinessDbTimeoutSecs` lower than `probes.readiness.timeoutSeconds`. + ## PKI bootstrap By default, a pre-install/pre-upgrade hook Job runs `openshell-gateway generate-certs` diff --git a/deploy/helm/openshell/templates/statefulset.yaml b/deploy/helm/openshell/templates/statefulset.yaml index 69140a70c..e7a4fc4fd 100644 --- a/deploy/helm/openshell/templates/statefulset.yaml +++ b/deploy/helm/openshell/templates/statefulset.yaml @@ -61,6 +61,8 @@ spec: - {{ .Values.server.logLevel }} - --db-url - {{ .Values.server.dbUrl | quote }} + - --readiness-db-timeout-secs + - {{ .Values.server.readinessDbTimeoutSecs | quote }} env: - name: OPENSHELL_SANDBOX_NAMESPACE value: {{ include "openshell.sandboxNamespace" . | quote }} diff --git a/deploy/helm/openshell/values.yaml b/deploy/helm/openshell/values.yaml index 17c0fedd9..880186095 100644 --- a/deploy/helm/openshell/values.yaml +++ b/deploy/helm/openshell/values.yaml @@ -72,7 +72,7 @@ probes: readiness: initialDelaySeconds: 1 periodSeconds: 2 - timeoutSeconds: 1 + timeoutSeconds: 2 failureThreshold: 3 resources: {} @@ -90,6 +90,10 @@ server: # namespace (.Release.Namespace) when left empty. sandboxNamespace: "" dbUrl: "sqlite:/var/openshell/openshell.db" + # Database readiness timeout in seconds used by /readyz and /health. + # Keep this lower than probes.readiness.timeoutSeconds so the gateway can + # return a readiness decision before Kubernetes times out the probe. + readinessDbTimeoutSecs: 1 sandboxImage: "ghcr.io/nvidia/openshell-community/sandboxes/base:latest" # Kubernetes imagePullPolicy for sandbox pods. Empty = Kubernetes default # (Always for :latest, IfNotPresent otherwise). Set to "Always" for dev diff --git a/deploy/man/openshell-gateway.8.md b/deploy/man/openshell-gateway.8.md index b38bef8a9..e7327940e 100644 --- a/deploy/man/openshell-gateway.8.md +++ b/deploy/man/openshell-gateway.8.md @@ -48,6 +48,10 @@ gRPC and HTTP, secured by mutual TLS (mTLS) by default. : Port for Prometheus metrics (/metrics). Set to 0 to disable. Default: **0**. Environment: **OPENSHELL_METRICS_PORT**. +**--readiness-db-timeout-secs** *SECONDS* +: Timeout for database readiness probes on **/readyz** and **/health**. + Default: **1**. Environment: **OPENSHELL_READINESS_DB_TIMEOUT_SECS**. + **--log-level** *LEVEL* : Log level: trace, debug, info, warn, error. Default: **info**. Environment: **OPENSHELL_LOG_LEVEL**. diff --git a/deploy/man/openshell-gateway.env.5.md b/deploy/man/openshell-gateway.env.5.md index 5bbf3b2c8..0363fed22 100644 --- a/deploy/man/openshell-gateway.env.5.md +++ b/deploy/man/openshell-gateway.env.5.md @@ -64,6 +64,10 @@ the SSH handshake secret). : Port for Prometheus metrics endpoint (/metrics). Set to a non-zero value to enable a dedicated metrics listener. +**OPENSHELL_READINESS_DB_TIMEOUT_SECS** (default: 1) +: Timeout in seconds for database readiness probes used by **/readyz** + and **/health**. + **OPENSHELL_LOG_LEVEL** (default: info) : Log verbosity: **trace**, **debug**, **info**, **warn**, **error**. diff --git a/deploy/rpm/CONFIGURATION.md b/deploy/rpm/CONFIGURATION.md index 04caaaae5..75e5b7d72 100644 --- a/deploy/rpm/CONFIGURATION.md +++ b/deploy/rpm/CONFIGURATION.md @@ -157,6 +157,7 @@ across package upgrades. | `OPENSHELL_SERVER_PORT` | `8080` | Port for the gRPC/HTTP API | | `OPENSHELL_HEALTH_PORT` | `0` (disabled) | Port for unauthenticated health endpoints (`/healthz`, `/readyz`). Set to a non-zero value to enable. | | `OPENSHELL_METRICS_PORT` | `0` (disabled) | Port for Prometheus metrics (`/metrics`). Set to a non-zero value to enable. | +| `OPENSHELL_READINESS_DB_TIMEOUT_SECS` | `1` | Timeout in seconds for database readiness probes used by `/readyz` and `/health`. | | `OPENSHELL_LOG_LEVEL` | `info` | Log level: `trace`, `debug`, `info`, `warn`, `error` | | `OPENSHELL_DRIVERS` | `podman` | Compute driver (`podman`, `docker`, `kubernetes`) | | `OPENSHELL_DB_URL` | `sqlite://$XDG_STATE_HOME/openshell/gateway.db` | SQLite database URL for state persistence | diff --git a/docs/sandboxes/manage-gateways.mdx b/docs/sandboxes/manage-gateways.mdx index 6cfa39121..da936f856 100644 --- a/docs/sandboxes/manage-gateways.mdx +++ b/docs/sandboxes/manage-gateways.mdx @@ -130,6 +130,8 @@ openshell status openshell gateway info ``` +Gateways expose `/healthz` as a liveness signal and `/readyz` as a dependency-aware readiness signal. `/readyz` checks database connectivity and reports unhealthy when the probe exceeds the configured timeout. Tune that timeout with `--readiness-db-timeout-secs` or `OPENSHELL_READINESS_DB_TIMEOUT_SECS` when you need a slower or stricter readiness window. + For Docker-backed local gateways, inspect Docker and the gateway process or container started by your local workflow: ```shell diff --git a/tasks/test.toml b/tasks/test.toml index 91d2c44f6..fc57defd9 100644 --- a/tasks/test.toml +++ b/tasks/test.toml @@ -21,7 +21,12 @@ depends = ["e2e:python:gpu"] ["test:rust"] description = "Run Rust tests" -run = "cargo test --workspace" +run = [ + # Run the workspace once without openshell-server so we can run that crate + # with test-only helpers enabled. + "cargo test --workspace --exclude openshell-server", + "cargo test -p openshell-server --features test-support", +] hide = true ["test:python"] From 05b69985403e361b2b64f55bf2cfd7a21bc7c5ec Mon Sep 17 00:00:00 2001 From: Adrien Langou Date: Tue, 12 May 2026 16:25:20 +0200 Subject: [PATCH 2/2] test(e2e): add simple e2e test with docker to test /readyz Signed-off-by: Adrien Langou --- .../tests/health_endpoint_integration.rs | 104 ++++++++++++++++ e2e/rust/Cargo.lock | 113 ++++++++++++++++++ e2e/rust/Cargo.toml | 9 ++ e2e/rust/tests/readyz_health.rs | 95 +++++++++++++++ e2e/with-docker-gateway.sh | 8 +- 5 files changed, 328 insertions(+), 1 deletion(-) create mode 100644 crates/openshell-server/tests/health_endpoint_integration.rs create mode 100644 e2e/rust/tests/readyz_health.rs diff --git a/crates/openshell-server/tests/health_endpoint_integration.rs b/crates/openshell-server/tests/health_endpoint_integration.rs new file mode 100644 index 000000000..97f732297 --- /dev/null +++ b/crates/openshell-server/tests/health_endpoint_integration.rs @@ -0,0 +1,104 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use bytes::Bytes; +use http_body_util::{BodyExt, Empty}; +use hyper::{Request, StatusCode}; +use hyper_util::rt::TokioIo; +use openshell_server::{Store, health_router}; +use serde_json::Value; +use std::sync::Arc; +use tokio::net::TcpListener; + +async fn start_health_server( + store: Arc, +) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind ephemeral health test listener"); + let addr = listener + .local_addr() + .expect("resolve local address for health test listener"); + + let server = tokio::spawn(async move { + let _ = axum::serve(listener, health_router(store).into_make_service()).await; + }); + + (addr, server) +} + +async fn http_get_json(addr: std::net::SocketAddr, path: &str) -> (StatusCode, Value) { + let stream = tokio::net::TcpStream::connect(addr) + .await + .expect("connect test HTTP client"); + let (mut sender, conn) = hyper::client::conn::http1::Builder::new() + .handshake(TokioIo::new(stream)) + .await + .expect("handshake HTTP/1 test client"); + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .method("GET") + .uri(format!("http://{addr}{path}")) + .body(Empty::::new()) + .expect("build HTTP request"); + let resp = sender.send_request(req).await.expect("send HTTP request"); + let status = resp.status(); + let bytes = resp + .into_body() + .collect() + .await + .expect("collect response body") + .to_bytes(); + let body = if bytes.is_empty() { + Value::Null + } else { + serde_json::from_slice(&bytes).expect("response body must be valid JSON") + }; + (status, body) +} + +#[tokio::test] +async fn readyz_reports_healthy_when_database_is_reachable() { + let store = Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for health integration test"), + ); + let (addr, server) = start_health_server(store.clone()).await; + + let (status, body) = http_get_json(addr, "/readyz").await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + + server.abort(); +} + +#[cfg(feature = "test-support")] +#[tokio::test] +async fn readyz_reports_database_health_transition_after_close() { + let store = Arc::new( + Store::connect("sqlite::memory:") + .await + .expect("connect in-memory sqlite store for health integration test"), + ); + let (addr, server) = start_health_server(store.clone()).await; + + let (status, body) = http_get_json(addr, "/readyz").await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["status"], "healthy"); + assert_eq!(body["checks"]["database"]["status"], "healthy"); + + store.close().await; + + let (status, body) = http_get_json(addr, "/readyz").await; + assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(body["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["status"], "unhealthy"); + assert_eq!(body["checks"]["database"]["error"], "database unavailable"); + + server.abort(); +} diff --git a/e2e/rust/Cargo.lock b/e2e/rust/Cargo.lock index 61f15866b..990aa5c46 100644 --- a/e2e/rust/Cargo.lock +++ b/e2e/rust/Cargo.lock @@ -8,6 +8,12 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "base64" version = "0.22.1" @@ -98,6 +104,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + [[package]] name = "generic-array" version = "0.14.7" @@ -160,6 +181,79 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "http" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +dependencies = [ + "bytes", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "hyper" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "bytes", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", +] + [[package]] name = "id-arena" version = "2.3.0" @@ -245,7 +339,11 @@ name = "openshell-e2e" version = "0.1.0" dependencies = [ "base64", + "bytes", "hex", + "http-body-util", + "hyper", + "hyper-util", "rand", "serde_json", "sha1", @@ -537,6 +635,12 @@ dependencies = [ "syn", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.19.0" @@ -561,6 +665,15 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" diff --git a/e2e/rust/Cargo.toml b/e2e/rust/Cargo.toml index 89a75967a..069c26261 100644 --- a/e2e/rust/Cargo.toml +++ b/e2e/rust/Cargo.toml @@ -45,6 +45,11 @@ name = "gateway_resume" path = "tests/gateway_resume.rs" required-features = ["e2e-docker"] +[[test]] +name = "readyz_health" +path = "tests/readyz_health.rs" +required-features = ["e2e-docker"] + [[test]] name = "websocket_conformance" path = "tests/websocket_conformance.rs" @@ -77,6 +82,10 @@ required-features = ["e2e-gpu"] [dependencies] base64 = "0.22" +bytes = "1" +http-body-util = "0.1" +hyper = { version = "1", features = ["client", "http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } tokio = { version = "1.43", features = ["full"] } tempfile = "3" sha1 = "0.10" diff --git a/e2e/rust/tests/readyz_health.rs b/e2e/rust/tests/readyz_health.rs new file mode 100644 index 000000000..42b9b3c27 --- /dev/null +++ b/e2e/rust/tests/readyz_health.rs @@ -0,0 +1,95 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +#![cfg(feature = "e2e-docker")] + +use bytes::Bytes; +use http_body_util::{BodyExt, Empty}; +use hyper::Request; +use hyper_util::rt::TokioIo; +use serde_json::Value; +use std::time::{Duration, Instant}; +use tokio::net::TcpStream; + +fn health_port_from_env() -> u16 { + let raw = std::env::var("OPENSHELL_E2E_HEALTH_PORT").unwrap_or_else(|_| { + panic!( + "OPENSHELL_E2E_HEALTH_PORT is not set. The Docker e2e wrapper \ + (e2e/with-docker-gateway.sh) must export this variable so the \ + /readyz test can reach the gateway health listener." + ) + }); + raw.parse::().unwrap_or_else(|err| { + panic!("OPENSHELL_E2E_HEALTH_PORT=\"{raw}\" is not a valid u16 port: {err}") + }) +} + +async fn http_get_json(port: u16, path: &str) -> Result<(u16, Value), String> { + let stream = TcpStream::connect(("127.0.0.1", port)) + .await + .map_err(|err| format!("connect health endpoint :{port}: {err}"))?; + let (mut sender, conn) = hyper::client::conn::http1::Builder::new() + .handshake(TokioIo::new(stream)) + .await + .map_err(|err| format!("handshake health HTTP/1 client :{port}: {err}"))?; + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .method("GET") + .uri(format!("http://127.0.0.1:{port}{path}")) + .body(Empty::::new()) + .map_err(|err| format!("build health request {path}: {err}"))?; + let resp = sender + .send_request(req) + .await + .map_err(|err| format!("send health request {path} to :{port}: {err}"))?; + let status_code = resp.status().as_u16(); + let bytes = resp + .into_body() + .collect() + .await + .map_err(|err| format!("read health response body {path}: {err}"))? + .to_bytes(); + let json = serde_json::from_slice::(&bytes) + .map_err(|err| format!("health endpoint {path} did not return valid JSON: {err}"))?; + + Ok((status_code, json)) +} + +#[tokio::test] +async fn readyz_reports_healthy_database_check() { + let port = health_port_from_env(); + + let deadline = Instant::now() + Duration::from_secs(20); + let timeout_detail = loop { + let observation = match http_get_json(port, "/readyz").await { + Ok((status, payload)) => { + let ready = status == 200 + && payload["status"] == "healthy" + && payload["checks"]["database"]["status"] == "healthy"; + if ready { + assert!( + payload["checks"]["database"]["latency_ms"].is_number(), + "readyz payload should include checks.database.latency_ms: {payload}" + ); + assert!( + payload["checks"]["database"]["error"].is_null(), + "readyz payload should not include checks.database.error when healthy: {payload}" + ); + return; + } + format!("unexpected /readyz response status={status} payload={payload}") + } + Err(err) => err, + }; + + if Instant::now() >= deadline { + break observation; + } + + tokio::time::sleep(Duration::from_secs(1)).await; + }; + panic!("timed out waiting for /readyz healthy response after 20s: {timeout_detail}"); +} diff --git a/e2e/with-docker-gateway.sh b/e2e/with-docker-gateway.sh index f6f34fbb5..2658ab998 100755 --- a/e2e/with-docker-gateway.sh +++ b/e2e/with-docker-gateway.sh @@ -427,6 +427,10 @@ openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial \ cd "${ROOT}" HOST_PORT=$(e2e_pick_port) +HEALTH_PORT=$(e2e_pick_port) +while [ "${HEALTH_PORT}" = "${HOST_PORT}" ]; do + HEALTH_PORT=$(e2e_pick_port) +done STATE_DIR="${WORKDIR}/state" mkdir -p "${STATE_DIR}" @@ -441,16 +445,18 @@ export OPENSHELL_E2E_NETWORK_NAME="${DOCKER_NETWORK_NAME}" export OPENSHELL_E2E_SANDBOX_NAMESPACE="${E2E_NAMESPACE}" export OPENSHELL_E2E_DRIVER="docker" export OPENSHELL_E2E_CONTAINER_ENGINE="docker" +export OPENSHELL_E2E_HEALTH_PORT="${HEALTH_PORT}" if connect_current_container_to_docker_network "${DOCKER_NETWORK_NAME}"; then echo "Connected CI job container to Docker network ${DOCKER_NETWORK_NAME} (${GATEWAY_HOST_ALIAS_IP})." else GATEWAY_HOST_ALIAS_IP="" fi -echo "Starting openshell-gateway on port ${HOST_PORT} (namespace: ${E2E_NAMESPACE})..." +echo "Starting openshell-gateway on port ${HOST_PORT} (health ${HEALTH_PORT}, namespace: ${E2E_NAMESPACE})..." GATEWAY_ARGS=( --bind-address 0.0.0.0 \ --port "${HOST_PORT}" \ + --health-port "${HEALTH_PORT}" \ --drivers docker \ --sandbox-namespace "${E2E_NAMESPACE}" \ --docker-network-name "${DOCKER_NETWORK_NAME}" \