Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ pub enum MdbxError {
/// Operation requires DUP_FIXED flag on database.
#[error("operation requires DUP_FIXED flag on database")]
RequiresDupFixed,
/// Failed to open multiple read transactions on the same MVCC snapshot.
///
/// Concurrent write pressure prevented acquiring a consistent set of
/// read transactions within the retry limit.
#[error("failed to acquire consistent MVCC snapshot across multiple read transactions")]
SnapshotDivergence,
}

impl MdbxError {
Expand Down Expand Up @@ -263,6 +269,7 @@ impl MdbxError {
Self::BotchedTransaction => -96001,
Self::RequiresDupSort => -96002,
Self::RequiresDupFixed => -96003,
Self::SnapshotDivergence => -96004,
Self::Permission => ffi::MDBX_EPERM,
Self::Other(err_code) => *err_code,
}
Expand Down
88 changes: 88 additions & 0 deletions src/sys/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ use std::{
time::Duration,
};

/// Maximum retry count for the optimistic snapshot-matching loop in
/// [`Environment::begin_ro_sync_multi`] and
/// [`Environment::begin_ro_unsync_multi`].
const MAX_MULTI_RETRIES: usize = 16;

/// An environment supports multiple databases, all residing in the same shared-memory map.
///
/// Accessing the environment is thread-safe.
Expand Down Expand Up @@ -116,6 +121,89 @@ impl Environment {
RwTxUnsync::begin(self.clone())
}

/// Open `n` read-only synchronized transactions guaranteed to share the
/// same MVCC snapshot.
///
/// This enables safe parallel iteration over large tables using multiple
/// cursors on separate threads without risking snapshot divergence.
///
/// Uses an optimistic open-and-verify loop: transactions are opened
/// sequentially, then their snapshot IDs are compared. If a writer
/// commits between opens causing divergence, all transactions are
/// dropped and the process retries. Returns
/// [`MdbxError::SnapshotDivergence`] if retries are exhausted.
///
/// # Examples
///
/// ```no_run
/// # use signet_libmdbx::{Environment, Geometry, MdbxResult};
/// # use std::path::Path;
/// # fn main() -> MdbxResult<()> {
/// let env = Environment::builder()
/// .set_geometry(Geometry {
/// size: Some(0..(1024 * 1024 * 1024)),
/// ..Default::default()
/// })
/// .open(Path::new("/tmp/my_database"))?;
///
/// // Open 4 read transactions on the same snapshot
/// let txns = env.begin_ro_sync_multi(4)?;
/// // All transactions see the same data
/// # Ok(())
/// # }
/// ```
pub fn begin_ro_sync_multi(&self, n: usize) -> MdbxResult<Vec<RoTxSync>> {
self.begin_ro_multi(n, Self::begin_ro_sync, |tx| tx.id())
}

/// Open `n` read-only unsynchronized transactions guaranteed to share the
/// same MVCC snapshot.
///
/// This is the `!Sync` counterpart to [`begin_ro_sync_multi`]. The
/// returned transactions cannot be shared between threads, but offer
/// ~30% lower overhead per operation.
///
/// See [`begin_ro_sync_multi`] for details on the optimistic retry
/// behavior.
///
/// [`begin_ro_sync_multi`]: Self::begin_ro_sync_multi
pub fn begin_ro_unsync_multi(&self, n: usize) -> MdbxResult<Vec<RoTxUnsync>> {
self.begin_ro_multi(n, Self::begin_ro_unsync, |tx| tx.id())
}

/// Open `n` read-only transactions guaranteed to share the same MVCC
/// snapshot.
///
/// This is the generic implementation backing both
/// [`begin_ro_sync_multi`] and [`begin_ro_unsync_multi`].
///
/// [`begin_ro_sync_multi`]: Self::begin_ro_sync_multi
/// [`begin_ro_unsync_multi`]: Self::begin_ro_unsync_multi
fn begin_ro_multi<T>(
&self,
n: usize,
begin: fn(&Self) -> MdbxResult<T>,
id: fn(&T) -> MdbxResult<u64>,
) -> MdbxResult<Vec<T>> {
if n == 0 {
return Ok(Vec::new());
}
if n == 1 {
return begin(self).map(|t| vec![t]);
}

for _ in 0..MAX_MULTI_RETRIES {
let txns: Vec<T> = (0..n).map(|_| begin(self)).collect::<MdbxResult<_>>()?;

let first = id(&txns[0])?;
if txns[1..].iter().all(|t| id(t) == Ok(first)) {
return Ok(txns);
}
}

Err(MdbxError::SnapshotDivergence)
}

/// Returns a raw pointer to the underlying MDBX environment.
///
/// The caller **must** ensure that the pointer is never dereferenced after the environment has
Expand Down
5 changes: 4 additions & 1 deletion src/tx/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ where
/// Returns the transaction id.
#[inline(always)]
pub fn id(&self) -> MdbxResult<u64> {
self.with_txn_ptr(|txn_ptr| Ok(unsafe { ffi::mdbx_txn_id(txn_ptr) }))
self.with_txn_ptr(|txn_ptr| {
let id = unsafe { ffi::mdbx_txn_id(txn_ptr) };
if id == 0 { Err(MdbxError::BadTxn) } else { Ok(id) }
})
}

/// Gets an item from a database.
Expand Down
120 changes: 120 additions & 0 deletions tests/multi_ro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#![allow(missing_docs)]
use signet_libmdbx::*;
use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
};
use tempfile::tempdir;

