Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions apps/dashboard/src/lib/metrics.server.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { getBackend } from "./backend";
import { getMetricsResponse } from "./metrics.server";
import type { Backend, WorkflowRunCounts } from "openworkflow/internal";
import type {
Backend,
WorkflowRunCounts,
WorkflowRunCountsByWorkflowName,
} from "openworkflow/internal";
import { beforeEach, describe, expect, it, vi } from "vitest";

vi.mock("./backend", () => ({
Expand All @@ -23,16 +27,18 @@ describe("getMetricsResponse()", () => {
});

it("returns Prometheus exposition format with expected metric labels", async () => {
const counts: WorkflowRunCounts = {
...ZERO_COUNTS,
pending: 3,
running: 3,
completed: 4,
failed: 2,
const counts: WorkflowRunCountsByWorkflowName = {
"send-email": {
...ZERO_COUNTS,
pending: 3,
running: 3,
completed: 4,
failed: 2,
},
};

const backend: Pick<Backend, "countWorkflowRuns"> = {
countWorkflowRuns: vi.fn().mockResolvedValue(counts),
const backend: Pick<Backend, "countWorkflowRunsByWorkflowName"> = {
countWorkflowRunsByWorkflowName: vi.fn().mockResolvedValue(counts),
};
mockedGetBackend.mockResolvedValue(backend);

Expand All @@ -45,30 +51,42 @@ describe("getMetricsResponse()", () => {
);
expect(body).toContain("# HELP openworkflow_workflow_runs");
expect(body).toContain("# TYPE openworkflow_workflow_runs gauge");
expect(body).toContain('openworkflow_workflow_runs{status="pending"} 3');
expect(body).toContain('openworkflow_workflow_runs{status="running"} 3');
expect(body).toContain(
'openworkflow_workflow_runs{workflow_name="send-email",status="pending"} 3',
);
expect(body).toContain(
'openworkflow_workflow_runs{workflow_name="send-email",status="running"} 3',
);
expect(body).not.toContain('openworkflow_workflow_runs{status="sleeping"}');
expect(body).toContain('openworkflow_workflow_runs{status="completed"} 4');
expect(body).toContain('openworkflow_workflow_runs{status="failed"} 2');
expect(body).toContain('openworkflow_workflow_runs{status="canceled"} 0');
expect(body).toContain(
'openworkflow_workflow_runs{workflow_name="send-email",status="completed"} 4',
);
expect(body).toContain(
'openworkflow_workflow_runs{workflow_name="send-email",status="failed"} 2',
);
expect(body).toContain(
'openworkflow_workflow_runs{workflow_name="send-email",status="canceled"} 0',
);
});

it("calls backend.countWorkflowRuns() on every scrape", async () => {
const backend: Pick<Backend, "countWorkflowRuns"> = {
countWorkflowRuns: vi.fn().mockResolvedValue(ZERO_COUNTS),
it("calls backend.countWorkflowRunsByWorkflowName() on every scrape", async () => {
const backend: Pick<Backend, "countWorkflowRunsByWorkflowName"> = {
countWorkflowRunsByWorkflowName: vi
.fn()
.mockResolvedValue({ "send-email": ZERO_COUNTS }),
};
mockedGetBackend.mockResolvedValue(backend);

await getMetricsResponse();
await getMetricsResponse();

expect(mockedGetBackend).toHaveBeenCalledTimes(2);
expect(backend.countWorkflowRuns).toHaveBeenCalledTimes(2);
expect(backend.countWorkflowRunsByWorkflowName).toHaveBeenCalledTimes(2);
});

it("returns 500 when backend aggregation fails", async () => {
const backend: Pick<Backend, "countWorkflowRuns"> = {
countWorkflowRuns: vi
const backend: Pick<Backend, "countWorkflowRunsByWorkflowName"> = {
countWorkflowRunsByWorkflowName: vi
.fn()
.mockRejectedValue(new Error("failed to aggregate")),
};
Expand Down
23 changes: 16 additions & 7 deletions apps/dashboard/src/lib/metrics.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { getBackend } from "./backend";
import type { WorkflowRunCounts } from "openworkflow/internal";
import type {
WorkflowRunCounts,
WorkflowRunCountsByWorkflowName,
} from "openworkflow/internal";
import { Gauge, Registry } from "prom-client";

/**
Expand All @@ -9,7 +12,7 @@ import { Gauge, Registry } from "prom-client";
export async function getMetricsResponse(): Promise<Response> {
try {
const backend = await getBackend();
const workflowRunCounts = await backend.countWorkflowRuns();
const workflowRunCounts = await backend.countWorkflowRunsByWorkflowName();

const registry = new Registry();
registerWorkflowRunCounts(registry, workflowRunCounts);
Expand Down Expand Up @@ -40,18 +43,24 @@ const PROMETHEUS_WORKFLOW_RUN_STATUSES = [

function registerWorkflowRunCounts(
registry: Registry,
workflowRunCounts: WorkflowRunCounts,
workflowRunCounts: WorkflowRunCountsByWorkflowName,
) {
const prometheusCounts = toPrometheusWorkflowRunCounts(workflowRunCounts);
const workflowRunsGauge = new Gauge({
name: "openworkflow_workflow_runs",
help: "Current count of workflow runs in each status.",
labelNames: ["status"] as const,
labelNames: ["workflow_name", "status"] as const,
registers: [registry],
});

for (const status of PROMETHEUS_WORKFLOW_RUN_STATUSES) {
workflowRunsGauge.set({ status }, prometheusCounts[status]);
for (const [workflowName, counts] of Object.entries(workflowRunCounts)) {
const prometheusCounts = toPrometheusWorkflowRunCounts(counts);

for (const status of PROMETHEUS_WORKFLOW_RUN_STATUSES) {
workflowRunsGauge.set(
{ workflow_name: workflowName, status },
prometheusCounts[status],
);
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions apps/docs/docs/prometheus.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ You should see output like:
```text
# HELP openworkflow_workflow_runs Current count of workflow runs in each status.
# TYPE openworkflow_workflow_runs gauge
openworkflow_workflow_runs{status="pending"} 12
openworkflow_workflow_runs{status="running"} 3
openworkflow_workflow_runs{status="completed"} 847
openworkflow_workflow_runs{status="failed"} 2
openworkflow_workflow_runs{status="canceled"} 0
openworkflow_workflow_runs{workflow_name="send-welcome-email",status="pending"} 12
openworkflow_workflow_runs{workflow_name="send-welcome-email",status="running"} 3
openworkflow_workflow_runs{workflow_name="send-welcome-email",status="completed"} 847
openworkflow_workflow_runs{workflow_name="send-welcome-email",status="failed"} 2
openworkflow_workflow_runs{workflow_name="send-welcome-email",status="canceled"} 0
```

### 3. Configure Prometheus to scrape the dashboard
Expand All @@ -66,7 +66,7 @@ Replace `localhost:3000` with the address where your dashboard is running.
| Field | Value |
| ------ | ------------------------------------------------------- |
| Type | Gauge |
| Labels | `status` |
| Labels | `workflow_name`, `status` |
| Values | `pending`, `running`, `completed`, `failed`, `canceled` |

Current count of workflow runs in each status. One query is executed per
Expand Down
33 changes: 32 additions & 1 deletion packages/openworkflow/core/backend.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { toWorkflowRunCounts } from "./backend.js";
import {
toWorkflowRunCounts,
toWorkflowRunCountsByWorkflowName,
} from "./backend.js";
import { describe, expect, test } from "vitest";

describe("toWorkflowRunCounts", () => {
Expand Down Expand Up @@ -49,3 +52,31 @@ describe("toWorkflowRunCounts", () => {
});
});
});

describe("toWorkflowRunCountsByWorkflowName", () => {
test("returns normalized counts grouped by workflow name", () => {
const counts = toWorkflowRunCountsByWorkflowName([
{ workflowName: "send-email", status: "pending", count: 2 },
{ workflowName: "send-email", status: "sleeping", count: 1 },
{ workflowName: "charge-card", status: "succeeded", count: 3 },
{ workflowName: "charge-card", status: "completed", count: 4 },
]);

expect(counts).toEqual({
"send-email": {
pending: 2,
running: 1,
completed: 0,
failed: 0,
canceled: 0,
},
"charge-card": {
pending: 0,
running: 0,
completed: 7,
failed: 0,
canceled: 0,
},
});
});
});
32 changes: 32 additions & 0 deletions packages/openworkflow/core/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface Backend {
params: Readonly<ListWorkflowRunsParams>,
): Promise<PaginatedResponse<WorkflowRun>>;
countWorkflowRuns(): Promise<WorkflowRunCounts>;
countWorkflowRunsByWorkflowName(): Promise<WorkflowRunCountsByWorkflowName>;
claimWorkflowRun(
params: Readonly<ClaimWorkflowRunParams>,
): Promise<WorkflowRun | null>;
Expand Down Expand Up @@ -212,6 +213,8 @@ export type WorkflowRunCounts = Omit<
"succeeded" | "sleeping"
>;

export type WorkflowRunCountsByWorkflowName = Record<string, WorkflowRunCounts>;

/**
* Convert status-count rows from a `GROUP BY "status"` query into a
* typed {@link WorkflowRunCounts} object.
Expand Down Expand Up @@ -249,3 +252,32 @@ export function toWorkflowRunCounts(

return counts;
}

/**
* Convert workflow-name/status-count rows into normalized workflow run counts.
* @param rows - Rows from the database query
* @returns Workflow run counts keyed by workflow name, then status
*/
export function toWorkflowRunCountsByWorkflowName(
rows: readonly {
workflowName: string;
status: string;
count: number | string;
}[],
): WorkflowRunCountsByWorkflowName {
const countsByWorkflowName: WorkflowRunCountsByWorkflowName = {};

for (const row of rows) {
const workflowCounts = (countsByWorkflowName[row.workflowName] ??=
toWorkflowRunCounts([]));
const counts = toWorkflowRunCounts([
{ status: row.status, count: row.count },
]);

for (const status of Object.keys(counts) as (keyof WorkflowRunCounts)[]) {
workflowCounts[status] += counts[status];
}
}

return countsByWorkflowName;
}
17 changes: 17 additions & 0 deletions packages/openworkflow/postgres/backend.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {
toWorkflowRunCounts,
toWorkflowRunCountsByWorkflowName,
DEFAULT_NAMESPACE_ID,
DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS,
Backend,
WorkflowRunCounts,
WorkflowRunCountsByWorkflowName,
CancelWorkflowRunParams,
ClaimWorkflowRunParams,
CreateStepAttemptParams,
Expand Down Expand Up @@ -431,6 +433,21 @@ export class BackendPostgres implements Backend {
return toWorkflowRunCounts(rows);
}

async countWorkflowRunsByWorkflowName(): Promise<WorkflowRunCountsByWorkflowName> {
const workflowRunsTable = this.workflowRunsTable();

const rows = await this.pg<
{ workflowName: string; status: string; count: string }[]
>`
SELECT "workflow_name" AS "workflowName", "status", COUNT(*) AS "count"
FROM ${workflowRunsTable}
WHERE "namespace_id" = ${this.namespaceId}
GROUP BY "workflow_name", "status"
`;

return toWorkflowRunCountsByWorkflowName(rows);
}

async claimWorkflowRun(
params: ClaimWorkflowRunParams,
): Promise<WorkflowRun | null> {
Expand Down
19 changes: 19 additions & 0 deletions packages/openworkflow/sqlite/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
SendSignalResult,
GetSignalDeliveryParams,
toWorkflowRunCounts,
toWorkflowRunCountsByWorkflowName,
WorkflowRunCountsByWorkflowName,
} from "../core/backend.js";
import {
buildPaginatedResponse,
Expand Down Expand Up @@ -844,6 +846,23 @@ export class BackendSqlite implements Backend {
return toWorkflowRunCounts(rows);
}

// eslint-disable-next-line @typescript-eslint/require-await
async countWorkflowRunsByWorkflowName(): Promise<WorkflowRunCountsByWorkflowName> {
const stmt = this.db.prepare(`
SELECT "workflow_name" AS "workflowName", "status", COUNT(*) AS "count"
FROM "workflow_runs"
WHERE "namespace_id" = ?
GROUP BY "workflow_name", "status"
`);

const rows = stmt.all(this.namespaceId) as {
workflowName: string;
status: string;
count: number;
}[];
return toWorkflowRunCountsByWorkflowName(rows);
}

listWorkflowRuns(
params: ListWorkflowRunsParams,
): Promise<PaginatedResponse<WorkflowRun>> {
Expand Down
41 changes: 39 additions & 2 deletions packages/openworkflow/testing/backend.testsuite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,39 @@ export function testBackend(options: TestBackendOptions): void {
await teardown(backend);
});

test("returns counts grouped by workflow name and status", async () => {
const backend = await setup();

await createPendingWorkflowRun(backend, "send-email");
await createPendingWorkflowRun(backend, "charge-card");
await createPendingWorkflowRun(backend, "charge-card");

const claimed = await backend.claimWorkflowRun({
workerId: randomUUID(),
leaseDurationMs: 60_000,
});
if (!claimed) throw new Error("Expected claimed workflow run");

expect(await backend.countWorkflowRunsByWorkflowName()).toEqual({
"send-email": {
pending: 0,
running: 1,
completed: 0,
failed: 0,
canceled: 0,
},
"charge-card": {
pending: 2,
running: 0,
completed: 0,
failed: 0,
canceled: 0,
},
});

await teardown(backend);
});

test("updates counts when workflow runs transition statuses", async () => {
const backend = await setup();

Expand Down Expand Up @@ -2574,11 +2607,15 @@ export function testBackend(options: TestBackendOptions): void {
/**
* Create a pending workflow run for tests.
* @param b - Backend
* @param workflowName - Workflow name
* @returns Created workflow run
*/
async function createPendingWorkflowRun(b: Backend) {
async function createPendingWorkflowRun(
b: Backend,
workflowName: string = randomUUID(),
) {
return await b.createWorkflowRun({
workflowName: randomUUID(),
workflowName,
version: null,
idempotencyKey: null,
input: null,
Expand Down