Skip to content

Fast lanes: dedicated worker capacity for named queues#1294

Merged
josephjclark merged 25 commits intorelease/nextfrom
fastlanes
Mar 13, 2026
Merged

Fast lanes: dedicated worker capacity for named queues#1294
josephjclark merged 25 commits intorelease/nextfrom
fastlanes

Conversation

@stuartc
Copy link
Member

@stuartc stuartc commented Mar 10, 2026

Short Description

Adds the ability to subscribe worker capacity to named queues via --workloops, guaranteeing dedicated capacity for latency-sensitive workloads (e.g., sync webhooks). This PR combines the two Kit-side fast lanes issues.

  Main Process (ws-worker)
 ├── Workloop 1 (manual>*:2)  ─┐
 ├── Workloop 2 (fast_lane:1) ─┼── all run in the main process as async loops
 ├── Workloop 3 (*:5)         ─┘
 │
 └── Engine (single instance, shared by all lanes)
     └── Child Process Pool (capacity = sum of all lanes/slots)
         ├── Child 1 (forked) → Worker Thread (per task)
         ├── Child 2 (forked) → Worker Thread (per task)
         ├── ...on demand, reused after each task
         └── Child N

Fixes #1288, fixes #1289

Feature Design

At present, the worker has a single workloop (or claim loop), which repeatedly calls lightning on a backoff and asks for more work. In this architecture, Lightning is responsible for prioritising which work gets scheduled next.

The problem with this architecture is that Lightning cannot reserve capacity: it'll just hand off the most important work right now to be done. But if something super high priority comes in, like a synchronous workflow request, and if all the workers happen to be busy, the work has to wait.

To resolve this, Lightning has three work queues: a fast lane for sync requests, a manual queue for user triggered runs, and a default queue for everything else.

Workers mirror this architecture by having several independent workloops. Each workloop makes claim requests to lightning and specifies a prioritised list of channels to pull from. Each workloop has its own independent backoff and capacity.

Now a worker can pull work from the manual and default queues in the app, but reserve capacity for fast lane work, ensuring that those priority workflows get pulled ASAP.

Note that the prioritised queues allow for work stealing. The * queue will pull anything - so if there's nothing in the manual runs queue, work will be pulled from the default queue. It'll even pull from the fast lane if work is queued up there.

The precise configuration of queues and workloops is purely config driven.

Benefits of this approach:

  • Workers can reserve capacity for priority work - like synchronous requests
  • Workers can still request from queues in a prioritised way, taking from the manual runs queue before the scheduled work queue.

Implementation Details

This PR is composed of two feature branches:

#1291--workloops CLI option and slot group configuration parsing (kit#1288)

  • Adds --workloops CLI option (and WORKER_WORKLOOPS env var), mutually exclusive with --capacity
  • Parses slot group definitions like --workloops "fast_lane:1 manual>*:4" into typed WorkloopConfig[] objects
  • Validates queue names, slot counts, wildcard placement, and duplicate queue assignments
  • Adds queues field to ClaimPayload in @openfn/lexicon

#1293 — Per-group workloops and queue-aware claiming (kit#1289)

  • Each slot group gets its own independent workloop, replacing the single shared loop
  • Each group tracks its own active runs and capacity independently
  • Claims are scoped to the group's queue list
  • The Lightning join payload now includes a queues map so the server knows the slot distribution
  • Default behavior (no --workloops) is preserved with a single manual>* group

Additional fixes included

  • Fix CLI args via pnpm: pnpm v7+ passes -- through to process.argv, causing yargs to ignore all flags. Stripped the leading -- before parsing.
  • Fix Promise.any type error: Added ES2021 to ws-worker's tsconfig lib for Promise.any support.

