Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,28 @@ be mediated via the `TxAccess` trait.

## API Patterns

### Cursor Creation
### Cursor Creation and Caching

```rust
let db = txn.open_db(None).unwrap(); // Returns Database (has dbi + flags)
let cursor = txn.cursor(db).unwrap(); // Takes Database, NOT raw dbi
```

Cursors are transparently cached within transactions. When a cursor is
dropped, its raw pointer is returned to the transaction's cache. Subsequent
`cursor()` calls reuse cached pointers, avoiding `mdbx_cursor_open`/
`mdbx_cursor_close` overhead (~100 ns per cycle). The cache is drained
and all pointers closed on commit or abort.

`DbCache` (in `src/tx/cache.rs`) stores raw `*mut ffi::MDBX_cursor`
pointers, which makes it `!Send + !Sync` by default. Explicit `unsafe impl
Send + Sync for DbCache` is required because:
- `SyncKind::Cache` requires `Cache + Send` (for `RefCell<DbCache>: Send`)
- `SharedCache` uses `Arc<RwLock<DbCache>>` which requires `DbCache: Send + Sync`
- This is sound because `Cursor` itself is already `unsafe impl Send + Sync`,
and all access to cached pointers is mediated by `RefCell` (unsync) or
`RwLock` (sync)

### Database Flags Validation

DUP_SORT/DUP_FIXED methods validate flags at runtime:
Expand Down
146 changes: 117 additions & 29 deletions src/tx/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc,
};

/// Cache trait for transaction-local database handles.
/// Cache trait for transaction-local database handles and cursors.
///
/// This is used by the [`SyncKind`] trait to define the cache type for each
/// transaction kind.
Expand All @@ -38,6 +38,20 @@ pub trait Cache: Clone + Default + std::fmt::Debug {

/// Remove a database entry from the cache by dbi.
fn remove_dbi(&self, dbi: ffi::MDBX_dbi);

/// Take a cached cursor for the given DBI, if one exists.
fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor>;

/// Return a cursor to the cache for later reuse.
fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor);

/// Drain all cached cursors, returning their raw pointers.
/// The caller is responsible for closing them via FFI.
fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>;

/// Drain cached cursors for a specific DBI, returning their raw pointers.
/// The caller is responsible for closing them via FFI.
fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]>;
}

/// Cached database entry.
Expand Down Expand Up @@ -73,37 +87,85 @@ impl From<CachedDb> for Database {
}
}

/// Simple cache container for database handles.
/// Simple cache container for database handles and cursor pointers.
///
/// Uses inline storage for the common case (most apps use < 16 databases).
#[derive(Debug, Default, Clone)]
#[repr(transparent)]
pub struct DbCache(SmallVec<[CachedDb; 16]>);
#[derive(Debug)]
pub struct DbCache {
dbs: SmallVec<[CachedDb; 16]>,
cursors: SmallVec<[(ffi::MDBX_dbi, *mut ffi::MDBX_cursor); 8]>,
}

// SAFETY: DbCache contains `*mut ffi::MDBX_cursor` which is `!Send + !Sync`.
// These are raw MDBX cursor pointers bound to a transaction, not a thread.
// `Cursor` itself is already `Send + Sync` (see cursor.rs), so caching the
// same pointers here introduces no new unsoundness. All access to these
// pointers is mediated by `RefCell` (unsync path) or `RwLock` (sync path),
// ensuring no concurrent mutation.
unsafe impl Send for DbCache {}
unsafe impl Sync for DbCache {}

impl Default for DbCache {
fn default() -> Self {
Self { dbs: SmallVec::new(), cursors: SmallVec::new() }
}
}

impl Clone for DbCache {
fn clone(&self) -> Self {
Self { dbs: self.dbs.clone(), cursors: SmallVec::new() }
}
}

