From 6f0d2eadfb41e6b9a7bf21a1d49f72d61640326c Mon Sep 17 00:00:00 2001 From: NagyVikt Date: Fri, 8 May 2026 19:46:37 +0200 Subject: [PATCH] fix(storage,core): use BEGIN IMMEDIATE for claim cleanup reads Move the expired-claim read inside the transaction in releaseExpiredQuotaClaims and bulkRescueStrandedSessions. Add { immediate: true } option to Storage.transaction() so both cleanup callers acquire the write lock before reading, preventing duplicate claim-weakened/rescue-stranded audit observations. Idempotency test covers both paths. Co-Authored-By: Claude Sonnet 4.6 --- .changeset/begin-immediate-claim-lifecycle.md | 17 ++ packages/core/src/stranded-rescue.ts | 27 +- packages/core/src/task-thread.ts | 30 +- .../test/claim-lifecycle-concurrency.test.ts | 264 ++++++++++++++++++ packages/storage/src/storage.ts | 15 +- 5 files changed, 332 insertions(+), 21 deletions(-) create mode 100644 .changeset/begin-immediate-claim-lifecycle.md create mode 100644 packages/core/test/claim-lifecycle-concurrency.test.ts diff --git a/.changeset/begin-immediate-claim-lifecycle.md b/.changeset/begin-immediate-claim-lifecycle.md new file mode 100644 index 00000000..28dcc1f3 --- /dev/null +++ b/.changeset/begin-immediate-claim-lifecycle.md @@ -0,0 +1,17 @@ +--- +'@colony/storage': patch +'@colony/core': patch +--- + +Fix read-then-write race in claim cleanup paths + +`releaseExpiredQuotaClaims` and `bulkRescueStrandedSessions` previously read +eligible claims outside their DEFERRED transaction, allowing two concurrent +callers to both snapshot the same rows and each emit a duplicate +`claim-weakened` or `rescue-stranded` audit observation. + +The fix moves the claim read inside a `BEGIN IMMEDIATE` transaction on both +paths so the write lock is acquired before any row is inspected. The storage +`transaction()` helper gains an `{ immediate: true }` option that maps to +better-sqlite3's `.immediate()` mode. A new idempotency test confirms that +calling each cleanup path twice produces exactly one audit observation. diff --git a/packages/core/src/stranded-rescue.ts b/packages/core/src/stranded-rescue.ts index 8985635e..5d520ff5 100644 --- a/packages/core/src/stranded-rescue.ts +++ b/packages/core/src/stranded-rescue.ts @@ -353,8 +353,18 @@ export function bulkRescueStrandedSessions( outcome.stranded.push(row); if (dryRun) continue; + // BEGIN IMMEDIATE so the re-read of claims inside the transaction and all + // subsequent deletes/writes are atomic across processes. Two concurrent + // rescue callers could otherwise both read the same held claims outside + // the transaction and then both attempt to release and audit them. const audit_observation_id = store.storage.transaction(() => { - for (const claim of claims) { + // Re-read claims inside the transaction so the set we release matches + // exactly what is visible under the write lock — guards against a + // concurrent caller having already released some of them between the + // outer read and this point. + const liveClaims = heldClaimsForCandidate(store, candidate); + if (liveClaims.length === 0) return -1; + for (const claim of liveClaims) { store.storage.releaseClaim({ task_id: claim.task_id, file_path: claim.file_path, @@ -364,7 +374,7 @@ export function bulkRescueStrandedSessions( const auditId = store.addObservation({ session_id, kind: 'rescue-stranded', - content: `Bulk rescue released ${claims.length} claim(s) for stranded session ${session_id}; audit history retained.`, + content: `Bulk rescue released ${liveClaims.length} claim(s) for stranded session ${session_id}; audit history retained.`, metadata: { kind: 'rescue-stranded', action: 'bulk-release-claims', @@ -376,8 +386,8 @@ export function bulkRescueStrandedSessions( branches: row.branches, task_ids: row.task_ids, last_activity: row.last_activity, - held_claim_count: row.held_claim_count, - released_claims: claims.map((claim) => ({ + held_claim_count: liveClaims.length, + released_claims: liveClaims.map((claim) => ({ task_id: claim.task_id, file_path: claim.file_path, claimed_at: claim.claimed_at, @@ -386,7 +396,14 @@ export function bulkRescueStrandedSessions( }); store.storage.endSession(session_id, now); return auditId; - }); + }, { immediate: true }); + + // -1 means another concurrent caller already released the claims before + // this transaction acquired the write lock — skip rather than double-count. + if (audit_observation_id === -1) { + outcome.skipped.push({ session_id, reason: 'claims already released by concurrent caller' }); + continue; + } const rescued = { ...row, diff --git a/packages/core/src/task-thread.ts b/packages/core/src/task-thread.ts index d2f78a41..435194aa 100644 --- a/packages/core/src/task-thread.ts +++ b/packages/core/src/task-thread.ts @@ -1746,20 +1746,24 @@ export class TaskThread { this.assertTaskExists(); this.assertParticipant(args.session_id); const normalizedFilePath = this.normalizeOptionalClaimPath(args.file_path); - const claims = this.claims().filter((claim) => { - if (claim.state !== 'handoff_pending') return false; - if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false; - if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false; - if ( - args.handoff_observation_id !== undefined && - claim.handoff_observation_id !== args.handoff_observation_id - ) { - return false; - } - return true; - }); + // BEGIN IMMEDIATE so that the read of expired claims and all subsequent + // writes are serialized across processes. Without IMMEDIATE, two concurrent + // cleanup callers both read the same expired claims in DEFERRED mode, then + // both attempt to write — producing duplicate claim-weakened observations. return this.store.storage.transaction(() => { + const claims = this.claims().filter((claim) => { + if (claim.state !== 'handoff_pending') return false; + if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false; + if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false; + if ( + args.handoff_observation_id !== undefined && + claim.handoff_observation_id !== args.handoff_observation_id + ) { + return false; + } + return true; + }); const audit_observation_ids: number[] = []; const seenBatons = new Set(); for (const claim of claims) { @@ -1821,7 +1825,7 @@ export class TaskThread { })), audit_observation_ids, }; - }); + }, { immediate: true }); } private assertTaskExists(): void { diff --git a/packages/core/test/claim-lifecycle-concurrency.test.ts b/packages/core/test/claim-lifecycle-concurrency.test.ts new file mode 100644 index 00000000..e773ea7e --- /dev/null +++ b/packages/core/test/claim-lifecycle-concurrency.test.ts @@ -0,0 +1,264 @@ +/** + * Concurrency-safety tests for the claim cleanup paths: + * - releaseExpiredQuotaClaims (TaskThread) + * - bulkRescueStrandedSessions (stranded-rescue) + * + * Both paths were previously structured as read-outside / write-inside DEFERRED + * transactions, which allowed two callers to both read the same expired/stranded + * claims and then both emit audit observations. The fix moves the read inside a + * BEGIN IMMEDIATE transaction, so the second caller sees an already-processed + * state and produces no duplicate observations. + * + * Within a single Node process better-sqlite3 is synchronous, so we can't + * trigger a true cross-process write-lock race here. What these tests verify + * is idempotency: calling the cleanup twice against unchanged data must not + * produce duplicate audit records. That's the observable invariant that the + * IMMEDIATE fix preserves. + */ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { defaultSettings } from '@colony/config'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { MemoryStore } from '../src/memory-store.js'; +import { bulkRescueStrandedSessions } from '../src/stranded-rescue.js'; +import { TaskThread } from '../src/task-thread.js'; + +// --------------------------------------------------------------------------- +// Hivemind mock — bulkRescueStrandedSessions checks whether the session is +// live before touching it. We make the stranded session appear live so the +// rescue path proceeds. +// --------------------------------------------------------------------------- +const hivemind = vi.hoisted(() => ({ + sessions: [] as Array<{ + source: 'active-session'; + activity: 'working' | 'thinking' | 'idle' | 'stalled'; + session_key: string; + file_path: string; + worktree_path: string; + }>, +})); + +vi.mock('../src/hivemind.js', () => ({ + readHivemind: () => ({ + generated_at: new Date(0).toISOString(), + repo_roots: ['/repo'], + session_count: hivemind.sessions.length, + counts: {}, + sessions: hivemind.sessions, + }), +})); + +let dir: string; +let store: MemoryStore; + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), 'colony-claim-lifecycle-concurrency-')); + store = new MemoryStore({ dbPath: join(dir, 'data.db'), settings: defaultSettings }); + hivemind.sessions = []; +}); + +afterEach(() => { + store.close(); + rmSync(dir, { recursive: true, force: true }); +}); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function seedSession(id: string): void { + store.startSession({ id, ide: 'claude-code', cwd: '/repo' }); +} + +/** + * Seeds a task with a quota-exhausted handoff so the owner's claim becomes + * `handoff_pending` with a finite `expires_at`. We then call + * `releaseExpiredQuotaClaims` with a `now` value far in the future so the + * claim is treated as expired without needing to reach into the private `db`. + */ +function seedTaskWithPendingHandoffClaim( + ownerSessionId: string, + callerSessionId: string, +): { thread: TaskThread; handoffId: number } { + const thread = TaskThread.open(store, { + repo_root: '/repo', + branch: 'feat/quota-test', + session_id: ownerSessionId, + }); + thread.join(ownerSessionId, 'claude'); + thread.join(callerSessionId, 'codex'); + thread.claimFile({ session_id: ownerSessionId, file_path: 'src/core.ts' }); + + const handoffId = thread.handOff({ + from_session_id: ownerSessionId, + from_agent: 'claude', + to_agent: 'any', + summary: 'quota exhausted', + next_steps: ['take over'], + reason: 'quota_exhausted', + runtime_status: 'blocked_by_runtime_limit', + quota_context: { + agent: 'claude', + session_id: ownerSessionId, + repo_root: '/repo', + branch: 'feat/quota-test', + worktree_path: '/repo', + task_id: thread.task_id, + claimed_files: ['src/core.ts'], + dirty_files: [], + last_command: 'pnpm test', + last_tool: 'Bash', + last_verification: { command: 'pnpm test', result: 'blocked' }, + suggested_next_step: 'accept handoff', + // Short TTL — claim will be expired when we pass now = Date.now() + FAR_FUTURE. + handoff_ttl_ms: 1, + }, + }); + + return { thread, handoffId }; +} + +// A `now` value guaranteed to be past any handoff TTL set in this test. +const FAR_FUTURE = Date.now() + 100 * 24 * 60 * 60_000; + +// --------------------------------------------------------------------------- +// releaseExpiredQuotaClaims — idempotency under repeated calls +// --------------------------------------------------------------------------- + +describe('releaseExpiredQuotaClaims idempotency', () => { + it('emits exactly one claim-weakened observation even when called twice', () => { + const owner = 'claude-quota-owner'; + const caller = 'codex-quota-caller'; + seedSession(owner); + seedSession(caller); + + const { thread, handoffId } = seedTaskWithPendingHandoffClaim(owner, caller); + + // First call — should release the expired claim and emit one audit obs. + // We pass FAR_FUTURE as `now` so the handoff TTL is guaranteed to be past. + const result1 = thread.releaseExpiredQuotaClaims({ + session_id: caller, + now: FAR_FUTURE, + }); + expect(result1.status).toBe('released_expired'); + expect(result1.released_claims).toHaveLength(1); + expect(result1.released_claims[0]?.file_path).toBe('src/core.ts'); + + const weakenedAfterFirst = store.storage + .taskObservationsByKind(thread.task_id, 'claim-weakened') + .filter((row) => { + const meta = JSON.parse(row.metadata ?? '{}') as { + handoff_observation_id?: number; + reason?: string; + }; + return ( + meta.reason === 'quota_pending_expired' && + meta.handoff_observation_id === handoffId + ); + }); + expect(weakenedAfterFirst).toHaveLength(1); + + // Second call — claim is now weak_expired so the IMMEDIATE transaction's + // re-read finds nothing eligible and emits no additional observations. + const result2 = thread.releaseExpiredQuotaClaims({ + session_id: caller, + now: FAR_FUTURE, + }); + expect(result2.status).toBe('released_expired'); + expect(result2.released_claims).toHaveLength(0); + + // Still exactly one claim-weakened — no duplicate emitted. + const weakenedAfterSecond = store.storage + .taskObservationsByKind(thread.task_id, 'claim-weakened') + .filter((row) => { + const meta = JSON.parse(row.metadata ?? '{}') as { + handoff_observation_id?: number; + reason?: string; + }; + return ( + meta.reason === 'quota_pending_expired' && + meta.handoff_observation_id === handoffId + ); + }); + expect(weakenedAfterSecond).toHaveLength(1); + }); +}); + +// --------------------------------------------------------------------------- +// bulkRescueStrandedSessions — idempotency under repeated calls +// --------------------------------------------------------------------------- + +type StrandedCandidate = { + session_id: string; + repo_root: string; + worktree_path: string; + last_observation_ts?: number; +}; + +function configureStrandedStorage(candidates: StrandedCandidate[]): void { + ( + store.storage as typeof store.storage & { + findStrandedSessions: (args: { stranded_after_ms: number }) => StrandedCandidate[]; + } + ).findStrandedSessions = () => candidates; +} + +describe('bulkRescueStrandedSessions idempotency', () => { + it('emits exactly one rescue-stranded observation even when called twice', () => { + const stranded = 'codex-stranded-session'; + seedSession(stranded); + + // Seed a task with a claim held by the stranded session. + const owner = 'claude-owner'; + seedSession(owner); + const thread = TaskThread.open(store, { + repo_root: '/repo', + branch: 'feat/rescue-test', + session_id: owner, + }); + thread.join(owner, 'claude'); + thread.join(stranded, 'codex'); + thread.claimFile({ session_id: stranded, file_path: 'src/stranded.ts' }); + + // Add an observation so the session appears active in colony memory. + store.addObservation({ + session_id: stranded, + kind: 'note', + task_id: thread.task_id, + content: 'working on stranded feature', + }); + + configureStrandedStorage([ + { + session_id: stranded, + repo_root: '/repo', + worktree_path: '/repo', + last_observation_ts: Date.now() - 20 * 60_000, + }, + ]); + + // First call — rescue should release the claim and emit one audit obs. + const result1 = bulkRescueStrandedSessions(store, { dry_run: false }); + expect(result1.rescued).toHaveLength(1); + expect(result1.rescued[0]?.session_id).toBe(stranded); + expect(result1.released_claim_count).toBe(1); + + const rescueObsAfterFirst = store.storage + .timeline(stranded, undefined, 50) + .filter((row) => row.kind === 'rescue-stranded'); + expect(rescueObsAfterFirst).toHaveLength(1); + + // Second call — claim is already released; transaction sees no live claims. + const result2 = bulkRescueStrandedSessions(store, { dry_run: false }); + // The session is reported as skipped (concurrent-already-released). + const skippedOrRescuedCount = result2.skipped.length + result2.rescued.length; + expect(skippedOrRescuedCount).toBeGreaterThan(0); + + // Still exactly one rescue-stranded observation — no duplicate emitted. + const rescueObsAfterSecond = store.storage + .timeline(stranded, undefined, 50) + .filter((row) => row.kind === 'rescue-stranded'); + expect(rescueObsAfterSecond).toHaveLength(1); + }); +}); diff --git a/packages/storage/src/storage.ts b/packages/storage/src/storage.ts index db826806..aad10a3e 100644 --- a/packages/storage/src/storage.ts +++ b/packages/storage/src/storage.ts @@ -2399,9 +2399,18 @@ export class Storage { return row.t ?? 0; } - /** Run a function inside a SQLite transaction. All-or-nothing. */ - transaction(fn: () => T): T { - return this.db.transaction(fn)(); + /** + * Run a function inside a SQLite transaction. All-or-nothing. + * + * Pass `{ immediate: true }` to use BEGIN IMMEDIATE instead of the default + * BEGIN DEFERRED. IMMEDIATE acquires the write lock at transaction start, + * which prevents read-then-write races when two callers both read the same + * rows and then try to modify them (e.g. claim cleanup loops running in + * parallel processes). + */ + transaction(fn: () => T, options?: { immediate?: boolean }): T { + const txFn = this.db.transaction(fn); + return options?.immediate ? txFn.immediate() : txFn(); } // --- foraging food sources (indexed /examples/) ---