Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fs::read_to_string, net::IpAddr, path::PathBuf};
use std::{fs::read_to_string, net::IpAddr, path::PathBuf, time::Duration};

use clap::Parser;
use log::LevelFilter;
Expand All @@ -9,6 +9,10 @@ fn default_url() -> Url {
Url::parse("http://localhost:8080").unwrap()
}

fn default_adoption_timeout() -> u64 {
10
}

#[derive(Parser, Debug, Deserialize, Clone)]
#[command(version)]
pub struct EnvConfig {
Expand Down Expand Up @@ -88,6 +92,24 @@ pub struct EnvConfig {
/// Use Let's Encrypt staging environment for ACME issuance.
#[arg(long, env = "DEFGUARD_PROXY_ACME_STAGING", default_value_t = false)]
pub acme_staging: bool,

/// Time limit in minutes for the auto-adoption process.
/// After this time Edge will reject adoption attempts until restarted.
#[arg(
long,
short = 't',
env = "DEFGUARD_ADOPTION_TIMEOUT",
default_value = "10"
)]
#[serde(default = "default_adoption_timeout")]
pub adoption_timeout: u64,
}

impl EnvConfig {
#[must_use]
pub fn adoption_timeout(&self) -> Duration {
Duration::from_secs(self.adoption_timeout * 60)
}
}

#[derive(thiserror::Error, Debug)]
Expand Down
15 changes: 9 additions & 6 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,15 @@ pub async fn run_setup(
cert_dir.display()
);
let configuration = setup_server
.await_initial_setup(SocketAddr::new(
env_config
.grpc_bind_address
.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
env_config.grpc_port,
))
.await_initial_setup(
SocketAddr::new(
env_config
.grpc_bind_address
.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
env_config.grpc_port,
),
env_config,
)
.await?;
info!("Generated new gRPC TLS certificates and signed by Defguard Core");

Expand Down
69 changes: 54 additions & 15 deletions src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use std::{
net::SocketAddr,
sync::{Arc, LazyLock, Mutex},
sync::{
Arc, LazyLock, Mutex,
atomic::{AtomicBool, Ordering},
},
};

use defguard_version::{
DefguardComponent, Version,
server::{DefguardVersionLayer, grpc::DefguardVersionInterceptor},
};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status, transport::Server};

use crate::{
CommsChannel, LogsReceiver, MIN_CORE_VERSION, VERSION,
config::EnvConfig,
error::ApiError,
grpc::Configuration,
proto::{CertificateInfo, DerPayload, LogEntry, proxy_setup_server},
Expand All @@ -32,6 +36,7 @@ pub(crate) struct ProxySetupServer {
key_pair: Arc<Mutex<Option<defguard_certs::RcGenKeyPair>>>,
logs_rx: LogsReceiver,
current_session_token: Arc<Mutex<Option<String>>>,
adoption_expired: Arc<AtomicBool>,
}

impl Clone for ProxySetupServer {
Expand All @@ -40,6 +45,7 @@ impl Clone for ProxySetupServer {
key_pair: Arc::clone(&self.key_pair),
logs_rx: Arc::clone(&self.logs_rx),
current_session_token: Arc::clone(&self.current_session_token),
adoption_expired: Arc::clone(&self.adoption_expired),
}
}
}
Expand All @@ -50,6 +56,7 @@ impl ProxySetupServer {
key_pair: Arc::new(Mutex::new(None)),
logs_rx,
current_session_token: Arc::new(Mutex::new(None)),
adoption_expired: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -59,14 +66,37 @@ impl ProxySetupServer {
/// `GetCsr`, `SendCert`. The server shuts down as soon as `SendCert` deposits a
/// `Configuration` into `SETUP_CHANNEL`, after which this function returns the received
/// gRPC configuration (locally generated key pair and remotely signed certificate).
///
/// A timeout is started in the background using `config.adoption_timeout()`. If the timeout
/// elapses before setup completes, the `adoption_expired` flag is set and incoming `Start`
/// requests are rejected with `failed_precondition` until the Edge is restarted.
/// On successful adoption the timeout is cancelled.
pub(crate) async fn await_initial_setup(
&self,
addr: SocketAddr,
config: &EnvConfig,
) -> Result<Configuration, anyhow::Error> {
info!("gRPC waiting for setup connection from Core on {addr}");
let adoption_timeout = config.adoption_timeout();
info!(
"gRPC waiting for setup connection from Core on {addr} for {} min",
adoption_timeout.as_secs() / 60
);

let adoption_expired = Arc::clone(&self.adoption_expired);
let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
tokio::select! {
_ = tokio::time::sleep(adoption_timeout) => {
adoption_expired.store(true, Ordering::Relaxed);
error!(
"Edge adoption expired and is now blocked. Restart the Edge to enable adoption."
);
}
_ = cancel_rx => {}
}
});
let own_version = Version::parse(VERSION)?;
debug!("Proxy version: {}", VERSION);
debug!("Edge version: {}", VERSION);

let config_slot: Arc<tokio::sync::Mutex<Option<Configuration>>> =
Arc::new(tokio::sync::Mutex::new(None));
Expand Down Expand Up @@ -107,14 +137,17 @@ impl ProxySetupServer {
ApiError::Unexpected("No configuration received after setup".into())
})?;

