From 8209058e6489088b2f25d8528a762b7e4780c32e Mon Sep 17 00:00:00 2001 From: Christopher Felegy Date: Mon, 11 May 2026 16:27:01 -0700 Subject: [PATCH 1/2] feat: listWorkflowRuns status filter --- packages/openworkflow/core/backend.ts | 4 +- packages/openworkflow/postgres/backend.ts | 4 + packages/openworkflow/sqlite/backend.ts | 11 +- .../openworkflow/testing/backend.testsuite.ts | 102 ++++++++++++++++++ 4 files changed, 118 insertions(+), 3 deletions(-) diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 9edf79be..e7abf4fb 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -95,7 +95,9 @@ export interface GetWorkflowRunParams { workflowRunId: string; } -export type ListWorkflowRunsParams = PaginationOptions; +export interface ListWorkflowRunsParams extends PaginationOptions { + status?: WorkflowRunStatus; +} export interface ClaimWorkflowRunParams { workerId: string; diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index cf8188f0..38fc1ea0 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -408,6 +408,10 @@ export class BackendPostgres implements Backend { const { after } = params; const conditions = [this.pg`"namespace_id" = ${this.namespaceId}`]; + if (params.status) { + conditions.push(this.pg`"status" = ${params.status}`); + } + if (cursor) { const op = after ? this.pg`<` : this.pg`>`; conditions.push( diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 182d42e5..6f71c259 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -847,11 +847,18 @@ export class BackendSqlite implements Backend { listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { + const baseWhere = params.status + ? `"namespace_id" = ? AND "status" = ?` + : `"namespace_id" = ?`; + const baseParams: readonly unknown[] = params.status + ? [this.namespaceId, params.status] + : [this.namespaceId]; + return this.listPaginated(params, { table: "workflow_runs", naturalOrder: "DESC", - baseWhere: `"namespace_id" = ?`, - baseParams: [this.namespaceId], + baseWhere, + baseParams, mapRow: (row) => rowToWorkflowRun(row as WorkflowRunRow), }); } diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index d3e7372b..6ba49733 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -705,6 +705,108 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + + test("filters workflow runs by status", async () => { + const backend = await setup(); + + const pendingRun = await createPendingWorkflowRun(backend); + + const completedRun = await createClaimedWorkflowRun(backend); + const completedWorkerId = completedRun.workerId; + if (!completedWorkerId) throw new Error("Expected workerId"); + await backend.completeWorkflowRun({ + workflowRunId: completedRun.id, + workerId: completedWorkerId, + output: null, + }); + + const failedRun = await createClaimedWorkflowRun(backend); + const failedWorkerId = failedRun.workerId; + if (!failedWorkerId) throw new Error("Expected workerId"); + await backend.failWorkflowRun({ + workflowRunId: failedRun.id, + workerId: failedWorkerId, + error: { message: "failed run" }, + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 1, + }, + }); + + const pendingList = await backend.listWorkflowRuns({ + status: "pending", + }); + expect(pendingList.data.map((r: WorkflowRun) => r.id)).toEqual([ + pendingRun.id, + ]); + + const completedList = await backend.listWorkflowRuns({ + status: "completed", + }); + expect(completedList.data.map((r: WorkflowRun) => r.id)).toEqual([ + completedRun.id, + ]); + + const failedList = await backend.listWorkflowRuns({ + status: "failed", + }); + expect(failedList.data.map((r: WorkflowRun) => r.id)).toEqual([ + failedRun.id, + ]); + + const canceledList = await backend.listWorkflowRuns({ + status: "canceled", + }); + expect(canceledList.data).toHaveLength(0); + + await teardown(backend); + }); + + test("paginates workflow runs with a status filter", async () => { + const backend = await setup(); + + const failedRuns: WorkflowRun[] = []; + for (let i = 0; i < 3; i++) { + const claimed = await createClaimedWorkflowRun(backend); + const workerId = claimed.workerId; + if (!workerId) throw new Error("Expected workerId"); + const failed = await backend.failWorkflowRun({ + workflowRunId: claimed.id, + workerId, + error: { message: "failed run" }, + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 1, + }, + }); + failedRuns.push(failed); + await sleep(10); // ensure timestamp difference + } + + await createPendingWorkflowRun(backend); + await sleep(10); + await createPendingWorkflowRun(backend); + + const page1 = await backend.listWorkflowRuns({ + status: "failed", + limit: 2, + }); + expect(page1.data).toHaveLength(2); + expect(page1.data[0]?.id).toBe(failedRuns[2]?.id); + expect(page1.data[1]?.id).toBe(failedRuns[1]?.id); + expect(page1.pagination.next).not.toBeNull(); + + const page2 = await backend.listWorkflowRuns({ + status: "failed", + limit: 2, + after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page2.data).toHaveLength(1); + expect(page2.data[0]?.id).toBe(failedRuns[0]?.id); + expect(page2.pagination.next).toBeNull(); + + await teardown(backend); + }); }); describe("countWorkflowRuns()", () => { From 0ee5177fc5b8c18d821543fcb724cb610285c263 Mon Sep 17 00:00:00 2001 From: Christopher Felegy Date: Tue, 12 May 2026 09:47:25 -0700 Subject: [PATCH 2/2] feat: allow filter on workflow name --- packages/openworkflow/core/backend.ts | 1 + packages/openworkflow/postgres/backend.ts | 4 + packages/openworkflow/sqlite/backend.ts | 21 +-- .../openworkflow/testing/backend.testsuite.ts | 135 +++++++++++++++++- 4 files changed, 151 insertions(+), 10 deletions(-) diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index e7abf4fb..efc4b07f 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -97,6 +97,7 @@ export interface GetWorkflowRunParams { export interface ListWorkflowRunsParams extends PaginationOptions { status?: WorkflowRunStatus; + workflowName?: string; } export interface ClaimWorkflowRunParams { diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index 38fc1ea0..42b05917 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -412,6 +412,10 @@ export class BackendPostgres implements Backend { conditions.push(this.pg`"status" = ${params.status}`); } + if (params.workflowName) { + conditions.push(this.pg`"workflow_name" = ${params.workflowName}`); + } + if (cursor) { const op = after ? this.pg`<` : this.pg`>`; conditions.push( diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 6f71c259..bf8bfe2f 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -847,18 +847,23 @@ export class BackendSqlite implements Backend { listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { - const baseWhere = params.status - ? `"namespace_id" = ? AND "status" = ?` - : `"namespace_id" = ?`; - const baseParams: readonly unknown[] = params.status - ? [this.namespaceId, params.status] - : [this.namespaceId]; + const conditions: string[] = [`"namespace_id" = ?`]; + const values: unknown[] = [this.namespaceId]; + + if (params.status) { + conditions.push(`"status" = ?`); + values.push(params.status); + } + if (params.workflowName) { + conditions.push(`"workflow_name" = ?`); + values.push(params.workflowName); + } return this.listPaginated(params, { table: "workflow_runs", naturalOrder: "DESC", - baseWhere, - baseParams, + baseWhere: conditions.join(" AND "), + baseParams: values, mapRow: (row) => rowToWorkflowRun(row as WorkflowRunRow), }); } diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index 6ba49733..21fcc148 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -709,8 +709,8 @@ export function testBackend(options: TestBackendOptions): void { test("filters workflow runs by status", async () => { const backend = await setup(); - const pendingRun = await createPendingWorkflowRun(backend); - + // Drive runs to completed/failed first; while no other pending runs + // exist, `createClaimedWorkflowRun` claims the run it just created. const completedRun = await createClaimedWorkflowRun(backend); const completedWorkerId = completedRun.workerId; if (!completedWorkerId) throw new Error("Expected workerId"); @@ -733,6 +733,9 @@ export function testBackend(options: TestBackendOptions): void { }, }); + // Add the pending run last so nothing else can claim it. + const pendingRun = await createPendingWorkflowRun(backend); + const pendingList = await backend.listWorkflowRuns({ status: "pending", }); @@ -807,6 +810,109 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + + test("filters workflow runs by workflow name", async () => { + const backend = await setup(); + + const orderProcessor = await backend.createWorkflowRun({ + workflowName: "order-processor", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + await backend.createWorkflowRun({ + workflowName: "email-sender", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + const filtered = await backend.listWorkflowRuns({ + workflowName: "order-processor", + }); + expect(filtered.data.map((r: WorkflowRun) => r.id)).toEqual([ + orderProcessor.id, + ]); + + const missing = await backend.listWorkflowRuns({ + workflowName: "no-such-workflow", + }); + expect(missing.data).toHaveLength(0); + + await teardown(backend); + }); + + test("combines status and workflow name filters", async () => { + const backend = await setup(); + + // Failed run with a different name — should NOT match the name filter. + await backend.createWorkflowRun({ + workflowName: "email-sender", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + await claimAndFailNextPendingRun(backend); + + // Failed run with target name — the only run that matches both filters. + const target = await backend.createWorkflowRun({ + workflowName: "order-processor", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + const failedTargetId = await claimAndFailNextPendingRun(backend); + expect(failedTargetId).toBe(target.id); + + // Pending run with target name — should NOT match the "failed" filter. + await backend.createWorkflowRun({ + workflowName: "order-processor", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + const filtered = await backend.listWorkflowRuns({ + status: "failed", + workflowName: "order-processor", + }); + expect(filtered.data.map((r: WorkflowRun) => r.id)).toEqual([ + target.id, + ]); + + await teardown(backend); + }); }); describe("countWorkflowRuns()", () => { @@ -2711,6 +2817,31 @@ async function createClaimedWorkflowRun(b: Backend) { return claimed; } +/** + * Claim the next available pending run and drive it to a terminal `failed` + * state. Useful when a test has already created the pending run with specific + * fields (e.g. an explicit workflow name) and just needs it failed. + * @param b - Backend + * @returns ID of the run that was failed + */ +async function claimAndFailNextPendingRun(b: Backend): Promise { + const claimed = await b.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + if (!claimed) throw new Error("Expected to claim a pending run"); + await b.failWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + error: { message: "failed run" }, + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 1, + }, + }); + return claimed.id; +} + /** * Get delta in seconds from now. * @param date - Date to compare