diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 9edf79be..efc4b07f 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -95,7 +95,10 @@ export interface GetWorkflowRunParams { workflowRunId: string; } -export type ListWorkflowRunsParams = PaginationOptions; +export interface ListWorkflowRunsParams extends PaginationOptions { + status?: WorkflowRunStatus; + workflowName?: string; +} export interface ClaimWorkflowRunParams { workerId: string; diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index cf8188f0..42b05917 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -408,6 +408,14 @@ export class BackendPostgres implements Backend { const { after } = params; const conditions = [this.pg`"namespace_id" = ${this.namespaceId}`]; + if (params.status) { + conditions.push(this.pg`"status" = ${params.status}`); + } + + if (params.workflowName) { + conditions.push(this.pg`"workflow_name" = ${params.workflowName}`); + } + if (cursor) { const op = after ? this.pg`<` : this.pg`>`; conditions.push( diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 182d42e5..bf8bfe2f 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -847,11 +847,23 @@ export class BackendSqlite implements Backend { listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { + const conditions: string[] = [`"namespace_id" = ?`]; + const values: unknown[] = [this.namespaceId]; + + if (params.status) { + conditions.push(`"status" = ?`); + values.push(params.status); + } + if (params.workflowName) { + conditions.push(`"workflow_name" = ?`); + values.push(params.workflowName); + } + return this.listPaginated(params, { table: "workflow_runs", naturalOrder: "DESC", - baseWhere: `"namespace_id" = ?`, - baseParams: [this.namespaceId], + baseWhere: conditions.join(" AND "), + baseParams: values, mapRow: (row) => rowToWorkflowRun(row as WorkflowRunRow), }); } diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index d3e7372b..21fcc148 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -705,6 +705,214 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + + test("filters workflow runs by status", async () => { + const backend = await setup(); + + // Drive runs to completed/failed first; while no other pending runs + // exist, `createClaimedWorkflowRun` claims the run it just created. + const completedRun = await createClaimedWorkflowRun(backend); + const completedWorkerId = completedRun.workerId; + if (!completedWorkerId) throw new Error("Expected workerId"); + await backend.completeWorkflowRun({ + workflowRunId: completedRun.id, + workerId: completedWorkerId, + output: null, + }); + + const failedRun = await createClaimedWorkflowRun(backend); + const failedWorkerId = failedRun.workerId; + if (!failedWorkerId) throw new Error("Expected workerId"); + await backend.failWorkflowRun({ + workflowRunId: failedRun.id, + workerId: failedWorkerId, + error: { message: "failed run" }, + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 1, + }, + }); + + // Add the pending run last so nothing else can claim it. + const pendingRun = await createPendingWorkflowRun(backend); + + const pendingList = await backend.listWorkflowRuns({ + status: "pending", + }); + expect(pendingList.data.map((r: WorkflowRun) => r.id)).toEqual([ + pendingRun.id, + ]); + + const completedList = await backend.listWorkflowRuns({ + status: "completed", + }); + expect(completedList.data.map((r: WorkflowRun) => r.id)).toEqual([ + completedRun.id, + ]); + + const failedList = await backend.listWorkflowRuns({ + status: "failed", + }); + expect(failedList.data.map((r: WorkflowRun) => r.id)).toEqual([ + failedRun.id, + ]); + + const canceledList = await backend.listWorkflowRuns({ + status: "canceled", + }); + expect(canceledList.data).toHaveLength(0); + + await teardown(backend); + }); + + test("paginates workflow runs with a status filter", async () => { + const backend = await setup(); + + const failedRuns: WorkflowRun[] = []; + for (let i = 0; i < 3; i++) { + const claimed = await createClaimedWorkflowRun(backend); + const workerId = claimed.workerId; + if (!workerId) throw new Error("Expected workerId"); + const failed = await backend.failWorkflowRun({ + workflowRunId: claimed.id, + workerId, + error: { message: "failed run" }, + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 1, + }, + }); + failedRuns.push(failed); + await sleep(10); // ensure timestamp difference + } + + await createPendingWorkflowRun(backend); + await sleep(10); + await createPendingWorkflowRun(backend); + + const page1 = await backend.listWorkflowRuns({ + status: "failed", + limit: 2, + }); + expect(page1.data).toHaveLength(2); + expect(page1.data[0]?.id).toBe(failedRuns[2]?.id); + expect(page1.data[1]?.id).toBe(failedRuns[1]?.id); + expect(page1.pagination.next).not.toBeNull(); + + const page2 = await backend.listWorkflowRuns({ + status: "failed", + limit: 2, + after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + }); + expect(page2.data).toHaveLength(1); + expect(page2.data[0]?.id).toBe(failedRuns[0]?.id); + expect(page2.pagination.next).toBeNull(); + + await teardown(backend); + }); + + test("filters workflow runs by workflow name", async () => { + const backend = await setup(); + + const orderProcessor = await backend.createWorkflowRun({ + workflowName: "order-processor", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + await backend.createWorkflowRun({ + workflowName: "email-sender", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + const filtered = await backend.listWorkflowRuns({ + workflowName: "order-processor", + }); + expect(filtered.data.map((r: WorkflowRun) => r.id)).toEqual([ + orderProcessor.id, + ]); + + const missing = await backend.listWorkflowRuns({ + workflowName: "no-such-workflow", + }); + expect(missing.data).toHaveLength(0); + + await teardown(backend); + }); + + test("combines status and workflow name filters", async () => { + const backend = await setup(); + + // Failed run with a different name — should NOT match the name filter. + await backend.createWorkflowRun({ + workflowName: "email-sender", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + await claimAndFailNextPendingRun(backend); + + // Failed run with target name — the only run that matches both filters. + const target = await backend.createWorkflowRun({ + workflowName: "order-processor", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + const failedTargetId = await claimAndFailNextPendingRun(backend); + expect(failedTargetId).toBe(target.id); + + // Pending run with target name — should NOT match the "failed" filter. + await backend.createWorkflowRun({ + workflowName: "order-processor", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + parentStepAttemptNamespaceId: null, + parentStepAttemptId: null, + availableAt: null, + deadlineAt: null, + }); + + const filtered = await backend.listWorkflowRuns({ + status: "failed", + workflowName: "order-processor", + }); + expect(filtered.data.map((r: WorkflowRun) => r.id)).toEqual([ + target.id, + ]); + + await teardown(backend); + }); }); describe("countWorkflowRuns()", () => { @@ -2609,6 +2817,31 @@ async function createClaimedWorkflowRun(b: Backend) { return claimed; } +/** + * Claim the next available pending run and drive it to a terminal `failed` + * state. Useful when a test has already created the pending run with specific + * fields (e.g. an explicit workflow name) and just needs it failed. + * @param b - Backend + * @returns ID of the run that was failed + */ +async function claimAndFailNextPendingRun(b: Backend): Promise { + const claimed = await b.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 100, + }); + if (!claimed) throw new Error("Expected to claim a pending run"); + await b.failWorkflowRun({ + workflowRunId: claimed.id, + workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion + error: { message: "failed run" }, + retryPolicy: { + ...DEFAULT_WORKFLOW_RETRY_POLICY, + maximumAttempts: 1, + }, + }); + return claimed.id; +} + /** * Get delta in seconds from now. * @param date - Date to compare