Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions src/proxy_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UdpSocket};
use tokio::sync::{mpsc, Mutex};
Expand Down Expand Up @@ -965,7 +966,7 @@ struct SocksUdpTarget {
/// to abort mid-await.
struct UdpRelaySession {
sid: String,
uplink: mpsc::Sender<Vec<u8>>,
uplink: mpsc::Sender<Bytes>,
}

/// All per-ASSOCIATE UDP relay state behind a single mutex so insertion
Expand All @@ -991,7 +992,7 @@ impl UdpRelayState {
}
}

fn get_uplink(&self, target: &SocksUdpTarget) -> Option<mpsc::Sender<Vec<u8>>> {
fn get_uplink(&self, target: &SocksUdpTarget) -> Option<mpsc::Sender<Bytes>> {
self.sessions.get(target).map(|s| s.uplink.clone())
}

Expand Down Expand Up @@ -1118,7 +1119,15 @@ async fn handle_socks5_udp_associate(
client_peer_ip
);

let mut buf = vec![0u8; SOCKS5_UDP_RECV_BUF_BYTES];
// Fixed reusable recv buffer. We deliberately don't go the
// `BytesMut::split().freeze()` route here even though `tunnel_loop`
// does: in TCP the read region IS the payload, but UDP always
// slices the SOCKS5 header off, so we'd be copying out anyway —
// and a frozen `Bytes` from the recv buf would refcount-pin the
// full ~65 KB allocation behind a tiny DNS reply, ballooning
// memory under bursts. Right-sized `Bytes::copy_from_slice` on
// accepted payloads keeps retention proportional to actual data.
let mut recv_buf = vec![0u8; SOCKS5_UDP_RECV_BUF_BYTES];
let mut control_buf = [0u8; 1];
let mut client_addr: Option<SocketAddr> = None;
let state: Arc<Mutex<UdpRelayState>> = Arc::new(Mutex::new(UdpRelayState::new()));
Expand All @@ -1134,14 +1143,15 @@ async fn handle_socks5_udp_associate(

loop {
tokio::select! {
recv = udp.recv_from(&mut buf) => {
recv = udp.recv_from(&mut recv_buf) => {
let (n, peer) = match recv {
Ok(v) => v,
Err(e) => {
tracing::debug!("udp associate recv failed: {}", e);
break;
}
};

// Source-IP check: anything not from the SOCKS5 client's
// host is dropped silently.
if peer.ip() != client_peer_ip {
Expand All @@ -1162,9 +1172,10 @@ async fn handle_socks5_udp_associate(
// can race one bad packet to DoS the legitimate client
// (whose real datagram, sent from a different ephemeral
// port, would then be silently rejected).
let Some((target, payload)) = parse_socks5_udp_packet(&buf[..n]) else {
let Some((target, payload_off)) = parse_socks5_udp_packet_offsets(&recv_buf[..n]) else {
continue;
};
let payload_slice = &recv_buf[payload_off..n];

// Issue #213: client-side QUIC block. UDP/443 is
// HTTP/3 — drop the datagram silently so the client
Expand Down Expand Up @@ -1206,19 +1217,26 @@ async fn handle_socks5_udp_associate(
// the mux. Each datagram costs ~payload * 1.33 in the
// batched JSON envelope plus tunnel-node CPU; uncapped,
// a runaway client can exhaust Apps Script quota.
if payload.len() > MAX_UDP_PAYLOAD_BYTES {
if payload_slice.len() > MAX_UDP_PAYLOAD_BYTES {
oversized_dropped += 1;
if oversized_dropped == 1 || oversized_dropped.is_multiple_of(100) {
tracing::debug!(
"udp datagram dropped: {} B > {} B (count={})",
payload.len(),
payload_slice.len(),
MAX_UDP_PAYLOAD_BYTES,
oversized_dropped,
);
}
continue;
}
let payload = payload.to_vec();

// Right-sized copy: the queued/in-flight payload owns its
// own allocation, so the recv buffer can be reused on the
// next iteration without keeping every queued datagram
// alive. Sized to the actual payload (≤ MAX_UDP_PAYLOAD_BYTES
// = 9 KB after the guard above), not the full ~65 KB recv
// buffer.
let payload = Bytes::copy_from_slice(payload_slice);

// Fast path: existing session — push payload onto its
// bounded uplink queue, drop on overflow (UDP semantics).
Expand Down Expand Up @@ -1292,7 +1310,7 @@ async fn handle_socks5_udp_associate(
continue;
}

let (uplink_tx, uplink_rx) = mpsc::channel::<Vec<u8>>(UDP_UPLINK_QUEUE);
let (uplink_tx, uplink_rx) = mpsc::channel::<Bytes>(UDP_UPLINK_QUEUE);
let task_mux = mux.clone();
let task_udp = udp.clone();
let task_target = target.clone();
Expand Down Expand Up @@ -1365,7 +1383,7 @@ async fn udp_session_task(
sid: String,
target: SocksUdpTarget,
client_addr: SocketAddr,
mut uplink_rx: mpsc::Receiver<Vec<u8>>,
mut uplink_rx: mpsc::Receiver<Bytes>,
) {
let mut backoff = UDP_INITIAL_POLL_DELAY;
loop {
Expand Down Expand Up @@ -1473,7 +1491,20 @@ async fn write_socks5_reply(
sock.flush().await
}

fn parse_socks5_udp_packet(buf: &[u8]) -> Option<(SocksUdpTarget, &[u8])> {
/// Parse the SOCKS5 UDP frame header and return the target plus the byte
/// offset at which the payload starts. Splitting "structure parsing"
/// from "give me a payload slice" lets the recv hot path stay on a
/// fixed reusable `Vec<u8>` buffer and only allocate a right-sized
/// `Bytes::copy_from_slice(&recv_buf[off..n])` for accepted payloads
/// (after the size guard). DO NOT change this back to a zero-copy
/// `Bytes::slice` path: that was tried and reverted because slicing
/// the recv buffer with `bytes` 1.x refcounts the whole ~65 KB
/// allocation, so a queued tiny DNS reply pinned the full datagram-
/// sized buffer until it drained — burst retention regressed by
/// orders of magnitude on UDP-heavy workloads. The thin
/// `parse_socks5_udp_packet` wrapper below keeps existing `&[u8]`
/// callers (tests) working.
fn parse_socks5_udp_packet_offsets(buf: &[u8]) -> Option<(SocksUdpTarget, usize)> {
if buf.len() < 4 || buf[0] != 0 || buf[1] != 0 || buf[2] != 0 {
return None;
}
Expand Down Expand Up @@ -1528,10 +1559,15 @@ fn parse_socks5_udp_packet(buf: &[u8]) -> Option<(SocksUdpTarget, &[u8])> {
atyp,
addr,
},
&buf[pos..],
pos,
))
}

fn parse_socks5_udp_packet(buf: &[u8]) -> Option<(SocksUdpTarget, &[u8])> {
let (target, off) = parse_socks5_udp_packet_offsets(buf)?;
Some((target, &buf[off..]))
}

fn build_socks5_udp_packet(target: &SocksUdpTarget, payload: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(4 + target.addr.len() + 2 + payload.len() + 1);
out.extend_from_slice(&[0, 0, 0, target.atyp]);
Expand Down
Loading
Loading