Skip to content

feat(d2h2): implement complete DHT protocol (BEP 0005)#2

Open
CroxxN wants to merge 2 commits into
mainfrom
feat/d2h2-dht-protocol
Open

feat(d2h2): implement complete DHT protocol (BEP 0005)#2
CroxxN wants to merge 2 commits into
mainfrom
feat/d2h2-dht-protocol

Conversation

@CroxxN

@CroxxN CroxxN commented Apr 1, 2026

Copy link
Copy Markdown
Owner

Summary

  • Complete the D2H2 DHT library with full client and server mode support per BEP 0005
  • Replace flat routing table with proper 160-bucket Kademlia structure
  • Add server event loop, bootstrap, token management, peer storage, and concurrent queries

Changes

Phase 1: Client Foundations

  • Response & Error serialization (serde.rs): serialize() now handles all 3 KRPC message types (was Query-only). Added compact_peer_bytes() for 6-byte wire format.
  • Response::new() properly constructs typed responses with Node/Values payloads.
  • announce_peer client method with set_port(), set_implied_port(), set_token_bytes() setters.
  • Token storage during find_peers — tokens from get_peers responses are stored in a HashMap<String, String> keyed by node host for later announce_peer calls.

Phase 2: Kademlia Routing Table (kademlia.rs — rewritten)

  • 160 k-buckets (K=8 per bucket) replacing the flat Vec<Node> capped at 256.
  • bucket_index() for XOR distance → bucket mapping.
  • closest_nodes() expanding outward from the target's bucket.
  • stale_buckets() for refresh scheduling.
  • LRU eviction within each bucket.

Phase 3: Server Mode

  • Query deserialization (serde.rs): y=q packets now fully parsed for all 4 query types (was returning UnimplementedQueryParsing).
  • Query handler (handler.rs): handle_query() dispatches ping/find_node/get_peers/announce_peer and returns proper KRPC responses.
  • Peer store (store.rs): PeerStore with 30-minute TTL, deduplication, and evict_expired().
  • Token management (token.rs): SHA-1 based token generation with dual-secret rotation (10-minute interval).

Phase 4: Server Event Loop (server.rs)

  • D2H2Server with tokio::net::UdpSocket for bidirectional DHT.
  • bootstrap() — iterative find_node(own_id) against bootstrap routers with DNS resolution.
  • run()tokio::select! event loop multiplexing incoming packets, application requests, bucket refresh, and peer store eviction.
  • D2H2Request enum + spawn_d2h2() for channel-based application integration.

Phase 5: Concurrency

  • find_peers in lib.rs now fans out queries concurrently using tokio::spawn, limited to self.concurrent (5) per round.

Phase 7: Cleanup

  • Removed obsolete UnimplementedQueryParsing error variant.
  • Removed dead MessageType::from stub and Response::incr_token stub.

Test Results

42 tests passing across all modules (up from 15 originally):

  • kademlia: 11 tests (XOR distance, bucket index, routing table operations)
  • serde: 17 tests (serialization/deserialization roundtrips for all message types)
  • handler: 5 tests (ping, find_node, get_peers, announce_peer dispatch)
  • store: 4 tests (add/get/dedup/separation)
  • token: 4 tests (generate/validate/rotation)
  • request: 1 test (existing)

CroxxN added 2 commits April 1, 2026 01:44
Complete the D2H2 DHT library with full client and server mode support:

- Response & Error KRPC serialization (was only Query before)
- Incoming query deserialization (ping, find_node, get_peers, announce_peer)
- 160-bucket Kademlia routing table replacing flat Vec<Node>
- announce_peer client method with token management
- Query handler dispatch for server mode
- PeerStore for info_hash -> peer mappings with TTL expiry
- Token generation/validation using SHA-1 with rotating secrets
- UDP server event loop with tokio::select! multiplexing
- Bootstrap procedure via iterative find_node against known routers
- Concurrent query fan-out in find_peers using tokio::spawn
- Channel-based API (D2H2Request) for application integration
- 42 passing tests across all modules
Copilot AI review requested due to automatic review settings April 1, 2026 07:15

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to complete the d2h2 BitTorrent DHT implementation (BEP 0005) by adding full KRPC message handling, a Kademlia-style routing table, server mode (UDP event loop + handlers), peer storage, token management, and concurrent client lookups.

