Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ path = "bin/builder.rs"
[dependencies]
init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] }

signet-constants = { version = "0.16.0-rc.16" }
signet-sim = { version = "0.16.0-rc.16" }
signet-tx-cache = { version = "0.16.0-rc.16" }
signet-types = { version = "0.16.0-rc.16" }
signet-zenith = { version = "0.16.0-rc.16" }
signet-constants = { version = "0.16.0-rc.17" }
signet-sim = { version = "0.16.0-rc.17" }
signet-tx-cache = { version = "0.16.0-rc.17" }
signet-types = { version = "0.16.0-rc.17" }
signet-zenith = { version = "0.16.0-rc.17" }
signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" }
signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" }

Expand Down
65 changes: 60 additions & 5 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
use crate::config::BuilderConfig;
use futures_util::{TryFutureExt, TryStreamExt};
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
use signet_sim::{ProviderStateSource, SimItemValidity, check_bundle_tx_list};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, debug, trace, trace_span, warn};
use tracing::{Instrument, debug_span, trace, trace_span, warn};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -56,6 +57,63 @@ impl BundlePoller {
self.tx_cache.stream_bundles().try_collect().await
}

/// Spawns a tokio task to check the validity of all host transactions in a
/// bundle before sending it to the cache task via the outbound channel.
///
/// Uses [`check_bundle_tx_list`] from `signet-sim` to validate host tx nonces
/// and balance against the host chain. Drops bundles that are not currently valid.
fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) {
Comment thread
prestwich marked this conversation as resolved.
let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id);
tokio::spawn(async move {
let recovered = match bundle.bundle.try_to_recovered() {
Ok(recovered) => recovered,
Err(error) => {
span_debug!(span, ?error, "Failed to recover bundle, dropping");
return;
}
};

if recovered.host_txs().is_empty() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nested ifs. should be functional if possible

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c72ca42.

let _ = outbound.send(bundle).inspect_err(|_| {
span_debug!(span, "Outbound channel closed");
});
return;
}

// Check if the receiver is still alive before doing expensive nonce validation over
// the network.
if outbound.is_closed() {
span_debug!(span, "Outbound channel closed, skipping nonce validation");
return;
}

let Ok(host_provider) =
crate::config().connect_host_provider().instrument(span.clone()).await
else {
span_debug!(span, "Failed to connect to host provider, dropping bundle");
return;
};

let source = ProviderStateSource(host_provider);
match check_bundle_tx_list(recovered.host_tx_reqs(), &source)
.instrument(span.clone())
.await
{
Ok(SimItemValidity::Now) | Ok(SimItemValidity::Future) => {
let _ = outbound.send(bundle).inspect_err(|_| {
span_debug!(span, "Outbound channel closed");
});
}
Ok(SimItemValidity::Never) => {
span_debug!(span, "Dropping bundle: host txs will never be valid");
}
Err(error) => {
span_debug!(span, %error, "Failed to check bundle validity, dropping");
}
}
});
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
Expand Down Expand Up @@ -91,10 +149,7 @@ impl BundlePoller {
crate::metrics::record_bundles_fetched(bundles.len());
trace!(count = bundles.len(), "fetched bundles from tx-cache");
for bundle in bundles {
if let Err(err) = outbound.send(bundle) {
debug!(?err, "Failed to send bundle - channel is dropped");
break;
}
Self::spawn_check_bundle_nonces(bundle, outbound.clone());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor TOCTOU on the outbound. it may have closed after the await

this is not a big deal, but may cause unnecessary resource usage validating nonces after the receiver is gone

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Addressed in c72ca42; there's still a toctou issue, but the window is mostly closed now.

}
}

Expand Down
Loading