QA Notes

  • Test with --workloops "fast_lane:2 *:3" — should show two separate workloops in logs
  • Test without --workloops — should behave identically to before (single manual>* group)
  • Verify --capacity and --workloops cannot be used together (should throw)
  • Lightning-side queue support (issues #4500, #4501) is required for end-to-end queue routing; without it, Lightning will ignore the queue preferences and return runs from any queue

AI Usage

  • I have used Claude Code
  • I have used another model
  • I have not used AI

stuartc added 11 commits March 10, 2026 11:00
Introduce queue-aware slot allocation to support fast lanes. Workers can
now dedicate specific slots to specific queues via a new `--queues` flag
(e.g. `--queues "fast_lane:1 manual,*:4"`).

- Add parseQueues() with SlotGroup type and full validation
- Add --queues CLI option with WORKER_QUEUES env var support
- Enforce mutual exclusivity between --queues and --capacity
- Derive effectiveCapacity from slot groups in start.ts
- Add queues field to ClaimPayload in lexicon
- Remove unreachable ?? 5 fallback in start.ts (capacity always has default)
- Add TODO(#1289) for passing slotGroups to createWorker
- Warn on duplicate queue names within a preference chain
- Warn on identical queue configurations across slot groups
- Add test for WORKER_QUEUES + WORKER_CAPACITY env var mutual exclusivity
capacity always receives a default value via setArg, so the type
should reflect that rather than requiring a runtime fallback.
Each slot group now gets its own independent workloop, tracks its own
active runs and capacity, and sends queue-scoped claims to Lightning.
The join payload includes a queues map so Lightning knows the slot
distribution. Default behavior (no --queues) is preserved with a single
manual,* group.
The pattern of computing pending claims and comparing against maxSlots
was duplicated 4 times across server.ts. Extracts it into a single
groupHasCapacity() function in parse-queues.ts with 5 unit tests.
Fold run-to-group tracking assertions into the existing execute test,
remove the redundant workloop-stop test (already covered), and drop
the now-covered todo.
pnpm v7+ passes the '--' separator through to process.argv, causing
yargs to treat all subsequent flags as positional arguments. Strip a
leading '--' before parsing so --queues and other flags work correctly
whether invoked via pnpm, npm, or directly.
The per-group workloops use Promise.any which requires ES2021 or later
in the TypeScript lib setting.
@github-project-automation github-project-automation bot moved this to New Issues in Core Mar 10, 2026
@stuartc stuartc self-assigned this Mar 10, 2026
@stuartc stuartc requested a review from josephjclark March 10, 2026 12:56
@stuartc stuartc marked this pull request as ready for review March 10, 2026 12:57
stuartc added 3 commits March 10, 2026 15:08
Pipes AVA TAP output through tap-xunit to generate JUnit XML,
enabling test results to display in CircleCI's Tests UI.
Each package now produces its own XML file instead of piping
combined pnpm -r output through tap-xunit, which was corrupted
by pnpm's interleaved logging.
Copy link
Collaborator

@josephjclark josephjclark left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few initial comments (I'll spend more time on this tomorrow)

My big question is: how is memory managed here?

On main you set the memory limit per worker and a fixed capacity - say 500mb per child process and a capacity of 2. The worker promises it won't use up more than 1 gb of memory (although we know from experience this isn't infallible, but that's a different discussion)

But here, if that architecture diagram is accurate, I'm essentially enabling 5 concurrent runs per worker now. How are we ensuring that we don't run out of memory?

Remove redundant env var guard wrapping an already-skipped test.
AVA now sees the test as registered but skipped instead of finding
zero tests and emitting a warning that causes a non-zero exit.
@stuartc
Copy link
Member Author

stuartc commented Mar 11, 2026

A few initial comments (I'll spend more time on this tomorrow)

My big question is: how is memory managed here?

On main you set the memory limit per worker and a fixed capacity - say 500mb per child process and a capacity of 2. The worker promises it won't use up more than 1 gb of memory (although we know from experience this isn't infallible, but that's a different discussion)

But here, if that architecture diagram is accurate, I'm essentially enabling 5 concurrent runs per worker now. How are we ensuring that we don't run out of memory?

The per-run memory limit (--run-memory) is unchanged and still enforced per child process. The total memory ceiling is sum(lane capacities) x runMemory. We could add a log on startup showing the total potential memory to make this explicit?

So the default "manual,*:5" is the same behaviour as before, the only difference is that a "queue preference" is sent on the claim to inform Lightning. We're shifting the ordering preference to the worker (in vanilla lightning claim queries).

Edit: I agree the diagram skims over the "where does concurrency live"

  Main Process (ws-worker)
 ├── Workloop 1 (manual,*:2)  ─┐
 ├── Workloop 2 (fast_lane:1) ─┼── all run in the main process as async loops
 ├── Workloop 3 (*:5)         ─┘
 │
 └── Engine (single instance, shared by all lanes)
     └── Child Process Pool (capacity = sum of all lanes/slots)
         ├── Child 1 (forked) → Worker Thread (per task)
         ├── Child 2 (forked) → Worker Thread (per task)
         ├── ...on demand, reused after each task
         └── Child N

Does that make more sense?

Move group from ClaimOptions to a required positional parameter,
making the function signature clearer about its dependencies.
@josephjclark
Copy link
Collaborator

So in this diagram

  Main Process (ws-worker)
 ├── Workloop 1 (manual,*:2)  ─┐
 ├── Workloop 2 (fast_lane:1) ─┼── all run in the main process as async loops
 ├── Workloop 3 (*:5)         ─┘
 │
 └── Engine (single instance, shared by all lanes)
     └── Child Process Pool (capacity = sum of all lanes/slots)
         ├── Child 1 (forked) → Worker Thread (per task)
         ├── Child 2 (forked) → Worker Thread (per task)
         ├── ...on demand, reused after each task
         └── Child N

the effective capacity is 8? The worker's memory must be greater than 8 * runMemory

I think at the moment we run with a capacity 2, just to ward against lost runs (less of a problem now than when we configured this). This means we run well under capacity, but at least its safe.

It's unrelated to this story but I'd really like to implement dynamic memory. Tell the worker its maximum memory (ie 2gb) and it'll track how much it has reserved, and maybe include a memory limit in claims ("hi lightning, can I have 1 workflow with no more than 256mb limit? Thanks bro"). Of course that story is a lot more complex in this lanes architecture because presumably you need to reserve memory per lane. Like don't block the fast lane by over-claiming the default lane. I don't know how we'd manage that.

Was there a discussion around creating multiple lanes in a worker, vs creating a worker with a single dedicated lane ? Like some config that says "give me 1 worker capacity 5 for fast lane, and 2 workers capacity 2 for everything else"

@stuartc
Copy link
Member Author

stuartc commented Mar 11, 2026

So in this diagram

  Main Process (ws-worker)
 ├── Workloop 1 (manual,*:2)  ─┐
 ├── Workloop 2 (fast_lane:1) ─┼── all run in the main process as async loops
 ├── Workloop 3 (*:5)         ─┘
 │
 └── Engine (single instance, shared by all lanes)
     └── Child Process Pool (capacity = sum of all lanes/slots)
         ├── Child 1 (forked) → Worker Thread (per task)
         ├── Child 2 (forked) → Worker Thread (per task)
         ├── ...on demand, reused after each task
         └── Child N

the effective capacity is 8? The worker's memory must be greater than 8 * runMemory

I think at the moment we run with a capacity 2, just to ward against lost runs (less of a problem now than when we configured this). This means we run well under capacity, but at least its safe.

Yeah thats correct, the diagram was more to illustrate the Workloop to Engine connection.

It's unrelated to this story but I'd really like to implement dynamic memory. Tell the worker its maximum memory (ie 2gb) and it'll track how much it has reserved, and maybe include a memory limit in claims ("hi lightning, can I have 1 workflow with no more than 256mb limit? Thanks bro"). Of course that story is a lot more complex in this lanes architecture because presumably you need to reserve memory per lane. Like don't block the fast lane by over-claiming the default lane. I don't know how we'd manage that.

I could argue we are closer to being able to do this, at least from a query perspective. Sure the config format is going to be finicky. I drew "inspiration" from how fstab mounts and docker does contain network and volume mounts (when using nfs) for the config. Which should immediately make any shudder, but yeah it's a heck of a lock better than jamming 20-40 chars of json inside a shell argument or env.

Anyway, if the workloop is able to (atomically) make enquiries about how much room we have we now have the means to tell Lightning something about what we want. So closer...?

Was there a discussion around creating multiple lanes in a worker, vs creating a worker with a single dedicated lane ? Like some config that says "give me 1 worker capacity 5 for fast lane, and 2 workers capacity 2 for everything else"

Yes and no, the requirement was specifically for limited resource on-prem deployments, after checking how two of our deployments have been done. There was a massive trade off at least in one of them, that in order to have a dedicated worker, they would effectively have 1 worker for manual and normal runs, and 1 worker for syncronous jobs. Cutting the max capacity 85% of the in half. I landed on having the worker being able to have multiple lanes.

stuartc added 3 commits March 12, 2026 09:00
- Rename --queues CLI option to --workloops (env: WORKER_WORKLOOPS)
- Change queue separator from comma to angle bracket (fast_lane>*:4)
- Rename SlotGroup → WorkloopConfig, maxSlots → capacity
- Merge RuntimeSlotGroup + Workloop into single Workloop interface
  with stub stop/isStopped that get overwritten by startWorkloop()
- Rename parse-queues.ts → parse-workloops.ts
- Update all imports, types, variable names, and tests
Move Workloop, createWorkloop, and workloopHasCapacity out of
parse-workloops.ts (pure parsing) into api/workloop.ts (runtime).

startWorkloop now returns a WorkloopHandle { stop, isStopped } instead
of mutating the Workloop object. ServerApp stores handles in a
workloopHandles Map, keeping Workloop as pure state with no lifecycle
methods. Also fixes syntax error on claim.ts L81 and updates error
message expectations in tests.
The test:ci scripts were piping TAP directly to tap-xunit with no
visible output. Adding tee /dev/stderr forks the stream so test
names and pass/fail status appear in CI logs in real time.
@stuartc stuartc requested a review from josephjclark March 12, 2026 11:13
@josephjclark
Copy link
Collaborator

reminder to me: does this all work OK with the wake up events?

@josephjclark
Copy link
Collaborator

QA note: runs great against main with default queue config 👍

* worker: factor claim signature

Put required arguments first, optional args after

* refactor to remove createWorkllop and WorkloopConfig

Since the Workloop inteface is just a static object, we should just create it at source in the parser. This removes some confusing typings and helps simplify the code. This miight enable a further refactor

* fix default capacit to restore tests

* refactor Workloop into its own class

A pretty big change that seriously simplifies the worklook handling. Each workloop is its own instance, created straight from the string config, which manages its own state

* update docs

* remove app.openClaims, which is not useful anymore

* formatting

* types
@josephjclark
Copy link
Collaborator

Subject to the -- CLI thing (which I'm struggling to understand and will speak to Stu about tomorrow) I'm happy with this. I'll approve and likely merge in the morning.

I've tested against lightning main and it works great. Haven't tested with the actual queues but I presume that'll be done on staging

@josephjclark josephjclark changed the base branch from main to release/next March 13, 2026 09:20
@josephjclark josephjclark merged commit 07457d7 into release/next Mar 13, 2026
1 of 2 checks passed
@github-project-automation github-project-automation bot moved this from New Issues to Done in Core Mar 13, 2026
@josephjclark josephjclark deleted the fastlanes branch March 13, 2026 10:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

Implement per-group workloops and queue-aware claiming Worker: Add --queues option and slot group configuration parsing

2 participants