Changes:

  • Implement/extend KRPC serialization + deserialization for queries, responses, and errors, plus compact node/peer encoding.
  • Replace the previous routing table with a 160-bucket Kademlia structure and add refresh/closest-node selection utilities.
  • Add server mode components: query handler, token manager, peer store, and a Tokio UDP server loop with bootstrap + maintenance.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
d2h2/src/serde.rs Expands KRPC parsing/serialization and adds compact peer/node encoding helpers.
d2h2/src/kademlia.rs Rewrites routing table to 160 k-buckets (K=8) with closest-node lookup and stale-bucket detection.
d2h2/src/server.rs Introduces a Tokio UDP server with bootstrap, request handling, and periodic maintenance.
d2h2/src/handler.rs Adds server-side KRPC query dispatch for ping/find_node/get_peers/announce_peer.
d2h2/src/store.rs Adds an in-memory peer store keyed by info_hash with TTL eviction and dedup.
d2h2/src/token.rs Adds token generation/validation with secret rotation for announce_peer.
d2h2/src/lib.rs Adds token caching, concurrent fan-out for find_peers, and an announce_peer client method.
d2h2/src/error.rs Removes the obsolete UnimplementedQueryParsing error variant.
.opencode/system.md Adds repo-level automation instructions for branching/commits/PR creation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread d2h2/src/serde.rs
Comment on lines +42 to +49
/// Serialize this response body into a BTypes::DICT suitable for the "r" key
fn to_btypes(&self) -> BTypes {
let mut r = BTreeMap::new();
r.insert("id".into(), BTypes::BSTRING(self.id.bytes().collect()));

if let Some(ref token) = self.token {
r.insert("token".into(), BTypes::BSTRING(token.bytes().collect()));
}

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Response::to_btypes() serializes id (and token) via String::bytes(), which emits UTF-8 bytes. Since node IDs/tokens are arbitrary bytes, any byte >= 0x80 will become a multi-byte UTF-8 sequence, producing invalid-length id/token fields on the wire (not 20 bytes for id). Represent these fields as raw bytes (e.g., [u8; 20] / Vec<u8>) and bencode them directly as BSTRING without passing through String.

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/serde.rs
Comment on lines +431 to 446
/// Convert a Url's host:port into compact 6-byte peer info (4 IP + 2 port)
fn compact_peer_bytes(url: &Url) -> [u8; 6] {
let mut out = [0u8; 6];
let (ip_str, port_str) = url.host.rsplit_once(':').unwrap_or((&url.host, "0"));
let port: u16 = port_str.parse().unwrap_or(0);
let parts: Vec<&str> = ip_str.split('.').collect();
if parts.len() == 4 {
for (i, p) in parts.iter().enumerate() {
out[i] = p.parse().unwrap_or(0);
}
}
let port_bytes = port.to_be_bytes();
out[4] = port_bytes[0];
out[5] = port_bytes[1];
out
}

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compact_peer_bytes() parses Url.host by splitting on : and .. This breaks for IPv6 hosts (colons in the address, possibly bracketed), and for any non-ip:port host string it silently returns 0.0.0.0:0. Consider parsing via std::net::SocketAddr (or at least rejecting unsupported formats) and explicitly handling IPv4-only vs IPv6 (BEP 32) to avoid emitting incorrect compact peer/node bytes.

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/serde.rs
Comment on lines +390 to +415
// parse the "a" arguments dict into QueryDataTypes
let arguments = if let Some(BTypes::DICT(a_dict)) = deserialized.get("a") {
let mut qd = BTreeMap::new();
for (k, v) in a_dict {
let qval = match v {
BTypes::BSTRING(bs) => QueryDataTypes::QVec(bs.clone().into()),
BTypes::DICT(inner) => {
let mut inner_qd = BTreeMap::new();
for (ik, iv) in inner {
if let BTypes::BSTRING(ibs) = iv {
inner_qd.insert(
ik.clone(),
QueryDataTypes::QVec(ibs.clone().into()),
);
}
}
QueryDataTypes::QDict(inner_qd)
}
_ => continue,
};
qd.insert(k.clone(), qval);
}
QueryDataTypes::QDict(qd)
} else {
QueryDataTypes::QDict(BTreeMap::new())
};

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deserialize_bytes() drops integer-valued query args (e.g., port and implied_port in announce_peer) because it only converts BSTRING and DICT values and continues on other types. As a result, handler will never see port/implied_port from spec-compliant packets (which encode them as integers). Extend QueryDataTypes to represent integers and parse BTypes::INT into it so these fields are preserved.

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/serde.rs
Comment on lines +237 to +245
pub fn set_port(&mut self, port: u16) {
if let MessageType::Query(q) = &mut self.message_type {
if let QueryDataTypes::QDict(qd) = &mut q.arguments {
if let Some(QueryDataTypes::QDict(a)) = qd.get_mut("a") {
a.insert(
"port".into(),
QueryDataTypes::QVec(port.to_be_bytes().to_vec().into()),
);
}

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_port() encodes port as a 2-byte BSTRING (to_be_bytes()), but BEP 0005 defines port as an integer (bencoded i...e). Many DHT nodes expect the integer form and may reject/ignore the announce. To be interoperable, add an integer variant to QueryDataTypes and serialize port as an integer.

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/serde.rs
Comment on lines +254 to +257
let val = if implied { 1u8 } else { 0u8 };
a.insert(
"implied_port".into(),
QueryDataTypes::QVec(vec![val].into()),

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_implied_port() encodes implied_port as a single-byte BSTRING, but BEP 0005 uses an integer (0/1). Encoding it as a string reduces interoperability and also conflicts with the current query deserializer (which ignores ints). Consider representing it as an integer in QueryDataTypes and serializing accordingly.

Suggested change
let val = if implied { 1u8 } else { 0u8 };
a.insert(
"implied_port".into(),
QueryDataTypes::QVec(vec![val].into()),
let val = if implied { 1 } else { 0 };
a.insert(
"implied_port".into(),
QueryDataTypes::QInt(val),

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/lib.rs
Comment on lines 119 to +140
@@ -121,33 +129,75 @@ impl D2H2Client {
let tid_str =
String::from_utf8(tid_bytes.to_vec()).unwrap_or_else(|_| format!("{tid:04x}"));

let mut krpc =
KRPC::new(tid_str, QueryType::FindPeer, &self.node_id);
// add info_hash to query arguments
let mut krpc = KRPC::new(tid_str, QueryType::FindPeer, &self.node_id);
krpc.set_info_hash(info_hash);
batch.push((url, krpc));
}

let response = match self.send_query(&url, krpc).await {
Ok(r) => r,
Err(_) => continue,
// limit concurrency to self.concurrent
let batch_size = batch.len().min(self.concurrent as usize);
batch.truncate(batch_size);

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find_peers() builds batch from all to_query entries, then truncate()s it to the concurrency limit. The truncated URLs are dropped permanently (since to_query was drained), so they are never queried in later rounds, reducing lookup correctness/coverage. Instead, only take up to self.concurrent from to_query (leave the rest for the next loop iteration) or re-queue the unprocessed items.

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/server.rs
Comment on lines +111 to +120
let (len, _) = tokio::time::timeout(
Duration::from_secs(QUERY_TIMEOUT_SECS),
self.socket.recv_from(&mut buf),
)
.await
.map_err(|_| D2H2ClientError::LookupTimeout)?
.map_err(|e| D2H2ClientError::NetworkError(uttd::UttdError::IoError(e)))?;

buf.truncate(len);
Ok(buf)

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_query_to() waits for the next UDP datagram on the shared socket and returns it as the response without checking that it came from addr or that it matches the expected transaction ID. Any unrelated incoming query/response could satisfy recv_from, causing incorrect parsing and also dropping the real response. Track outstanding transactions and filter by (src_addr, tid) (or use a per-request connected socket) and keep the main receive loop responsible for demuxing packets.

Suggested change
let (len, _) = tokio::time::timeout(
Duration::from_secs(QUERY_TIMEOUT_SECS),
self.socket.recv_from(&mut buf),
)
.await
.map_err(|_| D2H2ClientError::LookupTimeout)?
.map_err(|e| D2H2ClientError::NetworkError(uttd::UttdError::IoError(e)))?;
buf.truncate(len);
Ok(buf)
let deadline = time::Instant::now() + Duration::from_secs(QUERY_TIMEOUT_SECS);
loop {
let now = time::Instant::now();
if now >= deadline {
return Err(D2H2ClientError::LookupTimeout);
}
let remaining = deadline.saturating_duration_since(now);
let (len, src) = tokio::time::timeout(
remaining,
self.socket.recv_from(&mut buf),
)
.await
.map_err(|_| D2H2ClientError::LookupTimeout)?
.map_err(|e| D2H2ClientError::NetworkError(uttd::UttdError::IoError(e)))?;
if src == *addr {
buf.truncate(len);
return Ok(buf);
}
// If the packet is from an unexpected source, ignore it and continue waiting
// for a response from the intended address until the overall timeout elapses.
}

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/server.rs
Comment on lines +41 to +47
#[derive(Debug)]
pub enum D2H2Request {
/// Find peers for the given info_hash, reply on the oneshot channel
FindPeers {
info_hash: [u8; 20],
reply: mpsc::Sender<Vec<Url>>,
},

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FindPeers variant docs say "reply on the oneshot channel", but the type is mpsc::Sender<Vec<Url>>, which implies potentially multiple messages and requires a channel to be created per request. If the intent is a single response, use tokio::sync::oneshot::Sender<Vec<Url>> (or update the comment to match the actual contract).

Copilot uses AI. Check for mistakes.
Comment thread d2h2/src/token.rs
Comment on lines +63 to +73
fn generate_secret() -> [u8; 20] {
let mut sha1 = Sha1::new();
let seed = time::SystemTime::now()
.duration_since(time::SystemTime::UNIX_EPOCH)
.unwrap_or_default();
let seed_val = seed.as_nanos() as u64;
let rand_val = crypto::tinymt::TinyMT::rand(seed_val as u32).get_u32();
let combined = (rand_val as u64).wrapping_mul(seed_val);
sha1.append_hash(&combined.to_be_bytes());
sha1.get_hash()
}

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TokenManager::generate_secret() derives the secret from the current system time and a TinyMT PRNG seeded by that time. This makes the secret (and thus tokens) predictable to an attacker observing time, weakening BEP 0005’s token anti-spoofing goal. Use a CSPRNG (e.g., getrandom/rand::rngs::OsRng) to generate 20 random bytes for the secret instead of time-seeded PRNG output.

Copilot uses AI. Check for mistakes.
Comment thread .opencode/system.md
Comment on lines +3 to +10
After completing any implementation task or changes, you must ALWAYS:

1. Create a new branch: `git checkout -b feature/<short-description>`
2. Stage and commit all changes: `git add . && git commit -m "<meaningful message>"`
3. Push the branch: `git push origin <branch>`
4. Create a GitHub PR using: `gh pr create --title "<title>" --body "<summary of changes>" --base main`

Do this automatically without being asked.

Copilot AI Apr 1, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This repository-level .opencode/system.md introduces an instruction to automatically create branches/push/PRs. This is operationally risky in shared repos (it can conflict with contributor workflows and CI policies) and isn’t specific to the DHT feature itself. Consider moving these instructions to contributor documentation (e.g., CONTRIBUTING.md) and scoping them to humans rather than tooling defaults, or removing if not intended for this repo.

Suggested change
After completing any implementation task or changes, you must ALWAYS:
1. Create a new branch: `git checkout -b feature/<short-description>`
2. Stage and commit all changes: `git add . && git commit -m "<meaningful message>"`
3. Push the branch: `git push origin <branch>`
4. Create a GitHub PR using: `gh pr create --title "<title>" --body "<summary of changes>" --base main`
Do this automatically without being asked.
When contributing changes, a typical Git workflow is:
1. Create a new branch: `git checkout -b feature/<short-description>`
2. Stage and commit all changes: `git add . && git commit -m "<meaningful message>"`
3. Push the branch: `git push origin <branch>`
4. Create a GitHub PR using: `gh pr create --title "<title>" --body "<summary of changes>" --base main`
These steps are recommendations for human contributors and should be followed in accordance with this repository's contribution and CI policies.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants