From 228fe48339cc054e7243328a8eb7a023b2ad7a35 Mon Sep 17 00:00:00 2001 From: Venky Date: Wed, 1 Apr 2026 07:51:37 +0400 Subject: [PATCH] =?UTF-8?q?feat:=20TCP=20NAT=20traversal=20=E2=80=94=20Via?= =?UTF-8?q?=20alias,=20outbound=20proxy,=20connection=20reuse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes for reliable SIP over TCP behind NAT with load-balanced proxy clusters: 1. Via alias parameter (RFC 5923): Add ;alias to Via header for TCP/TLS connections, telling the proxy to reuse this connection for sending requests back. Essential for receiving INVITEs behind NAT. 2. Outbound proxy support on Registration: New outbound_proxy field (Option) that overrides transport destination while keeping the domain in SIP headers (Request-URI, From, To). Allows pinning to a DNS-resolved proxy IP for NAT consistency — prevents re-registration from resolving to a different proxy node. 3. TCP connection reuse in transport layer: Include local addresses from TCP/TLS client connections in the address list used for Via/Contact headers. Without this, only listener addresses are returned, and TCP client connections are invisible for header construction. Also fixes message inspector ordering: call before_send before computing destination so the inspector can modify headers that affect routing. Registration changes: - Outbound proxy inherits transport type from request URI (e.g., TCP) - Contact preservation across re-registrations: preserve URI params (transport=tcp) instead of clearing contact on 200 OK, preventing loss of transport parameter on re-register - Supported: path, outbound header in REGISTER (RFC 3327 / RFC 5626) so proxies that support Path-based routing record the correct edge node for the TCP connection --- src/dialog/registration.rs | 52 +++++++++++++++++++++++++++----- src/transaction/endpoint.rs | 29 ++++++++++-------- src/transport/transport_layer.rs | 23 ++++++++++++-- 3 files changed, 83 insertions(+), 21 deletions(-) diff --git a/src/dialog/registration.rs b/src/dialog/registration.rs index 20d0ca9..553a494 100644 --- a/src/dialog/registration.rs +++ b/src/dialog/registration.rs @@ -117,6 +117,10 @@ pub struct Registration { /// Public address detected by the server (IP and port) pub public_address: Option, pub call_id: rsip::headers::CallId, + /// Outbound proxy — override transport destination while keeping the + /// domain in SIP headers. Used for NAT traversal with load-balanced + /// proxy clusters where DNS may resolve to different IPs. + pub outbound_proxy: Option, } impl Registration { @@ -165,6 +169,7 @@ impl Registration { allow: Default::default(), public_address: None, call_id, + outbound_proxy: None, } } @@ -409,6 +414,13 @@ impl Registration { request.headers.unique_push(self.call_id.clone().into()); request.headers.unique_push(contact.into()); request.headers.unique_push(self.allow.clone().into()); + // RFC 3327 Path + RFC 5626 Outbound: tell the proxy to record + // a Path header so INVITEs route through the correct edge node + // (the one with our TCP connection). + request.headers.unique_push(rsip::Header::Other( + "Supported".into(), + "path, outbound".into(), + )); if let Some(expires) = expires { request .headers @@ -418,6 +430,24 @@ impl Registration { let key = TransactionKey::from_request(&request, TransactionRole::Client)?; let mut tx = Transaction::new_client(key, request, self.endpoint.clone(), None); + // Override transport destination if outbound proxy is configured. + // This keeps the domain in SIP headers (Request-URI, From, To) while + // sending all packets to the pinned proxy IP for NAT consistency. + if let Some(proxy) = &self.outbound_proxy { + let mut dest = SipAddr::from(*proxy); + // Inherit transport type from the request URI (e.g., TCP) + if let Some(rsip::Param::Transport(t)) = tx + .original + .uri() + .params + .iter() + .find(|p| matches!(p, rsip::Param::Transport(_))) + { + dest.r#type = Some(t.clone()); + } + tx.destination = Some(dest); + } + tx.send().await?; let mut auth_sent = false; @@ -436,7 +466,15 @@ impl Registration { "updated public address from 401 response" ); self.public_address = received; - self.contact = None; + // Update the Contact's host/port but preserve URI params + // (e.g., transport=tcp) so they persist across re-registrations. + if let Some(ref mut contact) = self.contact { + if let Some(ref pa) = self.public_address { + contact.uri.host_with_port = pa.clone(); + } + } else { + self.contact = None; + } } if auth_sent { @@ -458,7 +496,8 @@ impl Registration { auth: contact_for_retry.uri.auth.clone(), scheme: Some(rsip::Scheme::Sip), host_with_port: pa.clone(), - params: vec![], + // Preserve URI params (e.g., transport=tcp) from original Contact + params: contact_for_retry.uri.params.clone(), headers: vec![], }; let mut new_contact = contact_for_retry.clone(); @@ -484,11 +523,10 @@ impl Registration { // Do NOT adopt the Contact from the 200 OK response. // The response may contain Contact bindings from OTHER // devices sharing the same AOR (Address of Record). - // Blindly reusing it would corrupt our Contact in - // subsequent re-registrations, routing calls to the - // wrong host. Instead, always derive Contact from - // self.public_address (set from Via received parameter). - self.contact = None; + // Keep self.contact as-is — if explicitly set by the caller + // (e.g., with transport=tcp), it should persist across + // re-registrations. The Contact is rebuilt from + // public_address only when self.contact is None. if self.public_address != received { debug!( diff --git a/src/transaction/endpoint.rs b/src/transaction/endpoint.rs index e443f63..4e0333c 100644 --- a/src/transaction/endpoint.rs +++ b/src/transaction/endpoint.rs @@ -503,19 +503,17 @@ impl EndpointInner { rsip::StatusCode::CallTransactionDoesNotExist, None, ); + let resp = if let Some(ref inspector) = self.message_inspector { + inspector.before_send(resp.into(), None) + } else { + resp.into() + }; let dest = if !connection.is_reliable() { self.get_destination_from_request(&request).await } else { None }; - - let resp = if let Some(ref inspector) = self.message_inspector { - inspector.before_send(resp.into(), dest.as_ref()) - } else { - resp.into() - }; - connection.send(resp, dest.as_ref()).await?; return Ok(()); } @@ -594,14 +592,21 @@ impl EndpointInner { .cloned()?, }; + let transport = first_addr.r#type.unwrap_or_default(); + let mut params = vec![ + branch.unwrap_or_else(make_via_branch), + rsip::Param::Other("rport".into(), None), + ]; + // RFC 5923: alias parameter tells the proxy to reuse this TCP connection + // for sending requests back to us (essential for NAT traversal over TCP). + if transport == rsip::Transport::Tcp || transport == rsip::Transport::Tls { + params.push(rsip::Param::Other("alias".into(), None)); + } let via = rsip::typed::Via { version: rsip::Version::V2, - transport: first_addr.r#type.unwrap_or_default(), + transport, uri: first_addr.addr.into(), - params: vec![ - branch.unwrap_or_else(make_via_branch), - rsip::Param::Other("rport".into(), None), - ], + params, }; Ok(via) } diff --git a/src/transport/transport_layer.rs b/src/transport/transport_layer.rs index 859c49c..779982c 100644 --- a/src/transport/transport_layer.rs +++ b/src/transport/transport_layer.rs @@ -194,13 +194,29 @@ impl TransportLayer { } pub fn get_addrs(&self) -> Vec { - match self.inner.listens.read() { + let mut addrs: Vec = match self.inner.listens.read() { Ok(listens) => listens.iter().map(|t| t.get_addr().to_owned()).collect(), Err(e) => { warn!(error = ?e, "Failed to read listens"); Vec::new() } + }; + // Also include local addresses from TCP/TLS client connections. + // For connection-oriented transports, get_addr() returns the remote address + // (used for lookup), but Via/Contact headers need the local address. + if let Ok(connections) = self.inner.connections.read() { + for conn in connections.values() { + match conn { + SipConnection::Tcp(tcp) => { + addrs.push(tcp.inner.local_addr.clone()); + } + // TLS inner is private — skip for now + SipConnection::Tls(_) => {} + _ => {} + } + } } + addrs } /// Set an async whitelist callback invoked on incoming packets/connections. @@ -468,9 +484,12 @@ impl TransportLayerInner { pub fn serve_connection(&self, transport: SipConnection) { let sub_token = self.cancel_token.child_token(); let sender_clone = self.transport_tx.clone(); + info!(addr=%transport.get_addr(), "serve_connection: starting serve_loop"); tokio::spawn(async move { match sender_clone.send(TransportEvent::New(transport.clone())) { - Ok(()) => {} + Ok(()) => { + info!(addr=%transport.get_addr(), "serve_connection: New event sent"); + } Err(e) => { warn!(addr=%transport.get_addr(), error = ?e, "Error sending new connection event"); return;