Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 68 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ const pathways = new PathwaysBuilder({
### Pump Concurrency

Control how many events each pump processes in parallel via `startPump({ concurrency })`. Accepts a number (shared
default) or a `PumpConcurrencyConfig` with per-flow-type overrides:
default) or a `PumpConcurrencyConfig` with per-flow-type and per-pump-group overrides:

```typescript
// Shared default across every flow type
// Shared default across every pump
await pathways.startPump({ concurrency: 4 })

// Per-flow-type overrides — unlisted flow types fall back to `default` (or 1)
Expand All @@ -242,12 +242,72 @@ await pathways.startPump({
orders: 8,
audit: 1,
},
// Optional: per-(flowType, pumpGroup) override. Wins over byFlowType.
// Key format: `${flowType}::${pumpGroup}`.
byPumpGroup: {
"orders::hot": 16,
},
},
})
```

Omit `concurrency` to keep the default of 1 per flow type. `startPump()` also accepts a per-call `autoProvision`
override (same shape as the builder-level option) for overriding provisioning behavior at a specific call site.
Omit `concurrency` to keep the default of 1 per pump. `startPump()` also accepts a per-call `autoProvision` override
(same shape as the builder-level option) for overriding provisioning behavior at a specific call site.

> **Note**: this resolves to `processor.concurrency` on the underlying data pump, which is the in-flight batch width —
> not parallel handler invocations. Resolution order per pump: `byPumpGroup["${flowType}::${pumpGroup}"]` →
> `byFlowType[flowType]` → `default`.

### Splitting a flow type across multiple pumps

By default, every event type registered against the same `flowType` shares one pump. For high-throughput event types
that would otherwise starve their cold neighbours, register them with a distinct `pumpGroup` so they run on an isolated
pump with their own state cursor, processor concurrency, and restart backoff:

```typescript
// 8 cold event types share the default pump for orders.0
for (const eventType of ["paid", "fulfilled", "cancelled", "refunded", "archived", "audited", "tagged", "noted"]) {
pathways.register({ flowType: "orders.0", eventType, schema })
}

// 2 hot event types run on a separate "hot" pump — same flow type, isolated pump
pathways.register({ flowType: "orders.0", eventType: "placed.0", schema, pumpGroup: "hot" })
pathways.register({ flowType: "orders.0", eventType: "shipped.0", schema, pumpGroup: "hot" })

await pathways.startPump({
concurrency: { default: 2, byPumpGroup: { "orders.0::hot": 16 } },
})
```

What this gives you per pump group:

- **Isolated state cursor.** The Postgres `pathway_pump_state` table now has a composite primary key
`(flow_type, pump_group)`; existing rows are auto-migrated into `pump_group='default'` on first use.
- **Independent processor concurrency** via `byPumpGroup`.
- **Independent restart backoff.** A failure in the `hot` pump does not reset the cold pump's attempt counter, and the
restart loop keeps retrying with exponential backoff (capped at 30s) until the pump comes back — including when the
restart attempt itself throws synchronously.
- **Per-group pulse identity.** When pulse reporting is configured, each pump emits pulses under
`${pathwayId}::${flowType}::${pumpGroup}` so the control plane can distinguish the health of each group.

**Caveats (v2.4):**

- The WebSocket notifier subscribes at `flowType` scope, so two pump groups on the same flow type receive identical
notifications and each pulls. Isolation is at processor + state, not bandwidth — checks are cheap, so this is usually
fine. If you need bandwidth isolation, use distinct flow types instead.
- Cluster mode keeps a single global leader; per-pump-group leadership is not yet supported.
- Downstream consumers that mirror `pathway_pump_state` in their own Drizzle schema MUST add a
`pump_group TEXT NOT NULL DEFAULT 'default'` column and update the primary key to `(flow_type, pump_group)` before
running `drizzle-kit push`, otherwise drizzle will try to drop the new column.

Custom `PumpStateManagerFactory` implementations should accept a second `pumpGroup` argument:

```typescript
const factory: PumpStateManagerFactory = (flowType, pumpGroup) => createMyStateManager(flowType, pumpGroup)
```

Legacy single-argument factories continue to work but share state across pump groups on the same flow type — a
deprecation warning is logged once per pump.

### Registering Pathways

Expand Down Expand Up @@ -277,6 +337,10 @@ pathways.register({
writable: true, // Optional, default is true
maxRetries: 3, // Optional, default is 3
retryDelayMs: 500, // Optional, default is 500
// Optional: isolate this event type onto a dedicated pump for the same flow type.
// Same (flowType, pumpGroup) pair shares one pump; different pumpGroup → different pumps.
// Omit (or pass "default") for the legacy single-pump-per-flowType behavior.
pumpGroup: "hot",
})
```

Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"test:watch": "deno test -A --watch",
"postgres:start": "deno run -A bin/start-postgres.ts",
"postgres:stop": "deno run -A bin/stop-postgres.ts",
"test:postgres": "deno run -A bin/start-postgres.ts && (deno test -A tests/postgres-pathway-state.test.ts || (deno run -A bin/stop-postgres.ts && exit 1)) && deno run -A bin/stop-postgres.ts"
"test:postgres": "deno run -A bin/start-postgres.ts && (deno test -A tests/postgres-pathway-state.test.ts tests/postgres-pump-state.test.ts || (deno run -A bin/stop-postgres.ts && exit 1)) && deno run -A bin/stop-postgres.ts"
},
"imports": {
"@deno/dnt": "jsr:@deno/dnt@^0.41.3",
Expand Down
9 changes: 8 additions & 1 deletion deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 59 additions & 5 deletions src/pathways/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ export class PathwaysBuilder<
private readonly inputSchemas: Record<keyof TPathway, AnyZodObject> = {} as Record<keyof TPathway, AnyZodObject>
private readonly writable: Record<keyof TPathway, boolean> = {} as Record<keyof TPathway, boolean>
private readonly subscribed: Record<keyof TPathway, boolean> = {} as Record<keyof TPathway, boolean>
private readonly pumpGroups: Record<keyof TPathway, string> = {} as Record<keyof TPathway, string>
private readonly timeouts: Record<keyof TPathway, number> = {} as Record<keyof TPathway, number>
private readonly maxRetries: Record<keyof TPathway, number> = {} as Record<keyof TPathway, number>
private readonly retryDelays: Record<keyof TPathway, number> = {} as Record<keyof TPathway, number>
Expand Down Expand Up @@ -829,6 +830,18 @@ export class PathwaysBuilder<
* exists, avoiding back-pressure on the shared pump queue.
*/
subscribe?: boolean
/**
* Optional pump group. Event types sharing a `(flowType, pumpGroup)` pair
* land on the same data pump; different `pumpGroup` values within one
* `flowType` run on independent pumps with isolated state cursors and
* processor concurrency. Omit (or pass `"default"`) for the legacy
* single-pump-per-flowType behavior.
*
* NOTE: the WebSocket notifier is `flowType`-scoped, so two groups on the
* same `flowType` receive identical notifications and each pulls; isolation
* is at processor + state, not bandwidth.
*/
pumpGroup?: string
maxRetries?: number
retryDelayMs?: number
isFilePathway?: FP
Expand All @@ -844,13 +857,20 @@ export class PathwaysBuilder<
const path = `${contract.flowType}/${contract.eventType}` as PathwayKey<F, E>
const writable = contract.writable ?? true
const subscribe = contract.subscribe ?? true
if (contract.pumpGroup !== undefined && contract.pumpGroup.trim() === "") {
throw new Error(
`Pathway ${path} has an empty pumpGroup — pumpGroup must be a non-empty string when set`,
)
}
const pumpGroup = contract.pumpGroup?.trim() ?? "default"

this.logger.debug(`Registering pathway`, {
pathway: path,
flowType: contract.flowType,
eventType: contract.eventType,
writable,
subscribe,
pumpGroup,
isFilePathway: contract.isFilePathway,
timeoutMs: contract.timeoutMs,
maxRetries: contract.maxRetries,
Expand Down Expand Up @@ -900,6 +920,7 @@ export class PathwaysBuilder<
}
this.writable[path] = writable
this.subscribed[path] = subscribe
this.pumpGroups[path] = pumpGroup

// Store provisioning descriptions
if (contract.description !== undefined) {
Expand All @@ -915,6 +936,7 @@ export class PathwaysBuilder<
eventType: contract.eventType,
writable,
subscribe,
pumpGroup,
isFilePathway: contract.isFilePathway,
})

Expand Down Expand Up @@ -1464,14 +1486,32 @@ export class PathwaysBuilder<
return
}

const registrations = this.buildSubscribedRegistrations()
const registrations = this.buildPumpRegistrations()
await this.pathwayPump.start(registrations)

this.logger.info("Pump started", {
pathways: registrations.length,
})
}

/**
* Returns the subset of registrations the in-process pump should subscribe to,
* enriched with the resolved `pumpGroup` for each pathway. Only used by the
* pump — provisioning still uses {@link buildRegistrations} (no pumpGroup).
*/
private buildPumpRegistrations(): Array<{ flowType: string; eventType: string; pumpGroup: string }> {
return Object.keys(this.pathways)
.filter((key) => this.subscribed[key as keyof TPathway] !== false)
.map((key) => {
const [flowType, eventType] = key.split("/")
return {
flowType,
eventType,
pumpGroup: this.pumpGroups[key as keyof TPathway] ?? "default",
}
})
}

private async stopLeaderRuntime(): Promise<void> {
this.stopCommandPoller()

Expand Down Expand Up @@ -1890,21 +1930,35 @@ export class PathwaysBuilder<
* @param position - Target position { timeBucket, eventId? }. If omitted, clears persisted state
* and restarts from the live position. To replay from the very beginning,
* pass the first time bucket explicitly.
* @param filter - Optional filter narrowing which pumps to reset. Accepts:
* - `string[]` (legacy): treated as `flowTypes`.
* - `{ flowTypes?, pumpGroups? }`: matches pumps that satisfy every supplied
* criterion (intersection).
* Cluster mode does not yet propagate the filter — it is logged and
* the leader resets all pumps.
*/
async resetPump(position?: PumpState, flowTypes?: string[]): Promise<string[]> {
async resetPump(
position?: PumpState,
filter?: string[] | { flowTypes?: string[]; pumpGroups?: string[] },
): Promise<string[]> {
if (!this.pathwayPump) {
throw new Error("Pump not started — call startPump() first")
}

const normalized = Array.isArray(filter) ? { flowTypes: filter } : filter

if (this.clusterManager) {
if (flowTypes?.length) {
this.logger.warn("flowTypes filter is not supported in cluster mode reset — resetting all flow types")
if (normalized?.flowTypes?.length || normalized?.pumpGroups?.length) {
this.logger.warn(
"Reset filter is not supported in cluster mode — resetting all pumps",
{ filter: normalized },
)
}
await this.clusterManager.requestReset(position)
return [...this.pathwayPump.registeredFlowTypes]
}

return await this.pathwayPump.reset(position, flowTypes)
return await this.pathwayPump.reset(position, normalized)
}

/**
Expand Down
Loading
Loading