Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 105 additions & 5 deletions crates/hot-mdbx/src/db_info.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,66 @@
use bytes::Buf;
use parking_lot::RwLock;
use signet_hot::ValSer;
use std::collections::HashMap;
use signet_hot::{ValSer, tables::NUM_TABLES};
use std::{collections::HashMap, sync::Arc};

/// Inner storage for the two-tier FSI cache.
///
/// The `known` array holds pre-populated entries for the standard tables,
/// searched via lock-free linear scan. The `dynamic` map holds entries for
/// tables created at runtime.
#[derive(Debug)]
struct FsiCacheInner {
/// Pre-populated at open time. Lock-free linear scan.
known: [(&'static str, FixedSizeInfo); NUM_TABLES],
/// Locking fallback for dynamically created tables.
dynamic: RwLock<HashMap<&'static str, FixedSizeInfo>>,
}

/// Two-tier cache for [`FixedSizeInfo`].
///
/// The fast path is a lock-free linear scan over the known table entries.
/// The slow path acquires a `RwLock` for dynamically created tables.
#[derive(Debug, Clone)]
pub(crate) struct FsiCache(Arc<FsiCacheInner>);

impl Default for FsiCache {
fn default() -> Self {
Self::new([("", FixedSizeInfo::None); NUM_TABLES])
}
}

/// Type alias for the FixedSizeInfo cache.
pub type FsiCache = std::sync::Arc<RwLock<HashMap<&'static str, FixedSizeInfo>>>;
impl FsiCache {
/// Create a new `FsiCache` pre-populated with the known table entries.
pub(crate) fn new(known: [(&'static str, FixedSizeInfo); NUM_TABLES]) -> Self {
Self(Arc::new(FsiCacheInner { known, dynamic: RwLock::new(HashMap::new()) }))
}

/// Look up a table's [`FixedSizeInfo`].
///
/// Checks the lock-free known array first, then the locked dynamic map.
/// Returns `None` if the table is not cached.
pub(crate) fn get(&self, name: &str) -> Option<FixedSizeInfo> {
// Fast path: linear scan over known tables (no lock).
for &(known_name, fsi) in &self.0.known {
if known_name == name {
return Some(fsi);
}
}
// Slow path: check dynamic map.
self.0.dynamic.read().get(name).copied()
}

/// Insert a dynamically created table's [`FixedSizeInfo`].
pub(crate) fn insert_dynamic(&self, name: &'static str, fsi: FixedSizeInfo) {
self.0.dynamic.write().insert(name, fsi);
}
}

/// Information about fixed size values in a database.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum FixedSizeInfo {
/// Not a DUPSORT table.
#[default]
None,
/// DUPSORT table without DUP_FIXED (variable value size).
DupSort {
Expand Down Expand Up @@ -144,4 +195,53 @@ mod tests {
other => panic!("expected InsufficientData, got: {other:?}"),
}
}

#[test]
fn fsi_cache_known_path() {
let known = [
("TableA", FixedSizeInfo::None),
("TableB", FixedSizeInfo::DupSort { key2_size: 32 }),
("TableC", FixedSizeInfo::DupFixed { key2_size: 32, total_size: 64 }),
("TableD", FixedSizeInfo::None),
("TableE", FixedSizeInfo::None),
("TableF", FixedSizeInfo::None),
("TableG", FixedSizeInfo::None),
("TableH", FixedSizeInfo::None),
("TableI", FixedSizeInfo::None),
];
let cache = FsiCache::new(known);

assert_eq!(cache.get("TableA"), Some(FixedSizeInfo::None));
assert_eq!(cache.get("TableB"), Some(FixedSizeInfo::DupSort { key2_size: 32 }));
assert_eq!(
cache.get("TableC"),
Some(FixedSizeInfo::DupFixed { key2_size: 32, total_size: 64 })
);
// Unknown table returns None
assert_eq!(cache.get("Unknown"), None);
}

#[test]
fn fsi_cache_dynamic_path() {
let known = [
("T1", FixedSizeInfo::None),
("T2", FixedSizeInfo::None),
("T3", FixedSizeInfo::None),
("T4", FixedSizeInfo::None),
("T5", FixedSizeInfo::None),
("T6", FixedSizeInfo::None),
("T7", FixedSizeInfo::None),
("T8", FixedSizeInfo::None),
("T9", FixedSizeInfo::None),
];
let cache = FsiCache::new(known);

// Not in known set
assert_eq!(cache.get("DynTable"), None);

// Insert dynamically
let fsi = FixedSizeInfo::DupSort { key2_size: 20 };
cache.insert_dynamic("DynTable", fsi);
assert_eq!(cache.get("DynTable"), Some(fsi));
}
}
97 changes: 72 additions & 25 deletions crates/hot-mdbx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg))]

