Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/begin-immediate-claim-lifecycle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
'@colony/storage': patch
'@colony/core': patch
---

Fix read-then-write race in claim cleanup paths

`releaseExpiredQuotaClaims` and `bulkRescueStrandedSessions` previously read
eligible claims outside their DEFERRED transaction, allowing two concurrent
callers to both snapshot the same rows and each emit a duplicate
`claim-weakened` or `rescue-stranded` audit observation.

The fix moves the claim read inside a `BEGIN IMMEDIATE` transaction on both
paths so the write lock is acquired before any row is inspected. The storage
`transaction()` helper gains an `{ immediate: true }` option that maps to
better-sqlite3's `.immediate()` mode. A new idempotency test confirms that
calling each cleanup path twice produces exactly one audit observation.
27 changes: 22 additions & 5 deletions packages/core/src/stranded-rescue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,18 @@ export function bulkRescueStrandedSessions(
outcome.stranded.push(row);
if (dryRun) continue;

// BEGIN IMMEDIATE so the re-read of claims inside the transaction and all
// subsequent deletes/writes are atomic across processes. Two concurrent
// rescue callers could otherwise both read the same held claims outside
// the transaction and then both attempt to release and audit them.
const audit_observation_id = store.storage.transaction(() => {
for (const claim of claims) {
// Re-read claims inside the transaction so the set we release matches
// exactly what is visible under the write lock — guards against a
// concurrent caller having already released some of them between the
// outer read and this point.
const liveClaims = heldClaimsForCandidate(store, candidate);
if (liveClaims.length === 0) return -1;
for (const claim of liveClaims) {
store.storage.releaseClaim({
task_id: claim.task_id,
file_path: claim.file_path,
Expand All @@ -364,7 +374,7 @@ export function bulkRescueStrandedSessions(
const auditId = store.addObservation({
session_id,
kind: 'rescue-stranded',
content: `Bulk rescue released ${claims.length} claim(s) for stranded session ${session_id}; audit history retained.`,
content: `Bulk rescue released ${liveClaims.length} claim(s) for stranded session ${session_id}; audit history retained.`,
metadata: {
kind: 'rescue-stranded',
action: 'bulk-release-claims',
Expand All @@ -376,8 +386,8 @@ export function bulkRescueStrandedSessions(
branches: row.branches,
task_ids: row.task_ids,
last_activity: row.last_activity,
held_claim_count: row.held_claim_count,
released_claims: claims.map((claim) => ({
held_claim_count: liveClaims.length,
released_claims: liveClaims.map((claim) => ({
task_id: claim.task_id,
file_path: claim.file_path,
claimed_at: claim.claimed_at,
Expand All @@ -386,7 +396,14 @@ export function bulkRescueStrandedSessions(
});
store.storage.endSession(session_id, now);
return auditId;
});
}, { immediate: true });

// -1 means another concurrent caller already released the claims before
// this transaction acquired the write lock — skip rather than double-count.
if (audit_observation_id === -1) {
outcome.skipped.push({ session_id, reason: 'claims already released by concurrent caller' });
continue;
}

const rescued = {
...row,
Expand Down
30 changes: 17 additions & 13 deletions packages/core/src/task-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1746,20 +1746,24 @@ export class TaskThread {
this.assertTaskExists();
this.assertParticipant(args.session_id);
const normalizedFilePath = this.normalizeOptionalClaimPath(args.file_path);
const claims = this.claims().filter((claim) => {
if (claim.state !== 'handoff_pending') return false;
if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false;
if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false;
if (
args.handoff_observation_id !== undefined &&
claim.handoff_observation_id !== args.handoff_observation_id
) {
return false;
}
return true;
});

// BEGIN IMMEDIATE so that the read of expired claims and all subsequent
// writes are serialized across processes. Without IMMEDIATE, two concurrent
// cleanup callers both read the same expired claims in DEFERRED mode, then
// both attempt to write — producing duplicate claim-weakened observations.
return this.store.storage.transaction(() => {
const claims = this.claims().filter((claim) => {
if (claim.state !== 'handoff_pending') return false;
if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false;
if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false;
if (
args.handoff_observation_id !== undefined &&
claim.handoff_observation_id !== args.handoff_observation_id
) {
return false;
}
return true;
});
const audit_observation_ids: number[] = [];
const seenBatons = new Set<number>();
for (const claim of claims) {
Expand Down Expand Up @@ -1821,7 +1825,7 @@ export class TaskThread {
})),
audit_observation_ids,
};
});
}, { immediate: true });
}

private assertTaskExists(): void {
Expand Down
264 changes: 264 additions & 0 deletions packages/core/test/claim-lifecycle-concurrency.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/**
* Concurrency-safety tests for the claim cleanup paths:
* - releaseExpiredQuotaClaims (TaskThread)
* - bulkRescueStrandedSessions (stranded-rescue)
*
* Both paths were previously structured as read-outside / write-inside DEFERRED
* transactions, which allowed two callers to both read the same expired/stranded
* claims and then both emit audit observations. The fix moves the read inside a
* BEGIN IMMEDIATE transaction, so the second caller sees an already-processed
* state and produces no duplicate observations.
*
* Within a single Node process better-sqlite3 is synchronous, so we can't
* trigger a true cross-process write-lock race here. What these tests verify
* is idempotency: calling the cleanup twice against unchanged data must not
* produce duplicate audit records. That's the observable invariant that the
* IMMEDIATE fix preserves.
*/
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { defaultSettings } from '@colony/config';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemoryStore } from '../src/memory-store.js';
import { bulkRescueStrandedSessions } from '../src/stranded-rescue.js';
import { TaskThread } from '../src/task-thread.js';

// ---------------------------------------------------------------------------
// Hivemind mock — bulkRescueStrandedSessions checks whether the session is
// live before touching it. We make the stranded session appear live so the
// rescue path proceeds.
// ---------------------------------------------------------------------------
const hivemind = vi.hoisted(() => ({
sessions: [] as Array<{
source: 'active-session';
activity: 'working' | 'thinking' | 'idle' | 'stalled';
session_key: string;
file_path: string;
worktree_path: string;
}>,
}));

vi.mock('../src/hivemind.js', () => ({
readHivemind: () => ({
generated_at: new Date(0).toISOString(),
repo_roots: ['/repo'],
session_count: hivemind.sessions.length,
counts: {},
sessions: hivemind.sessions,
}),
}));

let dir: string;
let store: MemoryStore;

beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), 'colony-claim-lifecycle-concurrency-'));
store = new MemoryStore({ dbPath: join(dir, 'data.db'), settings: defaultSettings });
hivemind.sessions = [];
});

afterEach(() => {
store.close();
rmSync(dir, { recursive: true, force: true });
});

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

function seedSession(id: string): void {
store.startSession({ id, ide: 'claude-code', cwd: '/repo' });
}

/**
* Seeds a task with a quota-exhausted handoff so the owner's claim becomes
* `handoff_pending` with a finite `expires_at`. We then call
* `releaseExpiredQuotaClaims` with a `now` value far in the future so the
* claim is treated as expired without needing to reach into the private `db`.
*/
function seedTaskWithPendingHandoffClaim(
ownerSessionId: string,
callerSessionId: string,
): { thread: TaskThread; handoffId: number } {
const thread = TaskThread.open(store, {
repo_root: '/repo',
branch: 'feat/quota-test',
session_id: ownerSessionId,
});
thread.join(ownerSessionId, 'claude');
thread.join(callerSessionId, 'codex');
thread.claimFile({ session_id: ownerSessionId, file_path: 'src/core.ts' });

const handoffId = thread.handOff({
from_session_id: ownerSessionId,
from_agent: 'claude',
to_agent: 'any',
summary: 'quota exhausted',
next_steps: ['take over'],
reason: 'quota_exhausted',
runtime_status: 'blocked_by_runtime_limit',
quota_context: {
agent: 'claude',
session_id: ownerSessionId,
repo_root: '/repo',
branch: 'feat/quota-test',
worktree_path: '/repo',
task_id: thread.task_id,
claimed_files: ['src/core.ts'],
dirty_files: [],
last_command: 'pnpm test',
last_tool: 'Bash',
last_verification: { command: 'pnpm test', result: 'blocked' },
suggested_next_step: 'accept handoff',
// Short TTL — claim will be expired when we pass now = Date.now() + FAR_FUTURE.
handoff_ttl_ms: 1,
},
});

return { thread, handoffId };
}

// A `now` value guaranteed to be past any handoff TTL set in this test.
const FAR_FUTURE = Date.now() + 100 * 24 * 60 * 60_000;

// ---------------------------------------------------------------------------
// releaseExpiredQuotaClaims — idempotency under repeated calls
// ---------------------------------------------------------------------------

describe('releaseExpiredQuotaClaims idempotency', () => {
it('emits exactly one claim-weakened observation even when called twice', () => {
const owner = 'claude-quota-owner';
const caller = 'codex-quota-caller';
seedSession(owner);
seedSession(caller);

const { thread, handoffId } = seedTaskWithPendingHandoffClaim(owner, caller);

// First call — should release the expired claim and emit one audit obs.
// We pass FAR_FUTURE as `now` so the handoff TTL is guaranteed to be past.
const result1 = thread.releaseExpiredQuotaClaims({
session_id: caller,
now: FAR_FUTURE,
});
expect(result1.status).toBe('released_expired');
expect(result1.released_claims).toHaveLength(1);
expect(result1.released_claims[0]?.file_path).toBe('src/core.ts');

const weakenedAfterFirst = store.storage
.taskObservationsByKind(thread.task_id, 'claim-weakened')
.filter((row) => {
const meta = JSON.parse(row.metadata ?? '{}') as {
handoff_observation_id?: number;
reason?: string;
};
return (
meta.reason === 'quota_pending_expired' &&
meta.handoff_observation_id === handoffId
);
});
expect(weakenedAfterFirst).toHaveLength(1);

// Second call — claim is now weak_expired so the IMMEDIATE transaction's
// re-read finds nothing eligible and emits no additional observations.
const result2 = thread.releaseExpiredQuotaClaims({
session_id: caller,
now: FAR_FUTURE,
});
expect(result2.status).toBe('released_expired');
expect(result2.released_claims).toHaveLength(0);

// Still exactly one claim-weakened — no duplicate emitted.
const weakenedAfterSecond = store.storage
.taskObservationsByKind(thread.task_id, 'claim-weakened')
.filter((row) => {
const meta = JSON.parse(row.metadata ?? '{}') as {
handoff_observation_id?: number;
reason?: string;
};
return (
meta.reason === 'quota_pending_expired' &&
meta.handoff_observation_id === handoffId
);
});
expect(weakenedAfterSecond).toHaveLength(1);
});
});

// ---------------------------------------------------------------------------
// bulkRescueStrandedSessions — idempotency under repeated calls
// ---------------------------------------------------------------------------

type StrandedCandidate = {
session_id: string;
repo_root: string;
worktree_path: string;
last_observation_ts?: number;
};

function configureStrandedStorage(candidates: StrandedCandidate[]): void {
(
store.storage as typeof store.storage & {
findStrandedSessions: (args: { stranded_after_ms: number }) => StrandedCandidate[];
}
).findStrandedSessions = () => candidates;
}

describe('bulkRescueStrandedSessions idempotency', () => {
it('emits exactly one rescue-stranded observation even when called twice', () => {
const stranded = 'codex-stranded-session';
seedSession(stranded);

// Seed a task with a claim held by the stranded session.
const owner = 'claude-owner';
seedSession(owner);
const thread = TaskThread.open(store, {
repo_root: '/repo',
branch: 'feat/rescue-test',
session_id: owner,
});
thread.join(owner, 'claude');
thread.join(stranded, 'codex');
thread.claimFile({ session_id: stranded, file_path: 'src/stranded.ts' });

// Add an observation so the session appears active in colony memory.
store.addObservation({
session_id: stranded,
kind: 'note',
task_id: thread.task_id,
content: 'working on stranded feature',
});

configureStrandedStorage([
{
session_id: stranded,
repo_root: '/repo',
worktree_path: '/repo',
last_observation_ts: Date.now() - 20 * 60_000,
},
]);

// First call — rescue should release the claim and emit one audit obs.
const result1 = bulkRescueStrandedSessions(store, { dry_run: false });
expect(result1.rescued).toHaveLength(1);
expect(result1.rescued[0]?.session_id).toBe(stranded);
expect(result1.released_claim_count).toBe(1);

const rescueObsAfterFirst = store.storage
.timeline(stranded, undefined, 50)
.filter((row) => row.kind === 'rescue-stranded');
expect(rescueObsAfterFirst).toHaveLength(1);

// Second call — claim is already released; transaction sees no live claims.
const result2 = bulkRescueStrandedSessions(store, { dry_run: false });
// The session is reported as skipped (concurrent-already-released).
const skippedOrRescuedCount = result2.skipped.length + result2.rescued.length;
expect(skippedOrRescuedCount).toBeGreaterThan(0);

// Still exactly one rescue-stranded observation — no duplicate emitted.
const rescueObsAfterSecond = store.storage
.timeline(stranded, undefined, 50)
.filter((row) => row.kind === 'rescue-stranded');
expect(rescueObsAfterSecond).toHaveLength(1);
});
});
Loading
Loading