From 8996f7c8b50469a15a32470608fad1d42fb432e4 Mon Sep 17 00:00:00 2001 From: jbiskur Date: Mon, 4 May 2026 19:54:04 +0100 Subject: [PATCH] feat(pump): add pumpGroup to register() for splitting event types across pumps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same (flowType, pumpGroup) shares one pump; different pumpGroup values on the same flowType run on independent pumps with isolated state cursors, processor concurrency, and restart backoff. Defaults to "default" — fully back-compat. Resilience: per-group exponential-backoff restart loop now keeps retrying when the restart attempt itself throws synchronously (previously the restart catch handler only logged and gave up). - types: pumpGroup on PathwayContract; PumpStateManagerFactory takes (flowType, pumpGroup); byPumpGroup on PumpConcurrencyConfig. - builder: pumpGroups Record next to subscribed; non-empty validation; buildPumpRegistrations() emits pumpGroup; resetPump accepts legacy string[] or new { flowTypes?, pumpGroups? }. - pump: composite-key grouping; startPumpForGroup; scheduleRestart with dedupe + retry-on-synchronous-failure; registeredPumpGroups getter; legacy single-arg state factory back-compat (one warning per pump). - postgres state: composite PK (flow_type, pump_group); idempotent ALTER TABLE migration in ensureInitialized; existing rows land under pump_group='default'. - pulse identity suffixed with ::flowType::pumpGroup. Tests: pathway-pump-grouping (new), pathway-pump (extended composite factory + byPumpGroup resolution + legacy factory warning), restart (FakeTime resilience), postgres-pump-state (composite-PK + migration). Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 72 +++++- deno.json | 2 +- deno.lock | 9 +- src/pathways/builder.ts | 64 ++++- src/pathways/pump/pathway-pump.ts | 324 +++++++++++++++++++------ src/pathways/pump/state.ts | 65 +++-- src/pathways/pump/types.ts | 32 ++- src/pathways/types.ts | 15 ++ tests/pathway-pump-grouping.test.ts | 151 ++++++++++++ tests/pathway-pump-restart.test.ts | 72 +++++- tests/pathway-pump.test.ts | 352 +++++++++++++++++++--------- tests/postgres-pump-state.test.ts | 115 +++++++++ 12 files changed, 1045 insertions(+), 228 deletions(-) create mode 100644 tests/pathway-pump-grouping.test.ts create mode 100644 tests/postgres-pump-state.test.ts diff --git a/README.md b/README.md index b127efb..883098f 100644 --- a/README.md +++ b/README.md @@ -228,10 +228,10 @@ const pathways = new PathwaysBuilder({ ### Pump Concurrency Control how many events each pump processes in parallel via `startPump({ concurrency })`. Accepts a number (shared -default) or a `PumpConcurrencyConfig` with per-flow-type overrides: +default) or a `PumpConcurrencyConfig` with per-flow-type and per-pump-group overrides: ```typescript -// Shared default across every flow type +// Shared default across every pump await pathways.startPump({ concurrency: 4 }) // Per-flow-type overrides — unlisted flow types fall back to `default` (or 1) @@ -242,12 +242,72 @@ await pathways.startPump({ orders: 8, audit: 1, }, + // Optional: per-(flowType, pumpGroup) override. Wins over byFlowType. + // Key format: `${flowType}::${pumpGroup}`. + byPumpGroup: { + "orders::hot": 16, + }, }, }) ``` -Omit `concurrency` to keep the default of 1 per flow type. `startPump()` also accepts a per-call `autoProvision` -override (same shape as the builder-level option) for overriding provisioning behavior at a specific call site. +Omit `concurrency` to keep the default of 1 per pump. `startPump()` also accepts a per-call `autoProvision` override +(same shape as the builder-level option) for overriding provisioning behavior at a specific call site. + +> **Note**: this resolves to `processor.concurrency` on the underlying data pump, which is the in-flight batch width — +> not parallel handler invocations. Resolution order per pump: `byPumpGroup["${flowType}::${pumpGroup}"]` → +> `byFlowType[flowType]` → `default`. + +### Splitting a flow type across multiple pumps + +By default, every event type registered against the same `flowType` shares one pump. For high-throughput event types +that would otherwise starve their cold neighbours, register them with a distinct `pumpGroup` so they run on an isolated +pump with their own state cursor, processor concurrency, and restart backoff: + +```typescript +// 8 cold event types share the default pump for orders.0 +for (const eventType of ["paid", "fulfilled", "cancelled", "refunded", "archived", "audited", "tagged", "noted"]) { + pathways.register({ flowType: "orders.0", eventType, schema }) +} + +// 2 hot event types run on a separate "hot" pump — same flow type, isolated pump +pathways.register({ flowType: "orders.0", eventType: "placed.0", schema, pumpGroup: "hot" }) +pathways.register({ flowType: "orders.0", eventType: "shipped.0", schema, pumpGroup: "hot" }) + +await pathways.startPump({ + concurrency: { default: 2, byPumpGroup: { "orders.0::hot": 16 } }, +}) +``` + +What this gives you per pump group: + +- **Isolated state cursor.** The Postgres `pathway_pump_state` table now has a composite primary key + `(flow_type, pump_group)`; existing rows are auto-migrated into `pump_group='default'` on first use. +- **Independent processor concurrency** via `byPumpGroup`. +- **Independent restart backoff.** A failure in the `hot` pump does not reset the cold pump's attempt counter, and the + restart loop keeps retrying with exponential backoff (capped at 30s) until the pump comes back — including when the + restart attempt itself throws synchronously. +- **Per-group pulse identity.** When pulse reporting is configured, each pump emits pulses under + `${pathwayId}::${flowType}::${pumpGroup}` so the control plane can distinguish the health of each group. + +**Caveats (v2.4):** + +- The WebSocket notifier subscribes at `flowType` scope, so two pump groups on the same flow type receive identical + notifications and each pulls. Isolation is at processor + state, not bandwidth — checks are cheap, so this is usually + fine. If you need bandwidth isolation, use distinct flow types instead. +- Cluster mode keeps a single global leader; per-pump-group leadership is not yet supported. +- Downstream consumers that mirror `pathway_pump_state` in their own Drizzle schema MUST add a + `pump_group TEXT NOT NULL DEFAULT 'default'` column and update the primary key to `(flow_type, pump_group)` before + running `drizzle-kit push`, otherwise drizzle will try to drop the new column. + +Custom `PumpStateManagerFactory` implementations should accept a second `pumpGroup` argument: + +```typescript +const factory: PumpStateManagerFactory = (flowType, pumpGroup) => createMyStateManager(flowType, pumpGroup) +``` + +Legacy single-argument factories continue to work but share state across pump groups on the same flow type — a +deprecation warning is logged once per pump. ### Registering Pathways @@ -277,6 +337,10 @@ pathways.register({ writable: true, // Optional, default is true maxRetries: 3, // Optional, default is 3 retryDelayMs: 500, // Optional, default is 500 + // Optional: isolate this event type onto a dedicated pump for the same flow type. + // Same (flowType, pumpGroup) pair shares one pump; different pumpGroup → different pumps. + // Omit (or pass "default") for the legacy single-pump-per-flowType behavior. + pumpGroup: "hot", }) ``` diff --git a/deno.json b/deno.json index 8f7c823..2453c3a 100644 --- a/deno.json +++ b/deno.json @@ -18,7 +18,7 @@ "test:watch": "deno test -A --watch", "postgres:start": "deno run -A bin/start-postgres.ts", "postgres:stop": "deno run -A bin/stop-postgres.ts", - "test:postgres": "deno run -A bin/start-postgres.ts && (deno test -A tests/postgres-pathway-state.test.ts || (deno run -A bin/stop-postgres.ts && exit 1)) && deno run -A bin/stop-postgres.ts" + "test:postgres": "deno run -A bin/start-postgres.ts && (deno test -A tests/postgres-pathway-state.test.ts tests/postgres-pump-state.test.ts || (deno run -A bin/stop-postgres.ts && exit 1)) && deno run -A bin/stop-postgres.ts" }, "imports": { "@deno/dnt": "jsr:@deno/dnt@^0.41.3", diff --git a/deno.lock b/deno.lock index f835212..028dcaa 100644 --- a/deno.lock +++ b/deno.lock @@ -393,14 +393,21 @@ "https://deno.land/std@0.224.0/assert/unimplemented.ts": "8c55a5793e9147b4f1ef68cd66496b7d5ba7a9e7ca30c6da070c1a58da723d73", "https://deno.land/std@0.224.0/assert/unreachable.ts": "5ae3dbf63ef988615b93eb08d395dda771c96546565f9e521ed86f6510c29e19", "https://deno.land/std@0.224.0/async/delay.ts": "f90dd685b97c2f142b8069082993e437b1602b8e2561134827eeb7c12b95c499", + "https://deno.land/std@0.224.0/data_structures/_binary_search_node.ts": "ce1da11601fef0638df4d1e53c377f791f96913383277389286b390685d76c07", + "https://deno.land/std@0.224.0/data_structures/_red_black_node.ts": "4af8d3c5ac5f119d8058269259c46ea22ead567246cacde04584a83e43a9d2ea", + "https://deno.land/std@0.224.0/data_structures/binary_search_tree.ts": "2dd43d97ce5f5a4bdba11b075eb458db33e9143f50997b0eebf02912cb44f5d5", + "https://deno.land/std@0.224.0/data_structures/comparators.ts": "17dfa68bf1550edadbfdd453a06f9819290bcb534c9945b5cec4b30242cff475", + "https://deno.land/std@0.224.0/data_structures/red_black_tree.ts": "2222be0c46842fc932e2c8589a66dced9e6eae180914807c5c55d1aa4c8c1b9b", "https://deno.land/std@0.224.0/fmt/colors.ts": "508563c0659dd7198ba4bbf87e97f654af3c34eb56ba790260f252ad8012e1c5", "https://deno.land/std@0.224.0/http/server.ts": "f9313804bf6467a1704f45f76cb6cd0a3396a3b31c316035e6a4c2035d1ea514", "https://deno.land/std@0.224.0/internal/diff.ts": "6234a4b493ebe65dc67a18a0eb97ef683626a1166a1906232ce186ae9f65f4e6", "https://deno.land/std@0.224.0/internal/format.ts": "0a98ee226fd3d43450245b1844b47003419d34d210fa989900861c79820d21c2", "https://deno.land/std@0.224.0/internal/mod.ts": "534125398c8e7426183e12dc255bb635d94e06d0f93c60a297723abe69d3b22e", "https://deno.land/std@0.224.0/testing/_test_suite.ts": "f10a8a6338b60c403f07a76f3f46bdc9f1e1a820c0a1decddeb2949f7a8a0546", + "https://deno.land/std@0.224.0/testing/_time.ts": "fefd1ff35b50a410db9b0e7227e05163e1b172c88afd0d2071df0125958c3ff3", "https://deno.land/std@0.224.0/testing/bdd.ts": "3e4de4ff6d8f348b5574661cef9501b442046a59079e201b849d0e74120d476b", - "https://deno.land/std@0.224.0/testing/mock.ts": "a963181c2860b6ba3eb60e08b62c164d33cf5da7cd445893499b2efda20074db" + "https://deno.land/std@0.224.0/testing/mock.ts": "a963181c2860b6ba3eb60e08b62c164d33cf5da7cd445893499b2efda20074db", + "https://deno.land/std@0.224.0/testing/time.ts": "7119072a198e9913da0d21106b1f05a90a4c05b07075529770ff0e2a9eb5eaba" }, "workspace": { "dependencies": [ diff --git a/src/pathways/builder.ts b/src/pathways/builder.ts index 0a96975..ba979e8 100644 --- a/src/pathways/builder.ts +++ b/src/pathways/builder.ts @@ -385,6 +385,7 @@ export class PathwaysBuilder< private readonly inputSchemas: Record = {} as Record private readonly writable: Record = {} as Record private readonly subscribed: Record = {} as Record + private readonly pumpGroups: Record = {} as Record private readonly timeouts: Record = {} as Record private readonly maxRetries: Record = {} as Record private readonly retryDelays: Record = {} as Record @@ -829,6 +830,18 @@ export class PathwaysBuilder< * exists, avoiding back-pressure on the shared pump queue. */ subscribe?: boolean + /** + * Optional pump group. Event types sharing a `(flowType, pumpGroup)` pair + * land on the same data pump; different `pumpGroup` values within one + * `flowType` run on independent pumps with isolated state cursors and + * processor concurrency. Omit (or pass `"default"`) for the legacy + * single-pump-per-flowType behavior. + * + * NOTE: the WebSocket notifier is `flowType`-scoped, so two groups on the + * same `flowType` receive identical notifications and each pulls; isolation + * is at processor + state, not bandwidth. + */ + pumpGroup?: string maxRetries?: number retryDelayMs?: number isFilePathway?: FP @@ -844,6 +857,12 @@ export class PathwaysBuilder< const path = `${contract.flowType}/${contract.eventType}` as PathwayKey const writable = contract.writable ?? true const subscribe = contract.subscribe ?? true + if (contract.pumpGroup !== undefined && contract.pumpGroup.trim() === "") { + throw new Error( + `Pathway ${path} has an empty pumpGroup — pumpGroup must be a non-empty string when set`, + ) + } + const pumpGroup = contract.pumpGroup?.trim() ?? "default" this.logger.debug(`Registering pathway`, { pathway: path, @@ -851,6 +870,7 @@ export class PathwaysBuilder< eventType: contract.eventType, writable, subscribe, + pumpGroup, isFilePathway: contract.isFilePathway, timeoutMs: contract.timeoutMs, maxRetries: contract.maxRetries, @@ -900,6 +920,7 @@ export class PathwaysBuilder< } this.writable[path] = writable this.subscribed[path] = subscribe + this.pumpGroups[path] = pumpGroup // Store provisioning descriptions if (contract.description !== undefined) { @@ -915,6 +936,7 @@ export class PathwaysBuilder< eventType: contract.eventType, writable, subscribe, + pumpGroup, isFilePathway: contract.isFilePathway, }) @@ -1464,7 +1486,7 @@ export class PathwaysBuilder< return } - const registrations = this.buildSubscribedRegistrations() + const registrations = this.buildPumpRegistrations() await this.pathwayPump.start(registrations) this.logger.info("Pump started", { @@ -1472,6 +1494,24 @@ export class PathwaysBuilder< }) } + /** + * Returns the subset of registrations the in-process pump should subscribe to, + * enriched with the resolved `pumpGroup` for each pathway. Only used by the + * pump — provisioning still uses {@link buildRegistrations} (no pumpGroup). + */ + private buildPumpRegistrations(): Array<{ flowType: string; eventType: string; pumpGroup: string }> { + return Object.keys(this.pathways) + .filter((key) => this.subscribed[key as keyof TPathway] !== false) + .map((key) => { + const [flowType, eventType] = key.split("/") + return { + flowType, + eventType, + pumpGroup: this.pumpGroups[key as keyof TPathway] ?? "default", + } + }) + } + private async stopLeaderRuntime(): Promise { this.stopCommandPoller() @@ -1890,21 +1930,35 @@ export class PathwaysBuilder< * @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state * and restarts from the live position. To replay from the very beginning, * pass the first time bucket explicitly. + * @param filter - Optional filter narrowing which pumps to reset. Accepts: + * - `string[]` (legacy): treated as `flowTypes`. + * - `{ flowTypes?, pumpGroups? }`: matches pumps that satisfy every supplied + * criterion (intersection). + * Cluster mode does not yet propagate the filter — it is logged and + * the leader resets all pumps. */ - async resetPump(position?: PumpState, flowTypes?: string[]): Promise { + async resetPump( + position?: PumpState, + filter?: string[] | { flowTypes?: string[]; pumpGroups?: string[] }, + ): Promise { if (!this.pathwayPump) { throw new Error("Pump not started — call startPump() first") } + const normalized = Array.isArray(filter) ? { flowTypes: filter } : filter + if (this.clusterManager) { - if (flowTypes?.length) { - this.logger.warn("flowTypes filter is not supported in cluster mode reset — resetting all flow types") + if (normalized?.flowTypes?.length || normalized?.pumpGroups?.length) { + this.logger.warn( + "Reset filter is not supported in cluster mode — resetting all pumps", + { filter: normalized }, + ) } await this.clusterManager.requestReset(position) return [...this.pathwayPump.registeredFlowTypes] } - return await this.pathwayPump.reset(position, flowTypes) + return await this.pathwayPump.reset(position, normalized) } /** diff --git a/src/pathways/pump/pathway-pump.ts b/src/pathways/pump/pathway-pump.ts index 2d0e2af..fb9a93f 100644 --- a/src/pathways/pump/pathway-pump.ts +++ b/src/pathways/pump/pathway-pump.ts @@ -11,11 +11,25 @@ import type { } from "./types.ts" /** - * Registered pathway info needed for pump grouping + * Registered pathway info needed for pump grouping. + * `pumpGroup` defaults to `"default"` when omitted. */ interface PathwayRegistration { flowType: string eventType: string + pumpGroup?: string +} + +/** + * Filter for {@link PathwayPump.reset}. Each criterion narrows which pumps are reset. + * - Omit both → reset every pump. + * - `flowTypes` → reset every pump whose flow type matches. + * - `pumpGroups` → reset every pump whose pump group matches. + * - Both → reset pumps that match BOTH (intersection). + */ +export interface PumpResetFilter { + flowTypes?: string[] + pumpGroups?: string[] } // deno-lint-ignore no-explicit-any @@ -27,34 +41,58 @@ type DataPumpConstructor = any const RESTART_BASE_MS = 1_000 const RESTART_MAX_MS = 30_000 +const DEFAULT_PUMP_GROUP = "default" + +/** + * Composite key uniquely identifying a pump within a builder: `${flowType}::${pumpGroup}`. + * Used as the map key for `pumps`, `stateManagers`, `restartAttempts`, and `groupMeta`. + */ +function groupKey(flowType: string, pumpGroup: string): string { + return `${flowType}::${pumpGroup}` +} + /** * Normalize the user-facing `concurrency` option into a `Required`. * * Accepts: - * - `undefined` → `{ default: 1, byFlowType: {} }` - * - `number` → `{ default: n, byFlowType: {} }` - * - object → shallow copy, `default` falls back to `1`, `byFlowType` to `{}` + * - `undefined` → `{ default: 1, byFlowType: {}, byPumpGroup: {} }` + * - `number` → `{ default: n, byFlowType: {}, byPumpGroup: {} }` + * - object → shallow copy, `default` falls back to `1`, others to `{}` */ function normalizeConcurrency( concurrency: PathwayPumpOptions["concurrency"], ): Required { if (typeof concurrency === "number") { - return { default: concurrency, byFlowType: {} } + return { default: concurrency, byFlowType: {}, byPumpGroup: {} } } if (concurrency && typeof concurrency === "object") { return { default: concurrency.default ?? 1, byFlowType: { ...(concurrency.byFlowType ?? {}) }, + byPumpGroup: { ...(concurrency.byPumpGroup ?? {}) }, } } - return { default: 1, byFlowType: {} } + return { default: 1, byFlowType: {}, byPumpGroup: {} } +} + +interface GroupMeta { + flowType: string + pumpGroup: string + eventTypes: string[] } /** * PathwayPump orchestrates data pump instances for auto-fetching events from Flowcore. * - * Groups registered pathways by flowType and creates one FlowcoreDataPump per flowType group. - * Events are routed to PathwaysBuilder.process() for handling. + * Groups registered pathways by `(flowType, pumpGroup)` and creates one FlowcoreDataPump + * per group. Within one `flowType`, multiple `pumpGroup`s give independent state cursors, + * processor concurrency, and restart backoff. Events are routed to PathwaysBuilder.process() + * for handling. + * + * Resilience: per-group restarts on error use exponential backoff and keep retrying + * indefinitely (capped at {@link RESTART_MAX_MS}). A failure during a restart attempt + * does NOT stop further attempts — the loop continues until the pump is explicitly stopped + * or the restart eventually succeeds. */ export class PathwayPump { private readonly stateManagerFactory: PumpStateManagerFactory @@ -63,6 +101,8 @@ export class PathwayPump { private readonly maxRedeliveryCount: number private readonly concurrency: Required private readonly logger: Logger + private readonly stateManagerFactoryArity: number + private legacyFactoryWarningEmitted = false private pulseConfig?: { url: string intervalMs?: number @@ -75,7 +115,8 @@ export class PathwayPump { private stateManagers: Map = new Map() private running = false private restartAttempts: Map = new Map() - private flowTypeEventTypes: Map = new Map() + private restartTimers: Map> = new Map() + private groupMeta: Map = new Map() private dataPumpConstructor: DataPumpConstructor = null // Required config from PathwaysBuilder @@ -89,6 +130,7 @@ export class PathwayPump { constructor(options: PathwayPumpOptions, logger?: Logger) { this.stateManagerFactory = options.stateManagerFactory + this.stateManagerFactoryArity = options.stateManagerFactory.length this.notifier = options.notifier ?? { type: "websocket" } this.bufferSize = options.bufferSize ?? 1000 this.maxRedeliveryCount = options.maxRedeliveryCount ?? 3 @@ -116,7 +158,7 @@ export class PathwayPump { /** * Start pumps for the given pathway registrations. - * Groups by flowType and creates one pump per group. + * Groups by `(flowType, pumpGroup)` and creates one pump per group. */ async start(pathways: PathwayRegistration[]): Promise { if (this.running) return @@ -126,16 +168,24 @@ export class PathwayPump { this.running = true - // Group pathways by flowType - const flowTypeGroups = new Map() + const groups = new Map() for (const pw of pathways) { - const eventTypes = flowTypeGroups.get(pw.flowType) ?? [] - eventTypes.push(pw.eventType) - flowTypeGroups.set(pw.flowType, eventTypes) + const pumpGroup = pw.pumpGroup ?? DEFAULT_PUMP_GROUP + const key = groupKey(pw.flowType, pumpGroup) + const existing = groups.get(key) + if (existing) { + existing.eventTypes.push(pw.eventType) + } else { + groups.set(key, { flowType: pw.flowType, pumpGroup, eventTypes: [pw.eventType] }) + } } this.logger.info("Starting data pumps", { - flowTypes: [...flowTypeGroups.keys()], + groups: [...groups.values()].map((g) => ({ + flowType: g.flowType, + pumpGroup: g.pumpGroup, + eventTypes: g.eventTypes.length, + })), totalPathways: pathways.length, }) @@ -143,18 +193,58 @@ export class PathwayPump { const { FlowcoreDataPump } = await import("@flowcore/data-pump") this.dataPumpConstructor = FlowcoreDataPump - for (const [flowType, eventTypes] of flowTypeGroups) { - this.flowTypeEventTypes.set(flowType, eventTypes) - await this.startPumpForFlowType(flowType, eventTypes) + for (const meta of groups.values()) { + this.groupMeta.set(groupKey(meta.flowType, meta.pumpGroup), meta) + await this.startPumpForGroup(meta) } } /** - * Start (or restart) a pump for a specific flow type. + * Resolve a state manager for a `(flowType, pumpGroup)` pair, falling back to + * the legacy single-arg factory shape when the user-supplied factory has arity 1. */ - private async startPumpForFlowType(flowType: string, eventTypes: string[]): Promise { - const stateManager = this.stateManagerFactory(flowType) - this.stateManagers.set(flowType, stateManager) + private resolveStateManager(flowType: string, pumpGroup: string): PumpStateManager { + if (this.stateManagerFactoryArity <= 1) { + if (!this.legacyFactoryWarningEmitted && pumpGroup !== DEFAULT_PUMP_GROUP) { + this.logger.warn( + "PumpStateManagerFactory has legacy single-arg signature; pump groups on the same flowType " + + "will share state. Update the factory to accept (flowType, pumpGroup) for proper isolation.", + { flowType, pumpGroup }, + ) + this.legacyFactoryWarningEmitted = true + } + // Cast through unknown to permit legacy single-arg invocation. + return (this.stateManagerFactory as unknown as (flowType: string) => PumpStateManager)(flowType) + } + return this.stateManagerFactory(flowType, pumpGroup) + } + + /** + * Resolve effective concurrency for one pump. + * Order: `byPumpGroup` → `byFlowType` → `default`. + */ + private resolveConcurrency(flowType: string, pumpGroup: string): number { + const composite = this.concurrency.byPumpGroup[`${flowType}::${pumpGroup}`] + if (composite !== undefined) return composite + const perFlow = this.concurrency.byFlowType[flowType] + if (perFlow !== undefined) return perFlow + return this.concurrency.default + } + + /** + * Start (or restart) a pump for a specific (flowType, pumpGroup) group. + * + * On error from the underlying pump, schedules an exponential-backoff restart + * scoped to this group only. Restart attempts continue indefinitely (capped at + * {@link RESTART_MAX_MS}); a synchronous failure during a restart attempt does + * not stop the loop — it schedules another attempt. + */ + private async startPumpForGroup(meta: GroupMeta): Promise { + const { flowType, pumpGroup, eventTypes } = meta + const key = groupKey(flowType, pumpGroup) + + const stateManager = this.resolveStateManager(flowType, pumpGroup) + this.stateManagers.set(key, stateManager) const notifierOptions = this.buildNotifierOptions(flowType, eventTypes) @@ -168,7 +258,7 @@ export class PathwayPump { }, stateManager, processor: { - concurrency: this.concurrency.byFlowType[flowType] ?? this.concurrency.default, + concurrency: this.resolveConcurrency(flowType, pumpGroup), handler: async (events: FlowcoreEvent[]) => { for (const event of events) { const pathway = `${event.flowType}/${event.eventType}` @@ -180,22 +270,20 @@ export class PathwayPump { maxRedeliveryCount: this.maxRedeliveryCount, notifier: notifierOptions, logger: { - debug: (msg: string, meta?: Record) => this.logger.debug(msg, meta), - info: (msg: string, meta?: Record) => this.logger.info(msg, meta), - warn: (msg: string, meta?: Record) => this.logger.warn(msg, meta), - error: (msg: string | Error, meta?: Record) => - this.logger.error( - msg instanceof Error ? msg.message : msg, - meta, - ), + debug: (msg: string, m?: Record) => this.logger.debug(msg, m), + info: (msg: string, m?: Record) => this.logger.info(msg, m), + warn: (msg: string, m?: Record) => this.logger.warn(msg, m), + error: (msg: string | Error, m?: Record) => + this.logger.error(msg instanceof Error ? msg.message : msg, m), }, } if (this.pulseConfig) { + const baseId = this.pulseConfig.pathwayId ?? "unknown" pumpOptions.pulse = { url: this.pulseConfig.url, intervalMs: this.pulseConfig.intervalMs, - pathwayId: this.pulseConfig.pathwayId ?? "unknown", + pathwayId: `${baseId}::${flowType}::${pumpGroup}`, successLogLevel: this.pulseConfig.successLogLevel, failureLogLevel: this.pulseConfig.failureLogLevel, } @@ -204,32 +292,72 @@ export class PathwayPump { // deno-lint-ignore no-explicit-any const pump = await this.dataPumpConstructor.create(pumpOptions as any) - this.pumps.set(flowType, pump) + this.pumps.set(key, pump) await pump.start((error?: Error) => { if (error) { - this.logger.error(`Data pump error for flowType ${flowType}`, error, { flowType }) + this.logger.error(`Data pump error`, error, { flowType, pumpGroup }) if (!this.running) return - const attempts = (this.restartAttempts.get(flowType) ?? 0) + 1 - this.restartAttempts.set(flowType, attempts) - const delay = Math.min(RESTART_BASE_MS * Math.pow(2, attempts - 1), RESTART_MAX_MS) - this.logger.warn(`Restarting pump for flowType ${flowType} in ${delay}ms (attempt ${attempts})`) - setTimeout(async () => { - if (!this.running) return - try { - await this.startPumpForFlowType(flowType, eventTypes) - } catch (restartError) { - this.logger.error( - `Failed to restart pump for flowType ${flowType}`, - restartError instanceof Error ? restartError : new Error(String(restartError)), - ) - } - }, delay) + this.scheduleRestart(meta) } }) - this.restartAttempts.set(flowType, 0) - this.logger.info("Data pump started", { flowType, eventTypes }) + // Successful start: reset the per-group attempt counter. + this.restartAttempts.set(key, 0) + this.logger.info("Data pump started", { flowType, pumpGroup, eventTypes }) + } + + /** + * Schedule a restart for one pump group with capped exponential backoff. + * Multiple restart triggers for the same group within the backoff window are deduped. + * A synchronous failure inside the scheduled restart re-arms another attempt — the + * loop continues until the group is stopped or a restart succeeds. + */ + private scheduleRestart(meta: GroupMeta): void { + const key = groupKey(meta.flowType, meta.pumpGroup) + if (this.restartTimers.has(key)) return + + const attempts = (this.restartAttempts.get(key) ?? 0) + 1 + this.restartAttempts.set(key, attempts) + const delay = Math.min(RESTART_BASE_MS * Math.pow(2, attempts - 1), RESTART_MAX_MS) + this.logger.warn( + `Restarting pump in ${delay}ms (attempt ${attempts})`, + { flowType: meta.flowType, pumpGroup: meta.pumpGroup, delay, attempts }, + ) + + const timer = setTimeout(async () => { + this.restartTimers.delete(key) + if (!this.running) return + + // Stop and discard any prior pump instance for this group. + const previous = this.pumps.get(key) + if (previous) { + try { + await previous.stop() + } catch (stopErr) { + this.logger.warn(`Failed to stop pump before restart — continuing`, { + flowType: meta.flowType, + pumpGroup: meta.pumpGroup, + error: stopErr instanceof Error ? stopErr.message : String(stopErr), + }) + } + this.pumps.delete(key) + } + + try { + await this.startPumpForGroup(meta) + } catch (restartError) { + this.logger.error( + `Failed to restart pump`, + restartError instanceof Error ? restartError : new Error(String(restartError)), + { flowType: meta.flowType, pumpGroup: meta.pumpGroup, attempts }, + ) + if (this.running) { + this.scheduleRestart(meta) + } + } + }, delay) + this.restartTimers.set(key, timer) } /** @@ -241,14 +369,21 @@ export class PathwayPump { this.logger.info("Stopping data pumps") - for (const [flowType, pump] of this.pumps) { + for (const timer of this.restartTimers.values()) { + clearTimeout(timer) + } + this.restartTimers.clear() + + for (const [key, pump] of this.pumps) { + const meta = this.groupMeta.get(key) try { await pump.stop() - this.logger.info("Data pump stopped", { flowType }) + this.logger.info("Data pump stopped", { flowType: meta?.flowType, pumpGroup: meta?.pumpGroup }) } catch (err) { this.logger.error( - `Error stopping pump for ${flowType}`, + `Error stopping pump`, err instanceof Error ? err : new Error(String(err)), + { flowType: meta?.flowType, pumpGroup: meta?.pumpGroup }, ) } } @@ -256,52 +391,68 @@ export class PathwayPump { this.pumps.clear() this.stateManagers.clear() this.restartAttempts.clear() - this.flowTypeEventTypes.clear() + this.groupMeta.clear() } /** - * Reset all pumps to a specific position, or clear state and bounce if no position given. + * Reset pumps to a specific position, or clear state and bounce if no position given. * Uses @flowcore/data-pump's restart() to reposition the cursor without recreating instances. * + * Filter accepts: + * - `string[]` → legacy: filter by flow type names + * - `{ flowTypes?, pumpGroups? }` → narrow to matching `(flowType, pumpGroup)` pumps + * + * Both are supported for back-compat; the array form is equivalent to `{ flowTypes }`. + * * @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state * and restarts pumps (pump will start from live position). * To replay from the very beginning, pass the first time bucket explicitly. + * @returns Array of `${flowType}::${pumpGroup}` keys for pumps that were reset. */ - async reset(position?: PumpState, flowTypes?: string[]): Promise { + async reset(position?: PumpState, filter?: string[] | PumpResetFilter): Promise { if (!this.running) { throw new Error("PathwayPump is not running — cannot reset") } - this.logger.info("Resetting data pumps", { position, flowTypes }) + const normalized: PumpResetFilter | undefined = Array.isArray(filter) ? { flowTypes: filter } : filter - const resetFlowTypes: string[] = [] + this.logger.info("Resetting data pumps", { position, filter: normalized }) - for (const [flowType, pump] of this.pumps) { - if (flowTypes && !flowTypes.includes(flowType)) continue + const reset: string[] = [] + + for (const [key, pump] of this.pumps) { + const meta = this.groupMeta.get(key) + if (!meta) continue + if (normalized?.flowTypes && !normalized.flowTypes.includes(meta.flowType)) continue + if (normalized?.pumpGroups && !normalized.pumpGroups.includes(meta.pumpGroup)) continue try { if (position) { await pump.restart({ timeBucket: position.timeBucket, eventId: position.eventId }) } else { - // Clear persisted state then restart pump from live - const stateManager = this.stateManagers.get(flowType) + const stateManager = this.stateManagers.get(key) if (stateManager?.clearState) { await stateManager.clearState() } await pump.restart({ timeBucket: new Date().toISOString().replace(/[-:T]/g, "").slice(0, 14) }) } - resetFlowTypes.push(flowType) - this.logger.info("Data pump reset", { flowType, position }) + reset.push(key) + this.logger.info("Data pump reset", { + flowType: meta.flowType, + pumpGroup: meta.pumpGroup, + position, + }) } catch (err) { this.logger.error( - `Error resetting pump for ${flowType}`, + `Error resetting pump`, err instanceof Error ? err : new Error(String(err)), + { flowType: meta.flowType, pumpGroup: meta.pumpGroup }, ) throw err } } - return resetFlowTypes + return reset } async setPulseConfig(pulseConfig: NonNullable): Promise { @@ -311,28 +462,37 @@ export class PathwayPump { return } - const flowTypeGroups = [...this.flowTypeEventTypes.entries()] + const groups = [...this.groupMeta.values()] const existingPumps = [...this.pumps.entries()] - for (const [flowType, pump] of existingPumps) { + for (const [key, pump] of existingPumps) { + const meta = this.groupMeta.get(key) try { await pump.stop() - this.logger.info("Data pump stopped for pulse reconfiguration", { flowType }) + this.logger.info("Data pump stopped for pulse reconfiguration", { + flowType: meta?.flowType, + pumpGroup: meta?.pumpGroup, + }) } catch (err) { this.logger.error( - `Error stopping pump for ${flowType} during pulse reconfiguration`, + `Error stopping pump during pulse reconfiguration`, err instanceof Error ? err : new Error(String(err)), + { flowType: meta?.flowType, pumpGroup: meta?.pumpGroup }, ) throw err } } + for (const timer of this.restartTimers.values()) { + clearTimeout(timer) + } + this.restartTimers.clear() this.pumps.clear() this.stateManagers.clear() this.restartAttempts.clear() - for (const [flowType, eventTypes] of flowTypeGroups) { - await this.startPumpForFlowType(flowType, eventTypes) + for (const meta of groups) { + await this.startPumpForGroup(meta) } } @@ -340,8 +500,22 @@ export class PathwayPump { return this.running } + /** + * Unique flow types currently driven by at least one pump (back-compat with pre-2.4 API). + */ get registeredFlowTypes(): string[] { - return [...this.pumps.keys()] + const set = new Set() + for (const meta of this.groupMeta.values()) { + set.add(meta.flowType) + } + return [...set] + } + + /** + * All `(flowType, pumpGroup)` pairs currently driven by a pump. + */ + get registeredPumpGroups(): Array<{ flowType: string; pumpGroup: string }> { + return [...this.groupMeta.values()].map((m) => ({ flowType: m.flowType, pumpGroup: m.pumpGroup })) } // deno-lint-ignore no-explicit-any diff --git a/src/pathways/pump/state.ts b/src/pathways/pump/state.ts index 54ca621..4ab14a3 100644 --- a/src/pathways/pump/state.ts +++ b/src/pathways/pump/state.ts @@ -2,9 +2,11 @@ import { PostgresJsAdapter } from "../postgres/index.ts" import type { PostgresAdapter } from "../postgres/index.ts" import type { PostgresPumpStateConfig, PumpState, PumpStateManager, PumpStateManagerFactory } from "./types.ts" +const DEFAULT_PUMP_GROUP = "default" + /** * PostgreSQL-backed pump state manager. - * Stores per-flowType pump position (timeBucket + eventId) for resume support. + * Stores per-`(flowType, pumpGroup)` pump position (`timeBucket` + `eventId`) for resume support. */ class PostgresPumpStateManager implements PumpStateManager { private initialized = false @@ -12,26 +14,59 @@ class PostgresPumpStateManager implements PumpStateManager { constructor( private readonly adapter: PostgresAdapter, private readonly flowType: string, + private readonly pumpGroup: string, private readonly tableName: string, ) {} + /** + * Idempotent schema bootstrap + migration: + * 1. Create the table with the new composite-PK shape if missing. + * 2. If a pre-existing single-column-PK table is found, add the `pump_group` + * column (defaulting existing rows to `"default"`) and swap the primary key + * to the composite `(flow_type, pump_group)`. Library uses CREATE TABLE + * IF NOT EXISTS so it WILL NOT fix existing tables on its own — this + * method runs the migration explicitly on first use. + */ private async ensureInitialized(): Promise { if (this.initialized) return await this.adapter.execute(` CREATE TABLE IF NOT EXISTS ${this.tableName} ( - flow_type TEXT PRIMARY KEY, + flow_type TEXT NOT NULL, + pump_group TEXT NOT NULL DEFAULT 'default', time_bucket TEXT NOT NULL, - event_id TEXT + event_id TEXT, + PRIMARY KEY (flow_type, pump_group) ) `) + await this.adapter.execute( + `ALTER TABLE ${this.tableName} ADD COLUMN IF NOT EXISTS pump_group TEXT NOT NULL DEFAULT 'default'`, + ) + await this.adapter.execute(` + DO $$ + DECLARE + pk_cols int; + BEGIN + SELECT count(*) INTO pk_cols + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + WHERE tc.table_name = '${this.tableName}' + AND tc.constraint_type = 'PRIMARY KEY'; + IF pk_cols = 1 THEN + EXECUTE 'ALTER TABLE ${this.tableName} DROP CONSTRAINT ${this.tableName}_pkey'; + EXECUTE 'ALTER TABLE ${this.tableName} ADD PRIMARY KEY (flow_type, pump_group)'; + END IF; + END $$; + `) this.initialized = true } async getState(): Promise { await this.ensureInitialized() const result = await this.adapter.query>( - `SELECT time_bucket, event_id FROM ${this.tableName} WHERE flow_type = $1`, - [this.flowType], + `SELECT time_bucket, event_id FROM ${this.tableName} WHERE flow_type = $1 AND pump_group = $2`, + [this.flowType, this.pumpGroup], ) if (!Array.isArray(result) || result.length === 0) return null @@ -45,26 +80,26 @@ class PostgresPumpStateManager implements PumpStateManager { async setState(state: PumpState): Promise { await this.ensureInitialized() await this.adapter.execute( - `INSERT INTO ${this.tableName} (flow_type, time_bucket, event_id) - VALUES ($1, $2, $3) - ON CONFLICT (flow_type) DO UPDATE - SET time_bucket = $2, event_id = $3`, - [this.flowType, state.timeBucket, state.eventId ?? null], + `INSERT INTO ${this.tableName} (flow_type, pump_group, time_bucket, event_id) + VALUES ($1, $2, $3, $4) + ON CONFLICT (flow_type, pump_group) DO UPDATE + SET time_bucket = $3, event_id = $4`, + [this.flowType, this.pumpGroup, state.timeBucket, state.eventId ?? null], ) } async clearState(): Promise { await this.ensureInitialized() await this.adapter.execute( - `DELETE FROM ${this.tableName} WHERE flow_type = $1`, - [this.flowType], + `DELETE FROM ${this.tableName} WHERE flow_type = $1 AND pump_group = $2`, + [this.flowType, this.pumpGroup], ) } } /** * Creates a factory function that produces PostgreSQL-backed pump state managers. - * Each flowType gets its own state row in the shared table. + * Each `(flowType, pumpGroup)` gets its own state row in the shared table. * * The adapter is created once and shared across all state managers. */ @@ -77,7 +112,7 @@ export async function createPostgresPumpStateManagerFactory( const adapter = new PostgresJsAdapter(pgConfig) await adapter.connect() - return (flowType: string): PumpStateManager => { - return new PostgresPumpStateManager(adapter, flowType, table) + return (flowType: string, pumpGroup: string = DEFAULT_PUMP_GROUP): PumpStateManager => { + return new PostgresPumpStateManager(adapter, flowType, pumpGroup, table) } } diff --git a/src/pathways/pump/types.ts b/src/pathways/pump/types.ts index a08eecf..279632e 100644 --- a/src/pathways/pump/types.ts +++ b/src/pathways/pump/types.ts @@ -24,14 +24,25 @@ export interface AutoProvisionConfig { /** * Concurrency settings for event processing per pump. * - * @property default Default concurrency applied to every flow type. Default: 1. - * @property byFlowType Per-flow-type overrides keyed by `flowType` name. + * Resolution order (first hit wins) per pump: + * 1. `byPumpGroup["${flowType}::${pumpGroup}"]` + * 2. `byFlowType[flowType]` + * 3. `default` (or 1) + * + * NOTE: this resolves to `processor.concurrency` on `@flowcore/data-pump`, which is + * the in-flight batch width — not parallel handler invocations. */ export interface PumpConcurrencyConfig { - /** Default concurrency applied to every flow type. Default: 1. */ + /** Default concurrency applied to every pump. Default: 1. */ default?: number - /** Per-flow-type overrides keyed by `flowType` name. */ + /** Per-flow-type overrides keyed by `flowType` name. Used when no pump-group override matches. */ byFlowType?: Record + /** + * Per-(flowType, pumpGroup) override. Key format: `${flowType}::${pumpGroup}`. + * Wins over `byFlowType`. Use this to tune a hot pump group separately from + * the default group on the same `flowType`. + */ + byPumpGroup?: Record } /** @@ -71,9 +82,18 @@ export interface PathwayPumpOptions { } /** - * Factory function that creates a state manager for a given flowType + * Factory function that creates a state manager for a given pump. + * + * The factory is invoked once per `(flowType, pumpGroup)` pair when the pump starts, + * so each pump gets an isolated state cursor. + * + * Back-compat: factories with arity `1` (legacy single `flowType` argument) are still + * accepted at runtime — `PathwayPump` detects the arity and falls back to passing + * only `flowType`, logging a one-time deprecation warning. Such factories will share + * state across pump groups on the same flowType, so prefer the two-argument form when + * using `pumpGroup`. */ -export type PumpStateManagerFactory = (flowType: string) => PumpStateManager +export type PumpStateManagerFactory = (flowType: string, pumpGroup: string) => PumpStateManager /** * State manager interface compatible with @flowcore/data-pump's FlowcoreDataPumpStateManager diff --git a/src/pathways/types.ts b/src/pathways/types.ts index 5cea749..6f1b0ee 100644 --- a/src/pathways/types.ts +++ b/src/pathways/types.ts @@ -96,6 +96,21 @@ export interface PathwayContract + buildPumpRegistrations(): PumpRegistration[] +} + +// deno-lint-ignore no-explicit-any +function inspect(builder: PathwaysBuilder): BuilderInternals { + return builder as unknown as BuilderInternals +} + +const schema = z.object({ id: z.string() }) + +Deno.test({ + name: "PathwaysBuilder.register — pumpGroup", + sanitizeResources: false, + sanitizeOps: false, + fn: async (t) => { + await t.step("defaults pumpGroup to 'default' when omitted", () => { + const builder = new PathwaysBuilder(baseOpts).register({ + flowType: flowOrders, + eventType: eventPlaced, + schema, + }) + + const internals = inspect(builder) + assertEquals(internals.pumpGroups[pathPlaced], "default") + }) + + await t.step("persists explicit pumpGroup value", () => { + const builder = new PathwaysBuilder(baseOpts).register({ + flowType: flowOrders, + eventType: eventPlaced, + schema, + pumpGroup: "hot", + }) + + const internals = inspect(builder) + assertEquals(internals.pumpGroups[pathPlaced], "hot") + }) + + await t.step("trims whitespace in pumpGroup", () => { + const builder = new PathwaysBuilder(baseOpts).register({ + flowType: flowOrders, + eventType: eventPlaced, + schema, + pumpGroup: " hot ", + }) + + const internals = inspect(builder) + assertEquals(internals.pumpGroups[pathPlaced], "hot") + }) + + await t.step("rejects empty pumpGroup", () => { + assertThrows( + () => { + new PathwaysBuilder(baseOpts).register({ + flowType: flowOrders, + eventType: eventPlaced, + schema, + pumpGroup: "", + }) + }, + Error, + "pumpGroup must be a non-empty string", + ) + }) + + await t.step("rejects whitespace-only pumpGroup", () => { + assertThrows( + () => { + new PathwaysBuilder(baseOpts).register({ + flowType: flowOrders, + eventType: eventPlaced, + schema, + pumpGroup: " ", + }) + }, + Error, + "pumpGroup must be a non-empty string", + ) + }) + + await t.step( + "buildPumpRegistrations emits pumpGroup and produces two distinct entries for two groups on one flow type", + () => { + const builder = new PathwaysBuilder(baseOpts) + .register({ flowType: flowOrders, eventType: eventPlaced, schema }) + .register({ + flowType: flowOrders, + eventType: eventPlacedFast, + schema, + pumpGroup: "hot", + }) + + const regs = inspect(builder).buildPumpRegistrations() + const sorted = [...regs].sort((a, b) => a.eventType.localeCompare(b.eventType)) + assertEquals(sorted, [ + { flowType: flowOrders, eventType: eventPlaced, pumpGroup: "default" }, + { flowType: flowOrders, eventType: eventPlacedFast, pumpGroup: "hot" }, + ]) + }, + ) + + await t.step("buildPumpRegistrations excludes subscribe: false pathways", () => { + const builder = new PathwaysBuilder(baseOpts) + .register({ flowType: flowOrders, eventType: eventPlaced, schema }) + .register({ + flowType: flowOrders, + eventType: eventPlacedFast, + schema, + pumpGroup: "hot", + subscribe: false, + }) + + const regs = inspect(builder).buildPumpRegistrations() + assertEquals(regs.length, 1) + assertEquals(regs[0].eventType, eventPlaced) + }) + + await t.step("explicit 'default' pumpGroup is accepted as a no-op alias", () => { + const builder = new PathwaysBuilder(baseOpts).register({ + flowType: flowOrders, + eventType: eventPlaced, + schema, + pumpGroup: "default", + }) + + const internals = inspect(builder) + assertEquals(internals.pumpGroups[pathPlaced], "default") + }) + }, +}) diff --git a/tests/pathway-pump-restart.test.ts b/tests/pathway-pump-restart.test.ts index 89aa03b..7404abb 100644 --- a/tests/pathway-pump-restart.test.ts +++ b/tests/pathway-pump-restart.test.ts @@ -1,4 +1,5 @@ import { assertEquals } from "https://deno.land/std@0.224.0/assert/mod.ts" +import { FakeTime } from "https://deno.land/std@0.224.0/testing/time.ts" import { PathwayPump } from "../src/pathways/pump/pathway-pump.ts" import type { PumpState, PumpStateManager, PumpStateManagerFactory } from "../src/pathways/pump/types.ts" @@ -14,11 +15,12 @@ class InMemoryPumpStateManager implements PumpStateManager { function createInMemoryStateFactory(): PumpStateManagerFactory { const managers = new Map() - return (flowType: string) => { - if (!managers.has(flowType)) { - managers.set(flowType, new InMemoryPumpStateManager()) + return (flowType: string, pumpGroup: string) => { + const key = `${flowType}::${pumpGroup}` + if (!managers.has(key)) { + managers.set(key, new InMemoryPumpStateManager()) } - return managers.get(flowType)! + return managers.get(key)! } } @@ -89,5 +91,67 @@ Deno.test({ assertEquals(restartAttempts.size, 0) }) + + await t.step("restart loop keeps retrying when startPumpForGroup itself throws", async () => { + const time = new FakeTime() + try { + const pump = new PathwayPump({ + stateManagerFactory: createInMemoryStateFactory(), + notifier: { type: "poller", pollerIntervalMs: 60_000 }, + }, { + debug: () => {}, + info: () => {}, + warn: () => {}, + error: () => {}, + }) + pump.configure({ + tenant: "t", + dataCore: "dc", + apiKey: "k", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const internal = pump as unknown as { + running: boolean + startPumpForGroup: (meta: unknown) => Promise + scheduleRestart: (meta: unknown) => void + restartTimers: Map + restartAttempts: Map + } + + let attempts = 0 + internal.running = true + internal.startPumpForGroup = () => { + attempts++ + // First three attempts blow up — emulating a sticky failure + // (e.g. CP unreachable, DB credentials wrong, transient network outage). + if (attempts <= 3) { + return Promise.reject(new Error("synthetic startup failure")) + } + return Promise.resolve() + } + + const meta = { flowType: "orders", pumpGroup: "default", eventTypes: ["placed"] } + internal.scheduleRestart(meta) + + // Each failing attempt schedules the next with exponential backoff (1s, 2s, 4s...). + // Tick incrementally so each scheduled timer + its async callback gets a chance to run + // before the next tick — tickAsync only fires timers already in queue at call time. + for (let i = 0; i < 10 && attempts < 4; i++) { + await time.tickAsync(40_000) + } + + assertEquals( + attempts >= 4, + true, + `restart loop must keep retrying after synchronous failures (got ${attempts} attempts)`, + ) + // Once a restart succeeds, no more timers should be queued. + assertEquals(internal.restartTimers.size, 0) + } finally { + time.restore() + } + }) }, }) diff --git a/tests/pathway-pump.test.ts b/tests/pathway-pump.test.ts index fe61d40..4e087c5 100644 --- a/tests/pathway-pump.test.ts +++ b/tests/pathway-pump.test.ts @@ -20,14 +20,31 @@ class InMemoryPumpStateManager implements PumpStateManager { function createInMemoryStateFactory(): PumpStateManagerFactory { const managers = new Map() - return (flowType: string) => { - if (!managers.has(flowType)) { - managers.set(flowType, new InMemoryPumpStateManager()) + return (flowType: string, pumpGroup: string) => { + const key = `${flowType}::${pumpGroup}` + if (!managers.has(key)) { + managers.set(key, new InMemoryPumpStateManager()) } - return managers.get(flowType)! + return managers.get(key)! } } +interface GroupMeta { + flowType: string + pumpGroup: string + eventTypes: string[] +} + +interface InternalPump { + dataPumpConstructor: { + create(options: Record): Promise<{ start(cb?: unknown): Promise }> + } + startPumpForGroup(meta: GroupMeta): Promise + running: boolean + pumps: Map }> + groupMeta: Map +} + Deno.test({ name: "PathwayPump Tests", sanitizeResources: false, @@ -91,47 +108,23 @@ Deno.test({ assertEquals(state!.eventId, "evt-1") }) - await t.step("InMemoryStateFactory - creates separate managers per flowType", () => { + await t.step("InMemoryStateFactory - creates separate managers per (flowType, pumpGroup)", () => { const factory = createInMemoryStateFactory() - const mgr1 = factory("user") - const mgr2 = factory("order") - const mgr1Again = factory("user") - - // Same instance for same flowType - assertEquals(mgr1, mgr1Again) - - // Different instances for different flowTypes - assertEquals(mgr1 !== mgr2, true) + const userDefault = factory("user", "default") + const userHot = factory("user", "hot") + const userDefaultAgain = factory("user", "default") - mgr1.setState({ timeBucket: "20260319120000" }) - assertEquals(mgr2.getState(), null) - }) - - await t.step("should group pathways by flowType", () => { - // Test the grouping logic conceptually - const pathways = [ - { flowType: "user", eventType: "created" }, - { flowType: "user", eventType: "updated" }, - { flowType: "order", eventType: "placed" }, - { flowType: "order", eventType: "shipped" }, - { flowType: "payment", eventType: "received" }, - ] + // Same instance for same (flowType, pumpGroup) + assertEquals(userDefault, userDefaultAgain) + // Different instances for different pumpGroups on the same flowType + assertEquals(userDefault !== userHot, true) - const groups = new Map() - for (const pw of pathways) { - const eventTypes = groups.get(pw.flowType) ?? [] - eventTypes.push(pw.eventType) - groups.set(pw.flowType, eventTypes) - } - - assertEquals(groups.size, 3) - assertEquals(groups.get("user"), ["created", "updated"]) - assertEquals(groups.get("order"), ["placed", "shipped"]) - assertEquals(groups.get("payment"), ["received"]) + userDefault.setState({ timeBucket: "20260319120000" }) + assertEquals(userHot.getState(), null) }) - await t.step("concurrency defaults to 1 per flow type when unset", async () => { + await t.step("concurrency defaults to 1 per pump when unset", async () => { const factory = createInMemoryStateFactory() const pump = new PathwayPump({ stateManagerFactory: factory, @@ -146,32 +139,25 @@ Deno.test({ processEvent: async (_pathway: string, _event: FlowcoreEvent) => {}, }) - // Bypass the dynamic `@flowcore/data-pump` import by invoking the per-flowType - // bootstrap directly with a stubbed constructor — same pattern as the setPulseConfig test. - const createdConcurrencies: Record = {} - const internal = pump as unknown as { - dataPumpConstructor: { - create(options: Record): Promise<{ start(cb?: unknown): Promise }> - } - startPumpForFlowType(flowType: string, eventTypes: string[]): Promise - } + const created: Record = {} + const internal = pump as unknown as InternalPump internal.dataPumpConstructor = { create: (options: Record) => { const dataSource = options.dataSource as { flowType: string } const processor = options.processor as { concurrency: number } - createdConcurrencies[dataSource.flowType] = processor.concurrency + created[dataSource.flowType] = processor.concurrency return Promise.resolve({ start: async () => {} }) }, } - await internal.startPumpForFlowType("user", ["created"]) - await internal.startPumpForFlowType("order", ["placed"]) + await internal.startPumpForGroup({ flowType: "user", pumpGroup: "default", eventTypes: ["created"] }) + await internal.startPumpForGroup({ flowType: "order", pumpGroup: "default", eventTypes: ["placed"] }) - assertEquals(createdConcurrencies.user, 1) - assertEquals(createdConcurrencies.order, 1) + assertEquals(created.user, 1) + assertEquals(created.order, 1) }) - await t.step("numeric concurrency sets a shared default for every flow type", async () => { + await t.step("numeric concurrency sets a shared default for every pump", async () => { const factory = createInMemoryStateFactory() const pump = new PathwayPump({ stateManagerFactory: factory, @@ -187,35 +173,34 @@ Deno.test({ processEvent: async () => {}, }) - const createdConcurrencies: Record = {} - const internal = pump as unknown as { - dataPumpConstructor: { - create(options: Record): Promise<{ start(cb?: unknown): Promise }> - } - startPumpForFlowType(flowType: string, eventTypes: string[]): Promise - } + const created: Record = {} + const internal = pump as unknown as InternalPump internal.dataPumpConstructor = { create: (options: Record) => { const dataSource = options.dataSource as { flowType: string } const processor = options.processor as { concurrency: number } - createdConcurrencies[dataSource.flowType] = processor.concurrency + created[dataSource.flowType] = processor.concurrency return Promise.resolve({ start: async () => {} }) }, } - await internal.startPumpForFlowType("user", ["created"]) - await internal.startPumpForFlowType("order", ["placed"]) + await internal.startPumpForGroup({ flowType: "user", pumpGroup: "default", eventTypes: ["created"] }) + await internal.startPumpForGroup({ flowType: "order", pumpGroup: "default", eventTypes: ["placed"] }) - assertEquals(createdConcurrencies.user, 4) - assertEquals(createdConcurrencies.order, 4) + assertEquals(created.user, 4) + assertEquals(created.order, 4) }) - await t.step("per-flow-type overrides win; missing ones fall back to default", async () => { + await t.step("byPumpGroup wins over byFlowType wins over default", async () => { const factory = createInMemoryStateFactory() const pump = new PathwayPump({ stateManagerFactory: factory, notifier: { type: "poller", pollerIntervalMs: 1000 }, - concurrency: { default: 2, byFlowType: { orders: 5 } }, + concurrency: { + default: 2, + byFlowType: { orders: 5 }, + byPumpGroup: { "orders::hot": 9 }, + }, }) pump.configure({ @@ -226,27 +211,179 @@ Deno.test({ processEvent: async () => {}, }) - const createdConcurrencies: Record = {} - const internal = pump as unknown as { - dataPumpConstructor: { - create(options: Record): Promise<{ start(cb?: unknown): Promise }> - } - startPumpForFlowType(flowType: string, eventTypes: string[]): Promise - } + const seenConcurrencies: number[] = [] + const internal = pump as unknown as InternalPump internal.dataPumpConstructor = { create: (options: Record) => { - const dataSource = options.dataSource as { flowType: string } const processor = options.processor as { concurrency: number } - createdConcurrencies[dataSource.flowType] = processor.concurrency + seenConcurrencies.push(processor.concurrency) + return Promise.resolve({ start: async () => {} }) + }, + } + + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "hot", eventTypes: ["placed.fast"] }) + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "default", eventTypes: ["placed"] }) + await internal.startPumpForGroup({ flowType: "users", pumpGroup: "default", eventTypes: ["created"] }) + + assertEquals( + seenConcurrencies, + [9, 5, 2], + "byPumpGroup wins, then byFlowType, then default", + ) + }) + + await t.step("pulse pathwayId is suffixed with ::flowType::pumpGroup", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + pulse: { url: "http://cp.test", pathwayId: "p-123" }, + }) + + pump.configure({ + tenant: "t", + dataCore: "dc", + apiKey: "k", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const seen: string[] = [] + const internal = pump as unknown as InternalPump + internal.dataPumpConstructor = { + create: (options: Record) => { + const pulse = options.pulse as { pathwayId: string } + seen.push(pulse.pathwayId) + return Promise.resolve({ start: async () => {} }) + }, + } + + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "hot", eventTypes: ["placed.fast"] }) + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "default", eventTypes: ["placed"] }) + + assertEquals(seen, ["p-123::orders::hot", "p-123::orders::default"]) + }) + + await t.step("notifier dataSource.eventTypes is restricted to the group's subset", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + }) + + pump.configure({ + tenant: "t", + dataCore: "dc", + apiKey: "k", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const seenNotifierEventTypes: string[][] = [] + const internal = pump as unknown as InternalPump + internal.dataPumpConstructor = { + create: (options: Record) => { + const notifier = options.notifier as { dataSource: { eventTypes: string[] } } + seenNotifierEventTypes.push([...notifier.dataSource.eventTypes]) return Promise.resolve({ start: async () => {} }) }, } - await internal.startPumpForFlowType("orders", ["placed"]) - await internal.startPumpForFlowType("users", ["created"]) + await internal.startPumpForGroup({ + flowType: "orders", + pumpGroup: "hot", + eventTypes: ["placed.fast", "shipped.fast"], + }) + await internal.startPumpForGroup({ + flowType: "orders", + pumpGroup: "default", + eventTypes: ["placed", "shipped"], + }) + + assertEquals(seenNotifierEventTypes[0], ["placed.fast", "shipped.fast"]) + assertEquals(seenNotifierEventTypes[1], ["placed", "shipped"]) + }) + + await t.step("registeredPumpGroups exposes every (flowType, pumpGroup) pair", async () => { + const factory = createInMemoryStateFactory() + const pump = new PathwayPump({ + stateManagerFactory: factory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + }) + + pump.configure({ + tenant: "t", + dataCore: "dc", + apiKey: "k", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const internal = pump as unknown as InternalPump & { groupMeta: Map } + internal.dataPumpConstructor = { + create: (_options: Record) => Promise.resolve({ start: async () => {} }), + } + + // Bypass start() to avoid the dynamic import of @flowcore/data-pump (which would + // overwrite our stubbed dataPumpConstructor and try to authenticate against a real CP). + const groupsToStart: GroupMeta[] = [ + { flowType: "orders", pumpGroup: "default", eventTypes: ["placed"] }, + { flowType: "orders", pumpGroup: "hot", eventTypes: ["placed.fast"] }, + { flowType: "users", pumpGroup: "default", eventTypes: ["created"] }, + ] + for (const meta of groupsToStart) { + internal.groupMeta.set(`${meta.flowType}::${meta.pumpGroup}`, meta) + await internal.startPumpForGroup(meta) + } - assertEquals(createdConcurrencies.orders, 5) - assertEquals(createdConcurrencies.users, 2) + const groups = pump.registeredPumpGroups.map((g) => `${g.flowType}::${g.pumpGroup}`).sort() + assertEquals(groups, ["orders::default", "orders::hot", "users::default"]) + // Unique flow types preserved for back-compat. + assertEquals([...pump.registeredFlowTypes].sort(), ["orders", "users"]) + }) + + await t.step("legacy single-arg state factory is accepted with a deprecation warning", async () => { + const created = new Map() + // Arity 1 — old factory signature. + const legacyFactory = (flowType: string): PumpStateManager => { + if (!created.has(flowType)) { + created.set(flowType, new InMemoryPumpStateManager()) + } + return created.get(flowType)! + } + + const warns: Array<{ msg: string; meta?: Record }> = [] + const pump = new PathwayPump({ + stateManagerFactory: legacyFactory as unknown as PumpStateManagerFactory, + notifier: { type: "poller", pollerIntervalMs: 1000 }, + }, { + debug: () => {}, + info: () => {}, + warn: (msg: string, meta?: Record) => { + warns.push({ msg, meta }) + }, + error: () => {}, + }) + + pump.configure({ + tenant: "t", + dataCore: "dc", + apiKey: "k", + baseUrl: "https://api.flowcore.io", + processEvent: async () => {}, + }) + + const internal = pump as unknown as InternalPump + internal.dataPumpConstructor = { + create: (_options: Record) => Promise.resolve({ start: async () => {} }), + } + + // Two pump groups on same flow type with a legacy factory → warning fires once. + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "hot", eventTypes: ["placed.fast"] }) + await internal.startPumpForGroup({ flowType: "orders", pumpGroup: "default", eventTypes: ["placed"] }) + + const legacyWarns = warns.filter((w) => w.msg.includes("legacy single-arg signature")) + assertEquals(legacyWarns.length, 1, "deprecation warning should be emitted exactly once") }) await t.step("setPulseConfig recreates running pumps with the new pulse configuration", async () => { @@ -264,47 +401,26 @@ Deno.test({ processEvent: async (_pathway: string, _event: FlowcoreEvent) => {}, }) - const stoppedFlowTypes: string[] = [] - const createdFlowTypes: string[] = [] - const createdPulsePathwayIds: string[] = [] - ;(pump as unknown as { - running: boolean - pumps: Map }> - flowTypeEventTypes: Map - dataPumpConstructor: { - create(options: Record): Promise<{ start(cb?: unknown): Promise }> - } - }).running = true - ;(pump as unknown as { pumps: Map }> }).pumps = new Map([ - ["user", { - stop: async () => { - stoppedFlowTypes.push("user") - }, - }], - ["order", { - stop: async () => { - stoppedFlowTypes.push("order") - }, - }], + const internal = pump as unknown as InternalPump + internal.running = true + internal.pumps = new Map([ + ["user::default", { stop: async () => {} }], + ["order::default", { stop: async () => {} }], ]) - ;(pump as unknown as { flowTypeEventTypes: Map }).flowTypeEventTypes = new Map([ - ["user", ["created", "updated"]], - ["order", ["placed"]], + internal.groupMeta = new Map([ + ["user::default", { flowType: "user", pumpGroup: "default", eventTypes: ["created", "updated"] }], + ["order::default", { flowType: "order", pumpGroup: "default", eventTypes: ["placed"] }], ]) - ;(pump as unknown as { - dataPumpConstructor: { - create(options: Record): Promise<{ start(cb?: unknown): Promise }> - } - }).dataPumpConstructor = { + + const createdFlowTypes: string[] = [] + const createdPulsePathwayIds: string[] = [] + internal.dataPumpConstructor = { create: async (options: Record) => { const dataSource = options.dataSource as { flowType: string } const pulse = options.pulse as { pathwayId: string } createdFlowTypes.push(dataSource.flowType) createdPulsePathwayIds.push(pulse.pathwayId) - - return { - start: async () => {}, - } + return { start: async () => {} } }, } @@ -313,9 +429,11 @@ Deno.test({ pathwayId: "pathway-123", }) - assertEquals(stoppedFlowTypes.sort(), ["order", "user"]) assertEquals(createdFlowTypes.sort(), ["order", "user"]) - assertEquals(createdPulsePathwayIds, ["pathway-123", "pathway-123"]) + assertEquals( + createdPulsePathwayIds.sort(), + ["pathway-123::order::default", "pathway-123::user::default"], + ) assertEquals(pump.isRunning, true) assertEquals(pump.registeredFlowTypes.sort(), ["order", "user"]) }) diff --git a/tests/postgres-pump-state.test.ts b/tests/postgres-pump-state.test.ts new file mode 100644 index 0000000..7c4a152 --- /dev/null +++ b/tests/postgres-pump-state.test.ts @@ -0,0 +1,115 @@ +import { assertEquals, assertExists } from "https://deno.land/std@0.224.0/assert/mod.ts" +import { PostgresJsAdapter } from "../src/pathways/postgres/index.ts" +import { createPostgresPumpStateManagerFactory } from "../src/pathways/pump/state.ts" + +const config = { + host: Deno.env.get("POSTGRES_HOST") || "localhost", + port: parseInt(Deno.env.get("POSTGRES_PORT") || "5432"), + user: Deno.env.get("POSTGRES_USER") || "postgres", + password: Deno.env.get("POSTGRES_PASSWORD") || "postgres", + database: Deno.env.get("POSTGRES_DB") || "pathway_test", +} + +const TABLE_NEW = "pump_state_new_test" +const TABLE_MIGRATE = "pump_state_migrate_test" + +Deno.test({ + name: "PostgresPumpStateManager — composite-PK schema + migration", + sanitizeResources: false, + sanitizeOps: false, + fn: async (t) => { + await t.step("greenfield schema accepts composite (flow_type, pump_group)", async () => { + const factory = await createPostgresPumpStateManagerFactory({ ...config, tableName: TABLE_NEW }) + const adapter = new PostgresJsAdapter(config) + await adapter.connect() + try { + const ordersHot = factory("orders.0", "hot") + const ordersDefault = factory("orders.0", "default") + + await ordersHot.setState({ timeBucket: "20260101000000", eventId: "evt-hot-1" }) + await ordersDefault.setState({ timeBucket: "20260101000010", eventId: "evt-def-1" }) + + const hotState = await ordersHot.getState() + const defaultState = await ordersDefault.getState() + assertExists(hotState) + assertExists(defaultState) + assertEquals(hotState!.eventId, "evt-hot-1") + assertEquals(defaultState!.eventId, "evt-def-1") + + // Independent updates do not bleed across groups. + await ordersHot.setState({ timeBucket: "20260101000100", eventId: "evt-hot-2" }) + const refreshedDefault = await ordersDefault.getState() + assertEquals(refreshedDefault!.eventId, "evt-def-1") + } finally { + await adapter.execute(`DROP TABLE IF EXISTS ${TABLE_NEW}`) + await adapter.disconnect() + } + }) + + await t.step( + "migrates a pre-existing single-PK table without losing rows; existing rows land under pump_group='default'", + async () => { + const adapter = new PostgresJsAdapter(config) + await adapter.connect() + try { + // Simulate a pre-2.4 schema with single-column PK. + await adapter.execute(`DROP TABLE IF EXISTS ${TABLE_MIGRATE}`) + await adapter.execute(` + CREATE TABLE ${TABLE_MIGRATE} ( + flow_type TEXT PRIMARY KEY, + time_bucket TEXT NOT NULL, + event_id TEXT + ) + `) + await adapter.execute( + `INSERT INTO ${TABLE_MIGRATE} (flow_type, time_bucket, event_id) + VALUES ($1, $2, $3)`, + ["legacy.0", "20251231000000", "evt-legacy-1"], + ) + + const factory = await createPostgresPumpStateManagerFactory({ + ...config, + tableName: TABLE_MIGRATE, + }) + + // First call triggers the idempotent migration. + const legacyDefault = factory("legacy.0", "default") + const state = await legacyDefault.getState() + + assertExists(state, "row inserted under pre-migration schema must be preserved as 'default' group") + assertEquals(state!.eventId, "evt-legacy-1") + assertEquals(state!.timeBucket, "20251231000000") + + // New pump group on same flow type lives in its own row, not colliding. + const legacyHot = factory("legacy.0", "hot") + const hotStateBefore = await legacyHot.getState() + assertEquals(hotStateBefore, null) + await legacyHot.setState({ timeBucket: "20260601000000", eventId: "evt-hot-1" }) + const hotStateAfter = await legacyHot.getState() + assertExists(hotStateAfter) + assertEquals(hotStateAfter!.eventId, "evt-hot-1") + + // Default group untouched. + const refreshedDefault = await legacyDefault.getState() + assertEquals(refreshedDefault!.eventId, "evt-legacy-1") + + // Verify the table is now composite-PK by inspecting information_schema. + const pkCols = await adapter.query>( + `SELECT count(*)::text AS count + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + WHERE tc.table_name = $1 + AND tc.constraint_type = 'PRIMARY KEY'`, + [TABLE_MIGRATE], + ) + assertEquals(pkCols[0].count, "2", "primary key must now span (flow_type, pump_group)") + } finally { + await adapter.execute(`DROP TABLE IF EXISTS ${TABLE_MIGRATE}`) + await adapter.disconnect() + } + }, + ) + }, +})