use parking_lot::RwLock;
use signet_libmdbx::{
Environment, EnvironmentFlags, Geometry, Mode, Ro, RoSync, Rw, RwSync, SyncMode, ffi,
sys::{HandleSlowReadersReturnCode, PageSize},
};
use std::{collections::HashMap, ops::Range, path::Path, sync::Arc};
use std::{ops::Range, path::Path};

mod cursor;
pub use cursor::{Cursor, CursorRo, CursorRoSync, CursorRw, CursorRwSync};

mod db_info;
pub use db_info::{FixedSizeInfo, FsiCache};
pub use db_info::FixedSizeInfo;
use db_info::FsiCache;

mod error;
pub use error::MdbxError;
Expand All @@ -78,7 +78,26 @@ pub use tx::Tx;

mod utils;

use signet_hot::model::{HotKv, HotKvError, HotKvWrite};
use signet_hot::{
model::{HotKv, HotKvError, HotKvWrite},
tables::{
AccountChangeSets, AccountsHistory, Bytecodes, HeaderNumbers, Headers, NUM_TABLES,
PlainAccountState, PlainStorageState, StorageChangeSets, StorageHistory, Table,
},
};

/// The known table names, used to pre-populate the FSI cache at open time.
const KNOWN_TABLE_NAMES: [&str; NUM_TABLES] = [
Headers::NAME,
HeaderNumbers::NAME,
Bytecodes::NAME,
PlainAccountState::NAME,
PlainStorageState::NAME,
AccountsHistory::NAME,
AccountChangeSets::NAME,
StorageHistory::NAME,
StorageChangeSets::NAME,
];

