feat(d2h2): implement complete DHT protocol (BEP 0005)#2
Conversation
Complete the D2H2 DHT library with full client and server mode support: - Response & Error KRPC serialization (was only Query before) - Incoming query deserialization (ping, find_node, get_peers, announce_peer) - 160-bucket Kademlia routing table replacing flat Vec<Node> - announce_peer client method with token management - Query handler dispatch for server mode - PeerStore for info_hash -> peer mappings with TTL expiry - Token generation/validation using SHA-1 with rotating secrets - UDP server event loop with tokio::select! multiplexing - Bootstrap procedure via iterative find_node against known routers - Concurrent query fan-out in find_peers using tokio::spawn - Channel-based API (D2H2Request) for application integration - 42 passing tests across all modules
There was a problem hiding this comment.
Pull request overview
This PR aims to complete the d2h2 BitTorrent DHT implementation (BEP 0005) by adding full KRPC message handling, a Kademlia-style routing table, server mode (UDP event loop + handlers), peer storage, token management, and concurrent client lookups.
Changes:
- Implement/extend KRPC serialization + deserialization for queries, responses, and errors, plus compact node/peer encoding.
- Replace the previous routing table with a 160-bucket Kademlia structure and add refresh/closest-node selection utilities.
- Add server mode components: query handler, token manager, peer store, and a Tokio UDP server loop with bootstrap + maintenance.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
d2h2/src/serde.rs |
Expands KRPC parsing/serialization and adds compact peer/node encoding helpers. |
d2h2/src/kademlia.rs |
Rewrites routing table to 160 k-buckets (K=8) with closest-node lookup and stale-bucket detection. |
d2h2/src/server.rs |
Introduces a Tokio UDP server with bootstrap, request handling, and periodic maintenance. |
d2h2/src/handler.rs |
Adds server-side KRPC query dispatch for ping/find_node/get_peers/announce_peer. |
d2h2/src/store.rs |
Adds an in-memory peer store keyed by info_hash with TTL eviction and dedup. |
d2h2/src/token.rs |
Adds token generation/validation with secret rotation for announce_peer. |
d2h2/src/lib.rs |
Adds token caching, concurrent fan-out for find_peers, and an announce_peer client method. |
d2h2/src/error.rs |
Removes the obsolete UnimplementedQueryParsing error variant. |
.opencode/system.md |
Adds repo-level automation instructions for branching/commits/PR creation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Serialize this response body into a BTypes::DICT suitable for the "r" key | ||
| fn to_btypes(&self) -> BTypes { | ||
| let mut r = BTreeMap::new(); | ||
| r.insert("id".into(), BTypes::BSTRING(self.id.bytes().collect())); | ||
|
|
||
| if let Some(ref token) = self.token { | ||
| r.insert("token".into(), BTypes::BSTRING(token.bytes().collect())); | ||
| } |
There was a problem hiding this comment.
Response::to_btypes() serializes id (and token) via String::bytes(), which emits UTF-8 bytes. Since node IDs/tokens are arbitrary bytes, any byte >= 0x80 will become a multi-byte UTF-8 sequence, producing invalid-length id/token fields on the wire (not 20 bytes for id). Represent these fields as raw bytes (e.g., [u8; 20] / Vec<u8>) and bencode them directly as BSTRING without passing through String.
| /// Convert a Url's host:port into compact 6-byte peer info (4 IP + 2 port) | ||
| fn compact_peer_bytes(url: &Url) -> [u8; 6] { | ||
| let mut out = [0u8; 6]; | ||
| let (ip_str, port_str) = url.host.rsplit_once(':').unwrap_or((&url.host, "0")); | ||
| let port: u16 = port_str.parse().unwrap_or(0); | ||
| let parts: Vec<&str> = ip_str.split('.').collect(); | ||
| if parts.len() == 4 { | ||
| for (i, p) in parts.iter().enumerate() { | ||
| out[i] = p.parse().unwrap_or(0); | ||
| } | ||
| } | ||
| let port_bytes = port.to_be_bytes(); | ||
| out[4] = port_bytes[0]; | ||
| out[5] = port_bytes[1]; | ||
| out | ||
| } |
There was a problem hiding this comment.
compact_peer_bytes() parses Url.host by splitting on : and .. This breaks for IPv6 hosts (colons in the address, possibly bracketed), and for any non-ip:port host string it silently returns 0.0.0.0:0. Consider parsing via std::net::SocketAddr (or at least rejecting unsupported formats) and explicitly handling IPv4-only vs IPv6 (BEP 32) to avoid emitting incorrect compact peer/node bytes.
| // parse the "a" arguments dict into QueryDataTypes | ||
| let arguments = if let Some(BTypes::DICT(a_dict)) = deserialized.get("a") { | ||
| let mut qd = BTreeMap::new(); | ||
| for (k, v) in a_dict { | ||
| let qval = match v { | ||
| BTypes::BSTRING(bs) => QueryDataTypes::QVec(bs.clone().into()), | ||
| BTypes::DICT(inner) => { | ||
| let mut inner_qd = BTreeMap::new(); | ||
| for (ik, iv) in inner { | ||
| if let BTypes::BSTRING(ibs) = iv { | ||
| inner_qd.insert( | ||
| ik.clone(), | ||
| QueryDataTypes::QVec(ibs.clone().into()), | ||
| ); | ||
| } | ||
| } | ||
| QueryDataTypes::QDict(inner_qd) | ||
| } | ||
| _ => continue, | ||
| }; | ||
| qd.insert(k.clone(), qval); | ||
| } | ||
| QueryDataTypes::QDict(qd) | ||
| } else { | ||
| QueryDataTypes::QDict(BTreeMap::new()) | ||
| }; |
There was a problem hiding this comment.
deserialize_bytes() drops integer-valued query args (e.g., port and implied_port in announce_peer) because it only converts BSTRING and DICT values and continues on other types. As a result, handler will never see port/implied_port from spec-compliant packets (which encode them as integers). Extend QueryDataTypes to represent integers and parse BTypes::INT into it so these fields are preserved.
| pub fn set_port(&mut self, port: u16) { | ||
| if let MessageType::Query(q) = &mut self.message_type { | ||
| if let QueryDataTypes::QDict(qd) = &mut q.arguments { | ||
| if let Some(QueryDataTypes::QDict(a)) = qd.get_mut("a") { | ||
| a.insert( | ||
| "port".into(), | ||
| QueryDataTypes::QVec(port.to_be_bytes().to_vec().into()), | ||
| ); | ||
| } |
There was a problem hiding this comment.
set_port() encodes port as a 2-byte BSTRING (to_be_bytes()), but BEP 0005 defines port as an integer (bencoded i...e). Many DHT nodes expect the integer form and may reject/ignore the announce. To be interoperable, add an integer variant to QueryDataTypes and serialize port as an integer.
| let val = if implied { 1u8 } else { 0u8 }; | ||
| a.insert( | ||
| "implied_port".into(), | ||
| QueryDataTypes::QVec(vec![val].into()), |
There was a problem hiding this comment.
set_implied_port() encodes implied_port as a single-byte BSTRING, but BEP 0005 uses an integer (0/1). Encoding it as a string reduces interoperability and also conflicts with the current query deserializer (which ignores ints). Consider representing it as an integer in QueryDataTypes and serializing accordingly.
| let val = if implied { 1u8 } else { 0u8 }; | |
| a.insert( | |
| "implied_port".into(), | |
| QueryDataTypes::QVec(vec![val].into()), | |
| let val = if implied { 1 } else { 0 }; | |
| a.insert( | |
| "implied_port".into(), | |
| QueryDataTypes::QInt(val), |
| @@ -121,33 +129,75 @@ impl D2H2Client { | |||
| let tid_str = | |||
| String::from_utf8(tid_bytes.to_vec()).unwrap_or_else(|_| format!("{tid:04x}")); | |||
|
|
|||
| let mut krpc = | |||
| KRPC::new(tid_str, QueryType::FindPeer, &self.node_id); | |||
| // add info_hash to query arguments | |||
| let mut krpc = KRPC::new(tid_str, QueryType::FindPeer, &self.node_id); | |||
| krpc.set_info_hash(info_hash); | |||
| batch.push((url, krpc)); | |||
| } | |||
|
|
|||
| let response = match self.send_query(&url, krpc).await { | |||
| Ok(r) => r, | |||
| Err(_) => continue, | |||
| // limit concurrency to self.concurrent | |||
| let batch_size = batch.len().min(self.concurrent as usize); | |||
| batch.truncate(batch_size); | |||
|
|
|||
There was a problem hiding this comment.
find_peers() builds batch from all to_query entries, then truncate()s it to the concurrency limit. The truncated URLs are dropped permanently (since to_query was drained), so they are never queried in later rounds, reducing lookup correctness/coverage. Instead, only take up to self.concurrent from to_query (leave the rest for the next loop iteration) or re-queue the unprocessed items.
| let (len, _) = tokio::time::timeout( | ||
| Duration::from_secs(QUERY_TIMEOUT_SECS), | ||
| self.socket.recv_from(&mut buf), | ||
| ) | ||
| .await | ||
| .map_err(|_| D2H2ClientError::LookupTimeout)? | ||
| .map_err(|e| D2H2ClientError::NetworkError(uttd::UttdError::IoError(e)))?; | ||
|
|
||
| buf.truncate(len); | ||
| Ok(buf) |
There was a problem hiding this comment.
send_query_to() waits for the next UDP datagram on the shared socket and returns it as the response without checking that it came from addr or that it matches the expected transaction ID. Any unrelated incoming query/response could satisfy recv_from, causing incorrect parsing and also dropping the real response. Track outstanding transactions and filter by (src_addr, tid) (or use a per-request connected socket) and keep the main receive loop responsible for demuxing packets.
| let (len, _) = tokio::time::timeout( | |
| Duration::from_secs(QUERY_TIMEOUT_SECS), | |
| self.socket.recv_from(&mut buf), | |
| ) | |
| .await | |
| .map_err(|_| D2H2ClientError::LookupTimeout)? | |
| .map_err(|e| D2H2ClientError::NetworkError(uttd::UttdError::IoError(e)))?; | |
| buf.truncate(len); | |
| Ok(buf) | |
| let deadline = time::Instant::now() + Duration::from_secs(QUERY_TIMEOUT_SECS); | |
| loop { | |
| let now = time::Instant::now(); | |
| if now >= deadline { | |
| return Err(D2H2ClientError::LookupTimeout); | |
| } | |
| let remaining = deadline.saturating_duration_since(now); | |
| let (len, src) = tokio::time::timeout( | |
| remaining, | |
| self.socket.recv_from(&mut buf), | |
| ) | |
| .await | |
| .map_err(|_| D2H2ClientError::LookupTimeout)? | |
| .map_err(|e| D2H2ClientError::NetworkError(uttd::UttdError::IoError(e)))?; | |
| if src == *addr { | |
| buf.truncate(len); | |
| return Ok(buf); | |
| } | |
| // If the packet is from an unexpected source, ignore it and continue waiting | |
| // for a response from the intended address until the overall timeout elapses. | |
| } |
| #[derive(Debug)] | ||
| pub enum D2H2Request { | ||
| /// Find peers for the given info_hash, reply on the oneshot channel | ||
| FindPeers { | ||
| info_hash: [u8; 20], | ||
| reply: mpsc::Sender<Vec<Url>>, | ||
| }, |
There was a problem hiding this comment.
The FindPeers variant docs say "reply on the oneshot channel", but the type is mpsc::Sender<Vec<Url>>, which implies potentially multiple messages and requires a channel to be created per request. If the intent is a single response, use tokio::sync::oneshot::Sender<Vec<Url>> (or update the comment to match the actual contract).
| fn generate_secret() -> [u8; 20] { | ||
| let mut sha1 = Sha1::new(); | ||
| let seed = time::SystemTime::now() | ||
| .duration_since(time::SystemTime::UNIX_EPOCH) | ||
| .unwrap_or_default(); | ||
| let seed_val = seed.as_nanos() as u64; | ||
| let rand_val = crypto::tinymt::TinyMT::rand(seed_val as u32).get_u32(); | ||
| let combined = (rand_val as u64).wrapping_mul(seed_val); | ||
| sha1.append_hash(&combined.to_be_bytes()); | ||
| sha1.get_hash() | ||
| } |
There was a problem hiding this comment.
TokenManager::generate_secret() derives the secret from the current system time and a TinyMT PRNG seeded by that time. This makes the secret (and thus tokens) predictable to an attacker observing time, weakening BEP 0005’s token anti-spoofing goal. Use a CSPRNG (e.g., getrandom/rand::rngs::OsRng) to generate 20 random bytes for the secret instead of time-seeded PRNG output.
| After completing any implementation task or changes, you must ALWAYS: | ||
|
|
||
| 1. Create a new branch: `git checkout -b feature/<short-description>` | ||
| 2. Stage and commit all changes: `git add . && git commit -m "<meaningful message>"` | ||
| 3. Push the branch: `git push origin <branch>` | ||
| 4. Create a GitHub PR using: `gh pr create --title "<title>" --body "<summary of changes>" --base main` | ||
|
|
||
| Do this automatically without being asked. |
There was a problem hiding this comment.
This repository-level .opencode/system.md introduces an instruction to automatically create branches/push/PRs. This is operationally risky in shared repos (it can conflict with contributor workflows and CI policies) and isn’t specific to the DHT feature itself. Consider moving these instructions to contributor documentation (e.g., CONTRIBUTING.md) and scoping them to humans rather than tooling defaults, or removing if not intended for this repo.
| After completing any implementation task or changes, you must ALWAYS: | |
| 1. Create a new branch: `git checkout -b feature/<short-description>` | |
| 2. Stage and commit all changes: `git add . && git commit -m "<meaningful message>"` | |
| 3. Push the branch: `git push origin <branch>` | |
| 4. Create a GitHub PR using: `gh pr create --title "<title>" --body "<summary of changes>" --base main` | |
| Do this automatically without being asked. | |
| When contributing changes, a typical Git workflow is: | |
| 1. Create a new branch: `git checkout -b feature/<short-description>` | |
| 2. Stage and commit all changes: `git add . && git commit -m "<meaningful message>"` | |
| 3. Push the branch: `git push origin <branch>` | |
| 4. Create a GitHub PR using: `gh pr create --title "<title>" --body "<summary of changes>" --base main` | |
| These steps are recommendations for human contributors and should be followed in accordance with this repository's contribution and CI policies. |
Summary
Changes
Phase 1: Client Foundations
serde.rs):serialize()now handles all 3 KRPC message types (was Query-only). Addedcompact_peer_bytes()for 6-byte wire format.Response::new()properly constructs typed responses with Node/Values payloads.announce_peerclient method withset_port(),set_implied_port(),set_token_bytes()setters.find_peers— tokens fromget_peersresponses are stored in aHashMap<String, String>keyed by node host for laterannounce_peercalls.Phase 2: Kademlia Routing Table (
kademlia.rs— rewritten)Vec<Node>capped at 256.bucket_index()for XOR distance → bucket mapping.closest_nodes()expanding outward from the target's bucket.stale_buckets()for refresh scheduling.Phase 3: Server Mode
serde.rs):y=qpackets now fully parsed for all 4 query types (was returningUnimplementedQueryParsing).handler.rs):handle_query()dispatches ping/find_node/get_peers/announce_peer and returns proper KRPC responses.store.rs):PeerStorewith 30-minute TTL, deduplication, andevict_expired().token.rs): SHA-1 based token generation with dual-secret rotation (10-minute interval).Phase 4: Server Event Loop (
server.rs)D2H2Serverwithtokio::net::UdpSocketfor bidirectional DHT.bootstrap()— iterativefind_node(own_id)against bootstrap routers with DNS resolution.run()—tokio::select!event loop multiplexing incoming packets, application requests, bucket refresh, and peer store eviction.D2H2Requestenum +spawn_d2h2()for channel-based application integration.Phase 5: Concurrency
find_peersinlib.rsnow fans out queries concurrently usingtokio::spawn, limited toself.concurrent(5) per round.Phase 7: Cleanup
UnimplementedQueryParsingerror variant.MessageType::fromstub andResponse::incr_tokenstub.Test Results
42 tests passing across all modules (up from 15 originally):
kademlia: 11 tests (XOR distance, bucket index, routing table operations)serde: 17 tests (serialization/deserialization roundtrips for all message types)handler: 5 tests (ping, find_node, get_peers, announce_peer dispatch)store: 4 tests (add/get/dedup/separation)token: 4 tests (generate/validate/rotation)request: 1 test (existing)