Fast lanes: dedicated worker capacity for named queues#1294
Fast lanes: dedicated worker capacity for named queues#1294josephjclark merged 25 commits intorelease/nextfrom
Conversation
Introduce queue-aware slot allocation to support fast lanes. Workers can now dedicate specific slots to specific queues via a new `--queues` flag (e.g. `--queues "fast_lane:1 manual,*:4"`). - Add parseQueues() with SlotGroup type and full validation - Add --queues CLI option with WORKER_QUEUES env var support - Enforce mutual exclusivity between --queues and --capacity - Derive effectiveCapacity from slot groups in start.ts - Add queues field to ClaimPayload in lexicon
- Remove unreachable ?? 5 fallback in start.ts (capacity always has default) - Add TODO(#1289) for passing slotGroups to createWorker - Warn on duplicate queue names within a preference chain - Warn on identical queue configurations across slot groups - Add test for WORKER_QUEUES + WORKER_CAPACITY env var mutual exclusivity
capacity always receives a default value via setArg, so the type should reflect that rather than requiring a runtime fallback.
Each slot group now gets its own independent workloop, tracks its own active runs and capacity, and sends queue-scoped claims to Lightning. The join payload includes a queues map so Lightning knows the slot distribution. Default behavior (no --queues) is preserved with a single manual,* group.
The pattern of computing pending claims and comparing against maxSlots was duplicated 4 times across server.ts. Extracts it into a single groupHasCapacity() function in parse-queues.ts with 5 unit tests.
Fold run-to-group tracking assertions into the existing execute test, remove the redundant workloop-stop test (already covered), and drop the now-covered todo.
pnpm v7+ passes the '--' separator through to process.argv, causing yargs to treat all subsequent flags as positional arguments. Strip a leading '--' before parsing so --queues and other flags work correctly whether invoked via pnpm, npm, or directly.
The per-group workloops use Promise.any which requires ES2021 or later in the TypeScript lib setting.
Pipes AVA TAP output through tap-xunit to generate JUnit XML, enabling test results to display in CircleCI's Tests UI.
Each package now produces its own XML file instead of piping combined pnpm -r output through tap-xunit, which was corrupted by pnpm's interleaved logging.
josephjclark
left a comment
There was a problem hiding this comment.
A few initial comments (I'll spend more time on this tomorrow)
My big question is: how is memory managed here?
On main you set the memory limit per worker and a fixed capacity - say 500mb per child process and a capacity of 2. The worker promises it won't use up more than 1 gb of memory (although we know from experience this isn't infallible, but that's a different discussion)
But here, if that architecture diagram is accurate, I'm essentially enabling 5 concurrent runs per worker now. How are we ensuring that we don't run out of memory?
Remove redundant env var guard wrapping an already-skipped test. AVA now sees the test as registered but skipped instead of finding zero tests and emitting a warning that causes a non-zero exit.
The per-run memory limit (--run-memory) is unchanged and still enforced per child process. The total memory ceiling is sum(lane capacities) x runMemory. We could add a log on startup showing the total potential memory to make this explicit? So the default Edit: I agree the diagram skims over the "where does concurrency live" Does that make more sense? |
Move group from ClaimOptions to a required positional parameter, making the function signature clearer about its dependencies.
|
So in this diagram the effective capacity is 8? The worker's memory must be greater than I think at the moment we run with a capacity 2, just to ward against lost runs (less of a problem now than when we configured this). This means we run well under capacity, but at least its safe. It's unrelated to this story but I'd really like to implement dynamic memory. Tell the worker its maximum memory (ie 2gb) and it'll track how much it has reserved, and maybe include a memory limit in claims ("hi lightning, can I have 1 workflow with no more than 256mb limit? Thanks bro"). Of course that story is a lot more complex in this lanes architecture because presumably you need to reserve memory per lane. Like don't block the fast lane by over-claiming the default lane. I don't know how we'd manage that. Was there a discussion around creating multiple lanes in a worker, vs creating a worker with a single dedicated lane ? Like some config that says "give me 1 worker capacity 5 for fast lane, and 2 workers capacity 2 for everything else" |
Yeah thats correct, the diagram was more to illustrate the Workloop to Engine connection.
I could argue we are closer to being able to do this, at least from a query perspective. Sure the config format is going to be finicky. I drew "inspiration" from how fstab mounts and docker does contain network and volume mounts (when using nfs) for the config. Which should immediately make any shudder, but yeah it's a heck of a lock better than jamming 20-40 chars of json inside a shell argument or env. Anyway, if the workloop is able to (atomically) make enquiries about how much room we have we now have the means to tell Lightning something about what we want. So closer...?
Yes and no, the requirement was specifically for limited resource on-prem deployments, after checking how two of our deployments have been done. There was a massive trade off at least in one of them, that in order to have a dedicated worker, they would effectively have 1 worker for manual and normal runs, and 1 worker for syncronous jobs. Cutting the max capacity 85% of the in half. I landed on having the worker being able to have multiple lanes. |
- Rename --queues CLI option to --workloops (env: WORKER_WORKLOOPS) - Change queue separator from comma to angle bracket (fast_lane>*:4) - Rename SlotGroup → WorkloopConfig, maxSlots → capacity - Merge RuntimeSlotGroup + Workloop into single Workloop interface with stub stop/isStopped that get overwritten by startWorkloop() - Rename parse-queues.ts → parse-workloops.ts - Update all imports, types, variable names, and tests
Move Workloop, createWorkloop, and workloopHasCapacity out of
parse-workloops.ts (pure parsing) into api/workloop.ts (runtime).
startWorkloop now returns a WorkloopHandle { stop, isStopped } instead
of mutating the Workloop object. ServerApp stores handles in a
workloopHandles Map, keeping Workloop as pure state with no lifecycle
methods. Also fixes syntax error on claim.ts L81 and updates error
message expectations in tests.
The test:ci scripts were piping TAP directly to tap-xunit with no visible output. Adding tee /dev/stderr forks the stream so test names and pass/fail status appear in CI logs in real time.
|
reminder to me: does this all work OK with the wake up events? |
|
QA note: runs great against main with default queue config 👍 |
* worker: factor claim signature Put required arguments first, optional args after * refactor to remove createWorkllop and WorkloopConfig Since the Workloop inteface is just a static object, we should just create it at source in the parser. This removes some confusing typings and helps simplify the code. This miight enable a further refactor * fix default capacit to restore tests * refactor Workloop into its own class A pretty big change that seriously simplifies the worklook handling. Each workloop is its own instance, created straight from the string config, which manages its own state * update docs * remove app.openClaims, which is not useful anymore * formatting * types
|
Subject to the I've tested against lightning main and it works great. Haven't tested with the actual queues but I presume that'll be done on staging |
Short Description
Adds the ability to subscribe worker capacity to named queues via
--workloops, guaranteeing dedicated capacity for latency-sensitive workloads (e.g., sync webhooks). This PR combines the two Kit-side fast lanes issues.Fixes #1288, fixes #1289
Feature Design
At present, the worker has a single workloop (or claim loop), which repeatedly calls lightning on a backoff and asks for more work. In this architecture, Lightning is responsible for prioritising which work gets scheduled next.
The problem with this architecture is that Lightning cannot reserve capacity: it'll just hand off the most important work right now to be done. But if something super high priority comes in, like a synchronous workflow request, and if all the workers happen to be busy, the work has to wait.
To resolve this, Lightning has three work queues: a fast lane for sync requests, a manual queue for user triggered runs, and a default queue for everything else.
Workers mirror this architecture by having several independent workloops. Each workloop makes claim requests to lightning and specifies a prioritised list of channels to pull from. Each workloop has its own independent backoff and capacity.
Now a worker can pull work from the manual and default queues in the app, but reserve capacity for fast lane work, ensuring that those priority workflows get pulled ASAP.
Note that the prioritised queues allow for work stealing. The
*queue will pull anything - so if there's nothing in the manual runs queue, work will be pulled from the default queue. It'll even pull from the fast lane if work is queued up there.The precise configuration of queues and workloops is purely config driven.
Benefits of this approach:
Implementation Details
This PR is composed of two feature branches:
#1291 —
--workloopsCLI option and slot group configuration parsing (kit#1288)--workloopsCLI option (andWORKER_WORKLOOPSenv var), mutually exclusive with--capacity--workloops "fast_lane:1 manual>*:4"into typedWorkloopConfig[]objectsqueuesfield toClaimPayloadin@openfn/lexicon#1293 — Per-group workloops and queue-aware claiming (kit#1289)
queuesmap so the server knows the slot distribution--workloops) is preserved with a singlemanual>*groupAdditional fixes included
--through toprocess.argv, causing yargs to ignore all flags. Stripped the leading--before parsing.Promise.anytype error: AddedES2021to ws-worker's tsconfiglibforPromise.anysupport.QA Notes
--workloops "fast_lane:2 *:3"— should show two separate workloops in logs--workloops— should behave identically to before (singlemanual>*group)--capacityand--workloopscannot be used together (should throw)AI Usage