diff --git a/src/builder.rs b/src/builder.rs index b0ff1d03b..ea734ae7f 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -26,7 +26,6 @@ use lightning::chain::{chainmonitor, BestBlock}; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; -use lightning::log_trace; use lightning::onion_message::dns_resolution::DNSResolverMessageHandler; use lightning::routing::gossip::NodeAlias; use lightning::routing::router::DefaultRouter; @@ -37,11 +36,12 @@ use lightning::routing::scoring::{ use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::config::HTLCInterceptionFlags; use lightning::util::persist::{ - KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStore, MigratableKVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; use lightning::util::sweep::OutputSweeper; +use lightning::{log_info, log_trace}; use lightning_dns_resolver::OMDomainResolver; use lightning_persister::fs_store::v1::FilesystemStore; use vss_client::headers::VssHeaderProvider; @@ -57,7 +57,9 @@ use crate::entropy::NodeEntropy; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; +use crate::io::recovery::{list_existing_durable_keys, restore_from_backup}; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::{BackupMode, BackupRetryQueue, TierStore}; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, @@ -155,6 +157,29 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +pub(crate) struct TierStoreConfig { + pub(crate) ephemeral: Option>, + pub(crate) backup: Option>, + pub(crate) backup_mode: BackupMode, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .field("backup_mode", &self.backup_mode) + .finish() + } +} + +#[derive(Debug, Clone, Copy, Default)] +struct RecoveryConfig { + wallet_recovery: bool, + restore_from_backup: bool, +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -201,6 +226,12 @@ pub enum BuildError { AsyncPaymentsConfigMismatch, /// An attempt to setup a DNS Resolver failed. DNSResolverSetupFailed, + /// Restore was requested but no backup store is configured. + RestoreRequiresBackupStore, + /// Restore was requested but the primary store already contains durable data. + RestorePrimaryNotEmpty, + /// Failed to restore state from the backup store. + RestoreFailed, } impl fmt::Display for BuildError { @@ -238,6 +269,15 @@ impl fmt::Display for BuildError { Self::DNSResolverSetupFailed => { write!(f, "An attempt to setup a DNS resolver has failed.") }, + Self::RestoreRequiresBackupStore => { + write!(f, "Restore from backup was requested but no backup store is configured.") + }, + Self::RestorePrimaryNotEmpty => { + write!(f, "Restore from backup was requested but the primary store already contains durable data.") + }, + Self::RestoreFailed => { + write!(f, "Failed to restore state from the backup store.") + }, } } } @@ -290,9 +330,10 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, - recovery_mode: bool, + recovery_config: RecoveryConfig, } impl NodeBuilder { @@ -308,19 +349,21 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; - let recovery_mode = false; + let recovery_config = RecoveryConfig::default(); Self { config, chain_data_source_config, gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, - recovery_mode, + recovery_config, } } @@ -622,7 +665,69 @@ impl NodeBuilder { /// This should only be set on first startup when importing an older wallet from a previously /// used [`NodeEntropy`]. pub fn set_wallet_recovery_mode(&mut self) -> &mut Self { - self.recovery_mode = true; + self.recovery_config.wallet_recovery = true; + self + } + + /// Configures the [`Node`] to restore durable state from the configured backup + /// store before normal startup reads occur. + /// + /// This is intended for disaster recovery: when the primary store has been lost + /// or corrupted, the node can be rebuilt from a backup store that was previously + /// configured via [`set_backup_store`] and actively receiving writes during + /// normal operation. + /// + /// The primary store must be empty (contain no durable state) else the build will + /// fail with [`BuildError::RestorePrimaryNotEmpty`] if existing durable data + /// is detected. + /// + /// Note: this is distinct from wallet recovery mode, which controls whether the + /// on-chain wallet is rescanned from genesis on first startup. + /// + /// [`set_backup_store`]: Self::set_backup_store + pub fn restore_from_backup(&mut self) -> &mut Self { + self.recovery_config.restore_from_backup = true; + self + } + + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives a second durable + /// copy of data written to the primary store. + /// + /// Backup failure handling depends on `backup_mode`: + /// - [`BackupMode::BestEffortBackup`] logs backup failures and still returns + /// success as long as the primary store succeeds. + /// - [`BackupMode::SemiSync`] returns success only if a failed backup + /// operation can be durably persisted locally and enqueued for + /// asynchronous retry. + /// + /// Note: in [`BackupMode::SemiSync`], writes and removals are still not atomic + /// across the primary and backup stores. A call may return an error after the + /// primary store has already been updated if immediate backup replication + /// fails and the retry intent cannot be durably persisted locally. + /// + /// If no backup store is configured, durable data will be stored only in the + /// primary store. + pub fn set_backup_store( + &mut self, backup_store: Arc, backup_mode: BackupMode, + ) -> &mut Self { + let cfg = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + cfg.backup = Some(backup_store); + cfg.backup_mode = backup_mode; + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); self } @@ -774,8 +879,23 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. - pub fn build_with_store( + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store + pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, + ) -> Result { + let primary_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + self.build_with_dynstore(node_entropy, primary_store) + } + + fn build_with_dynstore( + &self, node_entropy: NodeEntropy, primary_store: Arc, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; @@ -788,9 +908,43 @@ impl NodeBuilder { })?) }; + let ts_config = self.tier_store_config.as_ref(); + let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + + if let Some(backup_store) = config.backup.as_ref() { + match config.backup_mode { + BackupMode::BestEffortBackup => { + tier_store.set_backup_store(Arc::clone(backup_store), None); + }, + BackupMode::SemiSync => { + let retry_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new( + PathBuf::from(&self.config.storage_dir_path) + .join("backup_retry_queue"), + ))); + + let retry_queue = Arc::new(BackupRetryQueue::new( + Arc::clone(&retry_store), + Arc::clone(&logger), + )); + tier_store.set_backup_store( + Arc::clone(backup_store), + Some(Arc::clone(&retry_queue)), + ); + }, + } + } + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); + let kv_store: Arc = Arc::new(DynStoreWrapper(tier_store.clone())); + let tier_store = Arc::new(tier_store); + build_with_store_internal( config, self.chain_data_source_config.as_ref(), @@ -798,11 +952,12 @@ impl NodeBuilder { self.liquidity_source_config.as_ref(), self.pathfinding_scores_sync_config.as_ref(), self.async_payments_role, - self.recovery_mode, + self.recovery_config, seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + kv_store, + tier_store, ) } } @@ -1227,7 +1382,7 @@ impl ArcedNodeBuilder { /// Builds a [`Node`] instance according to the options previously configured. // Note that the generics here don't actually work for Uniffi, but we don't currently expose // this so its not needed. - pub fn build_with_store( + pub fn build_with_store( &self, node_entropy: Arc, kv_store: S, ) -> Result, BuildError> { self.inner.read().expect("lock").build_with_store(*node_entropy, kv_store).map(Arc::new) @@ -1240,9 +1395,38 @@ fn build_with_store_internal( gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, - async_payments_role: Option, recovery_mode: bool, seed_bytes: [u8; 64], - runtime: Arc, logger: Arc, kv_store: Arc, + async_payments_role: Option, recovery_config: RecoveryConfig, + seed_bytes: [u8; 64], runtime: Arc, logger: Arc, kv_store: Arc, + tier_store: Arc, ) -> Result { + if recovery_config.restore_from_backup { + let backup = Arc::clone( + &tier_store.backup_store().ok_or(BuildError::RestoreRequiresBackupStore)?.store, + ); + + let existing_durable_keys = list_existing_durable_keys(&*tier_store.primary_store()) + .map_err(|e| { + log_error!(logger, "Failed to enumerate primary store during restore: {}", e); + BuildError::ReadFailed + })?; + + if !existing_durable_keys.is_empty() { + log_error!( + logger, + "Restore refused: primary store already contains {} durable key(s)", + existing_durable_keys.len() + ); + return Err(BuildError::RestorePrimaryNotEmpty); + } + + restore_from_backup(&*tier_store.primary_store(), &*backup).map_err(|e| { + log_error!(logger, "Failed to restore from backup: {}", e); + BuildError::RestoreFailed + })?; + + log_info!(logger, "Successfully restored durable state from backup store"); + } + optionally_install_rustls_cryptoprovider(); if let Err(err) = may_announce_channel(&config) { @@ -1448,7 +1632,7 @@ fn build_with_store_internal( BuildError::WalletSetupFailed })?; - if !recovery_mode { + if !recovery_config.wallet_recovery { if let Some(best_block) = chain_tip_opt { // Insert the first checkpoint if we have it, to avoid resyncing from genesis. // TODO: Use a proper wallet birthday once BDK supports it. @@ -2062,6 +2246,7 @@ fn build_with_store_internal( om_mailbox, async_payments_role, hrn_resolver: Arc::new(hrn_resolver), + tier_store, #[cfg(cycle_tests)] _leak_checker, }) @@ -2148,7 +2333,11 @@ pub(crate) fn sanitize_alias(alias_str: &str) -> Result { #[cfg(test)] mod tests { - use super::{sanitize_alias, BuildError, NodeAlias}; + use lightning::util::persist::KVStoreSync; + + use crate::io::test_utils::InMemoryStore; + + use super::*; #[test] fn sanitize_empty_node_alias() { @@ -2185,4 +2374,39 @@ mod tests { let node = sanitize_alias(alias); assert_eq!(node.err().unwrap(), BuildError::InvalidNodeAlias); } + + #[test] + fn restore_requires_backup_store() { + let mut builder = NodeBuilder::new(); + let entropy = NodeEntropy::from_seed_bytes([42; 64]); + let primary = InMemoryStore::new(); + + let res = builder.restore_from_backup().build_with_store(entropy, primary); + + assert!(matches!(res, Err(BuildError::RestoreRequiresBackupStore))); + } + + #[test] + fn restore_refuses_nonempty_primary() { + let mut builder = NodeBuilder::new(); + let entropy = NodeEntropy::from_seed_bytes([43; 64]); + + let primary = InMemoryStore::new(); + let backup: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); + + KVStoreSync::write( + &primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + b"existing".to_vec(), + ) + .unwrap(); + + builder.set_backup_store(backup, BackupMode::BestEffortBackup); + builder.restore_from_backup(); + + let res = builder.build_with_store(entropy, primary); + assert!(matches!(res, Err(BuildError::RestorePrimaryNotEmpty))); + } } diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..150748b86 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -7,9 +7,11 @@ //! Objects and traits for data persistence. +pub(crate) mod recovery; pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/recovery.rs b/src/io/recovery.rs new file mode 100644 index 000000000..27273d944 --- /dev/null +++ b/src/io/recovery.rs @@ -0,0 +1,368 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::io; +use lightning::util::persist::{ + KVStoreSync, ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, +}; + +use crate::io::utils::EXTERNAL_PATHFINDING_SCORES_CACHE_KEY; +use crate::io::{ + BDK_WALLET_CHANGE_DESCRIPTOR_KEY, BDK_WALLET_CHANGE_DESCRIPTOR_PRIMARY_NAMESPACE, + BDK_WALLET_CHANGE_DESCRIPTOR_SECONDARY_NAMESPACE, BDK_WALLET_DESCRIPTOR_KEY, + BDK_WALLET_DESCRIPTOR_PRIMARY_NAMESPACE, BDK_WALLET_DESCRIPTOR_SECONDARY_NAMESPACE, + BDK_WALLET_INDEXER_KEY, BDK_WALLET_INDEXER_PRIMARY_NAMESPACE, + BDK_WALLET_INDEXER_SECONDARY_NAMESPACE, BDK_WALLET_LOCAL_CHAIN_KEY, + BDK_WALLET_LOCAL_CHAIN_PRIMARY_NAMESPACE, BDK_WALLET_LOCAL_CHAIN_SECONDARY_NAMESPACE, + BDK_WALLET_NETWORK_KEY, BDK_WALLET_NETWORK_PRIMARY_NAMESPACE, + BDK_WALLET_NETWORK_SECONDARY_NAMESPACE, BDK_WALLET_TX_GRAPH_KEY, + BDK_WALLET_TX_GRAPH_PRIMARY_NAMESPACE, BDK_WALLET_TX_GRAPH_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, STATIC_INVOICE_STORE_PRIMARY_NAMESPACE, +}; +use crate::types::DynStore; + +type KeyTuple = (&'static str, &'static str, &'static str); +type NamespacePair = (&'static str, &'static str); + +/// Durable entries with well-known, fixed keys. +const DURABLE_EXACT_KEYS: &[KeyTuple] = &[ + ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ), + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ), + ( + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + ), + ( + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + ), + ( + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ), + (NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, NODE_METRICS_KEY), + ( + BDK_WALLET_DESCRIPTOR_PRIMARY_NAMESPACE, + BDK_WALLET_DESCRIPTOR_SECONDARY_NAMESPACE, + BDK_WALLET_DESCRIPTOR_KEY, + ), + ( + BDK_WALLET_CHANGE_DESCRIPTOR_PRIMARY_NAMESPACE, + BDK_WALLET_CHANGE_DESCRIPTOR_SECONDARY_NAMESPACE, + BDK_WALLET_CHANGE_DESCRIPTOR_KEY, + ), + ( + BDK_WALLET_NETWORK_PRIMARY_NAMESPACE, + BDK_WALLET_NETWORK_SECONDARY_NAMESPACE, + BDK_WALLET_NETWORK_KEY, + ), + ( + BDK_WALLET_LOCAL_CHAIN_PRIMARY_NAMESPACE, + BDK_WALLET_LOCAL_CHAIN_SECONDARY_NAMESPACE, + BDK_WALLET_LOCAL_CHAIN_KEY, + ), + ( + BDK_WALLET_TX_GRAPH_PRIMARY_NAMESPACE, + BDK_WALLET_TX_GRAPH_SECONDARY_NAMESPACE, + BDK_WALLET_TX_GRAPH_KEY, + ), + ( + BDK_WALLET_INDEXER_PRIMARY_NAMESPACE, + BDK_WALLET_INDEXER_SECONDARY_NAMESPACE, + BDK_WALLET_INDEXER_KEY, + ), + ( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + ), + ( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + ), +]; + +/// Durable namespaces where the key is dynamic (e.g. one entry per payment or monitor). +const DURABLE_DYNAMIC_KEY_NAMESPACES: &[NamespacePair] = &[ + (PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE), + ( + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ), + (STATIC_INVOICE_STORE_PRIMARY_NAMESPACE, ""), + ( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + ), + ( + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + ), +]; + +/// Durable namespaces where both the secondary namespace and key are dynamic +/// (e.g. monitor updates are keyed by monitor ID in the secondary namespace +/// and update ID in the key). +const DURABLE_WILDCARD_PRIMARY_NAMESPACES: &[&str] = + &[CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE]; + +pub(crate) fn is_known_durable_key( + primary_namespace: &str, secondary_namespace: &str, key: &str, +) -> bool { + DURABLE_EXACT_KEYS + .iter() + .any(|(p, s, k)| primary_namespace == *p && secondary_namespace == *s && key == *k) + || DURABLE_DYNAMIC_KEY_NAMESPACES + .iter() + .any(|(p, s)| primary_namespace == *p && secondary_namespace == *s) + || DURABLE_WILDCARD_PRIMARY_NAMESPACES.iter().any(|p| primary_namespace == *p) +} + +pub(crate) fn filter_known_durable_keys( + keys: Vec<(String, String, String)>, +) -> Vec<(String, String, String)> { + keys.into_iter().filter(|(p, s, k)| is_known_durable_key(p, s, k)).collect() +} + +/// Restores durable state from a backup store into an empty primary store. +/// +/// Enumerates all keys in the backup, filters to the known durable scope, +/// and copies each matching entry to the primary store. +pub(crate) fn restore_from_backup(primary: &DynStore, backup: &DynStore) -> Result<(), io::Error> { + let all_keys = backup.list_all_keys()?; + let durable_keys = filter_known_durable_keys(all_keys); + + for (primary_namespace, secondary_namespace, key) in &durable_keys { + let value = KVStoreSync::read(backup, primary_namespace, secondary_namespace, key)?; + + KVStoreSync::write(primary, primary_namespace, secondary_namespace, key, value)?; + } + + Ok(()) +} + +/// Returns all durable keys currently present in the given store. +/// +/// This is used to check whether the primary store already contains state +/// before performing a restore from backup. +pub(crate) fn list_existing_durable_keys( + store: &DynStore, +) -> Result, io::Error> { + let all_keys = store.list_all_keys()?; + Ok(filter_known_durable_keys(all_keys)) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::{io::test_utils::InMemoryStore, types::DynStoreWrapper}; + + use super::*; + + fn dyn_store() -> Arc { + Arc::new(DynStoreWrapper(InMemoryStore::new())) + } + + #[test] + fn exact_key_matches() { + assert!(is_known_durable_key( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + )); + assert!(is_known_durable_key( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + )); + assert!(is_known_durable_key( + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, + )); + } + + #[test] + fn dynamic_key_namespace_matches() { + assert!(is_known_durable_key( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + "some_payment_id", + )); + assert!(is_known_durable_key( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + "some_monitor_id", + )); + assert!(is_known_durable_key(STATIC_INVOICE_STORE_PRIMARY_NAMESPACE, "", "invoice_42",)); + } + + #[test] + fn scorer_exact_keys_match() { + assert!(is_known_durable_key( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + )); + assert!(is_known_durable_key( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + )); + assert!(!is_known_durable_key( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + "unknown_scorer_key", + )); + } + + #[test] + fn wildcard_primary_namespace_matches() { + assert!(is_known_durable_key( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + "any_secondary", + "any_key", + )); + } + + #[test] + fn unknown_keys_rejected() { + assert!(!is_known_durable_key("unknown_namespace", "", "unknown_key")); + assert!(!is_known_durable_key("backup_retry_queue", "", "some_entry")); + assert!(!is_known_durable_key("", "", "unknown_key")); + } + + #[test] + fn filter_retains_only_durable_keys() { + let keys = vec![ + ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ), + ("backup_retry_queue".to_string(), "".to_string(), "entry_1".to_string()), + ( + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + "pay_001".to_string(), + ), + ]; + + let filtered = filter_known_durable_keys(keys); + assert_eq!(filtered.len(), 2); + assert_eq!(filtered[0].2, CHANNEL_MANAGER_PERSISTENCE_KEY); + assert_eq!(filtered[1].2, "pay_001"); + } + + #[test] + fn list_existing_durable_keys_returns_only_durable_scope() { + let store = dyn_store(); + + KVStoreSync::write( + &*store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + b"manager".to_vec(), + ) + .expect("Failed to write to channel manager namespace."); + + KVStoreSync::write(&*store, "backup_retry_queue", "", "entry_1", b"internal".to_vec()) + .expect("Failed to write to backup retry queue."); + + let durable = list_existing_durable_keys(&*store).expect("Failure listing durable keys."); + assert_eq!(durable.len(), 1); + assert_eq!( + durable[0], + ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ) + ); + } + + #[test] + fn restore_from_backup_copies_only_known_durable_keys() { + let primary = dyn_store(); + let backup = dyn_store(); + + KVStoreSync::write( + &*backup, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + b"manager".to_vec(), + ) + .expect("Failed to write to channel manager namespace."); + + KVStoreSync::write( + &*backup, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + "payment_1", + b"payment".to_vec(), + ) + .expect("Failed to write to payment info namespace."); + + KVStoreSync::write(&*backup, "unrelated", "", "garbage", b"nope".to_vec()) + .expect("Failed to write to non-durable namespace."); + + restore_from_backup(&*primary, &*backup).expect("Backup restoration failed."); + + assert_eq!( + KVStoreSync::read( + &*primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .expect("Failed to read from channel manager namespace."), + b"manager".to_vec() + ); + + assert_eq!( + KVStoreSync::read( + &*primary, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + "payment_1", + ) + .expect("Failed to read from payment info namespace."), + b"payment".to_vec() + ); + + assert!(KVStoreSync::read(&*primary, "unrelated", "", "garbage").is_err()); + } +} diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 84af03adc..0122fc0d6 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -15,7 +15,8 @@ use std::sync::{Arc, Mutex}; use lightning::io; use lightning::util::persist::{ - KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse, + KVStore, KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedKVStoreSync, + PaginatedListResponse, }; use lightning_types::string::PrintableString; use rusqlite::{named_params, Connection}; @@ -255,6 +256,44 @@ impl PaginatedKVStore for SqliteStore { } } +impl MigratableKVStore for SqliteStore { + fn list_all_keys(&self) -> io::Result> { + let locked_conn = self.inner.connection.lock().expect("lock"); + + let sql = format!( + "SELECT DISTINCT primary_namespace, secondary_namespace, key FROM {}", + self.inner.kv_table_name + ); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let rows = stmt + .query_map([], |row| { + let primary_namespace: String = row.get(0)?; + let secondary_namespace: String = row.get(1)?; + let key: String = row.get(2)?; + Ok((primary_namespace, secondary_namespace, key)) + }) + .map_err(|e| { + let msg = format!("Failed to list all keys: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let mut keys = Vec::new(); + for row in rows { + keys.push(row.map_err(|e| { + let msg = format!("Failed to decode row while listing all keys: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?); + } + + Ok(keys) + } +} + struct SqliteStoreInner { connection: Arc>, data_dir: PathBuf, diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index eed8c3e2d..1267177e8 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -21,7 +21,7 @@ use lightning::ln::functional_test_utils::{ TestChanMonCfg, }; use lightning::util::persist::{ - KVStore, KVStoreSync, MonitorUpdatingPersister, PageToken, PaginatedKVStore, + KVStore, KVStoreSync, MigratableKVStore, MonitorUpdatingPersister, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse, KVSTORE_NAMESPACE_KEY_MAX_LEN, }; use lightning::util::test_utils; @@ -245,6 +245,29 @@ impl PaginatedKVStore for InMemoryStore { } } +impl MigratableKVStore for InMemoryStore { + fn list_all_keys(&self) -> io::Result> { + let persisted_lock = self.persisted_bytes.lock().unwrap(); + let mut keys = Vec::new(); + + for (namespace, entries) in persisted_lock.iter() { + let mut parts = namespace.splitn(2, '/'); + let primary_namespace = parts.next().unwrap_or_default(); + let secondary_namespace = parts.next().unwrap_or_default(); + + for key in entries.keys() { + keys.push(( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.clone(), + )); + } + } + + Ok(keys) + } +} + unsafe impl Sync for InMemoryStore {} unsafe impl Send for InMemoryStore {} diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..550916b9c --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,2076 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::types::DynStore; + +use bitcoin::io::Read; +use lightning::ln::msgs::DecodeError; +use lightning::util::persist::{ + KVStore, KVStoreSync, MigratableKVStore, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::util::ser::{Readable, Writeable, Writer}; +use lightning::{io, log_error}; + +use std::collections::HashMap; +use std::future::Future; +use std::ops::Deref; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use std::time::Duration; + +/// A 3-tiered [`KVStore`]/[`KVStoreSync`] implementation that routes data across +/// storage backends that may be local or remote: +/// - a primary store for durable, authoritative persistence, +/// - an optional backup store that maintains an additional durable copy of +/// primary-backed data, and +/// - an optional ephemeral store for non-critical, rebuildable cached data. +/// +/// When a backup store is configured, writes and removals for primary-backed +/// data are issued to the primary and backup stores concurrently. Success +/// semantics depend on the configured [`BackupMode`]. +/// +/// Reads and lists do not consult the backup store during normal operation. +/// Ephemeral data is read from and written to the ephemeral store when configured. +/// +/// Note that dual-store writes and removals are not atomic across the primary +/// and backup stores. One store may already reflect the change even if the +/// overall operation returns an error. +#[derive(Clone)] +pub(crate) struct TierStore { + inner: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner } + } + + /// Configures a backup store for primary-backed data. + /// + /// Once set, writes and removals targeting the primary tier are issued to the + /// primary and backup stores concurrently. + /// + /// If `retry_queue` is `None`, backup failures are logged and otherwise ignored + /// as long as the primary store succeeds. + /// + /// If `retry_queue` is `Some`, backup failures are treated with durable + /// semi-synchronous semantics: the operation succeeds only if the failed backup + /// intent can be persisted locally and enqueued for asynchronous retry. + /// + /// Note: dual-store writes/removals are not atomic. An error may be returned + /// after the primary store has already been updated if the requested backup + /// guarantee could not be achieved. + /// + /// The backup store is not consulted for normal reads or lists. + pub fn set_backup_store( + &mut self, backup: Arc, retry_queue: Option>>>, + ) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(BackupStore { store: backup, retry_queue }); + } + + /// Configures the ephemeral store for non-critical, rebuildable data. + /// + /// When configured, selected cache-like data is routed to this store instead of + /// the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } + + pub(crate) fn backup_store(&self) -> Option<&BackupStore> { + self.inner.backup_store.as_ref() + } + + pub(crate) fn primary_store(&self) -> Arc { + Arc::clone(&self.inner.primary_store) + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.inner.read_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.inner.write_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.inner.remove_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.inner + .list_internal_sync(primary_namespace.to_string(), secondary_namespace.to_string()) + } +} + +pub(crate) struct BackupStore { + /// Store may be remote. + pub(crate) store: Arc, + /// Present only when backup failures should be durably queued for retry. + pub(crate) retry_queue: Option>>>, +} + +struct TierStoreInner { + /// The authoritative store for durable data. + primary_store: Arc, + /// The store used for non-critical, rebuildable cached data. + ephemeral_store: Option>, + /// Optional backup configuration for primary-backed data. + backup_store: Option, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { primary_store, ephemeral_store: None, backup_store: None, logger } + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + fn read_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStoreSync::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + fn list_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStoreSync::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list keys in namespace {}/{} from primary store: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn write_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_fut = KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::write( + backup_store.store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + PendingBackupOp::Write { buf }, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn write_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + let backup_res = KVStoreSync::write( + backup_store.store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + PendingBackupOp::Write { buf }, + primary_res, + backup_res, + ) + } else { + KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ) + } + } + + async fn remove_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_fut = KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::remove( + backup_store.store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + PendingBackupOp::Remove { lazy }, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn remove_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + let backup_res = KVStoreSync::remove( + backup_store.store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + PendingBackupOp::Remove { lazy }, + primary_res, + backup_res, + ) + } else { + KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await + } else { + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + } + + fn read_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key) + } else { + self.read_primary_sync(&primary_namespace, &secondary_namespace, &key) + } + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } + } + + fn write_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::write( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } else { + self.write_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } + } + + fn remove_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::remove( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } else { + self.remove_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await + } else { + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } + + fn list_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(ephemeral_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list( + ephemeral_store.as_ref(), + &primary_namespace, + &secondary_namespace, + ) + } else { + self.list_primary_sync(&primary_namespace, &secondary_namespace) + } + }, + _ => self.list_primary_sync(&primary_namespace, &secondary_namespace), + } + } + + fn ephemeral_store( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Option<&Arc> { + self.ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(primary_namespace, secondary_namespace, key)) + } + + fn handle_primary_backup_results( + &self, op_name: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + pending_backup_op: PendingBackupOp, primary_res: io::Result<()>, + backup_res: io::Result<()>, + ) -> io::Result<()> { + match ( + primary_res, + backup_res, + self.backup_store.as_ref().and_then(|b| b.retry_queue.as_ref()), + ) { + (Ok(()), Ok(()), _) => Ok(()), + (Err(primary_err), Ok(()), _) => Err(primary_err), + (Ok(()), Err(backup_err), None) => { + log_error!( + self.logger, + "Backup {} failed for key {}/{}/{}: {}", + op_name, + primary_namespace, + secondary_namespace, + key, + backup_err + ); + Ok(()) + }, + (Ok(()), Err(backup_err), Some(retry_queue)) => { + retry_queue.enqueue_sync( + ( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ), + pending_backup_op, + )?; + log_error!( + self.logger, + "Backup {} failed for key {}/{}/{}: {}. Operation was durably queued for retry.", + op_name, + primary_namespace, + secondary_namespace, + key, + backup_err + ); + Ok(()) + }, + (Err(primary_err), Err(backup_err), _) => { + log_error!( + self.logger, + "Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}", + op_name, + primary_namespace, + secondary_namespace, + key, + primary_err, + backup_err + ); + Err(primary_err) + }, + } + } +} + +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + +// Backup retry constants. +const BACKUP_RETRY_QUEUE_PRIMARY_NAMESPACE: &str = "backup_retry"; +const BACKUP_RETRY_QUEUE_SECONDARY_NAMESPACE: &str = ""; +const BACKUP_RETRY_QUEUE_KEY: &str = "pending_ops"; + +// Backoff parameters for retry task. +const RETRY_INITIAL_BACKOFF_MS: u64 = 500; +const RETRY_MAX_BACKOFF_MS: u64 = 60_000; +const RETRY_BACKOFF_MULTIPLIER: u64 = 2; + +/// Controls how TierStore treats backup-store failures for primary-backed writes and removals. +#[derive(Debug)] +pub enum BackupMode { + /// Writes must succeed on the primary store. + /// + /// Backup writes/removals are attempted immediately, but if the backup + /// store fails, the operation still succeeds as long as the primary store + /// succeeds. Backup failures are logged. + BestEffortBackup, + + /// Writes must succeed on the primary store. + /// + /// Backup writes/removals are attempted immediately. If the backup store + /// fails but the primary succeeds, the operation still succeeds only if the + /// failed backup operation is durably persisted locally and enqueued for + /// asynchronous retry. + SemiSync, +} + +impl Default for BackupMode { + fn default() -> Self { + Self::BestEffortBackup + } +} + +/// A pending operation against the backup store that failed on the write path +/// and needs to be retried asynchronously. +/// +/// Limitation: +/// `Write` carries the exact `buf` at enqueue time. We never re-read from +/// primary at retry time — if a `Remove` for the same key succeeds on primary +/// after this `Write` was enqueued, re-reading primary would find nothing and +/// potentially resurrect deleted data on the backup. +#[derive(Clone, Debug)] +pub(crate) enum PendingBackupOp { + Write { buf: Vec }, + Remove { lazy: bool }, +} + +impl Writeable for PendingBackupOp { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { + match self { + PendingBackupOp::Write { buf } => { + 0u8.write(writer)?; + (buf.len() as u32).write(writer)?; + writer.write_all(buf)?; + }, + PendingBackupOp::Remove { lazy } => { + 1u8.write(writer)?; + lazy.write(writer)?; + }, + } + Ok(()) + } +} + +impl Readable for PendingBackupOp { + fn read(reader: &mut R) -> Result { + let tag: u8 = Readable::read(reader)?; + match tag { + 0 => { + let len: u32 = Readable::read(reader)?; + let mut buf = vec![0u8; len as usize]; + reader.read_exact(&mut buf)?; + Ok(PendingBackupOp::Write { buf }) + }, + 1 => { + let lazy: bool = Readable::read(reader)?; + Ok(PendingBackupOp::Remove { lazy }) + }, + _ => Err(DecodeError::InvalidValue), + } + } +} + +/// Deserialization wrapper for the pending ops map. +/// Decodes from a length-prefixed list of `(pn, sn, key, op_tag, [buf])` entries. +struct PendingOpsDeserWrapper(HashMap<(String, String, String), PendingBackupOp>); + +impl Readable for PendingOpsDeserWrapper { + fn read(reader: &mut R) -> Result { + let count: u32 = Readable::read(reader)?; + let mut map = HashMap::with_capacity(count as usize); + for _ in 0..count { + let pn: String = Readable::read(reader)?; + let sn: String = Readable::read(reader)?; + let key: String = Readable::read(reader)?; + let op: PendingBackupOp = Readable::read(reader)?; + map.insert((pn, sn, key), op); + } + Ok(Self(map)) + } +} + +/// Serialization wrapper for the pending ops map. +/// Encodes as a length-prefixed list of `(pn, sn, key, op_tag, [buf])` entries. +struct PendingOpsSerWrapper<'a>(&'a HashMap<(String, String, String), PendingBackupOp>); + +impl Writeable for PendingOpsSerWrapper<'_> { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { + (self.0.len() as u32).write(writer)?; + let mut entries: Vec<_> = self.0.iter().collect(); + entries.sort_by(|a, b| a.0.cmp(b.0)); + + for ((pn, sn, key), op) in entries { + pn.write(writer)?; + sn.write(writer)?; + key.write(writer)?; + op.write(writer)?; + } + + Ok(()) + } +} + +/// A durable, locally-persisted queue of backup operations that failed on the +/// write path and are pending async retry. +/// +/// The queue is keyed by `(primary_namespace, secondary_namespace, key)` so +/// that a newer op for the same key always replaces an older one — +/// deduplication is structural. This means: +/// - `Write` followed by `Write` to same key: only latest `buf` is retried. +/// - `Write` followed by `Remove` to same key: only `Remove` is retried. +/// - `Remove` followed by `Write` to same key: only latest `Write` is retried. +/// +/// The queue persists to a local `FilesystemStoreV2` at a dedicated namespace, +/// never through `TierStore`'s backup replication path, to avoid recursion +/// and to remain available even when primary is a remote store. +pub(crate) struct BackupRetryQueue +where + L::Target: LdkLogger, +{ + pending: Arc>>, + waker: Arc>>, + /// Always a local store — FilesystemStoreV2 at node data dir. + /// Never the user's primary or backup store. + local_store: Arc, + logger: L, +} + +impl BackupRetryQueue +where + L::Target: LdkLogger, +{ + /// Creates a new queue, loading any previously persisted pending ops from + /// `local_store` so that ops enqueued before a crash are not lost. + pub(crate) fn new(local_store: Arc, logger: L) -> Self { + let pending = Arc::new(Mutex::new(HashMap::new())); + + match KVStoreSync::read( + local_store.as_ref(), + BACKUP_RETRY_QUEUE_PRIMARY_NAMESPACE, + BACKUP_RETRY_QUEUE_SECONDARY_NAMESPACE, + BACKUP_RETRY_QUEUE_KEY, + ) { + Ok(data) => match PendingOpsDeserWrapper::read(&mut io::Cursor::new(data)) { + Ok(wrapper) => { + *pending.lock().expect("lock") = wrapper.0; + }, + Err(e) => { + log_error!( + logger, + "Failed to decode persisted backup retry queue, \ + starting fresh. Backup store may be missing recent \ + writes. Error: {}", + e + ); + }, + }, + Err(e) if e.kind() == io::ErrorKind::NotFound => { + // No persisted queue — normal on first startup. + }, + Err(e) => { + log_error!( + logger, + "Failed to read persisted backup retry queue: {}. \ + Starting fresh.", + e + ); + }, + } + + Self { pending, waker: Arc::new(Mutex::new(None)), local_store, logger } + } + + /// Enqueues a failed backup operation for asynchronous retry. + /// + /// If a pending op already exists for this key, it is replaced by the newer + /// op. The updated queue is persisted synchronously to the local store before + /// this function returns. + /// + /// Returns an error if the retry intent could not be durably persisted locally. + /// In that case, the op may still remain queued in memory for the current + /// process, but it is not guaranteed to survive a restart. + /// + /// This is the sync enqueue path, used from synchronous write/remove flows. + pub(crate) fn enqueue_sync( + &self, key: (String, String, String), op: PendingBackupOp, + ) -> io::Result<()> { + let encoded = { + let mut locked = self.pending.lock().expect("lock"); + locked.insert(key, op); + PendingOpsSerWrapper(&*locked).encode() + }; + + KVStoreSync::write( + self.local_store.as_ref(), + BACKUP_RETRY_QUEUE_PRIMARY_NAMESPACE, + BACKUP_RETRY_QUEUE_SECONDARY_NAMESPACE, + BACKUP_RETRY_QUEUE_KEY, + encoded, + )?; + + if let Some(waker) = self.waker.lock().expect("lock").take() { + waker.wake(); + } + + Ok(()) + } + + /// Removes a successfully retried entry from the in-memory queue and + /// best-effort persists the updated state. + /// + /// Failure to persist this cleanup is logged but not returned. At that + /// point the backup operation has already succeeded, so correctness relies + /// on at-least-once replay semantics: if the process restarts before a + /// later successful queue persistence, the completed op may be retried + /// again from the on-disk snapshot. + /// + /// This is acceptable because retried backup writes/removals are expected + /// to be idempotent at the `KVStore` layer. + pub(crate) fn remove_completed(&self, key: &(String, String, String)) { + let encoded = { + let mut locked = self.pending.lock().expect("lock"); + locked.remove(key); + PendingOpsSerWrapper(&*locked).encode() + }; + + if let Err(e) = KVStoreSync::write( + self.local_store.as_ref(), + BACKUP_RETRY_QUEUE_PRIMARY_NAMESPACE, + BACKUP_RETRY_QUEUE_SECONDARY_NAMESPACE, + BACKUP_RETRY_QUEUE_KEY, + encoded, + ) { + // Cleanup persistence is best-effort only. The backup op has already + // succeeded, so a later restart may replay it from the stale on-disk + // queue snapshot. This is acceptable because backup ops are expected + // to be idempotent. + log_error!( + self.logger, + "Failed to persist backup retry queue after successful retry for key {:?}: {}", + key, + e + ); + } + } + + /// Returns a snapshot of all pending entries for the retry task to drain. + /// + /// Returns cloned entries so the lock is not held during I/O. + pub(crate) fn snapshot(&self) -> Vec<((String, String, String), PendingBackupOp)> { + self.pending.lock().expect("lock").iter().map(|(k, v)| (k.clone(), v.clone())).collect() + } + + /// Returns true if the queue has no pending entries. + pub(crate) fn is_empty(&self) -> bool { + self.pending.lock().expect("lock").is_empty() + } + + /// Waits asynchronously until the queue has entries to process. + pub(crate) async fn wait_for_entries(&self) { + RetryQueueFuture { pending: Arc::clone(&self.pending), waker: Arc::clone(&self.waker) } + .await + } +} + +/// Future that resolves when the retry queue has pending entries. +/// Mirrors `EventFuture` in `EventQueue`. +struct RetryQueueFuture { + pending: Arc>>, + waker: Arc>>, +} + +impl Future for RetryQueueFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if !self.pending.lock().expect("lock").is_empty() { + Poll::Ready(()) + } else { + *self.waker.lock().expect("lock") = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +/// Runs the background retry loop for the given `BackupRetryQueue` against +/// `backup_store`. +/// +/// The task waits for pending queue entries, a periodic fallback wake, or a +/// shutdown signal. When woken, it snapshots the current queue and retries +/// each pending operation against the backup store. +/// +/// `Write` retries write the originally queued buffer directly to the backup +/// store and never re-read from primary. `Remove` retries issue a remove +/// against the backup store and treat `io::ErrorKind::NotFound` as success. +/// +/// Successfully retried entries are removed from the queue and the updated +/// queue state is persisted locally. Failed entries remain queued for later +/// retry. If any retries fail in a round, the task waits using exponential +/// backoff with jitter before retrying again. +/// +/// This function runs for node lifetime and should be spawned as a background +/// task during [`Node::start`] when semisynchronous backup retry is configured. +/// +/// [`Node::start`]: crate::Node::start +pub(crate) async fn run_backup_retry_task( + queue: Arc>>, backup_store: Arc, logger: Arc, + mut stop_receiver: tokio::sync::watch::Receiver<()>, +) { + let mut backoff_ms = RETRY_INITIAL_BACKOFF_MS; + + loop { + let fallback_sleep = tokio::time::sleep(Duration::from_secs(60)); + tokio::pin!(fallback_sleep); + + tokio::select! { + _ = stop_receiver.changed() => break, + _ = queue.wait_for_entries() => {}, + _ = &mut fallback_sleep => {}, + } + + if queue.is_empty() { + continue; + } + + let entries = queue.snapshot(); + let mut any_failed = false; + + for (key, op) in &entries { + if stop_receiver.has_changed().unwrap_or(false) { + return; + } + + let (pn, sn, k) = key; + + // Skip stale snapshot entries that were removed or replaced + // after we took the snapshot. + { + let locked = queue.pending.lock().expect("lock"); + match locked.get(key) { + None => continue, + Some(current_op) if !ops_match(current_op, op) => continue, + Some(_) => {}, + } + } + + let result = match op { + PendingBackupOp::Write { buf } => { + KVStoreSync::write(backup_store.as_ref(), pn, sn, k, buf.clone()) + }, + PendingBackupOp::Remove { lazy } => { + match KVStoreSync::remove(backup_store.as_ref(), pn, sn, k, *lazy) { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + }, + }; + + match result { + Ok(()) => { + queue.remove_completed(key); + }, + Err(e) => { + any_failed = true; + log_error!(logger, "Backup retry failed for key {}/{}/{}: {}", pn, sn, k, e); + }, + } + } + + if any_failed { + let jitter_ms = (backoff_ms / 10).max(1); + let sleep_ms = backoff_ms + (jitter_ms % 17); + let sleep = tokio::time::sleep(Duration::from_millis(sleep_ms)); + tokio::pin!(sleep); + + tokio::select! { + _ = stop_receiver.changed() => break, + _ = &mut sleep => {}, + } + + backoff_ms = (backoff_ms * RETRY_BACKOFF_MULTIPLIER).min(RETRY_MAX_BACKOFF_MS); + } else { + backoff_ms = RETRY_INITIAL_BACKOFF_MS; + } + } +} + +/// Returns true if two `PendingBackupOp` values are the same variant with +/// equivalent data. Used to detect staleness before applying a retried op. +fn ops_match(a: &PendingBackupOp, b: &PendingBackupOp) -> bool { + match (a, b) { + (PendingBackupOp::Write { buf: ba }, PendingBackupOp::Write { buf: bb }) => ba == bb, + (PendingBackupOp::Remove { lazy: la }, PendingBackupOp::Remove { lazy: lb }) => la == lb, + _ => false, + } +} + +impl MigratableKVStore for TierStore { + fn list_all_keys(&self) -> io::Result> { + self.inner.primary_store.list_all_keys() + } +} + +#[cfg(test)] +mod tests { + use std::future::Future; + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + use std::time::Duration; + + use bitcoin::io::ErrorKind; + use lightning::util::logger::Level; + use lightning::util::persist::{ + MigratableKVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v1::FilesystemStore; + + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; + use crate::logger::Logger; + use crate::types::{DynStore, DynStoreWrapper}; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + struct TestStores { + _cleanup: CleanupDir, + logger: Arc, + primary: Arc, + backup: Arc, + retry: Arc, + ephemeral: Arc, + } + + impl TestStores { + fn new() -> Self { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new( + Logger::new_fs_writer(log_path, Level::Trace).expect("logger setup failed."), + ); + + let primary: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let backup: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + let retry: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("retry_queue")))); + let ephemeral: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + + Self { _cleanup: CleanupDir(base_dir), logger, primary, backup, retry, ephemeral } + } + } + + #[derive(Clone, Copy)] + enum FailMode { + Write, + Remove, + } + + struct FailingStore { + mode: FailMode, + } + + impl FailingStore { + fn new(mode: FailMode) -> Self { + Self { mode } + } + + fn should_fail_write(&self) -> bool { + matches!(self.mode, FailMode::Write) + } + + fn should_fail_remove(&self) -> bool { + matches!(self.mode, FailMode::Remove) + } + } + + impl KVStore for FailingStore { + fn read( + &self, _primary_namespace: &str, _secondary_namespace: &str, _key: &str, + ) -> impl Future, io::Error>> + Send + 'static { + async { Err(io::Error::from(ErrorKind::NotFound)) } + } + + fn write( + &self, _primary_namespace: &str, _secondary_namespace: &str, _key: &str, _buf: Vec, + ) -> impl Future> + Send + 'static { + let fail = self.should_fail_write(); + async move { + if fail { + Err(io::Error::new(ErrorKind::Other, "intentional backup write failure")) + } else { + Ok(()) + } + } + } + + fn remove( + &self, _primary_namespace: &str, _secondary_namespace: &str, _key: &str, _lazy: bool, + ) -> impl Future> + Send + 'static { + let fail = self.should_fail_remove(); + async move { + if fail { + Err(io::Error::new(ErrorKind::Other, "intentional backup remove failure")) + } else { + Ok(()) + } + } + } + + fn list( + &self, _primary_namespace: &str, _secondary_namespace: &str, + ) -> impl Future, io::Error>> + Send + 'static { + async { Ok(Vec::new()) } + } + } + + impl KVStoreSync for FailingStore { + fn read( + &self, _primary_namespace: &str, _secondary_namespace: &str, _key: &str, + ) -> Result, io::Error> { + Err(io::Error::from(ErrorKind::NotFound)) + } + + fn write( + &self, _primary_namespace: &str, _secondary_namespace: &str, _key: &str, _buf: Vec, + ) -> Result<(), io::Error> { + if self.should_fail_write() { + Err(io::Error::new(ErrorKind::Other, "intentional backup write failure")) + } else { + Ok(()) + } + } + + fn remove( + &self, _primary_namespace: &str, _secondary_namespace: &str, _key: &str, _lazy: bool, + ) -> Result<(), io::Error> { + if self.should_fail_remove() { + Err(io::Error::new(ErrorKind::Other, "intentional backup remove failure")) + } else { + Ok(()) + } + } + + fn list( + &self, _primary_namespace: &str, _secondary_namespace: &str, + ) -> Result, io::Error> { + Ok(Vec::new()) + } + } + + impl MigratableKVStore for FailingStore { + fn list_all_keys(&self) -> Result, io::Error> { + Ok(Vec::new()) + } + } + + fn setup_tier_store(primary_store: Arc, logger: Arc) -> TierStore { + TierStore::new(primary_store, logger) + } + + fn read_retry_queue_ops( + retry_store: Arc, + ) -> HashMap<(String, String, String), PendingBackupOp> { + let data = KVStoreSync::read( + &*retry_store, + BACKUP_RETRY_QUEUE_PRIMARY_NAMESPACE, + BACKUP_RETRY_QUEUE_SECONDARY_NAMESPACE, + BACKUP_RETRY_QUEUE_KEY, + ) + .expect("Failed to read backup retry queue."); + + PendingOpsDeserWrapper::read(&mut io::Cursor::new(data)) + .expect("Failed to deserialization retry queue.") + .0 + } + + #[test] + fn write_read_list_remove() { + let stores = TestStores::new(); + let tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + do_read_write_remove_list_persist(&tier); + } + + #[test] + fn ephemeral_routing() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + tier.set_ephemeral_store(Arc::clone(&stores.ephemeral)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .expect("Failed to write network graph."); + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .expect("Failed to write channel manager to tier store."); + + let primary_read_ng = KVStoreSync::read( + &*stores.primary, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*stores.ephemeral, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*stores.ephemeral, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!( + ephemeral_read_ng.expect("Failed to read network graph from ephemeral store."), + data + ); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!( + primary_read_cm.expect("Failed to read channel manager from primary store."), + data + ); + } + + #[test] + fn backup_write_is_part_of_success_path() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + tier.set_backup_store(Arc::clone(&stores.backup), None); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .expect("Failed to write channel manager to tier store."); + + let primary_read = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let backup_read = KVStoreSync::read( + &*stores.backup, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.expect("Failed to read channel manager from primary store."), data); + assert_eq!(backup_read.expect("Failed to read channel manager from backup store."), data); + } + + #[test] + fn backup_remove_is_part_of_success_path() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + tier.set_backup_store(Arc::clone(&stores.backup), None); + + let data = vec![42u8; 32]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .expect("Failed to write channel manager to tier store."); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .expect("Failed to remove channel manager from tier store."); + + let primary_read = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + let backup_read = KVStoreSync::read( + &*stores.backup, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(primary_read.is_err()); + assert!(backup_read.is_err()); + } + + #[test] + fn best_effort_backup_write_failure_returns_success() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + let failing_backup: Arc = + Arc::new(DynStoreWrapper(FailingStore::new(FailMode::Write))); + + tier.set_backup_store(Arc::clone(&failing_backup), None); + + let data = vec![42u8; 32]; + + let res = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ); + + assert!(res.is_ok()); + + let primary_read = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.expect("Failed to read channel manager from primary store."), data); + } + + #[test] + fn best_effort_backup_remove_failure_returns_success() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + let failing_backup: Arc = + Arc::new(DynStoreWrapper(FailingStore::new(FailMode::Remove))); + + tier.set_backup_store(Arc::clone(&failing_backup), None); + + let data = vec![11u8; 20]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .expect("Failed to write channel manager to tier store."); + + let res = KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ); + + assert!(res.is_ok()); + + let primary_read = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(primary_read.is_err()); + } + + #[test] + fn semisync_backup_write_failure_enqueues_retry() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + let failing_backup: Arc = + Arc::new(DynStoreWrapper(FailingStore::new(FailMode::Write))); + + let retry_queue = + Arc::new(BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger))); + + tier.set_backup_store(Arc::clone(&failing_backup), Some(Arc::clone(&retry_queue))); + + let data = vec![7u8; 16]; + + let res = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ); + + assert!(res.is_ok()); + + let primary_read = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.expect("Failed to read channel manager from primary store."), data); + + let queued_ops = read_retry_queue_ops(stores.retry); + let queued_op = queued_ops.get(&( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + )); + + match queued_op { + Some(PendingBackupOp::Write { buf }) => assert_eq!(buf, &data), + other => panic!("expected queued write op, got {:?}", other), + } + } + + #[test] + fn semisync_backup_remove_failure_enqueues_retry() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + let failing_backup: Arc = + Arc::new(DynStoreWrapper(FailingStore::new(FailMode::Remove))); + + let retry_queue = + Arc::new(BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger))); + + tier.set_backup_store(Arc::clone(&failing_backup), Some(Arc::clone(&retry_queue))); + + let data = vec![5u8; 24]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .expect("Failed to write channel manager to tier store."); + + let res = KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ); + + assert!(res.is_ok()); + + let primary_read = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + assert!(primary_read.is_err()); + + let queued_ops = read_retry_queue_ops(stores.retry); + let queued_op = queued_ops.get(&( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + key.to_string(), + )); + + match queued_op { + Some(PendingBackupOp::Remove { lazy }) => assert!(*lazy), + other => panic!("expected queued remove op, got {:?}", other), + } + } + + #[test] + fn semisync_queue_persist_failure_returns_error() { + let stores = TestStores::new(); + let mut tier = setup_tier_store(Arc::clone(&stores.primary), Arc::clone(&stores.logger)); + + let failing_backup: Arc = + Arc::new(DynStoreWrapper(FailingStore::new(FailMode::Write))); + + let failing_retry_store: Arc = + Arc::new(DynStoreWrapper(FailingStore::new(FailMode::Write))); + let retry_queue = Arc::new(BackupRetryQueue::new( + Arc::clone(&failing_retry_store), + Arc::clone(&stores.logger), + )); + + tier.set_backup_store(Arc::clone(&failing_backup), Some(Arc::clone(&retry_queue))); + + let data = vec![9u8; 8]; + + let res = KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ); + + assert!(res.is_err()); + + let primary_read = KVStoreSync::read( + &*stores.primary, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.expect("Failed to read channel manager from primary store."), data); + } + + #[test] + fn retry_task_replays_queued_write() { + let stores = TestStores::new(); + + let queue = + Arc::new(BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger))); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + let data = vec![3u8; 12]; + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: data.clone() }) + .expect("Failed to enqueue write op."); + + let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime."); + let (stop_tx, stop_rx) = tokio::sync::watch::channel(()); + + runtime.block_on(async { + let task_queue = Arc::clone(&queue); + let task_backup = Arc::clone(&stores.backup); + let task_logger = Arc::clone(&stores.logger); + + let handle = tokio::spawn(async move { + run_backup_retry_task(task_queue, task_backup, task_logger, stop_rx).await; + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + let _ = stop_tx.send(()); + let _ = handle.await; + }); + + let backup_read = KVStoreSync::read( + &*stores.backup, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(backup_read.expect("Failed to read channel manager from backup store."), data); + + let queued_ops = read_retry_queue_ops(stores.retry); + assert!(queued_ops.is_empty()); + } + + #[test] + fn retry_task_replays_queued_remove() { + let stores = TestStores::new(); + + let queue = + Arc::new(BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger))); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + + KVStoreSync::write( + &*stores.backup, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + vec![1u8; 4], + ) + .expect("Failed to write channel manager to backup store."); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Remove { lazy: true }) + .expect("Failed to enqueue remove op."); + + let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime."); + let (stop_tx, stop_rx) = tokio::sync::watch::channel(()); + + runtime.block_on(async { + let task_queue = Arc::clone(&queue); + let task_backup = Arc::clone(&stores.backup); + let task_logger = Arc::clone(&stores.logger); + + let handle = tokio::spawn(async move { + run_backup_retry_task(task_queue, task_backup, task_logger, stop_rx).await; + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + let _ = stop_tx.send(()); + let _ = handle.await; + }); + + let backup_read = KVStoreSync::read( + &*stores.backup, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(backup_read.is_err()); + + let queued_ops = read_retry_queue_ops(stores.retry); + assert!(queued_ops.is_empty()); + } + + #[test] + fn retry_task_remove_not_found_is_success() { + let stores = TestStores::new(); + + let queue = + Arc::new(BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger))); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Remove { lazy: true }) + .expect("Failed to enqueue remove op."); + + let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime."); + let (stop_tx, stop_rx) = tokio::sync::watch::channel(()); + + runtime.block_on(async { + let task_queue = Arc::clone(&queue); + let task_backup = Arc::clone(&stores.backup); + let task_logger = Arc::clone(&stores.logger); + + let handle = tokio::spawn(async move { + run_backup_retry_task(task_queue, task_backup, task_logger, stop_rx).await; + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + let _ = stop_tx.send(()); + let _ = handle.await; + }); + + let queued_ops = read_retry_queue_ops(stores.retry); + assert!(queued_ops.is_empty()); + } + + #[test] + fn retry_task_skips_stale_snapshot_entries() { + let stores = TestStores::new(); + + let queue = + Arc::new(BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger))); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: vec![1u8; 4] }) + .expect("Failed to enqueue first write op."); + + let snapshot = queue.snapshot(); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: vec![2u8; 4] }) + .expect("Failed to enqueue second write op."); + + for (snap_key, snap_op) in &snapshot { + let locked = queue.pending.lock().expect("Failed to lock pending ops."); + match locked.get(snap_key) { + None => continue, + Some(current_op) if !ops_match(current_op, snap_op) => continue, + Some(_) => panic!("expected stale snapshot op to be skipped"), + } + } + } + + #[test] + fn pending_ops_roundtrip_ser_deser() { + let mut map = HashMap::new(); + map.insert( + ("ns1".to_string(), "".to_string(), "k1".to_string()), + PendingBackupOp::Write { buf: vec![1, 2, 3] }, + ); + map.insert( + ("ns2".to_string(), "sub".to_string(), "k2".to_string()), + PendingBackupOp::Remove { lazy: true }, + ); + + let encoded = PendingOpsSerWrapper(&map).encode(); + let decoded = PendingOpsDeserWrapper::read(&mut io::Cursor::new(encoded)) + .expect("Failed to deserialize pending ops.") + .0; + + assert_eq!(decoded.len(), 2); + + match decoded.get(&("ns1".to_string(), "".to_string(), "k1".to_string())) { + Some(PendingBackupOp::Write { buf }) => assert_eq!(buf, &vec![1, 2, 3]), + other => panic!("expected queued write op, got {:?}", other), + } + + match decoded.get(&("ns2".to_string(), "sub".to_string(), "k2".to_string())) { + Some(PendingBackupOp::Remove { lazy }) => assert!(*lazy), + other => panic!("expected queued remove op, got {:?}", other), + } + } + + #[test] + fn retry_queue_reloads_persisted_entries_on_restart() { + let stores = TestStores::new(); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + + { + let queue = Arc::new(BackupRetryQueue::new( + Arc::clone(&stores.retry), + Arc::clone(&stores.logger), + )); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: vec![1u8, 2u8, 3u8] }) + .expect("Failed to enqueue write op."); + } + + let reloaded_queue = + BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger)); + let snapshot = reloaded_queue.snapshot(); + + assert_eq!(snapshot.len(), 1); + + match snapshot.first() { + Some((queued_key, PendingBackupOp::Write { buf })) => { + assert_eq!(queued_key, &key); + assert_eq!(buf, &vec![1u8, 2u8, 3u8]); + }, + other => panic!("expected reloaded queued write op, got {:?}", other), + } + } + + #[test] + fn retry_queue_dedups_write_then_write_to_latest() { + let stores = TestStores::new(); + + let queue = BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger)); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: vec![1u8, 2u8] }) + .expect("Failed to enqueue first write op."); + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: vec![9u8, 8u8] }) + .expect("Failed to enqueue second write op."); + + let snapshot = queue.snapshot(); + assert_eq!(snapshot.len(), 1); + + match snapshot.first() { + Some((queued_key, PendingBackupOp::Write { buf })) => { + assert_eq!(queued_key, &key); + assert_eq!(buf, &vec![9u8, 8u8]); + }, + other => panic!("expected latest queued write op, got {:?}", other), + } + } + + #[test] + fn retry_queue_dedups_write_then_remove_to_remove() { + let stores = TestStores::new(); + + let queue = BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger)); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: vec![1u8, 2u8] }) + .expect("Failed to enqueue write op."); + queue + .enqueue_sync(key.clone(), PendingBackupOp::Remove { lazy: true }) + .expect("Failed to enqueue remove op."); + + let snapshot = queue.snapshot(); + assert_eq!(snapshot.len(), 1); + + match snapshot.first() { + Some((queued_key, PendingBackupOp::Remove { lazy })) => { + assert_eq!(queued_key, &key); + assert!(*lazy); + }, + other => panic!("expected queued remove op, got {:?}", other), + } + } + + #[test] + fn retry_queue_dedups_remove_then_write_to_write() { + let stores = TestStores::new(); + + let queue = BackupRetryQueue::new(Arc::clone(&stores.retry), Arc::clone(&stores.logger)); + + let key = ( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(), + ); + + queue + .enqueue_sync(key.clone(), PendingBackupOp::Remove { lazy: true }) + .expect("Failed to enqueue remove op."); + queue + .enqueue_sync(key.clone(), PendingBackupOp::Write { buf: vec![5u8, 6u8, 7u8] }) + .expect("Failed to enqueue write op."); + + let snapshot = queue.snapshot(); + assert_eq!(snapshot.len(), 1); + + match snapshot.first() { + Some((queued_key, PendingBackupOp::Write { buf })) => { + assert_eq!(queued_key, &key); + assert_eq!(buf, &vec![5u8, 6u8, 7u8]); + }, + other => panic!("expected queued write op, got {:?}", other), + } + } +} diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 97883b5d5..4c59672d8 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -24,7 +24,7 @@ use bitcoin::Network; use lightning::impl_writeable_tlv_based_enum; use lightning::io::{self, Error, ErrorKind}; use lightning::sign::{EntropySource as LdkEntropySource, RandomBytes}; -use lightning::util::persist::{KVStore, KVStoreSync}; +use lightning::util::persist::{KVStore, KVStoreSync, MigratableKVStore}; use lightning::util::ser::{Readable, Writeable}; use prost::Message; use vss_client::client::VssClient; @@ -386,6 +386,27 @@ impl Drop for VssStore { } } +impl MigratableKVStore for VssStore { + fn list_all_keys(&self) -> io::Result> { + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let inner = Arc::clone(&self.inner); + let fut = async move { + let stored_keys = inner.list_all_stored_keys(&inner.blocking_client).await?; + let mut decoded = Vec::with_capacity(stored_keys.len()); + for stored_key in stored_keys { + if let Some(key_parts) = inner.decode_stored_key(&stored_key)? { + decoded.push(key_parts); + } + } + Ok(decoded) + }; + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) + } +} struct VssStoreInner { schema_version: VssSchemaVersion, blocking_client: VssClient, @@ -507,6 +528,93 @@ impl VssStoreInner { Ok(keys) } + async fn list_all_stored_keys( + &self, client: &VssClient, + ) -> io::Result> { + let mut page_token = None; + let mut keys = Vec::new(); + + while page_token != Some("".to_string()) { + let request = ListKeyVersionsRequest { + store_id: self.store_id.clone(), + key_prefix: None, + page_token, + page_size: None, + }; + + let response = client.list_key_versions(&request).await.map_err(|e| { + let msg = format!("Failed to list all stored keys: {}", e); + Error::new(ErrorKind::Other, msg) + })?; + + for kv in response.key_versions { + keys.push(kv.key); + } + + page_token = response.next_page_token; + } + + Ok(keys) + } + + fn decode_stored_key(&self, stored_key: &str) -> io::Result> { + match self.schema_version { + VssSchemaVersion::V0 => { + if !stored_key.contains('#') { + let key = self.key_obfuscator.deobfuscate(stored_key)?; + if key == VSS_SCHEMA_VERSION_KEY { + return Ok(None); + } + return Ok(Some(("".to_string(), "".to_string(), key))); + } + + let mut parts = stored_key.splitn(3, '#'); + let primary_namespace = parts + .next() + .ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?; + let secondary_namespace = parts + .next() + .ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?; + let obfuscated_key = parts + .next() + .ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?; + + let key = self.key_obfuscator.deobfuscate(obfuscated_key)?; + if key == VSS_SCHEMA_VERSION_KEY { + return Ok(None); + } + + Ok(Some((primary_namespace.to_string(), secondary_namespace.to_string(), key))) + }, + VssSchemaVersion::V1 => { + let mut parts = stored_key.splitn(2, '#'); + let obfuscated_prefix = parts + .next() + .ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?; + let obfuscated_key = parts + .next() + .ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?; + + let prefix = self.key_obfuscator.deobfuscate(obfuscated_prefix)?; + let key = self.key_obfuscator.deobfuscate(obfuscated_key)?; + + if key == VSS_SCHEMA_VERSION_KEY { + return Ok(None); + } + + let mut prefix_parts = prefix.splitn(2, '#'); + let primary_namespace = prefix_parts + .next() + .ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key prefix"))?; + let secondary_namespace = prefix_parts + .next() + .ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key prefix"))?; + + Ok(Some((primary_namespace.to_string(), secondary_namespace.to_string(), key))) + }, + } + } + async fn read_internal( &self, client: &VssClient, primary_namespace: String, secondary_namespace: String, key: String, diff --git a/src/lib.rs b/src/lib.rs index faeb6d339..8b7f59b02 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -143,6 +143,7 @@ use fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use ffi::*; use gossip::GossipSource; use graph::NetworkGraph; +pub use io::tier_store::BackupMode; use io::utils::update_and_persist_node_metrics; pub use lightning; use lightning::chain::BestBlock; @@ -173,13 +174,16 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, HRNResolver, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, +}; +pub use types::{ + ChannelDetails, CustomTlvRecord, DynStore, DynStoreWrapper, PeerDetails, SyncAndAsyncKVStore, + UserChannelId, }; -pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use vss_client; +use crate::io::tier_store::{run_backup_retry_task, TierStore}; use crate::scoring::setup_background_pathfinding_scores_sync; use crate::wallet::FundingAmount; @@ -239,6 +243,7 @@ pub struct Node { om_mailbox: Option>, async_payments_role: Option, hrn_resolver: Arc, + tier_store: Arc, #[cfg(cycle_tests)] _leak_checker: LeakChecker, } @@ -346,6 +351,26 @@ impl Node { ); } + // Spawn background task to asynchronously retry failed backup operations. + if let Some(backup_store) = self.tier_store.backup_store() { + if let Some(retry_queue) = backup_store.retry_queue.as_ref() { + let retry_queue = Arc::clone(retry_queue); + let retry_backup_store = Arc::clone(&backup_store.store); + let retry_logger = Arc::clone(&self.logger); + let stop_retry = self.stop_sender.subscribe(); + + self.runtime.spawn_background_task(async move { + run_backup_retry_task( + retry_queue, + retry_backup_store, + retry_logger, + stop_retry, + ) + .await; + }); + } + } + if let Some(listening_addresses) = &self.config.listening_addresses { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); diff --git a/src/types.rs b/src/types.rs index 5d5515dcc..a1cb1f001 100644 --- a/src/types.rs +++ b/src/types.rs @@ -31,7 +31,9 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersisterAsync}; +use lightning::util::persist::{ + KVStore, KVStoreSync, MigratableKVStore, MonitorUpdatingPersisterAsync, +}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::GossipVerifier; @@ -59,7 +61,23 @@ where { } -pub(crate) trait DynStoreTrait: Send + Sync { +/// An object-safe façade over [`KVStore`], [`KVStoreSync`], and +/// [`MigratableKVStore`]. +/// +/// The LDK store traits are convenient for concrete backends, but they are not +/// directly suitable for dynamic dispatch because the async methods on +/// [`KVStore`] return `impl Future`, which is not object-safe. `DynStoreTrait` +/// bridges that gap by: +/// - exposing boxed async methods for object-safe async access +/// - exposing synchronous methods mirroring [`KVStoreSync`] +/// - exposing [`list_all_keys`] for exhaustive key enumeration during +/// migration and restoration +/// +/// This trait is the common erased storage interface used by the builder, +/// tiered storage, runtime plumbing, and FFI-facing store integrations. +/// +/// [`list_all_keys`]: DynStoreTrait::list_all_keys +pub trait DynStoreTrait: Send + Sync { fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; @@ -85,6 +103,16 @@ pub(crate) trait DynStoreTrait: Send + Sync { fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, bitcoin::io::Error>; + + /// Returns all known keys as `(primary_namespace, secondary_namespace, key)` tuples. + /// + /// As with [`lightning::util::persist::MigratableKVStore::list_all_keys`], + /// implementations must exhaustively return all entries known to the store so + /// migration and restoration do not miss data. + /// + /// Implementations that do not support exhaustive enumeration may return an + /// error with [`bitcoin::io::ErrorKind::Other`]. + fn list_all_keys(&self) -> Result, bitcoin::io::Error>; } impl<'a> KVStore for dyn DynStoreTrait + 'a { @@ -139,7 +167,24 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { } } -pub(crate) type DynStore = dyn DynStoreTrait; +/// An object-safe, type-erased key-value store used throughout the node. +/// +/// [`KVStore`] and [`KVStoreSync`] are not directly object-safe because their +/// async methods return `impl Future`. `DynStoreTrait` provides an object-safe +/// façade over both traits by boxing the async futures and exposing the sync +/// methods directly. +/// +/// `DynStore` is the common erased store type used when the node needs to hold +/// storage behind dynamic dispatch, such as: +/// - primary, backup, or ephemeral stores in tiered storage +/// - FFI-facing stores +/// - generic builder and runtime plumbing that should not depend on a concrete +/// backend type +/// +/// Implementations are also expected to support exhaustive key enumeration +/// mirroring [`lightning::util::persist::MigratableKVStore`], so the same +/// erased store abstraction can be used for migration and restoration. +pub type DynStore = dyn DynStoreTrait; // Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` // methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by @@ -174,9 +219,23 @@ impl KVStore for DynStoreRef { } } -pub(crate) struct DynStoreWrapper(pub(crate) T); +/// Wraps a concrete store so it can be used as a [`DynStore`]. +/// +/// This adapter bridges a concrete backend implementing +/// [`SyncAndAsyncKVStore`] and [`MigratableKVStore`] into the object-safe +/// `DynStoreTrait` abstraction by: +/// - delegating synchronous operations to [`KVStoreSync`] +/// - boxing async operations from [`KVStore`] +/// - forwarding exhaustive key enumeration to [`MigratableKVStore`] +/// +/// In practice, this is the main entry point for erasing native store types +/// such as SQLite-, filesystem-, tiered-, or VSS-backed stores into the common +/// dynamic store abstraction used by the builder and runtime. +pub struct DynStoreWrapper(pub T); -impl DynStoreTrait for DynStoreWrapper { +impl DynStoreTrait + for DynStoreWrapper +{ fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>> { @@ -224,6 +283,10 @@ impl DynStoreTrait for DynStoreWrapper ) -> Result, bitcoin::io::Error> { KVStoreSync::list(&self.0, primary_namespace, secondary_namespace) } + + fn list_all_keys(&self) -> Result, bitcoin::io::Error> { + MigratableKVStore::list_all_keys(&self.0) + } } pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 850c6f22c..c3935aba5 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -42,7 +42,7 @@ use ldk_node::{ use lightning::io; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; -use lightning::util::persist::{KVStore, KVStoreSync}; +use lightning::util::persist::{KVStore, KVStoreSync, MigratableKVStore}; use lightning::util::test_utils::TestStore; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_persister::fs_store::v1::FilesystemStore; @@ -442,7 +442,17 @@ pub(crate) fn setup_two_nodes_with_store( } pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestNode { + setup_node_with_builder(chain_source, config, |_| {}) +} + +pub(crate) fn setup_node_with_builder( + chain_source: &TestChainSource, config: TestConfig, configure_builder: F, +) -> TestNode +where + F: FnOnce(&mut Builder), +{ setup_builder!(builder, config.node_config); + match chain_source { TestChainSource::Esplora(electrsd) => { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -501,6 +511,8 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + configure_builder(&mut builder); + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -509,10 +521,6 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), }; - if config.recovery_mode { - builder.set_wallet_recovery_mode(); - } - node.start().unwrap(); assert!(node.status().is_running); assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); @@ -1616,6 +1624,12 @@ impl KVStoreSync for TestSyncStore { } } +impl MigratableKVStore for TestSyncStore { + fn list_all_keys(&self) -> lightning::io::Result> { + self.inner.list_all_keys_internal() + } +} + struct TestSyncStoreInner { serializer: RwLock<()>, test_store: TestStore, @@ -1789,4 +1803,19 @@ impl TestSyncStoreInner { let _guard = self.serializer.read().unwrap(); self.do_list(primary_namespace, secondary_namespace) } + + fn list_all_keys_internal(&self) -> lightning::io::Result> { + let _guard = self.serializer.read().unwrap(); + + let mut fs_keys = MigratableKVStore::list_all_keys(&self.fs_store)?; + fs_keys.sort(); + + let mut sqlite_keys = MigratableKVStore::list_all_keys(&self.sqlite_store)?; + sqlite_keys.sort(); + assert_eq!(fs_keys, sqlite_keys); + + // TODO(enigbe): Upstream MigratableKVStore implementation for TestStore + + Ok(fs_keys) + } } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index d2c057a16..2038ae8d2 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -23,22 +23,28 @@ use common::{ expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, generate_listening_addresses, open_channel, open_channel_push_amt, open_channel_with_all, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, - setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + random_storage_path, setup_bitcoind_and_electrsd, setup_builder, setup_node, + setup_node_with_builder, setup_two_nodes, splice_in_with_all, wait_for_tx, TestChainSource, + TestStoreType, TestSyncStore, }; use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; +use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::liquidity::LSPS2ServiceConfig; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, UnifiedPaymentResult, }; -use ldk_node::{Builder, Event, NodeError}; +use ldk_node::{BackupMode, Builder, DynStore, DynStoreWrapper, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::{ + MigratableKVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -2957,3 +2963,82 @@ async fn splice_in_with_all_balance() { node_a.stop().unwrap(); node_b.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn restore_from_backup_full_cycle() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let config_a = random_config(true); + let config_b = random_config(true); + let entropy_a = config_a.node_entropy.clone(); + + let backup_dir = random_storage_path(); + let backup_db_file = "backup_db".to_string(); + let backup_table = "backup_table".to_string(); + + let make_backup_store = || { + SqliteStore::new( + backup_dir.clone(), + Some(backup_db_file.clone()), + Some(backup_table.clone()), + ) + .expect("Failed to create SQLite store.") + }; + + // 1. Run a real node cycle with backup replication enabled. + let original_node_id = { + let backup_store: Arc = Arc::new(DynStoreWrapper(make_backup_store())); + + let node_a = setup_node_with_builder(&chain_source, config_a.clone(), |builder| { + builder.set_backup_store(Arc::clone(&backup_store), BackupMode::BestEffortBackup); + }); + let node_b = setup_node(&chain_source, config_b.clone()); + + let node_id = node_a.node_id(); + + do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + false, + ) + .await; + + node_id + }; + + // 2. Reopen the backup store concretely and verify it contains a known + // durable key from normal node operation. + let reopened_backup = make_backup_store(); + let backup_keys = + MigratableKVStore::list_all_keys(&reopened_backup).expect("Failed to list backup keys"); + + assert!( + backup_keys.iter().any(|(p, s, k)| { + p == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE + && s == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE + && k == CHANNEL_MANAGER_PERSISTENCE_KEY + }), + "Backup store should contain the persisted ChannelManager" + ); + + // 3. Restore into a fresh primary and ensure the restored node can boot. + let mut restore_config = config_a.clone(); + restore_config.node_config.storage_dir_path = + random_storage_path().to_str().unwrap().to_owned(); + + let restore_backup_store: Arc = Arc::new(DynStoreWrapper(make_backup_store())); + + let restored_node = setup_node_with_builder(&chain_source, restore_config, |builder| { + builder.set_backup_store(restore_backup_store, BackupMode::BestEffortBackup); + builder.restore_from_backup(); + }); + + assert_eq!(restored_node.node_id(), original_node_id); + + restored_node.stop().unwrap(); +}