-
Notifications
You must be signed in to change notification settings - Fork 2
feat(bundle): filter bundles with stale host tx nonces before SimCache #235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6369f79
98b0718
5fb52d6
7e4905b
c72ca42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,13 +2,14 @@ | |
| use crate::config::BuilderConfig; | ||
| use futures_util::{TryFutureExt, TryStreamExt}; | ||
| use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; | ||
| use signet_sim::{ProviderStateSource, SimItemValidity, check_bundle_tx_list}; | ||
| use signet_tx_cache::{TxCacheError, types::CachedBundle}; | ||
| use tokio::{ | ||
| sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, | ||
| task::JoinHandle, | ||
| time::{self, Duration}, | ||
| }; | ||
| use tracing::{Instrument, debug, trace, trace_span, warn}; | ||
| use tracing::{Instrument, debug_span, trace, trace_span, warn}; | ||
|
|
||
| /// Poll interval for the bundle poller in milliseconds. | ||
| const POLL_INTERVAL_MS: u64 = 1000; | ||
|
|
@@ -56,6 +57,63 @@ impl BundlePoller { | |
| self.tx_cache.stream_bundles().try_collect().await | ||
| } | ||
|
|
||
| /// Spawns a tokio task to check the validity of all host transactions in a | ||
| /// bundle before sending it to the cache task via the outbound channel. | ||
| /// | ||
| /// Uses [`check_bundle_tx_list`] from `signet-sim` to validate host tx nonces | ||
| /// and balance against the host chain. Drops bundles that are not currently valid. | ||
| fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender<CachedBundle>) { | ||
| let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); | ||
| tokio::spawn(async move { | ||
| let recovered = match bundle.bundle.try_to_recovered() { | ||
| Ok(recovered) => recovered, | ||
| Err(error) => { | ||
| span_debug!(span, ?error, "Failed to recover bundle, dropping"); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| if recovered.host_txs().is_empty() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nested ifs. should be functional if possible
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in c72ca42. |
||
| let _ = outbound.send(bundle).inspect_err(|_| { | ||
| span_debug!(span, "Outbound channel closed"); | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| // Check if the receiver is still alive before doing expensive nonce validation over | ||
| // the network. | ||
| if outbound.is_closed() { | ||
| span_debug!(span, "Outbound channel closed, skipping nonce validation"); | ||
| return; | ||
| } | ||
|
|
||
| let Ok(host_provider) = | ||
| crate::config().connect_host_provider().instrument(span.clone()).await | ||
| else { | ||
| span_debug!(span, "Failed to connect to host provider, dropping bundle"); | ||
| return; | ||
| }; | ||
|
|
||
| let source = ProviderStateSource(host_provider); | ||
| match check_bundle_tx_list(recovered.host_tx_reqs(), &source) | ||
| .instrument(span.clone()) | ||
| .await | ||
| { | ||
| Ok(SimItemValidity::Now) | Ok(SimItemValidity::Future) => { | ||
| let _ = outbound.send(bundle).inspect_err(|_| { | ||
| span_debug!(span, "Outbound channel closed"); | ||
| }); | ||
| } | ||
| Ok(SimItemValidity::Never) => { | ||
| span_debug!(span, "Dropping bundle: host txs will never be valid"); | ||
| } | ||
| Err(error) => { | ||
| span_debug!(span, %error, "Failed to check bundle validity, dropping"); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| async fn task_future(self, outbound: UnboundedSender<CachedBundle>) { | ||
| loop { | ||
| let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); | ||
|
|
@@ -91,10 +149,7 @@ impl BundlePoller { | |
| crate::metrics::record_bundles_fetched(bundles.len()); | ||
| trace!(count = bundles.len(), "fetched bundles from tx-cache"); | ||
| for bundle in bundles { | ||
| if let Err(err) = outbound.send(bundle) { | ||
| debug!(?err, "Failed to send bundle - channel is dropped"); | ||
| break; | ||
| } | ||
| Self::spawn_check_bundle_nonces(bundle, outbound.clone()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor TOCTOU on the outbound. it may have closed after the await this is not a big deal, but may cause unnecessary resource usage validating nonces after the receiver is gone
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Addressed in c72ca42; there's still a toctou issue, but the window is mostly closed now. |
||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.