From 1cc149d72b6a5caa8c32ddad39c1a0f3a02c514f Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:12:40 -0400 Subject: [PATCH 01/11] feat: add cursor caching methods to Cache trait Extend DbCache with cursor pointer storage and add take_cursor, return_cursor, drain_cursors to the Cache trait. Both RefCell and SharedCache implement the new methods. Co-Authored-By: Claude Sonnet 4.6 --- src/tx/cache.rs | 117 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 88 insertions(+), 29 deletions(-) diff --git a/src/tx/cache.rs b/src/tx/cache.rs index ff236bc..2767062 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, }; -/// Cache trait for transaction-local database handles. +/// Cache trait for transaction-local database handles and cursors. /// /// This is used by the [`SyncKind`] trait to define the cache type for each /// transaction kind. @@ -38,6 +38,16 @@ pub trait Cache: Clone + Default + std::fmt::Debug { /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi); + + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor>; + + /// Return a cursor to the cache for later reuse. + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor); + + /// Drain all cached cursors, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; } /// Cached database entry. @@ -73,37 +83,68 @@ impl From for Database { } } -/// Simple cache container for database handles. +/// Simple cache container for database handles and cursor pointers. /// /// Uses inline storage for the common case (most apps use < 16 databases). -#[derive(Debug, Default, Clone)] -#[repr(transparent)] -pub struct DbCache(SmallVec<[CachedDb; 16]>); +#[derive(Debug)] +pub struct DbCache { + dbs: SmallVec<[CachedDb; 16]>, + cursors: SmallVec<[(ffi::MDBX_dbi, *mut ffi::MDBX_cursor); 8]>, +} + +// SAFETY: DbCache contains `*mut ffi::MDBX_cursor` which is `!Send + !Sync`. +// These are raw MDBX cursor pointers bound to a transaction, not a thread. +// `Cursor` itself is already `Send + Sync` (see cursor.rs), so caching the +// same pointers here introduces no new unsoundness. All access to these +// pointers is mediated by `RefCell` (unsync path) or `RwLock` (sync path), +// ensuring no concurrent mutation. +unsafe impl Send for DbCache {} +unsafe impl Sync for DbCache {} + +impl Default for DbCache { + fn default() -> Self { + Self { dbs: SmallVec::new(), cursors: SmallVec::new() } + } +} + +impl Clone for DbCache { + fn clone(&self) -> Self { + Self { dbs: self.dbs.clone(), cursors: SmallVec::new() } + } +} impl DbCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - for entry in self.0.iter() { - if entry.name_hash == name_hash { - return Some(entry.db); - } - } - None + self.dbs.iter().find(|e| e.name_hash == name_hash).map(|e| e.db) } /// Write a database entry to the cache. fn write_db(&mut self, db: CachedDb) { - for entry in self.0.iter() { - if entry.name_hash == db.name_hash { - return; // Another thread beat us - } + if self.dbs.iter().any(|e| e.name_hash == db.name_hash) { + return; } - self.0.push(db); + self.dbs.push(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&mut self, dbi: ffi::MDBX_dbi) { - self.0.retain(|entry| entry.db.dbi() != dbi); + self.dbs.retain(|entry| entry.db.dbi() != dbi); + } + + /// Take a cached cursor for the given DBI, if one exists. + fn take_cursor(&mut self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.cursors.iter().position(|(d, _)| *d == dbi).map(|i| self.cursors.swap_remove(i).1) + } + + /// Return a cursor to the cache for later reuse. + fn return_cursor(&mut self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.cursors.push((dbi, cursor)); + } + + /// Drain all cached cursors, returning their raw pointers. + fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.cursors.drain(..).map(|(_, c)| c).collect() } } @@ -135,20 +176,29 @@ impl SharedCache { impl Cache for SharedCache { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - let cache = self.read(); - cache.read_db(name_hash) + self.read().read_db(name_hash) } /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.write(); - cache.write_db(db); + self.write().write_db(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.write(); - cache.remove_dbi(dbi); + self.write().remove_dbi(dbi); + } + + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.write().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.write().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.write().drain_cursors() } } @@ -161,19 +211,28 @@ impl Default for SharedCache { impl Cache for RefCell { /// Read a database entry from the cache. fn read_db(&self, name_hash: u64) -> Option { - let cache = self.borrow(); - cache.read_db(name_hash) + self.borrow().read_db(name_hash) } /// Write a database entry to the cache. fn write_db(&self, db: CachedDb) { - let mut cache = self.borrow_mut(); - cache.write_db(db); + self.borrow_mut().write_db(db); } /// Remove a database entry from the cache by dbi. fn remove_dbi(&self, dbi: ffi::MDBX_dbi) { - let mut cache = self.borrow_mut(); - cache.remove_dbi(dbi); + self.borrow_mut().remove_dbi(dbi); + } + + fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> { + self.borrow_mut().take_cursor(dbi) + } + + fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) { + self.borrow_mut().return_cursor(dbi, cursor); + } + + fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors() } } From d5868e5286cdf85667105bacfa0900fe0cfb52c4 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:18:26 -0400 Subject: [PATCH 02/11] feat: add cache field to Cursor, return pointer on drop Cursor now holds a &'tx K::Cache reference. Drop returns the raw pointer to the cache instead of calling mdbx_cursor_close. Add from_raw constructor for cache-hit path. Co-Authored-By: Claude Sonnet 4.6 --- src/tx/cursor.rs | 39 +++++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index 9c058e5..b0c389a 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -6,6 +6,7 @@ use crate::{ tx::{ TxPtrAccess, aliases::IterKeyVals, + cache::Cache, iter::{Iter, IterDup, IterDupFixed, IterDupFixedOfKey, IterDupOfKey}, kind::WriteMarker, }, @@ -33,6 +34,7 @@ where K: TransactionKind, { access: &'tx K::Access, + cache: &'tx K::Cache, cursor: *mut ffi::MDBX_cursor, db: Database, _kind: PhantomData, @@ -43,12 +45,28 @@ where K: TransactionKind, { /// Creates a new cursor from a reference to a transaction access type. - pub(crate) fn new(access: &'tx K::Access, db: Database) -> MdbxResult { + pub(crate) fn new( + access: &'tx K::Access, + cache: &'tx K::Cache, + db: Database, + ) -> MdbxResult { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); access.with_txn_ptr(|txn_ptr| unsafe { mdbx_result(ffi::mdbx_cursor_open(txn_ptr, db.dbi(), &mut cursor)) })?; - Ok(Self { access, cursor, db, _kind: PhantomData }) + Ok(Self { access, cache, cursor, db, _kind: PhantomData }) + } + + /// Wraps an existing raw cursor pointer with cache support. + /// + /// The cursor must already be bound to the correct transaction and DBI. + pub(crate) fn from_raw( + access: &'tx K::Access, + cache: &'tx K::Cache, + cursor: *mut ffi::MDBX_cursor, + db: Database, + ) -> Self { + Self { access, cache, cursor, db, _kind: PhantomData } } /// Helper function for `Clone`. This should only be invoked within @@ -59,7 +77,13 @@ where let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); - let s = Self { access: other.access, cursor, db: other.db, _kind: PhantomData }; + let s = Self { + access: other.access, + cache: other.cache, + cursor, + db: other.db, + _kind: PhantomData, + }; mdbx_result(res)?; @@ -1072,11 +1096,10 @@ where K: TransactionKind, { fn drop(&mut self) { - // MDBX cursors MUST be closed. Failure to do so is a memory leak. - // - // To be able to close a cursor of a timed out transaction, we need to - // renew it first. Hence the usage of `with_txn_ptr_for_cleanup` here. - self.access.with_txn_ptr(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }); + // Return the cursor pointer to the transaction cache for reuse. + // The transaction's commit/drop path will call mdbx_cursor_close on + // all cached pointers once the transaction is still valid. + self.cache.return_cursor(self.db.dbi(), self.cursor); } } From 158d9184f9d89e09271d58c1bdcb4eeb9d00cc6b Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:21:03 -0400 Subject: [PATCH 03/11] feat: cache-aware Tx::cursor() and drain on commit/drop Tx::cursor() checks the cursor cache before allocating. Commit and drop paths drain cached cursors inside with_txn_ptr to ensure all FFI close calls are properly serialized. Co-Authored-By: Claude Sonnet 4.6 --- src/tx/cursor.rs | 2 +- src/tx/impl.rs | 59 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index b0c389a..bb2b92d 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -60,7 +60,7 @@ where /// Wraps an existing raw cursor pointer with cache support. /// /// The cursor must already be bound to the correct transaction and DBI. - pub(crate) fn from_raw( + pub(crate) const fn from_raw( access: &'tx K::Access, cache: &'tx K::Cache, cursor: *mut ffi::MDBX_cursor, diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 70e3513..1b45012 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -47,7 +47,7 @@ impl fmt::Debug for TxMeta { /// /// [`TxSync`]: crate::tx::aliases::TxSync /// [`TxUnsync`]: crate::tx::aliases::TxUnsync -pub struct Tx::Access> { +pub struct Tx::Access> { txn: U, cache: K::Cache, @@ -55,7 +55,7 @@ pub struct Tx::Access> { meta: TxMeta, } -impl fmt::Debug for Tx { +impl fmt::Debug for Tx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Tx").finish_non_exhaustive() } @@ -238,11 +238,33 @@ where /// Opens a cursor on the given database. /// - /// Multiple cursors can be open simultaneously on different databases - /// within the same transaction. The cursor borrows the transaction's - /// inner access type, allowing concurrent cursor operations. + /// Cursors are transparently cached: dropped cursors return their + /// raw pointer to the cache, and subsequent calls reuse them without + /// a new `mdbx_cursor_open` allocation. pub fn cursor(&self, db: Database) -> MdbxResult> { - Cursor::new(&self.txn, db) + if let Some(raw) = self.cache.take_cursor(db.dbi()) { + Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) + } else { + Cursor::new(&self.txn, &self.cache, db) + } + } + + /// Drains the cursor cache and closes all cached cursor pointers. + /// + /// Must be called before commit or abort to ensure all cursors are + /// closed while the transaction is still valid. + fn drain_cached_cursors(&self) { + let cursors = self.cache.drain_cursors(); + if cursors.is_empty() { + return; + } + self.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers are valid — they were returned + // by Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); } } @@ -489,6 +511,8 @@ where /// /// SAFETY: latency pointer must be valid for the duration of the commit. fn commit_inner(self, latency: *mut MDBX_commit_latency) -> MdbxResult<()> { + self.drain_cached_cursors(); + let was_aborted = self.with_txn_ptr(|txn| { if K::IS_READ_ONLY { mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, latency) }) @@ -542,6 +566,8 @@ where // span scope. let _guard = self.meta.span.clone().entered(); + self.drain_cached_cursors(); + // SAFETY: txn_ptr is valid from with_txn_ptr. let was_aborted = self.with_txn_ptr(|txn_ptr| unsafe { ops::commit_raw(txn_ptr, latency) })?; @@ -652,6 +678,27 @@ where } } +impl Drop for Tx +where + K: TransactionKind, + U: TxPtrAccess, +{ + fn drop(&mut self) { + let cursors = self.cache.drain_cursors(); + if cursors.is_empty() { + return; + } + self.txn.with_txn_ptr(|_| { + for cursor in cursors { + // SAFETY: cursor pointers were returned by Cursor::drop + // during the lifetime of this transaction, which is still + // alive (we are in Tx::drop, before txn ptr is dropped). + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } +} + #[cfg(test)] mod tests { use super::*; From ab5817d6a5ecd5834c3c0f5f86b13b071d726d5a Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:25:13 -0400 Subject: [PATCH 04/11] test: add cursor caching tests Verify cursor reuse across open/drop cycles, multiple cursors on the same DB, and repeated cycle stability. Co-Authored-By: Claude Sonnet 4.6 --- tests/cursor.rs | 128 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/tests/cursor.rs b/tests/cursor.rs index c5d6cfa..f655dee 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1729,6 +1729,134 @@ fn test_put_multiple_empty_values_v2() { test_put_multiple_empty_values_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_v1() { + test_cursor_cache_reuse_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_v2() { + test_cursor_cache_reuse_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_multiple_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"a", b"1", WriteFlags::empty()).unwrap(); + txn.put(db, b"b", b"2", WriteFlags::empty()).unwrap(); + txn.put(db, b"c", b"3", WriteFlags::empty()).unwrap(); + + // Open two cursors, drop both (both return to cache) + { + let _c1 = txn.cursor(db).unwrap(); + let _c2 = txn.cursor(db).unwrap(); + } + + // Open two again — both should reuse cached pointers + { + let mut c1 = txn.cursor(db).unwrap(); + let mut c2 = txn.cursor(db).unwrap(); + + let (k1, _) = c1.first::, Vec>().unwrap().unwrap(); + let (k2, _) = c2.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k1, b"a"); + assert_eq!(&k2, b"c"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_multiple_v1() { + test_cursor_cache_multiple_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_multiple_v2() { + test_cursor_cache_multiple_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v1() { + test_cursor_cache_repeated_cycles_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_v2() { + test_cursor_cache_repeated_cycles_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test] From 3ceb76afaa2db2fa33ef43c0124dc97111b17e95 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 7 Apr 2026 11:30:26 -0400 Subject: [PATCH 05/11] docs: document cursor caching Send/Sync and Drop constraint Document the unsafe Send + Sync impls on DbCache in CLAUDE.md with full justification. Add code comment explaining why Tx::Drop inlines drain logic instead of calling drain_cached_cursors (type system constraint: Drop is on Tx, helper is on Tx). --- CLAUDE.md | 17 ++++++++++++++++- src/tx/impl.rs | 4 ++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index f8fc8c1..09c55a8 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -33,13 +33,28 @@ be mediated via the `TxAccess` trait. ## API Patterns -### Cursor Creation +### Cursor Creation and Caching ```rust let db = txn.open_db(None).unwrap(); // Returns Database (has dbi + flags) let cursor = txn.cursor(db).unwrap(); // Takes Database, NOT raw dbi ``` +Cursors are transparently cached within transactions. When a cursor is +dropped, its raw pointer is returned to the transaction's cache. Subsequent +`cursor()` calls reuse cached pointers, avoiding `mdbx_cursor_open`/ +`mdbx_cursor_close` overhead (~100 ns per cycle). The cache is drained +and all pointers closed on commit or abort. + +`DbCache` (in `src/tx/cache.rs`) stores raw `*mut ffi::MDBX_cursor` +pointers, which makes it `!Send + !Sync` by default. Explicit `unsafe impl +Send + Sync for DbCache` is required because: +- `SyncKind::Cache` requires `Cache + Send` (for `RefCell: Send`) +- `SharedCache` uses `Arc>` which requires `DbCache: Send + Sync` +- This is sound because `Cursor` itself is already `unsafe impl Send + Sync`, + and all access to cached pointers is mediated by `RefCell` (unsync) or + `RwLock` (sync) + ### Database Flags Validation DUP_SORT/DUP_FIXED methods validate flags at runtime: diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 1b45012..777efea 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -678,6 +678,10 @@ where } } +// NOTE: This impl is on Tx with free U, not Tx (where U = K::Access). +// Rust requires Drop bounds to match the struct definition exactly, so we +// cannot call `self.drain_cached_cursors()` here (it lives on `impl Tx`). +// The drain-and-close logic is inlined instead. impl Drop for Tx where K: TransactionKind, From b97eff257a0f96d29ec0d8b67edaf9412c8fad76 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:30:11 -0400 Subject: [PATCH 06/11] fix: drain cached cursors on close_db and drop_db Prevents use-after-free when a DBI is closed or dropped while stale cursor pointers for it remain in the cursor cache. --- src/tx/cache.rs | 29 +++++++++++++++++++++++++++++ src/tx/impl.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/tx/cache.rs b/src/tx/cache.rs index 2767062..8c079cf 100644 --- a/src/tx/cache.rs +++ b/src/tx/cache.rs @@ -48,6 +48,10 @@ pub trait Cache: Clone + Default + std::fmt::Debug { /// Drain all cached cursors, returning their raw pointers. /// The caller is responsible for closing them via FFI. fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; + + /// Drain cached cursors for a specific DBI, returning their raw pointers. + /// The caller is responsible for closing them via FFI. + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>; } /// Cached database entry. @@ -146,6 +150,23 @@ impl DbCache { fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.cursors.drain(..).map(|(_, c)| c).collect() } + + /// Drain cached cursors for a specific DBI, returning their raw pointers. + fn drain_cursors_for_dbi( + &mut self, + dbi: ffi::MDBX_dbi, + ) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + let mut drained = SmallVec::new(); + self.cursors.retain(|(d, c)| { + if *d == dbi { + drained.push(*c); + false + } else { + true + } + }); + drained + } } /// Simple cache container for database handles. @@ -200,6 +221,10 @@ impl Cache for SharedCache { fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.write().drain_cursors() } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.write().drain_cursors_for_dbi(dbi) + } } impl Default for SharedCache { @@ -235,4 +260,8 @@ impl Cache for RefCell { fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { self.borrow_mut().drain_cursors() } + + fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> { + self.borrow_mut().drain_cursors_for_dbi(dbi) + } } diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 777efea..fd10653 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -224,12 +224,26 @@ where /// Closes the database handle. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the handle is closed. + /// /// # Safety /// /// This will invalidate data cached in [`Database`] instances with the /// DBI, and may result in bad behavior when using those instances after /// calling this function. pub unsafe fn close_db(&self, dbi: ffi::MDBX_dbi) -> MdbxResult<()> { + // Drain and close any cached cursors for this DBI before closing it. + let stale = self.cache.drain_cursors_for_dbi(dbi); + if !stale.is_empty() { + self.with_txn_ptr(|_| { + for cursor in stale { + // SAFETY: cursor pointers are valid — returned by + // Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } // SAFETY: Caller ensures no other references exist. unsafe { ops::close_db_raw(self.meta.env.env_ptr(), dbi) }?; self.cache.remove_dbi(dbi); @@ -481,12 +495,26 @@ impl Tx { /// Drops the database from the environment. /// + /// Any cached cursor pointers for this DBI are drained and closed + /// before the database is dropped. + /// /// # Safety /// /// Caller must ensure no [`Cursor`] or other references to the database /// exist. [`Database`] instances with the DBI will be invalidated, and /// use after calling this function may result in bad behavior. pub unsafe fn drop_db(&self, db: Database) -> MdbxResult<()> { + // Drain and close any cached cursors for this DBI before dropping it. + let stale = self.cache.drain_cursors_for_dbi(db.dbi()); + if !stale.is_empty() { + self.with_txn_ptr(|_| { + for cursor in stale { + // SAFETY: cursor pointers are valid — returned by + // Cursor::drop during the lifetime of this transaction. + unsafe { ffi::mdbx_cursor_close(cursor) }; + } + }); + } self.with_txn_ptr(|txn| { // SAFETY: txn is a valid RW transaction pointer, caller ensures // no other references to dbi exist. From 33507af02313cfe6633f1538f8d35933b14773bc Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:31:51 -0400 Subject: [PATCH 07/11] fix: prevent cache poisoning on mdbx_cursor_copy failure Construct Cursor only after mdbx_cursor_copy succeeds. On failure, close the raw pointer directly instead of letting Drop push an unbound cursor into the cache. --- src/tx/cursor.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index bb2b92d..e10ccd4 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -76,18 +76,20 @@ where let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); + if let Err(e) = mdbx_result(res) { + // Close directly — do NOT construct Self, as Drop would + // push this unbound cursor into the cache. + ffi::mdbx_cursor_close(cursor); + return Err(e); + } - let s = Self { + Ok(Self { access: other.access, cache: other.cache, cursor, db: other.db, _kind: PhantomData, - }; - - mdbx_result(res)?; - - Ok(s) + }) } } From 9c6dc2bb3a3acbd692ced96cce51cc6e6d65f8f7 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:33:08 -0400 Subject: [PATCH 08/11] fix: renew cached cursors to reset B-tree position Call mdbx_cursor_renew on the cache-hit path so reused cursors start at a clean position rather than retaining stale state. --- src/tx/impl.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/tx/impl.rs b/src/tx/impl.rs index fd10653..8185216 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -254,10 +254,17 @@ where /// /// Cursors are transparently cached: dropped cursors return their /// raw pointer to the cache, and subsequent calls reuse them without - /// a new `mdbx_cursor_open` allocation. + /// a new `mdbx_cursor_open` allocation. Cached cursors are renewed + /// via `mdbx_cursor_renew` to reset their position. pub fn cursor(&self, db: Database) -> MdbxResult> { if let Some(raw) = self.cache.take_cursor(db.dbi()) { - Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) + self.with_txn_ptr(|txn_ptr| { + // SAFETY: txn_ptr is valid from with_txn_ptr, raw is a + // valid cursor pointer returned by a prior Cursor::drop. + let rc = unsafe { ffi::mdbx_cursor_renew(txn_ptr, raw) }; + mdbx_result(rc)?; + Ok(Cursor::from_raw(&self.txn, &self.cache, raw, db)) + }) } else { Cursor::new(&self.txn, &self.cache, db) } From 4144c177f70d8c6280bca106d43cbf343fbf6f10 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:33:42 -0400 Subject: [PATCH 09/11] docs: add keep-in-sync comments on duplicated drain logic Co-Authored-By: Claude Opus 4.6 (1M context) --- src/tx/impl.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/tx/impl.rs b/src/tx/impl.rs index 8185216..89782ca 100644 --- a/src/tx/impl.rs +++ b/src/tx/impl.rs @@ -274,6 +274,8 @@ where /// /// Must be called before commit or abort to ensure all cursors are /// closed while the transaction is still valid. + /// + /// NB: keep in sync with the inlined logic in `Tx::Drop`. fn drain_cached_cursors(&self) { let cursors = self.cache.drain_cursors(); if cursors.is_empty() { @@ -716,6 +718,7 @@ where // NOTE: This impl is on Tx with free U, not Tx (where U = K::Access). // Rust requires Drop bounds to match the struct definition exactly, so we // cannot call `self.drain_cached_cursors()` here (it lives on `impl Tx`). +// NB: keep in sync with `drain_cached_cursors`. // The drain-and-close logic is inlined instead. impl Drop for Tx where From 8504d37eab62ca358a7b06b52e6a1a79cdae16fa Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:46:32 -0400 Subject: [PATCH 10/11] test: add RO cursor-cache tests Cover cursor caching in read-only transactions (reuse and repeated cycles), complementing existing RW-only coverage. Co-Authored-By: Claude Sonnet 4.6 --- tests/cursor.rs | 92 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/tests/cursor.rs b/tests/cursor.rs index f655dee..f8ac163 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1857,6 +1857,98 @@ fn test_cursor_cache_repeated_cycles_v2() { test_cursor_cache_repeated_cycles_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + // Populate via RW, then commit + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + // Test cursor caching in RO txn + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + // First cursor: open, use, drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Second cursor: should reuse cached pointer + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + + let (k, v) = cursor.next::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_ro_v1() { + test_cursor_cache_reuse_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_ro_v2() { + test_cursor_cache_reuse_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + +fn test_cursor_cache_repeated_cycles_ro_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key", b"val", WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + + let txn = begin_ro(&env).unwrap(); + let db = txn.open_db(None).unwrap(); + + for _ in 0..100 { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key"); + assert_eq!(&v, b"val"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v1() { + test_cursor_cache_repeated_cycles_ro_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_repeated_cycles_ro_v2() { + test_cursor_cache_repeated_cycles_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test] From 426be07b3228f4bc3d1fdf8f3c18622b3f2408f8 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 09:46:58 -0400 Subject: [PATCH 11/11] test: add cursor reuse across writes test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Verify cached cursors see updated data after B-tree COW from interleaved put operations — the primary hot path for caching. Co-Authored-By: Claude Sonnet 4.6 --- tests/cursor.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/cursor.rs b/tests/cursor.rs index f8ac163..619d9a4 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -1949,6 +1949,57 @@ fn test_cursor_cache_repeated_cycles_ro_v2() { test_cursor_cache_repeated_cycles_ro_impl(V2Factory::begin_rw, V2Factory::begin_ro); } +fn test_cursor_cache_reuse_across_writes_impl( + begin_rw: impl Fn(&Environment) -> MdbxResult, + _begin_ro: impl Fn(&Environment) -> MdbxResult, +) where + RwTx: TestRwTxn, + RoTx: TestRoTxn, +{ + let dir = tempdir().unwrap(); + let env = Environment::builder().open(dir.path()).unwrap(); + + let txn = begin_rw(&env).unwrap(); + let db = txn.create_db(None, DatabaseFlags::empty()).unwrap(); + txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); + + // cursor -> read -> drop (returns to cache) + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + // Write new data (B-tree COW) + txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); + + // cursor (from cache) -> read -> should see updated data + { + let mut cursor = txn.cursor(db).unwrap(); + let (k, v) = cursor.last::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key2"); + assert_eq!(&v, b"val2"); + + // Verify both entries visible + let (k, v) = cursor.first::, Vec>().unwrap().unwrap(); + assert_eq!(&k, b"key1"); + assert_eq!(&v, b"val1"); + } + + txn.commit().unwrap(); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v1() { + test_cursor_cache_reuse_across_writes_impl(V1Factory::begin_rw, V1Factory::begin_ro); +} + +#[test] +fn test_cursor_cache_reuse_across_writes_v2() { + test_cursor_cache_reuse_across_writes_impl(V2Factory::begin_rw, V2Factory::begin_ro); +} + // Release-build test: verify runtime error instead of panic #[cfg(not(debug_assertions))] #[test]