Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions src/dialog/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ pub struct Registration {
/// Public address detected by the server (IP and port)
pub public_address: Option<crate::sip::HostWithPort>,
pub call_id: crate::sip::headers::CallId,
/// Outbound proxy — override transport destination while keeping the
/// domain in SIP headers. Used for NAT traversal with load-balanced
/// proxy clusters where DNS may resolve to different IPs.
pub outbound_proxy: Option<std::net::SocketAddr>,
}

impl Registration {
Expand Down Expand Up @@ -165,6 +169,7 @@ impl Registration {
allow: crate::sip::headers::Allow::new(""),
public_address: None,
call_id,
outbound_proxy: None,
}
}

Expand Down Expand Up @@ -412,6 +417,13 @@ impl Registration {
request.headers.unique_push(self.call_id.clone().into());
request.headers.unique_push(contact.into());
request.headers.unique_push(self.allow.clone().into());
// RFC 3327 Path + RFC 5626 Outbound: tell the proxy to record
// a Path header so INVITEs route through the correct edge node
// (the one with our TCP connection).
request.headers.unique_push(rsip::Header::Other(
"Supported".into(),
"path, outbound".into(),
));
if let Some(expires) = expires {
request
.headers
Expand All @@ -421,6 +433,24 @@ impl Registration {
let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
let mut tx = Transaction::new_client(key, request, self.endpoint.clone(), None);

// Override transport destination if outbound proxy is configured.
// This keeps the domain in SIP headers (Request-URI, From, To) while
// sending all packets to the pinned proxy IP for NAT consistency.
if let Some(proxy) = &self.outbound_proxy {
let mut dest = SipAddr::from(*proxy);
// Inherit transport type from the request URI (e.g., TCP)
if let Some(rsip::Param::Transport(t)) = tx
.original
.uri()
.params
.iter()
.find(|p| matches!(p, rsip::Param::Transport(_)))
{
dest.r#type = Some(t.clone());
}
tx.destination = Some(dest);
}

tx.send().await?;
let mut auth_sent = false;

Expand All @@ -443,7 +473,15 @@ impl Registration {
"updated public address from 401 response"
);
self.public_address = received;
self.contact = None;
// Update the Contact's host/port but preserve URI params
// (e.g., transport=tcp) so they persist across re-registrations.
if let Some(ref mut contact) = self.contact {
if let Some(ref pa) = self.public_address {
contact.uri.host_with_port = pa.clone();
}
} else {
self.contact = None;
}
}

if auth_sent {
Expand All @@ -465,7 +503,8 @@ impl Registration {
auth: contact_for_retry.uri.auth.clone(),
scheme: Some(crate::sip::Scheme::Sip),
host_with_port: pa.clone(),
params: vec![],
// Preserve URI params (e.g., transport=tcp) from original Contact
params: contact_for_retry.uri.params.clone(),
headers: vec![],
};
let mut new_contact = contact_for_retry.clone();
Expand Down Expand Up @@ -495,11 +534,10 @@ impl Registration {
// Do NOT adopt the Contact from the 200 OK response.
// The response may contain Contact bindings from OTHER
// devices sharing the same AOR (Address of Record).
// Blindly reusing it would corrupt our Contact in
// subsequent re-registrations, routing calls to the
// wrong host. Instead, always derive Contact from
// self.public_address (set from Via received parameter).
self.contact = None;
// Keep self.contact as-is — if explicitly set by the caller
// (e.g., with transport=tcp), it should persist across
// re-registrations. The Contact is rebuilt from
// public_address only when self.contact is None.

if self.public_address != received {
debug!(
Expand Down
29 changes: 17 additions & 12 deletions src/transaction/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,19 +477,17 @@ impl EndpointInner {
crate::sip::StatusCode::CallTransactionDoesNotExist,
None,
);
let resp = if let Some(ref inspector) = self.message_inspector {
inspector.before_send(resp.into(), None)
} else {
resp.into()
};

let dest = if !connection.is_reliable() {
self.get_destination_from_request(&request).await
} else {
None
};

let resp = if let Some(ref inspector) = self.message_inspector {
inspector.before_send(resp.into(), dest.as_ref())
} else {
resp.into()
};

connection.send(resp, dest.as_ref()).await?;
return Ok(());
}
Expand Down Expand Up @@ -558,14 +556,21 @@ impl EndpointInner {
.cloned()?,
};

let transport = first_addr.r#type.unwrap_or_default();
let mut params = vec![
branch.unwrap_or_else(make_via_branch),
crate::sip::Param::Rport(None),
];
// RFC 5923: alias parameter tells the proxy to reuse this TCP connection
// for sending requests back to us (essential for NAT traversal over TCP).
if transport == crate::sip::Transport::Tcp || transport == crate::sip::Transport::Tls {
params.push(crate::sip::Param::Other("alias".into(), None));
}
let via = crate::sip::typed::Via {
version: crate::sip::Version::V2,
transport: first_addr.r#type.unwrap_or_default(),
transport,
uri: first_addr.addr.into(),
params: vec![
branch.unwrap_or_else(make_via_branch),
crate::sip::Param::Rport(None),
],
params,
};
Ok(via)
}
Expand Down
29 changes: 27 additions & 2 deletions src/transport/transport_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,29 @@ impl TransportLayer {
}

pub fn get_addrs(&self) -> Vec<SipAddr> {
self.inner.listens.read().iter().map(|t| t.get_addr().to_owned()).collect()
let mut addrs: Vec<SipAddr> = match self.inner.listens.read() {
Ok(listens) => listens.iter().map(|t| t.get_addr().to_owned()).collect(),
Err(e) => {
warn!(error = ?e, "Failed to read listens");
Vec::new()
}
};
// Also include local addresses from TCP/TLS client connections.
// For connection-oriented transports, get_addr() returns the remote address
// (used for lookup), but Via/Contact headers need the local address.
if let Ok(connections) = self.inner.connections.read() {
for conn in connections.values() {
match conn {
SipConnection::Tcp(tcp) => {
addrs.push(tcp.inner.local_addr.clone());
}
// TLS inner is private — skip for now
SipConnection::Tls(_) => {}
_ => {}
}
}
}
addrs
}

/// Set an async whitelist callback invoked on incoming packets/connections.
Expand Down Expand Up @@ -386,9 +408,12 @@ impl TransportLayerInner {
pub fn serve_connection(&self, transport: SipConnection) {
let sub_token = self.cancel_token.child_token();
let sender_clone = self.transport_tx.clone();
info!(addr=%transport.get_addr(), "serve_connection: starting serve_loop");
tokio::spawn(async move {
match sender_clone.send(TransportEvent::New(transport.clone())) {
Ok(()) => {}
Ok(()) => {
info!(addr=%transport.get_addr(), "serve_connection: New event sent");
}
Err(e) => {
warn!(addr=%transport.get_addr(), error = ?e, "Error sending new connection event");
return;
Expand Down
Loading