#[test]
fn begin_ro_sync_multi_all_same_snapshot() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();

let txn = env.begin_rw_sync().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap();
txn.commit().unwrap();

let txns = env.begin_ro_sync_multi(4).unwrap();
assert_eq!(txns.len(), 4);

let ids: Vec<u64> = txns.iter().map(|tx| tx.id().unwrap()).collect();
assert!(ids.windows(2).all(|w| w[0] == w[1]));
}

#[test]
fn begin_ro_unsync_multi_all_same_snapshot() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();

let txn = env.begin_rw_sync().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap();
txn.commit().unwrap();

let txns = env.begin_ro_unsync_multi(4).unwrap();
assert_eq!(txns.len(), 4);

let ids: Vec<u64> = txns.iter().map(|tx| tx.id().unwrap()).collect();
assert!(ids.windows(2).all(|w| w[0] == w[1]));
}

#[test]
fn begin_ro_sync_multi_zero_returns_empty() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();

let txns = env.begin_ro_sync_multi(0).unwrap();
assert!(txns.is_empty());
}

#[test]
fn begin_ro_unsync_multi_zero_returns_empty() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();

let txns = env.begin_ro_unsync_multi(0).unwrap();
assert!(txns.is_empty());
}

#[test]
fn begin_ro_sync_multi_one_returns_single() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();

let txns = env.begin_ro_sync_multi(1).unwrap();
assert_eq!(txns.len(), 1);
}

#[test]
fn begin_ro_unsync_multi_one_returns_single() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();

let txns = env.begin_ro_unsync_multi(1).unwrap();
assert_eq!(txns.len(), 1);
}

#[test]
fn begin_ro_sync_multi_consistent_under_write_pressure() {
let dir = tempdir().unwrap();
let env = Environment::builder().open(dir.path()).unwrap();

// Seed the database
let txn = env.begin_rw_sync().unwrap();
let db = txn.create_db(None, DatabaseFlags::empty()).unwrap();
txn.put(db, b"k", b"v", WriteFlags::empty()).unwrap();
txn.commit().unwrap();

let stop = Arc::new(AtomicBool::new(false));

// Writer thread: commit as fast as possible to create snapshot churn
let writer = {
let env = env.clone();
let stop = Arc::clone(&stop);
thread::spawn(move || {
let mut i = 0u64;
while !stop.load(Ordering::Relaxed) {
let txn = env.begin_rw_sync().unwrap();
let db = txn.open_db(None).unwrap();
txn.put(db, b"counter", i.to_le_bytes(), WriteFlags::empty()).unwrap();
txn.commit().unwrap();
i += 1;
}
})
};

// Open multi-txn sets concurrently with writer
for _ in 0..20 {
let txns = env.begin_ro_sync_multi(4).unwrap();
let ids: Vec<u64> = txns.iter().map(|tx| tx.id().unwrap()).collect();
assert!(ids.windows(2).all(|w| w[0] == w[1]), "snapshot divergence detected: {ids:?}");
}

stop.store(true, Ordering::Relaxed);
writer.join().unwrap();
}
Loading