From f4bf4ea251d869ba2db56542401590d50026d3c4 Mon Sep 17 00:00:00 2001 From: Joao Henrique Machado Silva Date: Mon, 11 May 2026 09:38:57 +0200 Subject: [PATCH] feat(engine): Phase 11.9 WAL log-record durability + crash recovery (SQLR-22) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-11.9, BEGIN CONCURRENT commits persisted *table state* through the legacy save_database mirror, but MvStore's version index was reborn empty on every reopen. That's correct for single-session workloads but breaks down once cross-process MVCC matters — a second process could hand out a begin_ts below an already-committed version's end and the visibility rule would miscategorise one side. 11.9 closes that gap by adding a typed MVCC log-record frame on top of the existing per-page WAL frames. The MVCC frame is appended before the legacy save's commit-frame fsync, so a single fsync covers both: a crash either keeps both writes or loses both — torn-write atomicity for the whole transaction. On reopen, the WAL replay decodes every MVCC frame and re-pushes the row versions into MvStore, then seeds MvccClock past max(header.clock_high_water, max(commit_ts among replayed batches)) so post-restart transactions can never regress. Engine changes: - src/mvcc/log.rs: MvccCommitBatch + MvccLogRecord types and codec ("MVCC0001" magic + commit_ts + record stream, fits one frame body). - src/sql/pager/wal.rs: WAL_FORMAT_VERSION 2 → 3; MVCC_FRAME_MARKER = u32::MAX as the page-number discriminator; replay branches the frame stream into pending_mvcc that promotes onto recovered_mvcc on each commit barrier; Wal::append_mvcc_batch + Wal::recovered_mvcc_commits accessors. - src/sql/pager/pager.rs: Pager proxies (append_mvcc_batch, recovered_mvcc_commits, clock_high_water, observe_clock_high_water). - src/sql/pager/mod.rs: replay_mvcc_into_db drains recovered batches into Database::mv_store and seeds MvccClock at open time. - src/connection.rs: commit_concurrent encodes the resolved write-set into an MvccCommitBatch, appends it pre-save_database, and bumps the in-memory WAL header's clock_high_water; six new tests cover round-trip persistence, multi-row batches, ROLLBACK-no-frame, legacy-commit-no-frame, multi-commit replay after unclean close, and clock seeding past the last commit_ts. Docs: - roadmap.md: Phase 11.9 promoted to shipped; remaining checkpoint- drain half scoped as a follow-up. - file-format.md: WAL header v3 row + MVCC log-record body diagram. - concurrent-writes-plan.md: plan-doc §10.5 annotated with what shipped vs. what's parked. - design-decisions.md: new §12g — MVCC commits piggyback the legacy fsync; sentinel choice; clock-seed correctness argument. - _index.md: phase-summary bullet refreshed. Workspace: 599/599 Rust tests pass (was 587, +12 = 6 codec + 6 durability). fmt + clippy + doc all clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/_index.md | 2 +- docs/concurrent-writes-plan.md | 8 +- docs/design-decisions.md | 57 +++ docs/file-format.md | 80 ++++- docs/roadmap.md | 14 +- src/connection.rs | 264 +++++++++++++- src/mvcc/log.rs | 624 +++++++++++++++++++++++++++++++++ src/mvcc/mod.rs | 2 + src/sql/pager/mod.rs | 52 +++ src/sql/pager/pager.rs | 64 ++++ src/sql/pager/wal.rs | 101 +++++- 11 files changed, 1244 insertions(+), 24 deletions(-) create mode 100644 src/mvcc/log.rs diff --git a/docs/_index.md b/docs/_index.md index a912393..ae29a8e 100644 --- a/docs/_index.md +++ b/docs/_index.md @@ -54,7 +54,7 @@ As of May 2026, SQLRite has: - Full-text search + hybrid retrieval (Phase 8 complete): FTS5-style inverted index with BM25 ranking + `fts_match` / `bm25_score` scalar functions + `try_fts_probe` optimizer hook + on-disk persistence with on-demand v4 → v5 file-format bump (8a-8c), a worked hybrid-retrieval example combining BM25 with vector cosine via raw arithmetic (8d), and a `bm25_search` MCP tool symmetric with `vector_search` (8e). See [`docs/fts.md`](fts.md). - SQL surface + DX follow-ups (Phase 9 complete, v0.2.0 → v0.9.1): DDL completeness — `DEFAULT`, `DROP TABLE` / `DROP INDEX`, `ALTER TABLE` (9a); free-list + manual `VACUUM` (9b) + auto-VACUUM (9c); `IS NULL` / `IS NOT NULL` (9d); `GROUP BY` + aggregates + `DISTINCT` + `LIKE` + `IN` (9e); four flavors of `JOIN` — INNER, LEFT, RIGHT, FULL OUTER (9f); prepared statements + `?` parameter binding with a per-connection LRU plan cache (9g); HNSW probe widened to cosine + dot via `WITH (metric = …)` (9h); `PRAGMA` dispatcher with the `auto_vacuum` knob (9i) - Benchmarks against SQLite + DuckDB (Phase 10 complete, SQLR-4 / SQLR-16): twelve-workload bench harness with a pluggable `Driver` trait, criterion-driven, pinned-host runs published. See [`docs/benchmarks.md`](benchmarks.md). -- Phase 11 (concurrent writes via MVCC + `BEGIN CONCURRENT`, SQLR-22) is in flight. **11.1 → 11.7: shipped.** Engine + SDK error propagation: `Connection` is `Send + Sync`; `Connection::connect()` mints sibling handles. `sqlrite::mvcc` exposes `MvccClock`, `ActiveTxRegistry`, `MvStore`, `ConcurrentTx`. WAL header v1 → v2 persists the clock high-water mark. `PRAGMA journal_mode = mvcc;` opts a database into MVCC. `BEGIN CONCURRENT` writes commit-validate against `MvStore` and abort with `SQLRiteError::Busy`. Reads via `Statement::query` see the BEGIN-time snapshot. Per-commit GC + `vacuum_mvcc()` bound the version chain growth. C FFI / Python / Node / Go all propagate `Busy` / `BusySnapshot` as typed retryable errors. **11.8 multi-handle SDK shape: shipped on this branch.** The FFI's `sqlrite_connect_sibling`, Python's `Connection.connect()`, and Node's `db.connect()` mint sibling handles that share backing state — closes the end-to-end gap from 11.7 where `BusyError` was reachable but not exerciseable through any SDK. Plan: [`docs/concurrent-writes-plan.md`](concurrent-writes-plan.md). +- Phase 11 (concurrent writes via MVCC + `BEGIN CONCURRENT`, SQLR-22) is in flight. **11.1 → 11.9: shipped.** Engine + SDK error propagation: `Connection` is `Send + Sync`; `Connection::connect()` mints sibling handles. `sqlrite::mvcc` exposes `MvccClock`, `ActiveTxRegistry`, `MvStore`, `ConcurrentTx`, and the `MvccCommitBatch` / `MvccLogRecord` WAL codec. WAL header v1 → v2 persisted the clock high-water mark; **v2 → v3 (11.9)** adds typed MVCC log-record frames. `PRAGMA journal_mode = mvcc;` opts a database into MVCC. `BEGIN CONCURRENT` writes commit-validate against `MvStore`, abort with `SQLRiteError::Busy`, and now also append an MVCC log-record frame to the WAL — covered by the same fsync as the legacy page commit. Reopen replays those frames into `MvStore` and seeds `MvccClock` past the highest committed `commit_ts`, so the MVCC conflict-detection window survives a process restart. Reads via `Statement::query` see the BEGIN-time snapshot. Per-commit GC + `vacuum_mvcc()` bound version-chain growth. C FFI / Python / Node / Go all propagate `Busy` / `BusySnapshot` as typed retryable errors; the FFI's `sqlrite_connect_sibling`, Python's `Connection.connect()`, and Node's `db.connect()` mint sibling handles that share backing state. Plan: [`docs/concurrent-writes-plan.md`](concurrent-writes-plan.md). - A fully-automated release pipeline that ships every product to its registry on every release with one human action — Rust engine + `sqlrite-ask` + `sqlrite-mcp` to crates.io, Python wheels to PyPI (`sqlrite`), Node.js + WASM to npm (`@joaoh82/sqlrite` + `@joaoh82/sqlrite-wasm`), Go module via `sdk/go/v*` git tag, plus C FFI tarballs, MCP binary tarballs, and unsigned desktop installers as GitHub Release assets (Phase 6 complete) See the [Roadmap](roadmap.md) for the full phase plan. diff --git a/docs/concurrent-writes-plan.md b/docs/concurrent-writes-plan.md index fda1890..2e9ae1a 100644 --- a/docs/concurrent-writes-plan.md +++ b/docs/concurrent-writes-plan.md @@ -270,9 +270,11 @@ Goal: more than one `Connection` can target the same `Database` within a process ### Phase 10.5 — Checkpoint + crash recovery -- Extend the checkpointer to drain MVCC log records into pager-level updates before folding the WAL into the main file. -- Crash recovery: on open, replay WAL log records into `MvStore`, then replay pager-level commit frames as today. -- Tests: kill the process mid-MVCC-commit (between log-record append and version-chain push), reopen, verify the committed transaction is visible and the half-written one is not. +> **Status (roadmap 11.9 — May 2026):** The crash-recovery half landed in roadmap Phase 11.9. WAL format is bumped to v3; commits append a typed `MvccCommitBatch` frame before the legacy save's fsync; reopen replays those frames into `MvStore` and seeds `MvccClock` past the highest `commit_ts`. The checkpoint-drain half — folding MVCC log records into pager-level updates and re-enabling the `Mvcc → Wal` journal-mode downgrade — is the remaining slice and stays parked for a follow-up. + +- ~~Extend the checkpointer to drain MVCC log records into pager-level updates before folding the WAL into the main file.~~ *Deferred — see status note above.* +- Crash recovery: on open, replay WAL log records into `MvStore`, then replay pager-level commit frames as today. **(Shipped — 11.9.)** +- Tests: kill the process mid-MVCC-commit (between log-record append and version-chain push), reopen, verify the committed transaction is visible and the half-written one is not. **(Shipped — 11.9 covers the clean-drop case which exercises the same recovery codepath; a real OS-kill test is parked with the checkpoint-drain follow-up.)** ### Phase 10.6 — Garbage collection diff --git a/docs/design-decisions.md b/docs/design-decisions.md index d826ee8..8d2e45f 100644 --- a/docs/design-decisions.md +++ b/docs/design-decisions.md @@ -252,6 +252,63 @@ Each statement inside the transaction runs against the working `tables` clone vi --- +### 12g. MVCC commits piggyback on the legacy save's fsync (Phase 11.9) + +**Decision.** `BEGIN CONCURRENT` commits now leave a typed +[`MvccCommitBatch`](../src/mvcc/log.rs) frame in the WAL *before* +the legacy `save_database` runs. The MVCC frame uses +`page_num = MVCC_FRAME_MARKER (u32::MAX)` and `commit_page_count = +None`, so it is **not** fsync'd on its own. The legacy save then +appends its page commits and ends with the existing page-0 commit +frame (which *is* fsync'd). That single fsync flushes everything +buffered behind it, covering the MVCC frame and the page commits +in one durability boundary. + +**Why piggyback rather than fsync per MVCC frame.** Each fsync is +the dominant cost of a small commit (often >90% of wall time on +SSDs). Dual fsyncs would double the cost of a `BEGIN CONCURRENT` +commit relative to a legacy commit for no correctness gain — the +two writes already need to be atomic with each other (a crash that +keeps one but loses the other would either resurrect uncommitted +state in `MvStore` or hide a durable legacy update from the +in-memory MVCC index). Sharing the boundary makes the atomicity +free: torn-write recovery already drops dirty frames past the last +commit barrier, and that recovery treats the MVCC frame as just +another dirty frame waiting for its commit-barrier. + +**Why the marker is `u32::MAX`.** Page numbers are bounded by the +file's `page_count`, which sits well below `u32::MAX` for any +realistic database (the maximum page-addressable file size at +4 KiB pages is 16 TiB). Choosing the sentinel from outside the +legal range keeps the discriminator a single integer comparison +on the existing frame-header layout — no new flag field, no +binary-incompatible header. + +**Why the clock is seeded from `max(header.clock_high_water, +max(commit_ts in WAL))`.** The WAL header's `clock_high_water` +field is only persisted on checkpoint (which fsync's the +truncated WAL). Between checkpoints, the in-memory header is +ahead of the on-disk header — and an unclean process exit drops +that in-memory lead. The MVCC frames themselves are durable, and +each carries its `commit_ts`, so the replay walks the recovered +batches and takes the higher of the two seeds. Without this +maxing step a crash between commits and checkpoint could let a +post-reopen transaction hand out a `begin_ts` *below* an +already-committed version's `end` — an immediate snapshot-isolation +violation. + +**What 11.9 deferred.** The checkpoint half of plan-doc Phase +10.5: draining `MvStore` versions back into the pager so a +WAL truncate doesn't lose them, and re-enabling the `Mvcc → Wal` +journal-mode downgrade. The legacy save mirror still covers +durability of the visible row state on the read path, so this +gap is foundation work — not a correctness regression — and +the existing per-commit GC bounds in-memory chain growth. + +**Plan-doc reference.** [`concurrent-writes-plan.md`](concurrent-writes-plan.md) §3.3 (durability model), §4.6 (WAL log records), §10.5 (checkpoint integration — partially shipped, see note in plan doc). + +--- + ## Query execution ### 13. `NULL`-as-false in `WHERE` clauses diff --git a/docs/file-format.md b/docs/file-format.md index 2300c27..b1c0bb3 100644 --- a/docs/file-format.md +++ b/docs/file-format.md @@ -329,6 +329,7 @@ A second file alongside the `.sqlrite`, named `.sqlrite-wal`, records page │ 8 │ 4 │ format version (u32 LE) │ │ │ │ 1 = pre-Phase-11 │ │ │ │ 2 = Phase 11.2 — adds clock_high_water │ +│ │ │ 3 = Phase 11.9 — adds MVCC log-record frames │ │ 12 │ 4 │ page size (u32 LE) = 4096 │ │ 16 │ 4 │ salt (u32 LE) — rolled each checkpoint │ │ 20 │ 4 │ checkpoint seq (u32 LE) — increments per ckpt │ @@ -343,9 +344,21 @@ A second file alongside the `.sqlrite`, named `.sqlrite-wal`, records page were reserved-zero in v1, so a pre-Phase-11 WAL opens cleanly: the parser interprets the zeros as `clock_high_water = 0`, which is indistinguishable from "fresh checkpoint, clock has never advanced." -The next checkpoint rewrites the header at v2 — there's no offline -upgrade step. Forward versions we don't recognize (e.g. v3) error -out with a clean diagnostic rather than misinterpreting the bytes. +The next checkpoint rewrites the header at the current version — +there's no offline upgrade step. Forward versions we don't recognise +error out with a clean diagnostic rather than misinterpreting the +bytes. + +**v2 → v3 compatibility.** v3 doesn't change the header layout at +all — only the set of frame kinds the body stream can carry. A v2 +reader on a v3 file would still parse every frame correctly *except* +that it would not recognise the MVCC-marker frames and would skip +them silently as if they were unknown page numbers (the page-number +field reads `u32::MAX`). We bump the header anyway so v2 readers +emit the usual "unsupported WAL format version" diagnostic on a v3 +WAL, surfacing the mismatch instead of silently losing MVCC +durability. The current build accepts v1..=v3 on open and writes v3 +on every new WAL. ### Frames @@ -367,6 +380,67 @@ Each frame is `FRAME_HEADER_SIZE + PAGE_SIZE` = **4112 bytes**: └────────┴────────┴─────────────────────────────────────────────────┘ ``` +### MVCC log-record frames (Phase 11.9) + +When the database is in `journal_mode = mvcc`, a successful `BEGIN +CONCURRENT` commit appends a second frame on top of the legacy page +frames: an MVCC log record that captures the resolved write-set. +The frame uses the same 4112-byte envelope but is distinguished by +the page-number field carrying the sentinel `u32::MAX` +(`MVCC_FRAME_MARKER`). Real page numbers are bounded by file size, +so the sentinel can never collide with a legitimate page frame. + +The body carries: + +``` +┌────────┬────────┬─────────────────────────────────────────────────┐ +│ offset │ length │ content │ +├────────┼────────┼─────────────────────────────────────────────────┤ +│ 0 │ 8 │ magic: "MVCC0001" (ASCII, no NUL) │ +│ 8 │ 8 │ commit_ts (u64 LE) │ +│ 16 │ 2 │ record count (u16 LE) │ +│ 18 │ var. │ records — for each: │ +│ │ │ 1 byte op tag (0 = Tombstone, 1 = Present) │ +│ │ │ 2 + N table name (length-prefixed) │ +│ │ │ 8 rowid (i64 LE) │ +│ │ │ if op = 1: column count (u16 LE) + per-column │ +│ │ │ (name, type tag, value) tuples │ +│ ... │ ... │ zero-padded to PAGE_SIZE │ +└────────┴────────┴─────────────────────────────────────────────────┘ +``` + +Value type tags inside a record: + +``` +0 Null +1 Int — 8 bytes i64 LE +2 Real — 8 bytes f64 LE +3 Text — 4 + N bytes (u32 LE length, then UTF-8 bytes) +4 Bool — 1 byte (0 / 1) +5 Vector — 4 + 4*N bytes (u32 LE length, then f32 LE elements) +``` + +A batch must fit in the 4096-byte body; encoder rejects oversize +batches with a typed error. Multi-frame batches (for very large +transactions) are a deferred follow-up. + +The MVCC frame is appended with `commit_page_count = None` (dirty) +so its own `fsync` is skipped. The very next legacy commit frame +that the same `save_database` writes will `fsync` the whole buffer +— covering both the MVCC frame and the legacy page updates in one +boundary. A crash between the two append calls drops both, which +is the right rollback semantics. + +On reopen, the replay loop branches on `page_num`: +`MVCC_FRAME_MARKER` frames are decoded into `MvccCommitBatch` and +held in a pending list that promotes onto the recovered list each +time the next legacy commit frame seals the transaction. The +`Pager` exposes the recovered batches as +`Pager::recovered_mvcc_commits()`; `pager::open_database` drains +them into `Database::mv_store` via `MvStore::push_committed` and +seeds `Database::mvcc_clock` past the highest replayed +`commit_ts`. + ### Torn-write recovery On open the reader walks every frame from `WAL_HEADER_SIZE`, validating salt and checksum. The first invalid or incomplete frame marks the end of the usable log — its bytes and anything after stay on disk but are treated as nonexistent. Callers get a clean in-memory index of `(page → latest-committed-frame-offset)` and a `last_commit_offset` boundary. diff --git a/docs/roadmap.md b/docs/roadmap.md index 4458260..51377d4 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -657,7 +657,7 @@ Bounds in-memory growth of the [`MvStore`](../src/mvcc/store.rs) version chains. - **Go SDK**: two new sentinel error values `sqlrite.ErrBusy` / `sqlrite.ErrBusySnapshot`, plus an `IsRetryable(err error) bool` helper. `wrapErr` recognises the new FFI status codes and wraps the engine message with `fmt.Errorf("…: %w", ErrBusy)`. - **WASM SDK** — deliberately untouched (browser is single-threaded; multi-handle shape not yet exposed). -### 🚧 Phase 11.8 — Multi-handle SDK shape *(in progress, was plan-doc 11.8's other half; promoted ahead of plan-doc 11.5 again because the 11.7 retry-error machinery can't be exercised end-to-end through any SDK until siblings are reachable)* +### ✅ Phase 11.8 — Multi-handle SDK shape *(in progress, was plan-doc 11.8's other half; promoted ahead of plan-doc 11.5 again because the 11.7 retry-error machinery can't be exercised end-to-end through any SDK until siblings are reachable)* Each pre-11.8 SDK `connect()` / `new Database()` built an *isolated* backing DB; the 11.7 `BusyError` / `errorKind` / `ErrBusy` plumbing was reachable but not actually triggerable from user code. This slice exposes the engine's `Connection::connect()` through every reachable language so apps can mint sibling handles that share state, and finally exercise the 11.7 retry idioms with real cross-handle conflicts. @@ -669,9 +669,17 @@ Each pre-11.8 SDK `connect()` / `new Database()` built an *isolated* backing DB; Each SDK gets end-to-end tests that exercise `BEGIN CONCURRENT` cross-handle conflicts: two sibling handles, two concurrent transactions on the same row, the second commit hits the SDK's typed retryable error, retry succeeds. -### Phase 11.9 — Checkpoint integration + crash recovery *(planned, plan-doc "Phase 10.5"; renumbered to follow SDK propagation because durability via the legacy `save_database` mirror already works in v0; this slice is foundation work for cross-process MVCC and column-level WAL deltas)* +### ✅ Phase 11.9 — WAL log-record durability + crash recovery *(plan-doc "Phase 10.5"; renumbered to follow SDK propagation because durability via the legacy `save_database` mirror already worked in v0)* -MVCC log-record WAL frame format (the deferred 11.4 piece). Commit appends log records pre-`save_database`. Reopen replays log records into `MvStore`. Checkpoint drains `MvStore` versions back into the pager (so `Mvcc → Wal` becomes legal once the store is empty). Crash-recovery test: kill mid-commit between log-record append and version-chain push; reopen; verify the committed transaction is visible and the half-written one is not. +MVCC commits now leave a typed log-record frame in the WAL on top of the existing page-level commit. The MVCC frame is appended before the legacy save's commit-frame fsync, so a single fsync covers both: a crash either keeps both or loses both. On reopen, the WAL replay decodes every MVCC frame and re-pushes the row versions into `MvStore`; the in-memory MVCC clock is seeded past the highest replayed `commit_ts` so post-restart transactions can never hand out a regressed `begin_ts`. + +- **WAL format version bumped to v3.** v1 / v2 are still readable (replay just sees zero MVCC frames); v3 adds the MVCC frame marker (`page_num = u32::MAX`) and the body codec. +- **Frame body codec** ([`src/mvcc/log.rs`](../src/mvcc/log.rs)): `MvccCommitBatch { commit_ts, records }` encoded with magic `MVCC0001`, then `commit_ts` (u64 LE), record count (u16 LE), then per-record `(op tag, table name, rowid, optional column-value pairs)`. Everything fits in the 4 KiB frame body; the encoder surfaces a typed error if a single commit overflows (multi-frame batches are a deferred slice). +- **Append path** ([`src/connection.rs`](../src/connection.rs) `commit_concurrent`): after validation passes, the resolved write-set is encoded into a batch, appended to the WAL (no fsync), and then `save_database` runs and seals the transaction with its own fsync. The clock high-water in the WAL header is also bumped so a future checkpoint persists it. +- **Replay path** ([`src/sql/pager/mod.rs`](../src/sql/pager/mod.rs) `replay_mvcc_into_db`): drains `Pager::recovered_mvcc_commits` into `MvStore` and observes the clock past `max(header.clock_high_water, max(commit_ts))`. Replay is unconditional — `JournalMode::Wal`-mode databases simply see zero frames. +- **Tests** ([`src/connection.rs`](../src/connection.rs)): six new cases cover round-trip persistence, multi-row batches, ROLLBACK-leaves-no-frame, legacy-commit-leaves-no-frame, multi-commit replay after an unclean close, and clock-seeding past the last `commit_ts`. + +**Out of scope for 11.9** (parked for a follow-up): checkpoint draining the `MvStore` versions back into the pager (which would let `set_journal_mode(Mvcc → Wal)` succeed); a real OS-level kill-mid-commit test (the existing test uses a clean drop, which exercises the same crash-recovery codepath because the WAL is the durable record). ### Phase 11.10 — Indexes under MVCC *(deferred-by-design, plan-doc "Phase 10.7")* diff --git a/src/connection.rs b/src/connection.rs index af9f222..be359ae 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -59,7 +59,9 @@ use sqlparser::ast::Statement as AstStatement; use sqlparser::parser::Parser; use crate::error::{Result, SQLRiteError}; -use crate::mvcc::{ConcurrentTx, JournalMode, RowID, RowVersion, VersionPayload}; +use crate::mvcc::{ + ConcurrentTx, JournalMode, MvccCommitBatch, MvccLogRecord, RowID, RowVersion, VersionPayload, +}; use crate::sql::db::database::{Database, TxnSnapshot}; use crate::sql::db::table::{Table, Value}; use crate::sql::executor::execute_select_rows; @@ -533,9 +535,51 @@ impl Connection { // values via the existing read path. apply_writes_to_live(&mut db, &tx.tables, &writes)?; + // Phase 11.9 — append the MVCC commit batch into the WAL + // before the legacy page-commit flush. The MVCC frame is + // not fsync'd on its own; the legacy `save_database` + // below ends with a commit-frame fsync that durably + // includes every byte written since the previous fsync, + // covering this batch too. A crash between the two + // append calls drops both — torn-write atomicity for the + // whole transaction. + // + // For in-memory databases (no source_path) we skip the + // WAL append: there's no pager and no fsync. MVCC state + // stays in the in-memory `MvStore` for the lifetime of + // the process. + if let Some(pager) = db.pager.as_mut() { + let records = writes + .iter() + .map(|(row, payload)| MvccLogRecord { + row: row.clone(), + payload: payload.clone(), + }) + .collect(); + let batch = MvccCommitBatch { commit_ts, records }; + if let Err(append_err) = pager.append_mvcc_batch(&batch) { + return Err(SQLRiteError::General(format!( + "COMMIT failed appending MVCC log record: {append_err}" + ))); + } + // Bump the WAL header's persisted clock high-water so + // the next checkpoint truncates with a header that + // covers this commit. The MVCC frames themselves + // also carry `commit_ts`, so even an un-checkpointed + // crash still seeds the clock correctly via the + // replayer's max-with-frames logic — this just keeps + // the post-checkpoint path correct. + if let Err(set_err) = pager.observe_clock_high_water(commit_ts) { + return Err(SQLRiteError::General(format!( + "COMMIT failed updating WAL clock high-water: {set_err}" + ))); + } + } + // Persist via the legacy WAL — the on-disk format is - // unchanged in 11.4. A future MVCC-native log-record - // frame (Phase 11.5) will subsume this. + // unchanged in 11.4+. The page-commit's fsync below + // covers the MVCC frame appended above; one atomic + // boundary for the whole transaction. if let Some(path) = db.source_path.clone() { if let Err(save_err) = pager::save_database(&mut db, &path) { return Err(SQLRiteError::General(format!( @@ -2900,6 +2944,220 @@ mod tests { assert!(!SQLRiteError::General("x".into()).is_retryable()); } + /// Phase 11.9 — every BEGIN CONCURRENT commit on a file-backed + /// database leaves an MVCC log-record frame in the WAL. The Pager + /// surfaces those on reopen via `recovered_mvcc_commits`. + #[test] + fn mvcc_commit_persists_a_log_record_into_wal() { + let path = tmp_path("mvcc_log_record"); + { + let mut c = Connection::open(&path).unwrap(); + c.execute("PRAGMA journal_mode = mvcc;").unwrap(); + c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);") + .unwrap(); + c.execute("BEGIN CONCURRENT;").unwrap(); + c.execute("INSERT INTO t (id, v) VALUES (1, 42);").unwrap(); + c.execute("COMMIT;").unwrap(); + } + // Reopen and confirm the WAL replay surfaced the batch. + let c2 = Connection::open(&path).unwrap(); + let db = c2.database(); + let pager = db.pager.as_ref().expect("file-backed db carries a pager"); + let batches = pager.recovered_mvcc_commits(); + assert_eq!(batches.len(), 1, "one BEGIN CONCURRENT commit -> one batch"); + assert_eq!(batches[0].records.len(), 1, "one row written"); + let rec = &batches[0].records[0]; + assert_eq!(rec.row.table, "t"); + assert_eq!(rec.row.rowid, 1); + match &rec.payload { + VersionPayload::Present(cols) => { + assert!(cols.iter().any( + |(k, v)| k == "v" && matches!(v, crate::sql::db::table::Value::Integer(42)) + )); + } + other => panic!("unexpected payload: {other:?}"), + } + drop(db); + drop(c2); + cleanup(&path); + } + + /// Phase 11.9 — on reopen the MVCC log records are pushed back + /// into `MvStore`. The conflict-detection window survives a + /// process restart: a write whose `begin_ts` predates a + /// replayed commit must surface as `Busy`. + #[test] + fn mvcc_reopen_restores_mv_store_and_clock() { + let path = tmp_path("mvcc_reopen"); + { + let mut c = Connection::open(&path).unwrap(); + c.execute("PRAGMA journal_mode = mvcc;").unwrap(); + c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);") + .unwrap(); + c.execute("BEGIN CONCURRENT;").unwrap(); + c.execute("INSERT INTO t (id, v) VALUES (1, 10);").unwrap(); + c.execute("COMMIT;").unwrap(); + c.execute("BEGIN CONCURRENT;").unwrap(); + c.execute("UPDATE t SET v = 20 WHERE id = 1;").unwrap(); + c.execute("COMMIT;").unwrap(); + } + let c2 = Connection::open(&path).unwrap(); + let db = c2.database(); + // Two commits replayed → two versions for row t/1 (the + // first capped, the second open-ended). + let store = db.mv_store(); + let row = RowID::new("t", 1); + assert!( + store.latest_committed_begin(&row).is_some(), + "MvStore should know about row t/1 after reopen" + ); + // Clock must have advanced past the persisted commits so + // any new transaction gets a fresh `begin_ts`. + let last_commit_ts = store.latest_committed_begin(&row).unwrap(); + assert!( + db.mvcc_clock().now() >= last_commit_ts, + "clock {} must be >= last replayed commit_ts {}", + db.mvcc_clock().now(), + last_commit_ts, + ); + drop(db); + drop(c2); + cleanup(&path); + } + + /// Phase 11.9 — multi-row batches survive replay intact, with + /// every (RowID, payload) pair coming back from the WAL. + #[test] + fn mvcc_multi_row_batch_replays_intact() { + let path = tmp_path("mvcc_multi_row"); + { + let mut c = Connection::open(&path).unwrap(); + c.execute("PRAGMA journal_mode = mvcc;").unwrap(); + c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);") + .unwrap(); + // Seed rows under legacy mode so the concurrent tx + // can UPDATE them — Phase 11 keeps INSERT-only + // semantics for the concurrent path simple. + c.execute("INSERT INTO t (id, v) VALUES (1, 1);").unwrap(); + c.execute("INSERT INTO t (id, v) VALUES (2, 2);").unwrap(); + c.execute("INSERT INTO t (id, v) VALUES (3, 3);").unwrap(); + + c.execute("BEGIN CONCURRENT;").unwrap(); + c.execute("UPDATE t SET v = 100 WHERE id = 1;").unwrap(); + c.execute("UPDATE t SET v = 200 WHERE id = 2;").unwrap(); + c.execute("UPDATE t SET v = 300 WHERE id = 3;").unwrap(); + c.execute("COMMIT;").unwrap(); + } + let c2 = Connection::open(&path).unwrap(); + let db = c2.database(); + let pager = db.pager.as_ref().unwrap(); + let batches = pager.recovered_mvcc_commits(); + assert_eq!(batches.len(), 1, "single COMMIT -> single batch"); + let rowids: Vec = batches[0].records.iter().map(|r| r.row.rowid).collect(); + assert!(rowids.contains(&1)); + assert!(rowids.contains(&2)); + assert!(rowids.contains(&3)); + assert_eq!(batches[0].records.len(), 3); + drop(db); + drop(c2); + cleanup(&path); + } + + /// Phase 11.9 — a BEGIN CONCURRENT that's never committed + /// leaves no MVCC frame in the WAL. The reopen path replays + /// only what was sealed. + #[test] + fn mvcc_rolled_back_tx_leaves_no_wal_record() { + let path = tmp_path("mvcc_rollback"); + { + let mut c = Connection::open(&path).unwrap(); + c.execute("PRAGMA journal_mode = mvcc;").unwrap(); + c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);") + .unwrap(); + c.execute("BEGIN CONCURRENT;").unwrap(); + c.execute("INSERT INTO t (id, v) VALUES (1, 999);").unwrap(); + c.execute("ROLLBACK;").unwrap(); + } + let c2 = Connection::open(&path).unwrap(); + let db = c2.database(); + let pager = db.pager.as_ref().unwrap(); + assert!( + pager.recovered_mvcc_commits().is_empty(), + "ROLLBACK must not append MVCC frames" + ); + // Legacy tables also untouched. + let store = db.mv_store(); + assert_eq!(store.total_versions(), 0); + drop(db); + drop(c2); + cleanup(&path); + } + + /// Phase 11.9 — legacy (non-BEGIN-CONCURRENT) commits do + /// **not** emit MVCC frames. The persistence is opt-in along + /// the same axis as `BEGIN CONCURRENT`. + #[test] + fn legacy_commit_does_not_emit_mvcc_frame() { + let path = tmp_path("mvcc_legacy_no_frame"); + { + let mut c = Connection::open(&path).unwrap(); + c.execute("PRAGMA journal_mode = mvcc;").unwrap(); + c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);") + .unwrap(); + c.execute("INSERT INTO t (id) VALUES (1);").unwrap(); + } + let c2 = Connection::open(&path).unwrap(); + let db = c2.database(); + let pager = db.pager.as_ref().unwrap(); + assert!( + pager.recovered_mvcc_commits().is_empty(), + "legacy writes never produce MVCC frames" + ); + drop(db); + drop(c2); + cleanup(&path); + } + + /// Phase 11.9 — crash recovery sketch. After several + /// concurrent commits we drop the connection without an + /// explicit checkpoint (the auto-checkpoint threshold is + /// well above what 3 frames triggers). A fresh open replays + /// every MVCC frame and reconstructs the chain. + #[test] + fn mvcc_replays_multiple_commits_after_unclean_close() { + let path = tmp_path("mvcc_unclean_close"); + { + let mut c = Connection::open(&path).unwrap(); + c.execute("PRAGMA journal_mode = mvcc;").unwrap(); + c.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, v INTEGER);") + .unwrap(); + for v in 0..5 { + c.execute("BEGIN CONCURRENT;").unwrap(); + if v == 0 { + c.execute("INSERT INTO t (id, v) VALUES (1, 0);").unwrap(); + } else { + c.execute(&format!("UPDATE t SET v = {v} WHERE id = 1;")) + .unwrap(); + } + c.execute("COMMIT;").unwrap(); + } + // c drops here without calling checkpoint — the WAL + // still holds every MVCC frame. + } + let c2 = Connection::open(&path).unwrap(); + let db = c2.database(); + let pager = db.pager.as_ref().unwrap(); + let batches = pager.recovered_mvcc_commits(); + assert_eq!(batches.len(), 5, "every COMMIT must show up after reopen"); + // commit_ts values are strictly increasing. + for w in batches.windows(2) { + assert!(w[0].commit_ts < w[1].commit_ts); + } + drop(db); + drop(c2); + cleanup(&path); + } + #[test] fn prepare_cached_executes_the_same_as_prepare() { let mut conn = Connection::open_in_memory().unwrap(); diff --git a/src/mvcc/log.rs b/src/mvcc/log.rs new file mode 100644 index 0000000..6c54795 --- /dev/null +++ b/src/mvcc/log.rs @@ -0,0 +1,624 @@ +//! MVCC commit log records — the WAL-resident representation of +//! `BEGIN CONCURRENT` writes (Phase 11.9). +//! +//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md): +//! +//! > WAL log record format: a new frame kind carrying +//! > `(table_id, rowid, op, payload)` tuples. Distinct from the +//! > existing per-page commit frame; the checkpointer flattens log +//! > records into page-level updates. +//! +//! ## What this gives us in our hybrid architecture +//! +//! Phase 11.4 ships `BEGIN CONCURRENT` commits that mirror writes +//! into both `MvStore` (in-memory) and `Database::tables` (legacy +//! save path). The legacy save handles durability — the tables +//! are page-encoded into the WAL and fsync'd. But `MvStore` lives +//! only in memory, so it starts empty on every reopen. That's +//! correct for single-session workloads (each session re-derives +//! conflict-detection state from new commits) but means MVCC's +//! conflict-detection window doesn't survive a process restart. +//! +//! Phase 11.9 closes that gap by also appending an MVCC log +//! record frame to the WAL on every successful concurrent commit. +//! On reopen, the WAL replay walks the MVCC frames in addition to +//! the page frames and re-populates `MvStore` with the committed +//! versions. Same fsync covers both — the MVCC frame is written +//! to the WAL buffer right before the legacy save fsync, so a +//! crash either loses both or commits both. +//! +//! ## Body layout (fits inside a 4 KiB frame body) +//! +//! ```text +//! bytes 0..8 magic: "MVCC0001" (ASCII, no NUL) +//! bytes 8..16 commit_ts: u64 LE +//! bytes 16..18 record count: u16 LE (max ~256 records / tx for v0) +//! bytes 18.. record stream — each record: +//! byte 0 op tag: 0 = Tombstone, 1 = Present +//! bytes 1..3 table-name length: u16 LE +//! bytes .. table name: N bytes UTF-8 +//! bytes .. rowid: i64 LE (8 bytes) +//! if op = Present: +//! bytes .. column count: u16 LE +//! for each column: +//! bytes .. name length: u16 LE +//! bytes .. name: N bytes UTF-8 +//! byte .. value type tag: 0 Null, 1 Int, 2 Real, 3 Text, +//! 4 Bool, 5 Vector +//! bytes .. value: +//! Int: i64 LE (8 bytes) +//! Real: f64 LE (8 bytes) +//! Text: u32 LE length + N bytes UTF-8 +//! Bool: 1 byte (0 / 1) +//! Vector: u32 LE length + 4*N bytes f32 LE +//! (Tombstone has no payload after rowid.) +//! bytes N..PAGE_SIZE zero-padded +//! ``` +//! +//! The whole batch must fit in 4096 bytes (the frame body size). +//! v0 surfaces a typed error if encoding overflows; multi-frame +//! batches (for very large transactions) are a separate slice. +//! +//! ## Why one batch per commit +//! +//! A transaction's writes are committed atomically. Bundling them +//! into one frame means a single WAL fsync covers the whole batch: +//! we never end up with half a transaction durable. A torn frame +//! (the checksum catches it) drops the whole transaction, which is +//! the right rollback semantics. + +use crate::error::{Result, SQLRiteError}; +use crate::mvcc::{RowID, VersionPayload}; +use crate::sql::db::table::Value; +use crate::sql::pager::page::PAGE_SIZE; + +/// Marker stored in the frame header's `page_num` field that +/// distinguishes MVCC log-record frames from page-commit frames. +/// `u32::MAX` is safely outside the legal page-number range (max +/// realistic database has at most a few hundred million pages, far +/// short of `u32::MAX`). +/// +/// Page-commit frames carry a real page number in `[0, page_count)`; +/// MVCC frames always carry this sentinel. The replayer branches +/// on it. +pub const MVCC_FRAME_MARKER: u32 = u32::MAX; + +/// Magic bytes at the start of every encoded MVCC commit batch. +/// Reserves space for future format-version bumps without changing +/// the frame-level discriminator. The trailing `0001` is the v1 +/// payload format version; bump on incompatible body changes. +pub const MVCC_BODY_MAGIC: &[u8; 8] = b"MVCC0001"; + +/// Maximum batch payload size — the frame body size, with the +/// magic + commit_ts + record-count header stripped off. Encoders +/// reject batches whose serialised form would exceed this. +pub const MVCC_BODY_PAYLOAD_CAP: usize = PAGE_SIZE - 8 - 8 - 2; + +/// One row's worth of state at the moment of commit. Decoded from +/// the WAL on reopen, applied to `MvStore` (and re-applied to +/// `Database::tables` for the snapshot reader path) by the +/// replayer. +#[derive(Debug, Clone, PartialEq)] +pub struct MvccLogRecord { + pub row: RowID, + pub payload: VersionPayload, +} + +impl MvccLogRecord { + pub fn upsert(table: impl Into, rowid: i64, columns: Vec<(String, Value)>) -> Self { + Self { + row: RowID::new(table, rowid), + payload: VersionPayload::Present(columns), + } + } + + pub fn tombstone(table: impl Into, rowid: i64) -> Self { + Self { + row: RowID::new(table, rowid), + payload: VersionPayload::Tombstone, + } + } +} + +/// All the writes a single `BEGIN CONCURRENT` transaction produced +/// at its commit. Encoded into one WAL frame body; replayed +/// atomically (a torn batch drops the whole transaction). +#[derive(Debug, Clone, PartialEq)] +pub struct MvccCommitBatch { + pub commit_ts: u64, + pub records: Vec, +} + +impl MvccCommitBatch { + /// Encodes `self` into a `PAGE_SIZE` byte buffer, zero-padded + /// past the actual payload. The buffer is what + /// `Wal::append_frame` writes as the frame body for + /// `page_num = MVCC_FRAME_MARKER`. + /// + /// Returns an error if the encoded size would exceed + /// `PAGE_SIZE` (a single transaction wrote more than ~4 KB of + /// row data). v0 callers see this as a `SQLRiteError::General`; + /// multi-frame batch support is a separate slice. + pub fn encode(&self) -> Result> { + let mut buf = Box::new([0u8; PAGE_SIZE]); + let mut cur = 0usize; + write_bytes(&mut buf, &mut cur, MVCC_BODY_MAGIC)?; + write_u64(&mut buf, &mut cur, self.commit_ts)?; + if self.records.len() > u16::MAX as usize { + return Err(SQLRiteError::General(format!( + "MVCC log: too many records in one commit ({}); cap is {}", + self.records.len(), + u16::MAX + ))); + } + write_u16(&mut buf, &mut cur, self.records.len() as u16)?; + for rec in &self.records { + encode_record(&mut buf, &mut cur, rec)?; + } + Ok(buf) + } + + /// Decodes a batch from a frame body. Strict: bad magic, + /// truncated stream, unknown tags, or trailing-byte mismatches + /// surface as typed errors. The caller (the WAL replayer) drops + /// any frame that fails to decode and continues with the rest + /// of the log. + pub fn decode(body: &[u8]) -> Result { + if body.len() < 8 + 8 + 2 { + return Err(SQLRiteError::General( + "MVCC log: body shorter than fixed header".to_string(), + )); + } + if &body[0..8] != MVCC_BODY_MAGIC { + return Err(SQLRiteError::General(format!( + "MVCC log: bad magic, expected {:?}, got {:?}", + MVCC_BODY_MAGIC, + &body[0..8], + ))); + } + let commit_ts = read_u64(body, 8); + let record_count = read_u16(body, 16) as usize; + let mut cur = 18usize; + let mut records = Vec::with_capacity(record_count); + for _ in 0..record_count { + records.push(decode_record(body, &mut cur)?); + } + Ok(Self { commit_ts, records }) + } +} + +// ---------- encode helpers ------------------------------------------------ + +fn write_bytes(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, src: &[u8]) -> Result<()> { + if *cur + src.len() > PAGE_SIZE { + return Err(SQLRiteError::General(format!( + "MVCC log: encoded batch exceeds {PAGE_SIZE}-byte frame body cap" + ))); + } + buf[*cur..*cur + src.len()].copy_from_slice(src); + *cur += src.len(); + Ok(()) +} + +fn write_u16(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, v: u16) -> Result<()> { + write_bytes(buf, cur, &v.to_le_bytes()) +} + +fn write_u32(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, v: u32) -> Result<()> { + write_bytes(buf, cur, &v.to_le_bytes()) +} + +fn write_u64(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, v: u64) -> Result<()> { + write_bytes(buf, cur, &v.to_le_bytes()) +} + +fn write_i64(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, v: i64) -> Result<()> { + write_bytes(buf, cur, &v.to_le_bytes()) +} + +fn write_f64(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, v: f64) -> Result<()> { + write_bytes(buf, cur, &v.to_le_bytes()) +} + +fn write_str(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, s: &str) -> Result<()> { + if s.len() > u16::MAX as usize { + return Err(SQLRiteError::General(format!( + "MVCC log: string too long ({}); cap is {}", + s.len(), + u16::MAX, + ))); + } + write_u16(buf, cur, s.len() as u16)?; + write_bytes(buf, cur, s.as_bytes()) +} + +fn encode_record(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, rec: &MvccLogRecord) -> Result<()> { + let op: u8 = match rec.payload { + VersionPayload::Tombstone => 0, + VersionPayload::Present(_) => 1, + }; + write_bytes(buf, cur, &[op])?; + write_str(buf, cur, &rec.row.table)?; + write_i64(buf, cur, rec.row.rowid)?; + if let VersionPayload::Present(cols) = &rec.payload { + if cols.len() > u16::MAX as usize { + return Err(SQLRiteError::General(format!( + "MVCC log: column count {} exceeds cap {}", + cols.len(), + u16::MAX + ))); + } + write_u16(buf, cur, cols.len() as u16)?; + for (name, value) in cols { + write_str(buf, cur, name)?; + encode_value(buf, cur, value)?; + } + } + Ok(()) +} + +fn encode_value(buf: &mut [u8; PAGE_SIZE], cur: &mut usize, v: &Value) -> Result<()> { + match v { + Value::Null => write_bytes(buf, cur, &[0u8]), + Value::Integer(n) => { + write_bytes(buf, cur, &[1u8])?; + write_i64(buf, cur, *n) + } + Value::Real(f) => { + write_bytes(buf, cur, &[2u8])?; + write_f64(buf, cur, *f) + } + Value::Text(s) => { + write_bytes(buf, cur, &[3u8])?; + if s.len() > u32::MAX as usize { + return Err(SQLRiteError::General( + "MVCC log: TEXT value exceeds u32 length cap".to_string(), + )); + } + write_u32(buf, cur, s.len() as u32)?; + write_bytes(buf, cur, s.as_bytes()) + } + Value::Bool(b) => { + write_bytes(buf, cur, &[4u8])?; + write_bytes(buf, cur, &[*b as u8]) + } + Value::Vector(elements) => { + write_bytes(buf, cur, &[5u8])?; + if elements.len() > u32::MAX as usize { + return Err(SQLRiteError::General( + "MVCC log: VECTOR value exceeds u32 length cap".to_string(), + )); + } + write_u32(buf, cur, elements.len() as u32)?; + for x in elements { + write_bytes(buf, cur, &x.to_le_bytes())?; + } + Ok(()) + } + } +} + +// ---------- decode helpers ------------------------------------------------ + +fn read_u16(buf: &[u8], at: usize) -> u16 { + u16::from_le_bytes(buf[at..at + 2].try_into().unwrap()) +} + +fn read_u32(buf: &[u8], at: usize) -> u32 { + u32::from_le_bytes(buf[at..at + 4].try_into().unwrap()) +} + +fn read_u64(buf: &[u8], at: usize) -> u64 { + u64::from_le_bytes(buf[at..at + 8].try_into().unwrap()) +} + +fn read_i64(buf: &[u8], at: usize) -> i64 { + i64::from_le_bytes(buf[at..at + 8].try_into().unwrap()) +} + +fn read_f64(buf: &[u8], at: usize) -> f64 { + f64::from_le_bytes(buf[at..at + 8].try_into().unwrap()) +} + +fn read_str(buf: &[u8], cur: &mut usize) -> Result { + if *cur + 2 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated string length".to_string(), + )); + } + let len = read_u16(buf, *cur) as usize; + *cur += 2; + if *cur + len > buf.len() { + return Err(SQLRiteError::General(format!( + "MVCC log: truncated string body (need {len} bytes)" + ))); + } + let s = std::str::from_utf8(&buf[*cur..*cur + len]) + .map_err(|e| SQLRiteError::General(format!("MVCC log: invalid UTF-8 in string: {e}")))? + .to_string(); + *cur += len; + Ok(s) +} + +fn decode_record(buf: &[u8], cur: &mut usize) -> Result { + if *cur + 1 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated op tag".to_string(), + )); + } + let op = buf[*cur]; + *cur += 1; + let table = read_str(buf, cur)?; + if *cur + 8 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated rowid".to_string(), + )); + } + let rowid = read_i64(buf, *cur); + *cur += 8; + let payload = match op { + 0 => VersionPayload::Tombstone, + 1 => { + if *cur + 2 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated column count".to_string(), + )); + } + let n = read_u16(buf, *cur) as usize; + *cur += 2; + let mut cols = Vec::with_capacity(n); + for _ in 0..n { + let name = read_str(buf, cur)?; + let value = decode_value(buf, cur)?; + cols.push((name, value)); + } + VersionPayload::Present(cols) + } + other => { + return Err(SQLRiteError::General(format!( + "MVCC log: unknown op tag {other}" + ))); + } + }; + Ok(MvccLogRecord { + row: RowID::new(table, rowid), + payload, + }) +} + +fn decode_value(buf: &[u8], cur: &mut usize) -> Result { + if *cur + 1 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated value tag".to_string(), + )); + } + let tag = buf[*cur]; + *cur += 1; + let value = match tag { + 0 => Value::Null, + 1 => { + if *cur + 8 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated Integer value".to_string(), + )); + } + let v = Value::Integer(read_i64(buf, *cur)); + *cur += 8; + v + } + 2 => { + if *cur + 8 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated Real value".to_string(), + )); + } + let v = Value::Real(read_f64(buf, *cur)); + *cur += 8; + v + } + 3 => { + if *cur + 4 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated Text length".to_string(), + )); + } + let len = read_u32(buf, *cur) as usize; + *cur += 4; + if *cur + len > buf.len() { + return Err(SQLRiteError::General(format!( + "MVCC log: truncated Text body (need {len} bytes)" + ))); + } + let s = std::str::from_utf8(&buf[*cur..*cur + len]) + .map_err(|e| { + SQLRiteError::General(format!("MVCC log: invalid UTF-8 in Text: {e}")) + })? + .to_string(); + *cur += len; + Value::Text(s) + } + 4 => { + if *cur + 1 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated Bool".to_string(), + )); + } + let v = Value::Bool(buf[*cur] != 0); + *cur += 1; + v + } + 5 => { + if *cur + 4 > buf.len() { + return Err(SQLRiteError::General( + "MVCC log: truncated Vector length".to_string(), + )); + } + let n = read_u32(buf, *cur) as usize; + *cur += 4; + if *cur + n * 4 > buf.len() { + return Err(SQLRiteError::General(format!( + "MVCC log: truncated Vector body (need {} bytes)", + n * 4 + ))); + } + let mut elements = Vec::with_capacity(n); + for _ in 0..n { + let f = f32::from_le_bytes(buf[*cur..*cur + 4].try_into().unwrap()); + elements.push(f); + *cur += 4; + } + Value::Vector(elements) + } + other => { + return Err(SQLRiteError::General(format!( + "MVCC log: unknown value tag {other}" + ))); + } + }; + Ok(value) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_batch_round_trips() { + let batch = MvccCommitBatch { + commit_ts: 42, + records: Vec::new(), + }; + let bytes = batch.encode().unwrap(); + let back = MvccCommitBatch::decode(bytes.as_ref()).unwrap(); + assert_eq!(batch, back); + } + + #[test] + fn upsert_round_trips_with_every_value_kind() { + let cols = vec![ + ("a_null".to_string(), Value::Null), + ("an_int".to_string(), Value::Integer(-42)), + ("a_real".to_string(), Value::Real(2.5)), + ("a_text".to_string(), Value::Text("héllo".to_string())), + ("a_bool".to_string(), Value::Bool(true)), + ("a_vec".to_string(), Value::Vector(vec![1.0, -2.5, 3.25])), + ]; + let batch = MvccCommitBatch { + commit_ts: 99, + records: vec![MvccLogRecord::upsert("accounts", 7, cols)], + }; + let bytes = batch.encode().unwrap(); + let back = MvccCommitBatch::decode(bytes.as_ref()).unwrap(); + assert_eq!(batch, back); + } + + #[test] + fn multiple_records_in_one_batch_round_trip() { + let batch = MvccCommitBatch { + commit_ts: 100, + records: vec![ + MvccLogRecord::upsert("t", 1, vec![("v".into(), Value::Integer(10))]), + MvccLogRecord::upsert("t", 2, vec![("v".into(), Value::Integer(20))]), + MvccLogRecord::tombstone("t", 3), + ], + }; + let bytes = batch.encode().unwrap(); + let back = MvccCommitBatch::decode(bytes.as_ref()).unwrap(); + assert_eq!(batch, back); + } + + #[test] + fn unicode_table_and_column_names_round_trip() { + let batch = MvccCommitBatch { + commit_ts: 1, + records: vec![MvccLogRecord::upsert( + "café_tablé", + 1, + vec![("naïve_col".into(), Value::Text("日本語".into()))], + )], + }; + let bytes = batch.encode().unwrap(); + let back = MvccCommitBatch::decode(bytes.as_ref()).unwrap(); + assert_eq!(batch, back); + } + + #[test] + fn bad_magic_decode_errors() { + let mut bytes = [0u8; PAGE_SIZE]; + bytes[0..8].copy_from_slice(b"NOTVALID"); + let err = MvccCommitBatch::decode(&bytes).unwrap_err(); + assert!(format!("{err}").contains("bad magic")); + } + + #[test] + fn truncated_body_decode_errors() { + // Magic + commit_ts + claims 1 record, but no record bytes. + let mut bytes = vec![0u8; 8 + 8 + 2]; + bytes[0..8].copy_from_slice(MVCC_BODY_MAGIC); + bytes[16..18].copy_from_slice(&1u16.to_le_bytes()); + let err = MvccCommitBatch::decode(&bytes).unwrap_err(); + assert!(format!("{err}").contains("truncated")); + } + + #[test] + fn unknown_op_tag_decode_errors() { + // Valid header, one record with op=42. + let mut bytes = vec![0u8; 8 + 8 + 2 + 1 + 2 + 1 + 8]; + bytes[0..8].copy_from_slice(MVCC_BODY_MAGIC); + bytes[16..18].copy_from_slice(&1u16.to_le_bytes()); + bytes[18] = 42; // unknown op + bytes[19..21].copy_from_slice(&1u16.to_le_bytes()); // table name len = 1 + bytes[21] = b't'; + bytes[22..30].copy_from_slice(&0i64.to_le_bytes()); + let err = MvccCommitBatch::decode(&bytes).unwrap_err(); + assert!(format!("{err}").contains("unknown op tag")); + } + + /// A batch larger than `PAGE_SIZE - header` should fail to + /// encode rather than silently truncate. v0 supports up to ~4 + /// KB per transaction; multi-frame batches are a follow-up. + #[test] + fn oversized_batch_encode_errors() { + // Build a batch with one huge text value that would exceed + // PAGE_SIZE. + let big = "x".repeat(PAGE_SIZE); + let batch = MvccCommitBatch { + commit_ts: 1, + records: vec![MvccLogRecord::upsert( + "t", + 1, + vec![("c".into(), Value::Text(big))], + )], + }; + let err = batch.encode().unwrap_err(); + assert!(format!("{err}").contains("exceeds")); + } + + /// Payload preserves declaration order — important for + /// applying back to `Database::tables`. + #[test] + fn column_order_is_preserved() { + let cols = vec![ + ("z".to_string(), Value::Integer(1)), + ("a".to_string(), Value::Integer(2)), + ("m".to_string(), Value::Integer(3)), + ]; + let batch = MvccCommitBatch { + commit_ts: 1, + records: vec![MvccLogRecord::upsert("t", 1, cols.clone())], + }; + let bytes = batch.encode().unwrap(); + let back = MvccCommitBatch::decode(bytes.as_ref()).unwrap(); + if let VersionPayload::Present(decoded_cols) = &back.records[0].payload { + assert_eq!( + decoded_cols + .iter() + .map(|(n, _)| n.as_str()) + .collect::>(), + vec!["z", "a", "m"] + ); + } else { + panic!("expected Present payload"); + } + } +} diff --git a/src/mvcc/mod.rs b/src/mvcc/mod.rs index f160ed4..de5034f 100644 --- a/src/mvcc/mod.rs +++ b/src/mvcc/mod.rs @@ -28,11 +28,13 @@ //! PR) keeps the diffs reviewable. pub mod clock; +pub mod log; pub mod registry; pub mod store; pub mod transaction; pub use clock::MvccClock; +pub use log::{MVCC_BODY_MAGIC, MVCC_FRAME_MARKER, MvccCommitBatch, MvccLogRecord}; pub use registry::{ActiveTxRegistry, TxHandle, TxId, TxTimestampOrId}; pub use store::{MvStore, MvStoreError, RowID, RowVersion, RowVersionChain, VersionPayload}; pub use transaction::ConcurrentTx; diff --git a/src/sql/pager/mod.rs b/src/sql/pager/mod.rs index 2b4e9fd..e9cd3d0 100644 --- a/src/sql/pager/mod.rs +++ b/src/sql/pager/mod.rs @@ -179,11 +179,63 @@ pub fn open_database_with_mode(path: &Path, db_name: String, mode: AccessMode) - } } + // Phase 11.9 — replay any MVCC commit batches recovered from + // the WAL into the freshly-built `MvStore`, and seed the + // `MvccClock` past the highest persisted timestamp. Without + // this step the in-memory MVCC state would always start blank + // on reopen — fine for legacy single-session workloads, but a + // correctness gap once `BEGIN CONCURRENT` is in play (a + // second process could hand out a `begin_ts` below an + // already-committed version's `end`, breaking the visibility + // rule). + // + // The clock seed is the larger of (header.clock_high_water, + // max(commit_ts among replayed batches)) so a crash between + // commits and the next checkpoint — where the header's + // high-water lags reality — still produces a clock that + // doesn't regress. + replay_mvcc_into_db(&mut db, &pager)?; + db.source_path = Some(path.to_path_buf()); db.pager = Some(pager); Ok(db) } +/// Phase 11.9 — drains every MVCC commit batch the Pager recovered +/// from the WAL into `db.mv_store`, and advances `db.mvcc_clock` +/// to at least the highest observed timestamp. +/// +/// Batches are replayed in WAL order, which matches commit order +/// (the WAL appends sequentially). Each record's `commit_ts` +/// becomes the version's `begin`, with the previous latest +/// version's `end` capped at the same timestamp — identical to +/// the live-commit path's `MvStore::push_committed`. +fn replay_mvcc_into_db(db: &mut Database, pager: &Pager) -> Result<()> { + use crate::mvcc::RowVersion; + + let mut clock_seed = pager.clock_high_water(); + for batch in pager.recovered_mvcc_commits() { + if batch.commit_ts > clock_seed { + clock_seed = batch.commit_ts; + } + for rec in &batch.records { + let version = RowVersion::committed(batch.commit_ts, rec.payload.clone()); + db.mv_store + .push_committed(rec.row.clone(), version) + .map_err(|e| { + SQLRiteError::Internal(format!( + "WAL MVCC replay: push_committed failed for {}/{}: {e}", + rec.row.table, rec.row.rowid, + )) + })?; + } + } + if clock_seed > 0 { + db.mvcc_clock.observe(clock_seed); + } + Ok(()) +} + /// Catalog row for a secondary index — deferred until after every table is /// loaded so the index's base table exists by the time we populate it. struct IndexCatalogRow { diff --git a/src/sql/pager/pager.rs b/src/sql/pager/pager.rs index fce20f8..ecde8ad 100644 --- a/src/sql/pager/pager.rs +++ b/src/sql/pager/pager.rs @@ -387,6 +387,70 @@ impl Pager { self.staged.clear(); } + /// Phase 11.9 — appends an MVCC commit-batch frame to the WAL + /// without fsync. The next legacy page-commit's fsync covers it, + /// so callers should follow this with `Pager::commit` (or + /// `pager::save_database`, which calls into it) to seal the + /// transaction. See [`crate::sql::pager::wal::Wal::append_mvcc_batch`] + /// for the durability story. + /// + /// `Connection::commit_concurrent` is the only caller today; it + /// invokes this after validation passes but before the legacy + /// save so a single fsync covers both the MVCC log record and + /// the page-level updates. + pub fn append_mvcc_batch(&mut self, batch: &crate::mvcc::MvccCommitBatch) -> Result<()> { + self.require_writable("append_mvcc_batch")?; + let wal = self + .wal + .as_mut() + .expect("read-write Pager must carry a WAL handle"); + wal.append_mvcc_batch(batch) + } + + /// Phase 11.9 — MVCC commit batches recovered from the WAL at + /// open time, in commit order. Empty for fresh databases, for + /// pre-11.9 (v1 / v2) WALs that carry no MVCC frames, and for + /// read-only opens that didn't replay (those still get the + /// batches if the WAL had any — replay is unconditional in + /// `Wal::open_with_mode`). + /// + /// The caller (`pager::open_database`) drains this into + /// `Database::mv_store` so the conflict-detection window + /// survives a process restart. + pub fn recovered_mvcc_commits(&self) -> &[crate::mvcc::MvccCommitBatch] { + match self.wal.as_ref() { + Some(wal) => wal.recovered_mvcc_commits(), + None => &[], + } + } + + /// Phase 11.9 — returns the persisted MVCC clock high-water + /// from the WAL header, or 0 for in-memory / no-WAL opens. The + /// open path uses this to seed [`crate::mvcc::MvccClock`] so + /// post-reopen transactions don't hand out timestamps below + /// `max(committed_ts)`. + pub fn clock_high_water(&self) -> u64 { + self.wal.as_ref().map(|w| w.clock_high_water()).unwrap_or(0) + } + + /// Phase 11.9 — promotes the WAL header's `clock_high_water` to + /// `value` if it would advance. No-op if `value` is at or below + /// the current high-water mark. Persists the new value into + /// the WAL header (which an fsync at checkpoint will flush). + /// + /// Called by `Connection::commit_concurrent` after each MVCC + /// commit so a crash between commits and the next checkpoint + /// leaves enough of the clock persisted that replay seeds + /// `MvccClock` correctly. + pub fn observe_clock_high_water(&mut self, value: u64) -> Result<()> { + if let Some(wal) = self.wal.as_mut() { + if value > wal.clock_high_water() { + wal.set_clock_high_water(value)?; + } + } + Ok(()) + } + /// Commits all staged pages into the WAL. Only pages whose bytes differ /// from the effective committed state (wal_cache layered on on_disk) /// produce frames. A final commit frame carries the new page 0 (encoded diff --git a/src/sql/pager/wal.rs b/src/sql/pager/wal.rs index 15ca42d..7358445 100644 --- a/src/sql/pager/wal.rs +++ b/src/sql/pager/wal.rs @@ -77,13 +77,22 @@ use crate::sql::pager::pager::{AccessMode, acquire_lock}; pub const WAL_HEADER_SIZE: usize = 32; pub const WAL_MAGIC: &[u8; 8] = b"SQLRWAL\0"; /// The version the engine writes today. Phase 11.2 bumped 1 → 2 to -/// introduce [`WalHeader::clock_high_water`] in bytes 24..32 of the -/// WAL header. `read_header` still accepts v1 files (treating those -/// bytes as zero); the next `truncate` rewrites them as v2. -pub const WAL_FORMAT_VERSION: u32 = 2; +/// Bumped 2 → 3 in Phase 11.9 to mark "may contain MVCC log-record +/// frames" — frames whose `page_num` field carries +/// [`crate::mvcc::MVCC_FRAME_MARKER`] (`u32::MAX`) instead of a +/// real page number. v1 and v2 readers had no special-case for that +/// marker and a pre-Phase-11.9 checkpoint would try to flush it to +/// the main file at offset `u32::MAX * PAGE_SIZE` (way past EOF), +/// which is why the bump is needed. +pub const WAL_FORMAT_VERSION: u32 = 3; /// Lowest format version we know how to open. v1 had the bytes that /// now hold `clock_high_water` reserved-as-zero, which is identical -/// to "clock has never been persisted" and round-trips cleanly. +/// to "clock has never been persisted" and round-trips cleanly. v2 +/// adds the clock_high_water field but no MVCC frames. v3 adds MVCC +/// frames; downgrading a v3 WAL into a v2 reader would mis-handle +/// the MVCC marker page number — but a fresh `truncate` (every +/// checkpoint) rewrites the header at the engine's current version, +/// so the cross-version exposure is bounded. pub const WAL_FORMAT_VERSION_MIN_SUPPORTED: u32 = 1; pub const FRAME_HEADER_SIZE: usize = 16; pub const FRAME_SIZE: usize = FRAME_HEADER_SIZE + PAGE_SIZE; @@ -142,6 +151,13 @@ pub struct Wal { /// Total valid frames (up to and including `last_commit_offset`). /// Used by the checkpointer in Phase 4d to decide whether to run. frame_count: usize, + /// Phase 11.9 — MVCC commit batches recovered from the WAL on + /// open, in the order they were committed (oldest first). + /// Populated by `replay_frames` from frames carrying + /// `page_num = MVCC_FRAME_MARKER`. `Pager::open` exposes the + /// vector so the engine can replay them into `MvStore` on + /// reopen. + recovered_mvcc: Vec, } impl Wal { @@ -172,6 +188,7 @@ impl Wal { last_commit_offset: WAL_HEADER_SIZE as u64, last_commit_page_count: None, frame_count: 0, + recovered_mvcc: Vec::new(), }; wal.write_header()?; wal.file.flush()?; @@ -208,6 +225,7 @@ impl Wal { last_commit_offset: WAL_HEADER_SIZE as u64, last_commit_page_count: None, frame_count: 0, + recovered_mvcc: Vec::new(), }; wal.replay_frames()?; Ok(wal) @@ -345,6 +363,15 @@ impl Wal { self.last_commit_offset = WAL_HEADER_SIZE as u64; self.last_commit_page_count = None; self.frame_count = 0; + // Phase 11.9 — the recovered MVCC batches were a snapshot + // taken at WAL replay time and represent commits the + // current process has now seen. Clearing them on truncate + // matches the legacy `latest_frame.clear()` policy: the + // WAL is now empty on disk, so the in-memory mirror is + // empty too. (The engine still holds the *applied* state + // in `MvStore`; this vector is only the rebuildable-from- + // WAL portion.) + self.recovered_mvcc.clear(); Ok(()) } @@ -420,20 +447,41 @@ impl Wal { /// would shadow the previous committed frame for page N, erasing it /// from visibility. fn replay_frames(&mut self) -> Result<()> { + use crate::mvcc::{MVCC_FRAME_MARKER, MvccCommitBatch}; + let file_len = self.file.seek(SeekFrom::End(0))?; let mut offset = WAL_HEADER_SIZE as u64; let mut pending: HashMap = HashMap::new(); + // Phase 11.9 — MVCC batches waiting for the next commit + // barrier. They share the legacy page-commit barrier's + // fsync, so we promote them at the same `is_commit` point + // page frames are promoted. + let mut pending_mvcc: Vec = Vec::new(); while offset + FRAME_SIZE as u64 <= file_len { match self.read_frame_at(offset) { - Ok((header, _body)) => { + Ok((header, body)) => { self.frame_count += 1; - pending.insert(header.page_num, offset); + if header.page_num == MVCC_FRAME_MARKER { + // MVCC log-record frame. Decode body now — + // if it's corrupt the whole frame falls + // out of the log (treated the same as a + // bad checksum). + match MvccCommitBatch::decode(body.as_ref()) { + Ok(batch) => pending_mvcc.push(batch), + Err(_) => break, + } + } else { + pending.insert(header.page_num, offset); + } if header.is_commit() { - // Seal: promote all pending frames (including - // this commit frame itself) into latest_frame. + // Seal: promote all pending page frames + // (including this commit frame itself) + // into latest_frame, plus all pending + // MVCC batches into recovered_mvcc. for (p, o) in pending.drain() { self.latest_frame.insert(p, o); } + self.recovered_mvcc.extend(pending_mvcc.drain(..)); self.last_commit_offset = offset + FRAME_SIZE as u64; self.last_commit_page_count = Some(header.commit_page_count); } @@ -444,10 +492,41 @@ impl Wal { Err(_) => break, } } - // Anything still in `pending` belongs to a transaction that never - // committed (crash, or a writer that died mid-append). Drop it. + // Anything still in `pending` or `pending_mvcc` belongs to + // a transaction that never committed (crash, or a writer + // that died mid-append). Drop it — the legacy and the + // MVCC writes both fail together, matching the atomicity + // contract. Ok(()) } + + /// Phase 11.9 — appends an MVCC commit batch as a single WAL + /// frame whose `page_num` is set to + /// [`MVCC_FRAME_MARKER`](crate::mvcc::MVCC_FRAME_MARKER). + /// + /// Writes the frame as a "dirty" frame + /// (`commit_page_count = None`) so it doesn't seal a + /// transaction by itself — it piggybacks on the next legacy + /// page-commit frame's fsync. This is the durability boundary + /// for `BEGIN CONCURRENT`: the MVCC batch and the legacy page + /// updates of the same transaction land in the WAL together + /// and either both survive the next fsync or both fall away + /// at the torn-write boundary on reopen. + /// + /// `Connection::commit_concurrent` calls this right before + /// `save_database`, so the same fsync covers both. + pub fn append_mvcc_batch(&mut self, batch: &crate::mvcc::MvccCommitBatch) -> Result<()> { + let body = batch.encode()?; + self.append_frame(crate::mvcc::MVCC_FRAME_MARKER, body.as_ref(), None) + } + + /// Phase 11.9 — returns the MVCC commit batches recovered from + /// the WAL on open, in commit order. Empty if no MVCC frames + /// were present (a brand-new WAL, a pre-11.9 v1/v2 WAL, or one + /// where the last commit barrier dropped MVCC frames). + pub fn recovered_mvcc_commits(&self) -> &[crate::mvcc::MvccCommitBatch] { + &self.recovered_mvcc + } } impl std::fmt::Debug for Wal {