diff --git a/apps/dashboard/src/lib/metrics.server.test.ts b/apps/dashboard/src/lib/metrics.server.test.ts index 2c14cd97..97b27401 100644 --- a/apps/dashboard/src/lib/metrics.server.test.ts +++ b/apps/dashboard/src/lib/metrics.server.test.ts @@ -1,6 +1,10 @@ import { getBackend } from "./backend"; import { getMetricsResponse } from "./metrics.server"; -import type { Backend, WorkflowRunCounts } from "openworkflow/internal"; +import type { + Backend, + WorkflowRunCounts, + WorkflowRunCountsByWorkflowName, +} from "openworkflow/internal"; import { beforeEach, describe, expect, it, vi } from "vitest"; vi.mock("./backend", () => ({ @@ -23,16 +27,18 @@ describe("getMetricsResponse()", () => { }); it("returns Prometheus exposition format with expected metric labels", async () => { - const counts: WorkflowRunCounts = { - ...ZERO_COUNTS, - pending: 3, - running: 3, - completed: 4, - failed: 2, + const counts: WorkflowRunCountsByWorkflowName = { + "send-email": { + ...ZERO_COUNTS, + pending: 3, + running: 3, + completed: 4, + failed: 2, + }, }; - const backend: Pick = { - countWorkflowRuns: vi.fn().mockResolvedValue(counts), + const backend: Pick = { + countWorkflowRunsByWorkflowName: vi.fn().mockResolvedValue(counts), }; mockedGetBackend.mockResolvedValue(backend); @@ -45,17 +51,29 @@ describe("getMetricsResponse()", () => { ); expect(body).toContain("# HELP openworkflow_workflow_runs"); expect(body).toContain("# TYPE openworkflow_workflow_runs gauge"); - expect(body).toContain('openworkflow_workflow_runs{status="pending"} 3'); - expect(body).toContain('openworkflow_workflow_runs{status="running"} 3'); + expect(body).toContain( + 'openworkflow_workflow_runs{workflow_name="send-email",status="pending"} 3', + ); + expect(body).toContain( + 'openworkflow_workflow_runs{workflow_name="send-email",status="running"} 3', + ); expect(body).not.toContain('openworkflow_workflow_runs{status="sleeping"}'); - expect(body).toContain('openworkflow_workflow_runs{status="completed"} 4'); - expect(body).toContain('openworkflow_workflow_runs{status="failed"} 2'); - expect(body).toContain('openworkflow_workflow_runs{status="canceled"} 0'); + expect(body).toContain( + 'openworkflow_workflow_runs{workflow_name="send-email",status="completed"} 4', + ); + expect(body).toContain( + 'openworkflow_workflow_runs{workflow_name="send-email",status="failed"} 2', + ); + expect(body).toContain( + 'openworkflow_workflow_runs{workflow_name="send-email",status="canceled"} 0', + ); }); - it("calls backend.countWorkflowRuns() on every scrape", async () => { - const backend: Pick = { - countWorkflowRuns: vi.fn().mockResolvedValue(ZERO_COUNTS), + it("calls backend.countWorkflowRunsByWorkflowName() on every scrape", async () => { + const backend: Pick = { + countWorkflowRunsByWorkflowName: vi + .fn() + .mockResolvedValue({ "send-email": ZERO_COUNTS }), }; mockedGetBackend.mockResolvedValue(backend); @@ -63,12 +81,12 @@ describe("getMetricsResponse()", () => { await getMetricsResponse(); expect(mockedGetBackend).toHaveBeenCalledTimes(2); - expect(backend.countWorkflowRuns).toHaveBeenCalledTimes(2); + expect(backend.countWorkflowRunsByWorkflowName).toHaveBeenCalledTimes(2); }); it("returns 500 when backend aggregation fails", async () => { - const backend: Pick = { - countWorkflowRuns: vi + const backend: Pick = { + countWorkflowRunsByWorkflowName: vi .fn() .mockRejectedValue(new Error("failed to aggregate")), }; diff --git a/apps/dashboard/src/lib/metrics.server.ts b/apps/dashboard/src/lib/metrics.server.ts index 47abf892..74b20755 100644 --- a/apps/dashboard/src/lib/metrics.server.ts +++ b/apps/dashboard/src/lib/metrics.server.ts @@ -1,5 +1,8 @@ import { getBackend } from "./backend"; -import type { WorkflowRunCounts } from "openworkflow/internal"; +import type { + WorkflowRunCounts, + WorkflowRunCountsByWorkflowName, +} from "openworkflow/internal"; import { Gauge, Registry } from "prom-client"; /** @@ -9,7 +12,7 @@ import { Gauge, Registry } from "prom-client"; export async function getMetricsResponse(): Promise { try { const backend = await getBackend(); - const workflowRunCounts = await backend.countWorkflowRuns(); + const workflowRunCounts = await backend.countWorkflowRunsByWorkflowName(); const registry = new Registry(); registerWorkflowRunCounts(registry, workflowRunCounts); @@ -40,18 +43,24 @@ const PROMETHEUS_WORKFLOW_RUN_STATUSES = [ function registerWorkflowRunCounts( registry: Registry, - workflowRunCounts: WorkflowRunCounts, + workflowRunCounts: WorkflowRunCountsByWorkflowName, ) { - const prometheusCounts = toPrometheusWorkflowRunCounts(workflowRunCounts); const workflowRunsGauge = new Gauge({ name: "openworkflow_workflow_runs", help: "Current count of workflow runs in each status.", - labelNames: ["status"] as const, + labelNames: ["workflow_name", "status"] as const, registers: [registry], }); - for (const status of PROMETHEUS_WORKFLOW_RUN_STATUSES) { - workflowRunsGauge.set({ status }, prometheusCounts[status]); + for (const [workflowName, counts] of Object.entries(workflowRunCounts)) { + const prometheusCounts = toPrometheusWorkflowRunCounts(counts); + + for (const status of PROMETHEUS_WORKFLOW_RUN_STATUSES) { + workflowRunsGauge.set( + { workflow_name: workflowName, status }, + prometheusCounts[status], + ); + } } } diff --git a/apps/docs/docs/prometheus.mdx b/apps/docs/docs/prometheus.mdx index 49cf9de4..bb334fbc 100644 --- a/apps/docs/docs/prometheus.mdx +++ b/apps/docs/docs/prometheus.mdx @@ -38,11 +38,11 @@ You should see output like: ```text # HELP openworkflow_workflow_runs Current count of workflow runs in each status. # TYPE openworkflow_workflow_runs gauge -openworkflow_workflow_runs{status="pending"} 12 -openworkflow_workflow_runs{status="running"} 3 -openworkflow_workflow_runs{status="completed"} 847 -openworkflow_workflow_runs{status="failed"} 2 -openworkflow_workflow_runs{status="canceled"} 0 +openworkflow_workflow_runs{workflow_name="send-welcome-email",status="pending"} 12 +openworkflow_workflow_runs{workflow_name="send-welcome-email",status="running"} 3 +openworkflow_workflow_runs{workflow_name="send-welcome-email",status="completed"} 847 +openworkflow_workflow_runs{workflow_name="send-welcome-email",status="failed"} 2 +openworkflow_workflow_runs{workflow_name="send-welcome-email",status="canceled"} 0 ``` ### 3. Configure Prometheus to scrape the dashboard @@ -66,7 +66,7 @@ Replace `localhost:3000` with the address where your dashboard is running. | Field | Value | | ------ | ------------------------------------------------------- | | Type | Gauge | -| Labels | `status` | +| Labels | `workflow_name`, `status` | | Values | `pending`, `running`, `completed`, `failed`, `canceled` | Current count of workflow runs in each status. One query is executed per diff --git a/packages/openworkflow/core/backend.test.ts b/packages/openworkflow/core/backend.test.ts index 34a051d5..288f33b2 100644 --- a/packages/openworkflow/core/backend.test.ts +++ b/packages/openworkflow/core/backend.test.ts @@ -1,4 +1,7 @@ -import { toWorkflowRunCounts } from "./backend.js"; +import { + toWorkflowRunCounts, + toWorkflowRunCountsByWorkflowName, +} from "./backend.js"; import { describe, expect, test } from "vitest"; describe("toWorkflowRunCounts", () => { @@ -49,3 +52,31 @@ describe("toWorkflowRunCounts", () => { }); }); }); + +describe("toWorkflowRunCountsByWorkflowName", () => { + test("returns normalized counts grouped by workflow name", () => { + const counts = toWorkflowRunCountsByWorkflowName([ + { workflowName: "send-email", status: "pending", count: 2 }, + { workflowName: "send-email", status: "sleeping", count: 1 }, + { workflowName: "charge-card", status: "succeeded", count: 3 }, + { workflowName: "charge-card", status: "completed", count: 4 }, + ]); + + expect(counts).toEqual({ + "send-email": { + pending: 2, + running: 1, + completed: 0, + failed: 0, + canceled: 0, + }, + "charge-card": { + pending: 0, + running: 0, + completed: 7, + failed: 0, + canceled: 0, + }, + }); + }); +}); diff --git a/packages/openworkflow/core/backend.ts b/packages/openworkflow/core/backend.ts index 9edf79be..8ab0fd19 100644 --- a/packages/openworkflow/core/backend.ts +++ b/packages/openworkflow/core/backend.ts @@ -26,6 +26,7 @@ export interface Backend { params: Readonly, ): Promise>; countWorkflowRuns(): Promise; + countWorkflowRunsByWorkflowName(): Promise; claimWorkflowRun( params: Readonly, ): Promise; @@ -212,6 +213,8 @@ export type WorkflowRunCounts = Omit< "succeeded" | "sleeping" >; +export type WorkflowRunCountsByWorkflowName = Record; + /** * Convert status-count rows from a `GROUP BY "status"` query into a * typed {@link WorkflowRunCounts} object. @@ -249,3 +252,32 @@ export function toWorkflowRunCounts( return counts; } + +/** + * Convert workflow-name/status-count rows into normalized workflow run counts. + * @param rows - Rows from the database query + * @returns Workflow run counts keyed by workflow name, then status + */ +export function toWorkflowRunCountsByWorkflowName( + rows: readonly { + workflowName: string; + status: string; + count: number | string; + }[], +): WorkflowRunCountsByWorkflowName { + const countsByWorkflowName: WorkflowRunCountsByWorkflowName = {}; + + for (const row of rows) { + const workflowCounts = (countsByWorkflowName[row.workflowName] ??= + toWorkflowRunCounts([])); + const counts = toWorkflowRunCounts([ + { status: row.status, count: row.count }, + ]); + + for (const status of Object.keys(counts) as (keyof WorkflowRunCounts)[]) { + workflowCounts[status] += counts[status]; + } + } + + return countsByWorkflowName; +} diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index cf8188f0..9769adb3 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -1,9 +1,11 @@ import { toWorkflowRunCounts, + toWorkflowRunCountsByWorkflowName, DEFAULT_NAMESPACE_ID, DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS, Backend, WorkflowRunCounts, + WorkflowRunCountsByWorkflowName, CancelWorkflowRunParams, ClaimWorkflowRunParams, CreateStepAttemptParams, @@ -431,6 +433,21 @@ export class BackendPostgres implements Backend { return toWorkflowRunCounts(rows); } + async countWorkflowRunsByWorkflowName(): Promise { + const workflowRunsTable = this.workflowRunsTable(); + + const rows = await this.pg< + { workflowName: string; status: string; count: string }[] + >` + SELECT "workflow_name" AS "workflowName", "status", COUNT(*) AS "count" + FROM ${workflowRunsTable} + WHERE "namespace_id" = ${this.namespaceId} + GROUP BY "workflow_name", "status" + `; + + return toWorkflowRunCountsByWorkflowName(rows); + } + async claimWorkflowRun( params: ClaimWorkflowRunParams, ): Promise { diff --git a/packages/openworkflow/sqlite/backend.ts b/packages/openworkflow/sqlite/backend.ts index 182d42e5..af5b55c8 100644 --- a/packages/openworkflow/sqlite/backend.ts +++ b/packages/openworkflow/sqlite/backend.ts @@ -24,6 +24,8 @@ import { SendSignalResult, GetSignalDeliveryParams, toWorkflowRunCounts, + toWorkflowRunCountsByWorkflowName, + WorkflowRunCountsByWorkflowName, } from "../core/backend.js"; import { buildPaginatedResponse, @@ -844,6 +846,23 @@ export class BackendSqlite implements Backend { return toWorkflowRunCounts(rows); } + // eslint-disable-next-line @typescript-eslint/require-await + async countWorkflowRunsByWorkflowName(): Promise { + const stmt = this.db.prepare(` + SELECT "workflow_name" AS "workflowName", "status", COUNT(*) AS "count" + FROM "workflow_runs" + WHERE "namespace_id" = ? + GROUP BY "workflow_name", "status" + `); + + const rows = stmt.all(this.namespaceId) as { + workflowName: string; + status: string; + count: number; + }[]; + return toWorkflowRunCountsByWorkflowName(rows); + } + listWorkflowRuns( params: ListWorkflowRunsParams, ): Promise> { diff --git a/packages/openworkflow/testing/backend.testsuite.ts b/packages/openworkflow/testing/backend.testsuite.ts index d3e7372b..3c161d5f 100644 --- a/packages/openworkflow/testing/backend.testsuite.ts +++ b/packages/openworkflow/testing/backend.testsuite.ts @@ -776,6 +776,39 @@ export function testBackend(options: TestBackendOptions): void { await teardown(backend); }); + test("returns counts grouped by workflow name and status", async () => { + const backend = await setup(); + + await createPendingWorkflowRun(backend, "send-email"); + await createPendingWorkflowRun(backend, "charge-card"); + await createPendingWorkflowRun(backend, "charge-card"); + + const claimed = await backend.claimWorkflowRun({ + workerId: randomUUID(), + leaseDurationMs: 60_000, + }); + if (!claimed) throw new Error("Expected claimed workflow run"); + + expect(await backend.countWorkflowRunsByWorkflowName()).toEqual({ + "send-email": { + pending: 0, + running: 1, + completed: 0, + failed: 0, + canceled: 0, + }, + "charge-card": { + pending: 2, + running: 0, + completed: 0, + failed: 0, + canceled: 0, + }, + }); + + await teardown(backend); + }); + test("updates counts when workflow runs transition statuses", async () => { const backend = await setup(); @@ -2574,11 +2607,15 @@ export function testBackend(options: TestBackendOptions): void { /** * Create a pending workflow run for tests. * @param b - Backend + * @param workflowName - Workflow name * @returns Created workflow run */ -async function createPendingWorkflowRun(b: Backend) { +async function createPendingWorkflowRun( + b: Backend, + workflowName: string = randomUUID(), +) { return await b.createWorkflowRun({ - workflowName: randomUUID(), + workflowName, version: null, idempotencyKey: null, input: null,