impl DbCache {
/// Read a database entry from the cache.
fn read_db(&self, name_hash: u64) -> Option<Database> {
for entry in self.0.iter() {
if entry.name_hash == name_hash {
return Some(entry.db);
}
}
None
self.dbs.iter().find(|e| e.name_hash == name_hash).map(|e| e.db)
}

/// Write a database entry to the cache.
fn write_db(&mut self, db: CachedDb) {
for entry in self.0.iter() {
if entry.name_hash == db.name_hash {
return; // Another thread beat us
}
if self.dbs.iter().any(|e| e.name_hash == db.name_hash) {
return;
}
self.0.push(db);
self.dbs.push(db);
}

/// Remove a database entry from the cache by dbi.
fn remove_dbi(&mut self, dbi: ffi::MDBX_dbi) {
self.0.retain(|entry| entry.db.dbi() != dbi);
self.dbs.retain(|entry| entry.db.dbi() != dbi);
}

/// Take a cached cursor for the given DBI, if one exists.
fn take_cursor(&mut self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> {
self.cursors.iter().position(|(d, _)| *d == dbi).map(|i| self.cursors.swap_remove(i).1)
}

/// Return a cursor to the cache for later reuse.
fn return_cursor(&mut self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) {
self.cursors.push((dbi, cursor));
}

/// Drain all cached cursors, returning their raw pointers.
fn drain_cursors(&mut self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> {
self.cursors.drain(..).map(|(_, c)| c).collect()
}

/// Drain cached cursors for a specific DBI, returning their raw pointers.
fn drain_cursors_for_dbi(
&mut self,
dbi: ffi::MDBX_dbi,
) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> {
let mut drained = SmallVec::new();
self.cursors.retain(|(d, c)| {
if *d == dbi {
drained.push(*c);
false
} else {
true
}
});
drained
}
}

Expand Down Expand Up @@ -135,20 +197,33 @@ impl SharedCache {
impl Cache for SharedCache {
/// Read a database entry from the cache.
fn read_db(&self, name_hash: u64) -> Option<Database> {
let cache = self.read();
cache.read_db(name_hash)
self.read().read_db(name_hash)
}

/// Write a database entry to the cache.
fn write_db(&self, db: CachedDb) {
let mut cache = self.write();
cache.write_db(db);
self.write().write_db(db);
}

/// Remove a database entry from the cache by dbi.
fn remove_dbi(&self, dbi: ffi::MDBX_dbi) {
let mut cache = self.write();
cache.remove_dbi(dbi);
self.write().remove_dbi(dbi);
}

fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> {
self.write().take_cursor(dbi)
}

fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) {
self.write().return_cursor(dbi, cursor);
}

fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> {
self.write().drain_cursors()
}

fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> {
self.write().drain_cursors_for_dbi(dbi)
}
}

Expand All @@ -161,19 +236,32 @@ impl Default for SharedCache {
impl Cache for RefCell<DbCache> {
/// Read a database entry from the cache.
fn read_db(&self, name_hash: u64) -> Option<Database> {
let cache = self.borrow();
cache.read_db(name_hash)
self.borrow().read_db(name_hash)
}

/// Write a database entry to the cache.
fn write_db(&self, db: CachedDb) {
let mut cache = self.borrow_mut();
cache.write_db(db);
self.borrow_mut().write_db(db);
}

/// Remove a database entry from the cache by dbi.
fn remove_dbi(&self, dbi: ffi::MDBX_dbi) {
let mut cache = self.borrow_mut();
cache.remove_dbi(dbi);
self.borrow_mut().remove_dbi(dbi);
}

fn take_cursor(&self, dbi: ffi::MDBX_dbi) -> Option<*mut ffi::MDBX_cursor> {
self.borrow_mut().take_cursor(dbi)
}

fn return_cursor(&self, dbi: ffi::MDBX_dbi, cursor: *mut ffi::MDBX_cursor) {
self.borrow_mut().return_cursor(dbi, cursor);
}

fn drain_cursors(&self) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> {
self.borrow_mut().drain_cursors()
}

fn drain_cursors_for_dbi(&self, dbi: ffi::MDBX_dbi) -> SmallVec<[*mut ffi::MDBX_cursor; 8]> {
self.borrow_mut().drain_cursors_for_dbi(dbi)
}
}
49 changes: 37 additions & 12 deletions src/tx/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
tx::{
TxPtrAccess,
aliases::IterKeyVals,
cache::Cache,
iter::{Iter, IterDup, IterDupFixed, IterDupFixedOfKey, IterDupOfKey},
kind::WriteMarker,
},
Expand Down Expand Up @@ -33,6 +34,7 @@ where
K: TransactionKind,
{
access: &'tx K::Access,
cache: &'tx K::Cache,
cursor: *mut ffi::MDBX_cursor,
db: Database,
_kind: PhantomData<K>,
Expand All @@ -43,12 +45,28 @@ where
K: TransactionKind,
{
/// Creates a new cursor from a reference to a transaction access type.
pub(crate) fn new(access: &'tx K::Access, db: Database) -> MdbxResult<Self> {
pub(crate) fn new(
access: &'tx K::Access,
cache: &'tx K::Cache,
db: Database,
) -> MdbxResult<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
access.with_txn_ptr(|txn_ptr| unsafe {
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, db.dbi(), &mut cursor))
})?;
Ok(Self { access, cursor, db, _kind: PhantomData })
Ok(Self { access, cache, cursor, db, _kind: PhantomData })
}

/// Wraps an existing raw cursor pointer with cache support.
///
/// The cursor must already be bound to the correct transaction and DBI.
pub(crate) const fn from_raw(
access: &'tx K::Access,
cache: &'tx K::Cache,
cursor: *mut ffi::MDBX_cursor,
db: Database,
) -> Self {
Self { access, cache, cursor, db, _kind: PhantomData }
}

/// Helper function for `Clone`. This should only be invoked within
Expand All @@ -58,12 +76,20 @@ where
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());

let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
if let Err(e) = mdbx_result(res) {
// Close directly — do NOT construct Self, as Drop would
// push this unbound cursor into the cache.
ffi::mdbx_cursor_close(cursor);
return Err(e);
}

let s = Self { access: other.access, cursor, db: other.db, _kind: PhantomData };

mdbx_result(res)?;

Ok(s)
Ok(Self {
access: other.access,
cache: other.cache,
cursor,
db: other.db,
_kind: PhantomData,
})
}
}

Expand Down Expand Up @@ -1072,11 +1098,10 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
// MDBX cursors MUST be closed. Failure to do so is a memory leak.
//
// To be able to close a cursor of a timed out transaction, we need to
// renew it first. Hence the usage of `with_txn_ptr_for_cleanup` here.
self.access.with_txn_ptr(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) });
// Return the cursor pointer to the transaction cache for reuse.
// The transaction's commit/drop path will call mdbx_cursor_close on
// all cached pointers once the transaction is still valid.
self.cache.return_cursor(self.db.dbi(), self.cursor);
}
}

Expand Down
Loading
Loading