feat: queue input for running automation flows#268
feat: queue input for running automation flows#268mason5052 wants to merge 3 commits intovxcontrol:mainfrom
Conversation
Signed-off-by: mason5052 <ehehwnwjs5052@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR enables submitting follow-up instructions to an automation flow while it is Running by queueing input server-side and adjusting the UI to allow submission without interrupting the current task.
Changes:
- Backend: switch flow input from an unbuffered handoff to a bounded buffered queue and process provider switches at consumption time.
- Frontend: allow message submission while
Running, keepStopvisible, and add queue-focused helper/placeholder text. - Backend tests: add queue-focused unit tests covering enqueue, full-queue error, early error propagation, cancel, and stop-drain behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| frontend/src/providers/flow-provider.tsx | Removes provider override from automation message submission payload. |
| frontend/src/features/flows/messages/flow-automation-messages.tsx | Enables input submission while Running, keeps Stop available, adds queue messaging. |
| frontend/src/features/flows/flow-form.tsx | Adds props to allow input while loading and show both Submit/Stop buttons as needed. |
| backend/pkg/controller/flow.go | Implements bounded input queueing, stop-time discard/drain handling, and defers provider switching to processing time. |
| backend/pkg/controller/flow_test.go | Adds unit tests for queued input behavior and stop-drain semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| fw.taskMX.Lock() | ||
| defer fw.taskMX.Unlock() | ||
| fw.discard.Store(true) | ||
| defer fw.discard.Store(false) |
There was a problem hiding this comment.
Stop() drains fw.input after waiting for the current task to stop, but PutInput() does not check fw.discard (the “stopping” flag). That means callers can still enqueue while Stop() is in progress, and drainPendingInput() can take arbitrarily long (or effectively never finish under a steady stream of inputs). Consider rejecting/enforcing backpressure in PutInput() when fw.discard is true (e.g., return a stopped/stopping error) so Stop() is bounded and callers get a consistent response during shutdown.
| defer fw.discard.Store(false) |
| if err := ctx.Err(); err != nil { | ||
| close(flin.done) | ||
| return fmt.Errorf("flow %d input processing timeout: %w", fw.flowCtx.FlowID, err) | ||
| } |
There was a problem hiding this comment.
The error returned when ctx.Err() != nil is labeled as an "input processing timeout", but ctx.Err() may also be context.Canceled. This can produce misleading error text for clients and logs. Consider wording that reflects both cancellation and deadline (or branch on errors.Is(err, context.DeadlineExceeded) vs canceled).
| select { | ||
| case err := <-resultCh: | ||
| require.NoError(t, err) | ||
| case <-time.After(flowInputTimeout + 500*time.Millisecond): | ||
| t.Fatal("PutInput did not return after the queue handshake timeout") | ||
| } |
There was a problem hiding this comment.
This test hard-codes a wait of flowInputTimeout (currently 1s), which makes the suite slower and can introduce flakiness under load. Consider making the handshake timeout injectable/overridable in tests (or using a much smaller test-only timeout) so the behavior is still covered without a 1s wall-clock dependency.
Signed-off-by: mason5052 <ehehwnwjs5052@gmail.com>
Signed-off-by: mason5052 <ehehwnwjs5052@gmail.com>
Summary
RunningStopavailable in the UI and add queue-focused backend testsProblem
Issue #192 asks for tighter assistant and automation integration. A smaller but immediately useful part of that request is use case 2: submit new instructions while automation is already running without interrupting the current task.
Today PentAGI blocks that path in two places.
Runningand only offersStopSolution
Implement the queueing path for running automation flows without expanding scope into assistant handoff or file injection yet.
Runningand showsStopalongsideSubmitThis intentionally targets use case 2 from #192. Assistant-generated plans, structured handoff, and file or data-source injection can still follow in later work.
User Impact
Users can add follow-up instructions to a running automation flow without interrupting the active task. The current task continues, the new instruction is queued for the next handoff point, and
Stopremains available when they do want to cancel execution.Test Plan
go test ./pkg/controller/...npx eslint src/features/flows/flow-form.tsx src/features/flows/messages/flow-automation-messages.tsx src/providers/flow-provider.tsxnpm run buildgit diff --checkNotes
npm run lintstill fails on this Windows host because of pre-existing unrelated frontend lint errors in files such assrc/components/shared/markdown.tsx,src/components/ui/textarea.tsx, and several existing flow/search components. The files changed in this PR pass the targetedeslintcommand above.