diff --git a/src/config.rs b/src/config.rs index 7f53d25..23b0813 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use std::{fs::read_to_string, net::IpAddr, path::PathBuf}; +use std::{fs::read_to_string, net::IpAddr, path::PathBuf, time::Duration}; use clap::Parser; use log::LevelFilter; @@ -9,6 +9,10 @@ fn default_url() -> Url { Url::parse("http://localhost:8080").unwrap() } +fn default_adoption_timeout() -> u64 { + 10 +} + #[derive(Parser, Debug, Deserialize, Clone)] #[command(version)] pub struct EnvConfig { @@ -88,6 +92,24 @@ pub struct EnvConfig { /// Use Let's Encrypt staging environment for ACME issuance. #[arg(long, env = "DEFGUARD_PROXY_ACME_STAGING", default_value_t = false)] pub acme_staging: bool, + + /// Time limit in minutes for the auto-adoption process. + /// After this time Edge will reject adoption attempts until restarted. + #[arg( + long, + short = 't', + env = "DEFGUARD_ADOPTION_TIMEOUT", + default_value = "10" + )] + #[serde(default = "default_adoption_timeout")] + pub adoption_timeout: u64, +} + +impl EnvConfig { + #[must_use] + pub fn adoption_timeout(&self) -> Duration { + Duration::from_secs(self.adoption_timeout * 60) + } } #[derive(thiserror::Error, Debug)] diff --git a/src/http.rs b/src/http.rs index 9085d54..10d34e4 100644 --- a/src/http.rs +++ b/src/http.rs @@ -205,12 +205,15 @@ pub async fn run_setup( cert_dir.display() ); let configuration = setup_server - .await_initial_setup(SocketAddr::new( - env_config - .grpc_bind_address - .unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), - env_config.grpc_port, - )) + .await_initial_setup( + SocketAddr::new( + env_config + .grpc_bind_address + .unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), + env_config.grpc_port, + ), + env_config, + ) .await?; info!("Generated new gRPC TLS certificates and signed by Defguard Core"); diff --git a/src/setup.rs b/src/setup.rs index 684639b..344ee34 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -1,18 +1,22 @@ use std::{ net::SocketAddr, - sync::{Arc, LazyLock, Mutex}, + sync::{ + Arc, LazyLock, Mutex, + atomic::{AtomicBool, Ordering}, + }, }; use defguard_version::{ DefguardComponent, Version, server::{DefguardVersionLayer, grpc::DefguardVersionInterceptor}, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{Request, Response, Status, transport::Server}; use crate::{ CommsChannel, LogsReceiver, MIN_CORE_VERSION, VERSION, + config::EnvConfig, error::ApiError, grpc::Configuration, proto::{CertificateInfo, DerPayload, LogEntry, proxy_setup_server}, @@ -32,6 +36,7 @@ pub(crate) struct ProxySetupServer { key_pair: Arc>>, logs_rx: LogsReceiver, current_session_token: Arc>>, + adoption_expired: Arc, } impl Clone for ProxySetupServer { @@ -40,6 +45,7 @@ impl Clone for ProxySetupServer { key_pair: Arc::clone(&self.key_pair), logs_rx: Arc::clone(&self.logs_rx), current_session_token: Arc::clone(&self.current_session_token), + adoption_expired: Arc::clone(&self.adoption_expired), } } } @@ -50,6 +56,7 @@ impl ProxySetupServer { key_pair: Arc::new(Mutex::new(None)), logs_rx, current_session_token: Arc::new(Mutex::new(None)), + adoption_expired: Arc::new(AtomicBool::new(false)), } } @@ -59,14 +66,37 @@ impl ProxySetupServer { /// `GetCsr`, `SendCert`. The server shuts down as soon as `SendCert` deposits a /// `Configuration` into `SETUP_CHANNEL`, after which this function returns the received /// gRPC configuration (locally generated key pair and remotely signed certificate). + /// + /// A timeout is started in the background using `config.adoption_timeout()`. If the timeout + /// elapses before setup completes, the `adoption_expired` flag is set and incoming `Start` + /// requests are rejected with `failed_precondition` until the Edge is restarted. + /// On successful adoption the timeout is cancelled. pub(crate) async fn await_initial_setup( &self, addr: SocketAddr, + config: &EnvConfig, ) -> Result { - info!("gRPC waiting for setup connection from Core on {addr}"); + let adoption_timeout = config.adoption_timeout(); + info!( + "gRPC waiting for setup connection from Core on {addr} for {} min", + adoption_timeout.as_secs() / 60 + ); + let adoption_expired = Arc::clone(&self.adoption_expired); + let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); + tokio::spawn(async move { + tokio::select! { + _ = tokio::time::sleep(adoption_timeout) => { + adoption_expired.store(true, Ordering::Relaxed); + error!( + "Edge adoption expired and is now blocked. Restart the Edge to enable adoption." + ); + } + _ = cancel_rx => {} + } + }); let own_version = Version::parse(VERSION)?; - debug!("Proxy version: {}", VERSION); + debug!("Edge version: {}", VERSION); let config_slot: Arc>> = Arc::new(tokio::sync::Mutex::new(None)); @@ -107,6 +137,9 @@ impl ProxySetupServer { ApiError::Unexpected("No configuration received after setup".into()) })?; + // Skip blocking Edge adoption if adoption was already done + let _ = cancel_tx.send(()); + Ok(configuration) } @@ -114,7 +147,7 @@ impl ProxySetupServer { let in_progress = self .current_session_token .lock() - .expect("Failed to acquire lock on current session token during proxy setup") + .expect("Failed to acquire lock on current session token during Edge setup") .is_some(); debug!("Setup in progress check: {}", in_progress); in_progress @@ -124,7 +157,7 @@ impl ProxySetupServer { debug!("Terminating setup session"); self.current_session_token .lock() - .expect("Failed to acquire lock on current session token during proxy setup") + .expect("Failed to acquire lock on current session token during Edge setup") .take(); debug!("Setup session terminated"); } @@ -133,7 +166,7 @@ impl ProxySetupServer { debug!("Establishing new setup session with Core"); self.current_session_token .lock() - .expect("Failed to acquire lock on current session token during proxy setup") + .expect("Failed to acquire lock on current session token during Edge setup") .replace(token); debug!("Setup session established"); } @@ -143,7 +176,7 @@ impl ProxySetupServer { let is_valid = (*self .current_session_token .lock() - .expect("Failed to acquire lock on current session token during proxy setup")) + .expect("Failed to acquire lock on current session token during Edge setup")) .as_ref() .is_some_and(|t| t == token); debug!("Authorization validation result: {}", is_valid); @@ -158,6 +191,12 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer { #[instrument(skip(self, request))] async fn start(&self, request: Request<()>) -> Result, Status> { debug!("Core initiated setup process, preparing to stream logs"); + if self.adoption_expired.load(Ordering::Relaxed) { + let error_message = + "Edge adoption expired and is now blocked. Restart the Edge to enable adoption."; + error!("{error_message}"); + return Err(Status::failed_precondition(error_message)); + } if self.is_setup_in_progress() { error!("Setup already in progress, rejecting new setup request"); return Err(Status::resource_exhausted("Setup already in progress")); @@ -174,7 +213,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer { debug!("Setup session authenticated successfully"); self.initialize_setup_session(token.to_string()); - debug!("Preparing to forward Proxy logs to Core in real-time"); + debug!("Preparing to forward Edge logs to Core in real-time"); let logs_rx = self.logs_rx.clone(); let (tx, rx) = mpsc::unbounded_channel(); @@ -208,7 +247,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer { self_clone.clear_setup_session(); }); - debug!("Log stream established, Core will now receive real-time Proxy logs"); + debug!("Log stream established, Core will now receive real-time Edge logs"); Ok(Response::new(UnboundedReceiverStream::new(rx))) } @@ -274,7 +313,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer { self.key_pair .lock() - .expect("Failed to acquire lock on key pair during proxy setup when trying to store generated key pair") + .expect("Failed to acquire lock on key pair during Edge setup when trying to store generated key pair") .replace(key_pair); debug!("Encoding Certificate Signing Request for transmission"); @@ -331,17 +370,17 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer { let key_pair = self .key_pair .lock() - .expect("Failed to acquire lock on key pair during proxy setup when trying to receive certificate") + .expect("Failed to acquire lock on key pair during Edge setup when trying to receive certificate") .take(); if let Some(kp) = key_pair { kp } else { error!( - "Key pair not found during Proxy setup. Key pair generation step might have failed." + "Key pair not found during Edge setup. Key pair generation step might have failed." ); self.clear_setup_session(); return Err(Status::internal( - "Key pair not found during Proxy setup. Key pair generation step might have failed.", + "Key pair not found during Edge setup. Key pair generation step might have failed.", )); } }; @@ -353,7 +392,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer { debug!("Passing configuration to gRPC server for finalization"); match SETUP_CHANNEL.0.lock().await.send(Some(configuration)).await { - Ok(()) => info!("Proxy configuration passed to gRPC server successfully"), + Ok(()) => info!("Edge configuration passed to gRPC server successfully"), Err(err) => { error!("Failed to send configuration to gRPC server: {err}"); self.clear_setup_session();