diff --git a/CLAUDE.md b/CLAUDE.md index f8fc8c1..09c55a8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -33,13 +33,28 @@ be mediated via the `TxAccess` trait. ## API Patterns -### Cursor Creation +### Cursor Creation and Caching ```rust let db = txn.open_db(None).unwrap(); // Returns Database (has dbi + flags) let cursor = txn.cursor(db).unwrap(); // Takes Database, NOT raw dbi ``` +Cursors are transparently cached within transactions. When a cursor is +dropped, its raw pointer is returned to the transaction's cache. Subsequent +`cursor()` calls reuse cached pointers, avoiding `mdbx_cursor_open`/ +`mdbx_cursor_close` overhead (~100 ns per cycle). The cache is drained +and all pointers closed on commit or abort. + +`DbCache` (in `src/tx/cache.rs`) stores raw `*mut ffi::MDBX_cursor` +pointers, which makes it `!Send + !Sync` by default. Explicit `unsafe impl +Send + Sync for DbCache` is required because: +- `SyncKind::Cache` requires `Cache + Send` (for `RefCell: Send`) +- `SharedCache` uses `Arc>` which requires `DbCache: Send + Sync` +- This is sound because `Cursor` itself is already `unsafe impl Send + Sync`, + and all access to cached pointers is mediated by `RefCell` (unsync) or + `RwLock` (sync) + ### Database Flags Validation DUP_SORT/DUP_FIXED methods validate flags at runtime: diff --git a/src/tx/cache.rs b/src/tx/cache.rs index ff236bc..8c079cf 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -/// Cache trait for transaction-local database handles. +/// Cache trait for transaction-local database handles and cursors. /// /// This is used by the [`SyncKind`] trait to define the cache type for each /// transaction kind. @@ -38,6 +38,20 @@ pub trait Cache: Clone + Default + std::fmt::Debug { /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi); + + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor>; + + /// Return a cursor to the cache for later reuse. + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor); + + /// Drain all cached cursors, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; + + /// Drain cached cursors for a specific DBI, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; } /// Cached database entry. @@ -73,37 +87,85 @@ impl From for Database { } } -/// Simple cache container for database handles. +/// Simple cache container for database handles and cursor pointers. /// /// Uses inline storage for the common case (most apps use < 16 databases). -#[derive(Debug, Default, Clone)] -#[repr(transparent)] -pub struct DbCache(SmallVec<[CachedDb; 16]>); +#[derive(Debug)] +pub struct DbCache { + dbs: SmallVec<[CachedDb; 16]>, + cursors: SmallVec<[(ffi::MDBX_dbi, *mut ffi::MDBX_cursor); 8]>, +} + +// SAFETY: DbCache contains `*mut ffi::MDBX_cursor` which is `!Send + !Sync`. +// These are raw MDBX cursor pointers bound to a transaction, not a thread. +// `Cursor` itself is already `Send + Sync` (see cursor.rs), so caching the +// same pointers here introduces no new unsoundness. All access to these +// pointers is mediated by `RefCell` (unsync path) or `RwLock` (sync path), +// ensuring no concurrent mutation. +unsafe impl Send for DbCache {} +unsafe impl Sync for DbCache {} + +impl Default for DbCache { + fn default() -> Self { + Self { dbs: SmallVec::new(), cursors: SmallVec::new() } + } +} + +impl Clone for DbCache { + fn clone(&self) -> Self { + Self { dbs: self.dbs.clone(), cursors: SmallVec::new() } + } +} impl DbCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - for entry in self.0.iter() { - if entry.name_hash == name_hash { - return Some(entry.db); - } - } - None + self.dbs.iter().find(|e| e.name_hash == name_hash).map(|e| e.db) } /// Write a database entry to the cache. fn write_db(&mut self, db: CachedDb) { - for entry in self.0.iter() { - if entry.name_hash == db.name_hash { - return; // Another thread beat us - } + if self.dbs.iter().any(|e| e.name_hash == db.name_hash) { + return; } - self.0.push(db); + self.dbs.push(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&mut self, dbi: ffi::MDBX_dbi) { - self.0.retain(|entry| entry.db.dbi() != dbi); + self.dbs.retain(|entry| entry.db.dbi() != dbi); + } + + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&mut self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.cursors.iter().position(|(d, _)| *d == dbi).map(|i| self.cursors.swap_remove(i).1) + } + + /// Return a cursor to the cache for later reuse. + fn return_cursor(&mut self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.cursors.push((dbi, cursor)); + } + + /// Drain all cached cursors, returning their raw pointers. + fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.cursors.drain(..).map(|(_, c)| c).collect() + } + + /// Drain cached cursors for a specific DBI, returning their raw pointers. + fn drain_cursors_for_dbi( + &mut self, + dbi: ffi::MDBX_dbi, + ) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + let mut drained = SmallVec::new(); + self.cursors.retain(|(d, c)| { + if *d == dbi { + drained.push(*c); + false + } else { + true + } + }); + drained } } @@ -135,20 +197,33 @@ impl SharedCache { impl Cache for SharedCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - let cache = self.read(); - cache.read_db(name_hash) + self.read().read_db(name_hash) } /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.write(); - cache.write_db(db); + self.write().write_db(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.write(); - cache.remove_dbi(dbi); + self.write().remove_dbi(dbi); + } + + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.write().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.write().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.write().drain_cursors() + } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.write().drain_cursors_for_dbi(dbi) } } @@ -161,19 +236,32 @@ impl Default for SharedCache { impl Cache for RefCell { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - let cache = self.borrow(); - cache.read_db(name_hash) + self.borrow().read_db(name_hash) } /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.borrow_mut(); - cache.write_db(db); + self.borrow_mut().write_db(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.borrow_mut(); - cache.remove_dbi(dbi); + self.borrow_mut().remove_dbi(dbi); + } + + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.borrow_mut().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.borrow_mut().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors() + } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors_for_dbi(dbi) } } diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index 9c058e5..e10ccd4 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -6,6 +6,7 @@ use crate::{ tx::{ TxPtrAccess, aliases::IterKeyVals, + cache::Cache, iter::{Iter, IterDup, IterDupFixed, IterDupFixedOfKey, IterDupOfKey}, kind::WriteMarker, }, @@ -33,6 +34,7 @@ where K: TransactionKind, { access: &'tx K::Access, + cache: &'tx K::Cache, cursor: *mut ffi::MDBX_cursor, db: Database, _kind: PhantomData, @@ -43,12 +45,28 @@ where K: TransactionKind, { /// Creates a new cursor from a reference to a transaction access type. - pub(crate) fn new(access: &'tx K::Access, db: Database) -> MdbxResult { + pub(crate) fn new( + access: &'tx K::Access, + cache: &'tx K::Cache, + db: Database, + ) -> MdbxResult { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); access.with_txn_ptr(|txn_ptr| unsafe { mdbx_result(ffi::mdbx_cursor_open(txn_ptr, db.dbi(), &mut cursor)) })?; - Ok(Self { access, cursor, db, _kind: PhantomData }) + Ok(Self { access, cache, cursor, db, _kind: PhantomData }) + } + + /// Wraps an existing raw cursor pointer with cache support. + /// + /// The cursor must already be bound to the correct transaction and DBI. + pub(crate) const fn from_raw( + access: &'tx K::Access, + cache: &'tx K::Cache, + cursor: *mut ffi::MDBX_cursor, + db: Database, + ) -> Self { + Self { access, cache, cursor, db, _kind: PhantomData } } /// Helper function for `Clone`. This should only be invoked within @@ -58,12 +76,20 @@ where let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); + if let Err(e) = mdbx_result(res) { + // Close directly — do NOT construct Self, as Drop would + // push this unbound cursor into the cache. + ffi::mdbx_cursor_close(cursor); + return Err(e); + } - let s = Self { access: other.access, cursor, db: other.db, _kind: PhantomData }; - - mdbx_result(res)?; - - Ok(s) + Ok(Self { + access: other.access, + cache: other.cache, + cursor, + db: other.db, + _kind: PhantomData, + }) } } @@ -1072,11 +1098,10 @@ where K: TransactionKind, { fn drop(&mut self) { - // MDBX cursors MUST be closed. Failure to do so is a memory leak. - // - // To be able to close a cursor of a timed out transaction, we need to - // renew it first. Hence the usage of `with_txn_ptr_for_cleanup` here. - self.access.with_txn_ptr(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }); + // Return the cursor pointer to the transaction cache for reuse. + // The transaction's commit/drop path will call mdbx_cursor_close on + // all cached pointers once the transaction is still valid. + self.cache.return_cursor(self.db.dbi(), self.cursor); } } diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 70e3513..89782ca 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -47,7 +47,7 @@ impl fmt::Debug for TxMeta { /// /// [`TxSync`]: crate::tx::aliases::TxSync /// [`TxUnsync`]: crate::tx::aliases::TxUnsync -pub struct Tx::Access> { +pub struct Tx::Access> { txn: U, cache: K::Cache, @@ -55,7 +55,7 @@ pub struct Tx::Access> { meta: TxMeta, } -impl fmt::Debug for Tx { +impl fmt::Debug for Tx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Tx").finish_non_exhaustive() } @@ -224,12 +224,26 @@ where /// Closes the database handle. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the handle is closed. + /// /// # Safety /// /// This will invalidate data cached in [`Database`] instances with the /// DBI, and may result in bad behavior when using those instances after /// calling this function. pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> MdbxResult<()> { + // Drain and close any cached cursors for this DBI before closing it. + let stale = self.cache.drain_cursors_for_dbi(dbi); + if !stale.is_empty() { + self.with_txn_ptr(|_| { + for cursor in stale { + // SAFETY: cursor pointers are valid — returned by + // Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } // SAFETY: Caller ensures no other references exist. unsafe { ops::close_db_raw(self.meta.env.env_ptr(), dbi) }?; self.cache.remove_dbi(dbi); @@ -238,11 +252,42 @@ where /// Opens a cursor on the given database. /// - /// Multiple cursors can be open simultaneously on different databases - /// within the same transaction. The cursor borrows the transaction's - /// inner access type, allowing concurrent cursor operations. + /// Cursors are transparently cached: dropped cursors return their + /// raw pointer to the cache, and subsequent calls reuse them without + /// a new `mdbx_cursor_open` allocation. Cached cursors are renewed + /// via `mdbx_cursor_renew` to reset their position. pub fn cursor(&self, db: Database) -> MdbxResult> { - Cursor::new(&self.txn, db) + if let Some(raw) = self.cache.take_cursor(db.dbi()) { + self.with_txn_ptr(|txn_ptr| { + // SAFETY: txn_ptr is valid from with_txn_ptr, raw is a + // valid cursor pointer returned by a prior Cursor::drop. + let rc = unsafe { ffi::mdbx_cursor_renew(txn_ptr, raw) }; + mdbx_result(rc)?; + Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) + }) + } else { + Cursor::new(&self.txn, &self.cache, db) + } + } + + /// Drains the cursor cache and closes all cached cursor pointers. + /// + /// Must be called before commit or abort to ensure all cursors are + /// closed while the transaction is still valid. + /// + /// NB: keep in sync with the inlined logic in `Tx::Drop`. + fn drain_cached_cursors(&self) { + let cursors = self.cache.drain_cursors(); + if cursors.is_empty() { + return; + } + self.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers are valid — they were returned + // by Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); } } @@ -459,12 +504,26 @@ impl Tx { /// Drops the database from the environment. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the database is dropped. + /// /// # Safety /// /// Caller must ensure no [`Cursor`] or other references to the database /// exist. [`Database`] instances with the DBI will be invalidated, and /// use after calling this function may result in bad behavior. pub unsafe fn drop_db(&self, db: Database) -> MdbxResult<()> { + // Drain and close any cached cursors for this DBI before dropping it. + let stale = self.cache.drain_cursors_for_dbi(db.dbi()); + if !stale.is_empty() { + self.with_txn_ptr(|_| { + for cursor in stale { + // SAFETY: cursor pointers are valid — returned by + // Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } self.with_txn_ptr(|txn| { // SAFETY: txn is a valid RW transaction pointer, caller ensures // no other references to dbi exist. @@ -489,6 +548,8 @@ where /// /// SAFETY: latency pointer must be valid for the duration of the commit. fn commit_inner(self, latency: *mut MDBX_commit_latency) -> MdbxResult<()> { + self.drain_cached_cursors(); + let was_aborted = self.with_txn_ptr(|txn| { if K::IS_READ_ONLY { mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency) }) @@ -542,6 +603,8 @@ where // span scope. let _guard = self.meta.span.clone().entered(); + self.drain_cached_cursors(); + // SAFETY: txn_ptr is valid from with_txn_ptr. let was_aborted = self.with_txn_ptr(|txn_ptr| unsafe { ops::commit_raw(txn_ptr, latency) })?; @@ -652,6 +715,32 @@ where } } +// NOTE: This impl is on Tx with free U, not Tx (where U = K::Access). +// Rust requires Drop bounds to match the struct definition exactly, so we +// cannot call `self.drain_cached_cursors()` here (it lives on `impl Tx`). +// NB: keep in sync with `drain_cached_cursors`. +// The drain-and-close logic is inlined instead. +impl Drop for Tx +where + K: TransactionKind, + U: TxPtrAccess, +{ + fn drop(&mut self) { + let cursors = self.cache.drain_cursors(); + if cursors.is_empty() { + return; + } + self.txn.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers were returned by Cursor::drop + // during the lifetime of this transaction, which is still + // alive (we are in Tx::drop, before txn ptr is dropped). + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/cursor.rs b/tests/cursor.rs index c5d6cfa..619d9a4 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1729,6 +1729,277 @@ fn test_put_multiple_empty_values_v2() { test_put_multiple_empty_values_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_v1() { + test_cursor_cache_reuse_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_v2() { + test_cursor_cache_reuse_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_multiple_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"a", b"1", WriteFlags::empty()).unwrap(); + txn.put(db, b"b", b"2", WriteFlags::empty()).unwrap(); + txn.put(db, b"c", b"3", WriteFlags::empty()).unwrap(); + + // Open two cursors, drop both (both return to cache) + { + let _c1 = txn.cursor(db).unwrap(); + let _c2 = txn.cursor(db).unwrap(); + } + + // Open two again — both should reuse cached pointers + { + let mut c1 = txn.cursor(db).unwrap(); + let mut c2 = txn.cursor(db).unwrap(); + + let (k1, _) = c1.first::, Vec>().unwrap().unwrap(); + let (k2, _) = c2.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k1, b"a"); + assert_eq!(&k2, b"c"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_multiple_v1() { + test_cursor_cache_multiple_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_multiple_v2() { + test_cursor_cache_multiple_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v1() { + test_cursor_cache_repeated_cycles_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v2() { + test_cursor_cache_repeated_cycles_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_reuse_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + // Populate via RW, then commit + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + // Test cursor caching in RO txn + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_ro_v1() { + test_cursor_cache_reuse_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_ro_v2() { + test_cursor_cache_reuse_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v1() { + test_cursor_cache_repeated_cycles_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v2() { + test_cursor_cache_repeated_cycles_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_reuse_across_writes_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + + // cursor -> read -> drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Write new data (B-tree COW) + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // cursor (from cache) -> read -> should see updated data + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + + // Verify both entries visible + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v1() { + test_cursor_cache_reuse_across_writes_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v2() { + test_cursor_cache_reuse_across_writes_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test]