Skip to content

feat: queue input for running automation flows#268

Open
mason5052 wants to merge 3 commits intovxcontrol:mainfrom
mason5052:codex/issue-192-automation-queue
Open

feat: queue input for running automation flows#268
mason5052 wants to merge 3 commits intovxcontrol:mainfrom
mason5052:codex/issue-192-automation-queue

Conversation

@mason5052
Copy link
Copy Markdown
Contributor

Summary

  • allow automation flows to accept follow-up input while status is Running
  • queue submitted input in the backend so it is delivered at the next task boundary instead of requiring an interrupt
  • keep Stop available in the UI and add queue-focused backend tests

Problem

Issue #192 asks for tighter assistant and automation integration. A smaller but immediately useful part of that request is use case 2: submit new instructions while automation is already running without interrupting the current task.

Today PentAGI blocks that path in two places.

  • the automation UI hides submit while the flow is Running and only offers Stop
  • the backend input path uses an unbuffered handoff, so follow-up instructions are not handled as a reliable queue

Solution

Implement the queueing path for running automation flows without expanding scope into assistant handoff or file injection yet.

  • flow input now uses a bounded FIFO queue with explicit queue-full errors and tests for enqueue, early error, cancel, and stop-drain behavior
  • provider switching now happens when queued input is actually processed, so queued messages follow the current flow execution path safely
  • the automation form keeps provider changes disabled, but now allows message submission while Running and shows Stop alongside Submit
  • queue-focused helper text explains that new instructions are applied at the next task boundary

This intentionally targets use case 2 from #192. Assistant-generated plans, structured handoff, and file or data-source injection can still follow in later work.

User Impact

Users can add follow-up instructions to a running automation flow without interrupting the active task. The current task continues, the new instruction is queued for the next handoff point, and Stop remains available when they do want to cancel execution.

Test Plan

  • go test ./pkg/controller/...
  • npx eslint src/features/flows/flow-form.tsx src/features/flows/messages/flow-automation-messages.tsx src/providers/flow-provider.tsx
  • npm run build
  • git diff --check

Notes

npm run lint still fails on this Windows host because of pre-existing unrelated frontend lint errors in files such as src/components/shared/markdown.tsx, src/components/ui/textarea.tsx, and several existing flow/search components. The files changed in this PR pass the targeted eslint command above.

Signed-off-by: mason5052 <ehehwnwjs5052@gmail.com>
Copilot AI review requested due to automatic review settings April 16, 2026 00:59
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enables submitting follow-up instructions to an automation flow while it is Running by queueing input server-side and adjusting the UI to allow submission without interrupting the current task.

Changes:

  • Backend: switch flow input from an unbuffered handoff to a bounded buffered queue and process provider switches at consumption time.
  • Frontend: allow message submission while Running, keep Stop visible, and add queue-focused helper/placeholder text.
  • Backend tests: add queue-focused unit tests covering enqueue, full-queue error, early error propagation, cancel, and stop-drain behavior.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
frontend/src/providers/flow-provider.tsx Removes provider override from automation message submission payload.
frontend/src/features/flows/messages/flow-automation-messages.tsx Enables input submission while Running, keeps Stop available, adds queue messaging.
frontend/src/features/flows/flow-form.tsx Adds props to allow input while loading and show both Submit/Stop buttons as needed.
backend/pkg/controller/flow.go Implements bounded input queueing, stop-time discard/drain handling, and defers provider switching to processing time.
backend/pkg/controller/flow_test.go Adds unit tests for queued input behavior and stop-drain semantics.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread backend/pkg/controller/flow.go Outdated
fw.taskMX.Lock()
defer fw.taskMX.Unlock()
fw.discard.Store(true)
defer fw.discard.Store(false)
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop() drains fw.input after waiting for the current task to stop, but PutInput() does not check fw.discard (the “stopping” flag). That means callers can still enqueue while Stop() is in progress, and drainPendingInput() can take arbitrarily long (or effectively never finish under a steady stream of inputs). Consider rejecting/enforcing backpressure in PutInput() when fw.discard is true (e.g., return a stopped/stopping error) so Stop() is bounded and callers get a consistent response during shutdown.

Suggested change
defer fw.discard.Store(false)

Copilot uses AI. Check for mistakes.
Comment on lines +587 to 590
if err := ctx.Err(); err != nil {
close(flin.done)
return fmt.Errorf("flow %d input processing timeout: %w", fw.flowCtx.FlowID, err)
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned when ctx.Err() != nil is labeled as an "input processing timeout", but ctx.Err() may also be context.Canceled. This can produce misleading error text for clients and logs. Consider wording that reflects both cancellation and deadline (or branch on errors.Is(err, context.DeadlineExceeded) vs canceled).

Copilot uses AI. Check for mistakes.
Comment on lines +41 to +46
select {
case err := <-resultCh:
require.NoError(t, err)
case <-time.After(flowInputTimeout + 500*time.Millisecond):
t.Fatal("PutInput did not return after the queue handshake timeout")
}
Copy link

Copilot AI Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test hard-codes a wait of flowInputTimeout (currently 1s), which makes the suite slower and can introduce flakiness under load. Consider making the handshake timeout injectable/overridable in tests (or using a much smaller test-only timeout) so the behavior is still covered without a 1s wall-clock dependency.

Copilot uses AI. Check for mistakes.
Signed-off-by: mason5052 <ehehwnwjs5052@gmail.com>
Signed-off-by: mason5052 <ehehwnwjs5052@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants