Implement per-group workloops and queue-aware claiming#1293
Merged
Implement per-group workloops and queue-aware claiming#1293
Conversation
One test still failing
Casing in the yaml files was causing issues
Worker: use New batch logging API
tests: deploy tests
…flag-and-an-openfn-project-clean-command feat: openfn project clean
fix package lock
1782d26 to
9015b38
Compare
Introduce queue-aware slot allocation to support fast lanes. Workers can now dedicate specific slots to specific queues via a new `--queues` flag (e.g. `--queues "fast_lane:1 manual,*:4"`). - Add parseQueues() with SlotGroup type and full validation - Add --queues CLI option with WORKER_QUEUES env var support - Enforce mutual exclusivity between --queues and --capacity - Derive effectiveCapacity from slot groups in start.ts - Add queues field to ClaimPayload in lexicon
- Remove unreachable ?? 5 fallback in start.ts (capacity always has default) - Add TODO(#1289) for passing slotGroups to createWorker - Warn on duplicate queue names within a preference chain - Warn on identical queue configurations across slot groups - Add test for WORKER_QUEUES + WORKER_CAPACITY env var mutual exclusivity
capacity always receives a default value via setArg, so the type should reflect that rather than requiring a runtime fallback.
Each slot group now gets its own independent workloop, tracks its own active runs and capacity, and sends queue-scoped claims to Lightning. The join payload includes a queues map so Lightning knows the slot distribution. Default behavior (no --queues) is preserved with a single manual,* group.
The pattern of computing pending claims and comparing against maxSlots was duplicated 4 times across server.ts. Extracts it into a single groupHasCapacity() function in parse-queues.ts with 5 unit tests.
Fold run-to-group tracking assertions into the existing execute test, remove the redundant workloop-stop test (already covered), and drop the now-covered todo.
pnpm v7+ passes the '--' separator through to process.argv, causing yargs to treat all subsequent flags as positional arguments. Strip a leading '--' before parsing so --queues and other flags work correctly whether invoked via pnpm, npm, or directly.
The per-group workloops use Promise.any which requires ES2021 or later in the TypeScript lib setting.
53ccdb9 to
85a8acc
Compare
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Short Description
Implements per-group workloops and queue-aware claiming for the fast lanes feature. Each slot group now gets its own independent workloop with its own queue preference chain, capacity tracking, and backoff. Claims include the
queuesfield and the join payload reports the slot distribution to Lightning.Fixes #1289
Implementation Details
This builds on #1288 (CLI parsing) and wires slot groups through the entire claim lifecycle:
parse-queues.ts):SlotGroupconfig type andRuntimeSlotGroupwith runtime state (activeRuns,openClaims,workloop).groupHasCapacity()helper consolidates the repeated capacity-check pattern.claim.ts): Claims are now scoped to a group — capacity checks use per-group tracking,queuesarray is sent in every claim payload, and claimed runs are associated with their owning group viaapp.runGroupMap.workloop.ts): Each group runs its own independent backoff-driven workloop. Stopping one group's workloop does not affect others.server.ts):resumeWorkloop(group?)selectively restarts workloops.work-availabletriggers claims for all groups with free capacity. Run completion cleans up from the owning group and resumes only that group's workloop.worker-queue.ts): Includes informationalqueuesmap ({ "fast_lane": 1, "manual,*": 4 }) for future WorkerPresence tracking.queues?: string[]toClaimPayloadtype.--queuesflag defaults tomanual,*:<capacity>. Old Lightning ignores the newqueuesfield; old workers without it get Lightning's default["manual", "*"].QA Notes
parse-queues.test.ts(parsing, validation,groupHasCapacity),claim.test.ts(per-group capacity, payload shape, run-to-group tracking),workloop.test.ts(independence of group workloops),worker-queue.test.ts(queues in join payload),cli.test.ts(--queues/WORKER_QUEUES/ mutual exclusivity with--capacity)--queues "fast_lane:1 manual,*:4"— should work identically to current behavior since Lightning ignores unknown claim fieldsAI Usage