/// 1 KB in bytes
pub const KILOBYTE: usize = 1024;
Expand Down Expand Up @@ -247,12 +266,11 @@ impl DatabaseArguments {
pub struct DatabaseEnv {
/// Libmdbx-sys environment.
inner: Environment,
/// Cached FixedSizeInfo for tables.
/// Cached FixedSizeInfo for tables, pre-populated at open time.
///
/// Important: Do not manually close these DBIs, like via `mdbx_dbi_close`.
/// More generally, do not dynamically create, re-open, or drop tables at
/// runtime. It's better to perform table creation and migration only once
/// at startup.
/// The standard tables are created and their FSI entries cached during
/// [`DatabaseEnv::open`]. Do not manually close DBIs (e.g. via
/// `mdbx_dbi_close`) or dynamically drop tables at runtime.
fsi_cache: FsiCache,

/// Write lock for when dealing with a read-write environment.
Expand Down Expand Up @@ -366,24 +384,15 @@ impl DatabaseEnv {
// https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
inner_env.set_rp_augment_limit(256 * 1024);

let fsi_cache = Arc::new(RwLock::new(HashMap::new()));
let env = Self { inner: inner_env.open(path)?, fsi_cache, _lock_file };

if kind.is_rw() {
env.create_tables()?;
}
let inner = inner_env.open(path)?;

Ok(env)
}
let fsi_cache = if kind.is_rw() {
create_tables_and_populate_cache(&inner)?
} else {
populate_cache_ro(&inner)?
};

/// Create all standard hot storage tables.
///
/// Called automatically when opening in read-write mode.
fn create_tables(&self) -> Result<(), MdbxError> {
let tx = self.tx_rw()?;
tx.queue_db_init()?;
tx.raw_commit()?;
Ok(())
Ok(Self { inner, fsi_cache, _lock_file })
}

/// Start a new read-only transaction.
Expand Down Expand Up @@ -431,3 +440,41 @@ impl HotKv for DatabaseEnv {
self.tx_rw().map_err(HotKvError::from_err)
}
}

/// Create all standard hot storage tables and return a pre-populated
/// [`FsiCache`]. Called during RW open.
fn create_tables_and_populate_cache(env: &Environment) -> Result<FsiCache, MdbxError> {
let inner_tx = env.begin_rw_unsync().map_err(MdbxError::Mdbx)?;
// Tx requires an FsiCache, so we pass a throwaway empty one. The FSI
// entries written by queue_db_init's store_fsi calls land in this
// temporary cache's dynamic map — they are discarded. We re-read the
// authoritative values from the metadata table via read_known_fsi.
let tmp_cache = FsiCache::new(Default::default());
let tx = Tx::new(inner_tx, tmp_cache);
tx.queue_db_init()?;

let known = read_known_fsi(&tx)?;
tx.raw_commit()?;
Ok(FsiCache::new(known))
}

/// Read FSI entries for all known tables from the metadata table.
fn read_known_fsi<K: signet_libmdbx::TransactionKind>(
tx: &Tx<K>,
) -> Result<[(&'static str, FixedSizeInfo); NUM_TABLES], MdbxError> {
let mut known = [("", FixedSizeInfo::None); NUM_TABLES];
for (i, &name) in KNOWN_TABLE_NAMES.iter().enumerate() {
known[i] = (name, tx.read_fsi_from_table(name)?);
}
Ok(known)
}

/// Read FSI entries for all known tables via a temporary RO transaction.
/// Called during RO open.
fn populate_cache_ro(env: &Environment) -> Result<FsiCache, MdbxError> {
let inner_tx = env.begin_ro_unsync().map_err(MdbxError::Mdbx)?;
let tmp_cache = FsiCache::new(Default::default());
let tx = Tx::new(inner_tx, tmp_cache);
let known = read_known_fsi(&tx)?;
Ok(FsiCache::new(known))
}
15 changes: 9 additions & 6 deletions crates/hot-mdbx/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ impl<K: TransactionKind> Tx<K> {
}

/// Reads FixedSizeInfo from the metadata table.
fn read_fsi_from_table(&self, name: &'static str) -> Result<FixedSizeInfo, MdbxError> {
pub(crate) fn read_fsi_from_table(
&self,
name: &'static str,
) -> Result<FixedSizeInfo, MdbxError> {
let db = self.inner.open_db(None)?;

let data: [u8; 8] = self
Expand All @@ -54,13 +57,13 @@ impl<K: TransactionKind> Tx<K> {

/// Gets cached FixedSizeInfo for a table.
pub fn get_fsi(&self, name: &'static str) -> Result<FixedSizeInfo, MdbxError> {
// Fast path: read lock
if let Some(&fsi) = self.fsi_cache.read().get(name) {
// Fast path: lock-free scan over known tables, then locked dynamic map.
if let Some(fsi) = self.fsi_cache.get(name) {
return Ok(fsi);
}
// Slow path: read from table, then write lock
// Slow path: read from table, then insert into dynamic map.
let fsi = self.read_fsi_from_table(name)?;
self.fsi_cache.write().insert(name, fsi);
self.fsi_cache.insert_dynamic(name, fsi);
Ok(fsi)
}

Expand Down Expand Up @@ -135,7 +138,7 @@ impl<K: TransactionKind + WriteMarker> Tx<K> {
fsi.encode_value_to(&mut value_buf.as_mut_slice());

self.inner.put(db, fsi_name_to_key(table).as_slice(), value_buf, WriteFlags::UPSERT)?;
self.fsi_cache.write().insert(table, fsi);
self.fsi_cache.insert_dynamic(table, fsi);

Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions crates/hot/src/tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ mod macros;
mod definitions;
pub use definitions::*;

/// The number of standard hot storage tables created by
/// [`queue_db_init`](crate::model::HotKvWrite::queue_db_init). Update this
/// constant whenever a table is added to or removed from `queue_db_init`.
pub const NUM_TABLES: usize = 9;

use crate::{
DeserError, KeySer, MAX_FIXED_VAL_SIZE, MAX_KEY_SIZE, ValSer,
model::{DualKeyValue, KeyValue},
Expand Down
Loading