Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/openworkflow/core/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ export interface GetWorkflowRunParams {
workflowRunId: string;
}

export type ListWorkflowRunsParams = PaginationOptions;
export interface ListWorkflowRunsParams extends PaginationOptions {
status?: WorkflowRunStatus;
workflowName?: string;
}

export interface ClaimWorkflowRunParams {
workerId: string;
Expand Down
8 changes: 8 additions & 0 deletions packages/openworkflow/postgres/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,14 @@ export class BackendPostgres implements Backend {
const { after } = params;
const conditions = [this.pg`"namespace_id" = ${this.namespaceId}`];

if (params.status) {
conditions.push(this.pg`"status" = ${params.status}`);
}

if (params.workflowName) {
conditions.push(this.pg`"workflow_name" = ${params.workflowName}`);
}

if (cursor) {
const op = after ? this.pg`<` : this.pg`>`;
conditions.push(
Expand Down
16 changes: 14 additions & 2 deletions packages/openworkflow/sqlite/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -847,11 +847,23 @@ export class BackendSqlite implements Backend {
listWorkflowRuns(
params: ListWorkflowRunsParams,
): Promise<PaginatedResponse<WorkflowRun>> {
const conditions: string[] = [`"namespace_id" = ?`];
const values: unknown[] = [this.namespaceId];

if (params.status) {
conditions.push(`"status" = ?`);
values.push(params.status);
}
if (params.workflowName) {
conditions.push(`"workflow_name" = ?`);
values.push(params.workflowName);
}

return this.listPaginated(params, {
table: "workflow_runs",
naturalOrder: "DESC",
baseWhere: `"namespace_id" = ?`,
baseParams: [this.namespaceId],
baseWhere: conditions.join(" AND "),
baseParams: values,
mapRow: (row) => rowToWorkflowRun(row as WorkflowRunRow),
});
}
Expand Down
233 changes: 233 additions & 0 deletions packages/openworkflow/testing/backend.testsuite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,214 @@ export function testBackend(options: TestBackendOptions): void {

await teardown(backend);
});

test("filters workflow runs by status", async () => {
const backend = await setup();

// Drive runs to completed/failed first; while no other pending runs
// exist, `createClaimedWorkflowRun` claims the run it just created.
const completedRun = await createClaimedWorkflowRun(backend);
const completedWorkerId = completedRun.workerId;
if (!completedWorkerId) throw new Error("Expected workerId");
await backend.completeWorkflowRun({
workflowRunId: completedRun.id,
workerId: completedWorkerId,
output: null,
});

const failedRun = await createClaimedWorkflowRun(backend);
const failedWorkerId = failedRun.workerId;
if (!failedWorkerId) throw new Error("Expected workerId");
await backend.failWorkflowRun({
workflowRunId: failedRun.id,
workerId: failedWorkerId,
error: { message: "failed run" },
retryPolicy: {
...DEFAULT_WORKFLOW_RETRY_POLICY,
maximumAttempts: 1,
},
});

// Add the pending run last so nothing else can claim it.
const pendingRun = await createPendingWorkflowRun(backend);

const pendingList = await backend.listWorkflowRuns({
status: "pending",
});
expect(pendingList.data.map((r: WorkflowRun) => r.id)).toEqual([
pendingRun.id,
]);

const completedList = await backend.listWorkflowRuns({
status: "completed",
});
expect(completedList.data.map((r: WorkflowRun) => r.id)).toEqual([
completedRun.id,
]);

const failedList = await backend.listWorkflowRuns({
status: "failed",
});
expect(failedList.data.map((r: WorkflowRun) => r.id)).toEqual([
failedRun.id,
]);

const canceledList = await backend.listWorkflowRuns({
status: "canceled",
});
expect(canceledList.data).toHaveLength(0);

await teardown(backend);
});

test("paginates workflow runs with a status filter", async () => {
const backend = await setup();

const failedRuns: WorkflowRun[] = [];
for (let i = 0; i < 3; i++) {
const claimed = await createClaimedWorkflowRun(backend);
const workerId = claimed.workerId;
if (!workerId) throw new Error("Expected workerId");
const failed = await backend.failWorkflowRun({
workflowRunId: claimed.id,
workerId,
error: { message: "failed run" },
retryPolicy: {
...DEFAULT_WORKFLOW_RETRY_POLICY,
maximumAttempts: 1,
},
});
failedRuns.push(failed);
await sleep(10); // ensure timestamp difference
}

await createPendingWorkflowRun(backend);
await sleep(10);
await createPendingWorkflowRun(backend);

const page1 = await backend.listWorkflowRuns({
status: "failed",
limit: 2,
});
expect(page1.data).toHaveLength(2);
expect(page1.data[0]?.id).toBe(failedRuns[2]?.id);
expect(page1.data[1]?.id).toBe(failedRuns[1]?.id);
expect(page1.pagination.next).not.toBeNull();

const page2 = await backend.listWorkflowRuns({
status: "failed",
limit: 2,
after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
});
expect(page2.data).toHaveLength(1);
expect(page2.data[0]?.id).toBe(failedRuns[0]?.id);
expect(page2.pagination.next).toBeNull();

await teardown(backend);
});

test("filters workflow runs by workflow name", async () => {
const backend = await setup();

const orderProcessor = await backend.createWorkflowRun({
workflowName: "order-processor",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
availableAt: null,
deadlineAt: null,
});

await backend.createWorkflowRun({
workflowName: "email-sender",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
availableAt: null,
deadlineAt: null,
});

const filtered = await backend.listWorkflowRuns({
workflowName: "order-processor",
});
expect(filtered.data.map((r: WorkflowRun) => r.id)).toEqual([
orderProcessor.id,
]);

const missing = await backend.listWorkflowRuns({
workflowName: "no-such-workflow",
});
expect(missing.data).toHaveLength(0);

await teardown(backend);
});

test("combines status and workflow name filters", async () => {
const backend = await setup();

// Failed run with a different name — should NOT match the name filter.
await backend.createWorkflowRun({
workflowName: "email-sender",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
availableAt: null,
deadlineAt: null,
});
await claimAndFailNextPendingRun(backend);

// Failed run with target name — the only run that matches both filters.
const target = await backend.createWorkflowRun({
workflowName: "order-processor",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
availableAt: null,
deadlineAt: null,
});
const failedTargetId = await claimAndFailNextPendingRun(backend);
expect(failedTargetId).toBe(target.id);

// Pending run with target name — should NOT match the "failed" filter.
await backend.createWorkflowRun({
workflowName: "order-processor",
version: null,
idempotencyKey: null,
input: null,
config: {},
context: null,
parentStepAttemptNamespaceId: null,
parentStepAttemptId: null,
availableAt: null,
deadlineAt: null,
});

const filtered = await backend.listWorkflowRuns({
status: "failed",
workflowName: "order-processor",
});
expect(filtered.data.map((r: WorkflowRun) => r.id)).toEqual([
target.id,
]);

await teardown(backend);
});
});

describe("countWorkflowRuns()", () => {
Expand Down Expand Up @@ -2609,6 +2817,31 @@ async function createClaimedWorkflowRun(b: Backend) {
return claimed;
}

/**
* Claim the next available pending run and drive it to a terminal `failed`
* state. Useful when a test has already created the pending run with specific
* fields (e.g. an explicit workflow name) and just needs it failed.
* @param b - Backend
* @returns ID of the run that was failed
*/
async function claimAndFailNextPendingRun(b: Backend): Promise<string> {
const claimed = await b.claimWorkflowRun({
workerId: randomUUID(),
leaseDurationMs: 100,
});
if (!claimed) throw new Error("Expected to claim a pending run");
await b.failWorkflowRun({
workflowRunId: claimed.id,
workerId: claimed.workerId!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
error: { message: "failed run" },
retryPolicy: {
...DEFAULT_WORKFLOW_RETRY_POLICY,
maximumAttempts: 1,
},
});
return claimed.id;
}

/**
* Get delta in seconds from now.
* @param date - Date to compare
Expand Down