// Skip blocking Edge adoption if adoption was already done
let _ = cancel_tx.send(());
Comment thread
jakub-tldr marked this conversation as resolved.

Ok(configuration)
}

fn is_setup_in_progress(&self) -> bool {
let in_progress = self
.current_session_token
.lock()
.expect("Failed to acquire lock on current session token during proxy setup")
.expect("Failed to acquire lock on current session token during Edge setup")
.is_some();
debug!("Setup in progress check: {}", in_progress);
in_progress
Expand All @@ -124,7 +157,7 @@ impl ProxySetupServer {
debug!("Terminating setup session");
self.current_session_token
.lock()
.expect("Failed to acquire lock on current session token during proxy setup")
.expect("Failed to acquire lock on current session token during Edge setup")
.take();
debug!("Setup session terminated");
}
Expand All @@ -133,7 +166,7 @@ impl ProxySetupServer {
debug!("Establishing new setup session with Core");
self.current_session_token
.lock()
.expect("Failed to acquire lock on current session token during proxy setup")
.expect("Failed to acquire lock on current session token during Edge setup")
.replace(token);
debug!("Setup session established");
}
Expand All @@ -143,7 +176,7 @@ impl ProxySetupServer {
let is_valid = (*self
.current_session_token
.lock()
.expect("Failed to acquire lock on current session token during proxy setup"))
.expect("Failed to acquire lock on current session token during Edge setup"))
.as_ref()
.is_some_and(|t| t == token);
debug!("Authorization validation result: {}", is_valid);
Expand All @@ -158,6 +191,12 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
#[instrument(skip(self, request))]
async fn start(&self, request: Request<()>) -> Result<Response<Self::StartStream>, Status> {
debug!("Core initiated setup process, preparing to stream logs");
if self.adoption_expired.load(Ordering::Relaxed) {
let error_message =
"Edge adoption expired and is now blocked. Restart the Edge to enable adoption.";
error!("{error_message}");
return Err(Status::failed_precondition(error_message));
}
if self.is_setup_in_progress() {
error!("Setup already in progress, rejecting new setup request");
return Err(Status::resource_exhausted("Setup already in progress"));
Expand All @@ -174,7 +213,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
debug!("Setup session authenticated successfully");
self.initialize_setup_session(token.to_string());

debug!("Preparing to forward Proxy logs to Core in real-time");
debug!("Preparing to forward Edge logs to Core in real-time");
let logs_rx = self.logs_rx.clone();

let (tx, rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -208,7 +247,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
self_clone.clear_setup_session();
});

debug!("Log stream established, Core will now receive real-time Proxy logs");
debug!("Log stream established, Core will now receive real-time Edge logs");
Ok(Response::new(UnboundedReceiverStream::new(rx)))
}

Expand Down Expand Up @@ -274,7 +313,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {

self.key_pair
.lock()
.expect("Failed to acquire lock on key pair during proxy setup when trying to store generated key pair")
.expect("Failed to acquire lock on key pair during Edge setup when trying to store generated key pair")
.replace(key_pair);

debug!("Encoding Certificate Signing Request for transmission");
Expand Down Expand Up @@ -331,17 +370,17 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {
let key_pair = self
.key_pair
.lock()
.expect("Failed to acquire lock on key pair during proxy setup when trying to receive certificate")
.expect("Failed to acquire lock on key pair during Edge setup when trying to receive certificate")
.take();
if let Some(kp) = key_pair {
kp
} else {
error!(
"Key pair not found during Proxy setup. Key pair generation step might have failed."
"Key pair not found during Edge setup. Key pair generation step might have failed."
);
self.clear_setup_session();
return Err(Status::internal(
"Key pair not found during Proxy setup. Key pair generation step might have failed.",
"Key pair not found during Edge setup. Key pair generation step might have failed.",
));
}
};
Expand All @@ -353,7 +392,7 @@ impl proxy_setup_server::ProxySetup for ProxySetupServer {

debug!("Passing configuration to gRPC server for finalization");
match SETUP_CHANNEL.0.lock().await.send(Some(configuration)).await {
Ok(()) => info!("Proxy configuration passed to gRPC server successfully"),
Ok(()) => info!("Edge configuration passed to gRPC server successfully"),
Err(err) => {
error!("Failed to send configuration to gRPC server: {err}");
self.clear_setup_session();
Expand Down
Loading