Skip to content

Callback for workflow update support#9614

Open
Quinn-With-Two-Ns wants to merge 6 commits intotemporalio:mainfrom
Quinn-With-Two-Ns:nexus-workflow-update
Open

Callback for workflow update support#9614
Quinn-With-Two-Ns wants to merge 6 commits intotemporalio:mainfrom
Quinn-With-Two-Ns:nexus-workflow-update

Conversation

@Quinn-With-Two-Ns
Copy link
Copy Markdown
Contributor

@Quinn-With-Two-Ns Quinn-With-Two-Ns commented Mar 21, 2026

What changed?

Added support for Nexus workflow update completion callbacks via CHASM. This allows a Nexus caller to be notified when a workflow update completes by attaching completion callbacks to the update request.

Why?

Nexus operations that target workflow updates need a way to receive completion notifications. Without this, a Nexus caller that sends an update has no async mechanism to learn when the update finishes. Completion callbacks enable the same async notification pattern that already exists for workflow-level Nexus operations.

How did you test it?

  • built
  • run locally and tested manually
  • covered by existing tests
  • added new unit test(s)
  • added new functional test(s)

Potential risks

Touches speculative workflow updates, they are always hard to reason about. Tried to compensate with lots of test coverage.

Note: Needs this API PR https://github.com/temporalio/api/pull/742/changes


Note

High Risk
High risk because it changes workflow update state-machine behavior and mutable-state/event interfaces to persist and fire per-update callbacks (including rejection handling), which impacts core history execution and callback delivery semantics.

Overview
Adds per-update completion callbacks (for Nexus) backed by CHASM by introducing a WorkflowUpdate component with persisted UpdateState (including rejection failure) and storing update-scoped callback maps under the root workflow.

Extends the update protocol/state machine to accept AttachCallbacks, buffer callback attachments pre-acceptance, persist them via WorkflowExecutionOptionsUpdated events, dedupe by request_id, and ensure callbacks fire on update completion, rejection, and workflow close/continue-as-new/retry paths.

Plumbs new dynamic config gates/limits (EnableWorkflowUpdateCallbacks, MaxCallbacksPerUpdateID), adds update-callback visibility in DescribeWorkflowExecution, updates Nexus completion retrieval for updates (GetNexusUpdateCompletion), and adjusts API responses to include appropriate links for accepted vs rejected updates.

Reviewed by Cursor Bugbot for commit 239cb1f. Bugbot is set up for automated code reviews on this repo. Configure here.

Comment thread common/dynamicconfig/constants.go Outdated
Comment thread chasm/tree.go
Copy link
Copy Markdown
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need just one more round here. For when updates are already completed, let's make sure to generate the new link type we discussed server-side.

Comment thread chasm/lib/workflow/library.go Outdated
func (l *Library) Components() []*chasm.RegistrableComponent {
return []*chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*Workflow](chasm.WorkflowComponentName),
chasm.NewRegistrableComponent[*WorkflowUpdate](chasm.WorkflowUpdateComponentName),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that workflow update is tightly coupled to workflows, it makes total sense to put them in the same library.

*workflowpb.UpdateState

// MSPointer is a special in-memory field for accessing the underlying mutable state.
chasm.MSPointer
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only supposed to be embedded in the top level Workflow component but I can see why you'd want to access it here. No strong opinion because either way this would be a workaround. I wonder though if you need to embed this or if it'd be better to make it a named field.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was embed in the workflow component so I made it embed here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's not embedded then it would also need to be an exported field otherwise CHASM tree deserialization will not work. Probably to keep similar convention embedding is ok here

Comment thread chasm/lib/workflow/workflow_update.go Outdated
Comment thread chasm/workflow.go Outdated
)
MaxCallbacksPerUpdateID = NewNamespaceIntSetting(
"system.maxCallbacksPerUpdateID",
32,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think limiting all of the workflow callbacks, regardless of what component they're attached to makes more sense than a per component limit due to the fact that the entire tree needs to be loaded into memory when mutable state is accessed today.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also limited all workflow callbacks as well. I added this limit as well to keep one update from using up all the callbacks limit on a workflow.

Comment thread tests/nexus_workflow_update_test.go Outdated
Comment thread tests/nexus_workflow_update_test.go Outdated
Comment thread tests/nexus_workflow_update_test.go Outdated
Comment thread tests/nexus_workflow_update_test.go Outdated
Comment thread service/history/workflow/update/update.go
Copy link
Copy Markdown
Contributor

@stephanos stephanos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only made it half-way through so far; but figured I can send my first review comments now.

Comment thread .gitignore Outdated
Comment thread service/history/workflow/update/export_test.go
Comment thread tests/update_workflow_sdk_test.go
Comment thread tests/update_workflow_sdk_test.go Outdated
links []*commonpb.Link,
identity string,
priority *commonpb.Priority,
workflowUpdateOptions map[string]*historypb.WorkflowExecutionOptionsUpdatedEventAttributes_WorkflowUpdateOptionsUpdate,
Copy link
Copy Markdown
Contributor

@stephanos stephanos Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's not wrong, but ... WorkflowUpdateOptionsUpdate 😬

(non-blocking; just noticing)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree

Comment thread service/history/interfaces/mutable_state.go Outdated
Comment thread service/history/interfaces/mutable_state.go Outdated
@long-nt-tran long-nt-tran force-pushed the nexus-workflow-update branch 7 times, most recently from a453230 to 09ac27a Compare April 27, 2026 14:58
// - The event will be written atomically with acceptance
// If the Update struct is lost (registry cleared), the abort mechanism fires
// registryClearedErr on the caller's future, prompting an immediate retry.
if u.state == stateAdmitted || u.state == stateSent {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added handling for stateAdmitted, should be same as stateSent but returns false, nil since IIUC caller still needs to create the speculative WFT at this stage

@long-nt-tran long-nt-tran force-pushed the nexus-workflow-update branch from 09ac27a to 9de5339 Compare April 27, 2026 16:05
@long-nt-tran
Copy link
Copy Markdown
Contributor

Made some updates to bring this to latest main, I squashed the base PR to first commit and group each type of change into a subsequent commit for ease of review.

Only logical changes are on on the top commit -- handling stateAdmitted and flushing callbacks to CHASM store before rejecting, and added some more unit tests to test nexus cases + backlinking.

cc @bergundy @Quinn-With-Two-Ns @stephanos

@long-nt-tran long-nt-tran marked this pull request as ready for review April 27, 2026 16:34
@long-nt-tran long-nt-tran requested review from a team as code owners April 27, 2026 16:34
Comment thread go.mod Outdated
sigs.k8s.io/yaml v1.6.0 // indirect
)

replace go.temporal.io/api => github.com/long-nt-tran/api-go v0.0.0-20260424225344-d14c0ccb7110
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fork replace directive in go.mod must not merge

High Severity

A replace directive in go.mod redirects go.temporal.io/api to a personal fork (github.com/long-nt-tran/api-go). The PR description acknowledges this depends on a pending API PR, but this directive will break the build or introduce untested dependencies if merged to main. The corresponding go.sum entries for the fork are also included.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9de5339. Configure here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be removed once api PR temporalio/api#742 is merged, ignore

Comment thread service/history/workflow/mutable_state_impl.go
Comment thread service/history/workflow/update/update.go
@long-nt-tran long-nt-tran force-pushed the nexus-workflow-update branch from 9de5339 to 4b0915d Compare April 27, 2026 17:52
Comment thread service/history/api/updateworkflow/api.go
@long-nt-tran long-nt-tran force-pushed the nexus-workflow-update branch 2 times, most recently from 8551a4f to 3ae1202 Compare April 27, 2026 20:22
Comment thread chasm/lib/workflow/workflow.go
Squashed these commits, left for posterity:
- Add Nexus Workflow Update
- Update from rebase
- Fix sent state
- Cleanup
- Fix lint
- Fix more CI
- fix
- Review clean up
- Try suggestions from the review skill
- Fix some tests
- Add TODO for rejected event
- Remove .omc from gitignore
- Respond to PR comments
- Add NS Capability for this feature
- Respond to PR comments
- Update API
@long-nt-tran long-nt-tran force-pushed the nexus-workflow-update branch from 3ae1202 to 2ce7339 Compare April 28, 2026 02:13
Comment thread service/history/workflow/mutable_state_impl.go
@long-nt-tran long-nt-tran force-pushed the nexus-workflow-update branch from 2ce7339 to 239cb1f Compare April 28, 2026 02:35
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 239cb1f. Configure here.

Noop: false,
CreateWorkflowTask: false,
}, nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dedup return from AttachCallbacks skips workflow task creation

Medium Severity

When AttachCallbacks detects a duplicate requestID while in stateAdmitted, it returns (true, nil). The caller in api.go interprets callbacksAttached=true as "work is done" and returns early with Noop: false, CreateWorkflowTask: false, bypassing the speculative workflow task creation path. Since stateAdmitted means the update hasn't been dispatched to a worker yet, a workflow task is still needed. The first AttachCallbacks call correctly returns (false, nil), but a retry with the same requestID would hit the dedup path and skip WT creation, potentially leaving the update stuck.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 239cb1f. Configure here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure this is a bug? I think if we don't return true, nil on the second time, we may create 2 WFT for this update IIUC -- reviewers please chime in

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread tests/testcore/test_cluster_pool.go
long-nt-tran added a commit that referenced this pull request Apr 28, 2026
## What changed?

When we set Nexus callback URL in test_env.go, the dynamic config
override is still tied to the test's lifetime, not the cluster's
lifetime, so a subsequent test that reuse this cluster will not have
that override. Moving the override to onebox.go (similar pattern to
#9918) so this default lives
for the lifetime of the cluster.

## Why?

Ran into issue with task token not set in
#9614, this solves it.
Breaking the fix in a separate PR for ease of review + checking this in
first.

## How did you test it?
- [ ] built
- [ ] run locally and tested manually
- [x] covered by existing tests
- [ ] added new unit test(s)
- [ ] added new functional test(s)
long-nt-tran added a commit that referenced this pull request Apr 29, 2026
## What changed?

Added a `createExternalNexusServer(...)` which sets up an external Nexus
endpoint with user-provided handler and listens on a provided address.
This is used in nexus_workflow_test.go and will be used more in
#9614

Opportunistically did a couple more drive-by refactors/consistency
fixes, specifically:
* Force user to provide `ctx` into the endpoint creation functions
instead of making a new `ctx`
* Use `env.Context()` instead of `testcore.NewContext()` in all suites
that I touched here

## Why?

Pulling changes out of #9614
into targeted PRs to reduce load on reviewers.

## How did you test it?
- [ ] built
- [ ] run locally and tested manually
- [x] covered by existing tests
- [ ] added new unit test(s)
- [ ] added new functional test(s)
Comment thread tests/nexus_workflow_update_test.go
// TODO (alex-update): This method is noop because we don't currently write rejections to the history.
return nil
func (ms *MutableStateImpl) RejectWorkflowExecutionUpdate(updateID string, wfFailure *failurepb.Failure) error {
if !ms.chasmCallbacksEnabled() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elsewhere we use chasmEnabled; why use chasmCallbacksEnabled here? if it's intentional, a comment would be helpful.

Comment thread chasm/lib/workflow/workflow.go
Comment thread service/history/workflow/mutable_state_impl.go
// but update callbacks must fire now because the update was aborted on the old run.
func (w *Workflow) ProcessAllUpdateCloseCallbacks(ctx chasm.MutableContext) error {
for _, updateField := range w.Updates {
if err := callback.ScheduleStandbyCallbacks(ctx, updateField.Get(ctx).Callbacks); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding correct that once this returns an error; the entire attempt to schedule callbacks is aborted and retried from the top? We wouldn't want partial results.

Comment on lines +7087 to +7089
if len(wf.Updates) == 0 {
return nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a meaningful perf optimization?

Comment thread service/history/workflow/mutable_state_impl.go
Comment on lines +751 to +756
ce, errCE := ms.GetCompletionEvent(ctx)
if errors.Is(errCE, ErrMissingWorkflowCompletionEvent) {
return nexusrpc.CompleteOperationOptions{}, err
} else if errCE != nil {
return nexusrpc.CompleteOperationOptions{}, errCE
}
Copy link
Copy Markdown
Contributor

@stephanos stephanos May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add some (short) comments here? I find it quite hard to follow why these branches exist exactly this way. The existing comment only adds to my confusion I found ("we need to respond to all callbacks"?).

Comment thread service/history/api/updateworkflow/api.go
Comment thread service/history/api/updateworkflow/api.go
if len(u.pendingCallbacks) > 0 {
// Invariant: buffer must be cleared before reject. If we reach here,
// there is a bug in the caller (onRejectionMsg should clear the buffer).
return serviceerror.NewInternalf(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use softassert package here as it will return that internal error and highlight this invariant in test envs

}
u.instrumentation.countRejectionMsg()
return u.reject(rej.Failure, effects)
// Flush any callbacks buffered during stateAdmitted or stateSent into CHASM
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add newline above here

Comment on lines +494 to +496
if len(u.pendingCallbacks) == 0 {
return nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a meaningful early return? the next line is the for loop and would take care of this already.

// complete the update state machine. The API caller is notified of the
// rejection via WaitLifecycleStage regardless; only async CHASM callbacks
// are dropped, which is acceptable when the host workflow is already gone.
if eventStore.CanAddEvent() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a better design to move the eventStore.CanAddEvent() check into flushPendingCallbacks? (flushPendingCallbacks should then always apply u.pendingCallbacks = nil)

That seems better to me as we can encapsulate this special case in there.


// flushPendingCallbacks writes one WorkflowExecutionOptionsUpdatedEvent per
// buffered AttachCallbacks callback, skipping any whose requestID is already persisted.
// Called from onAcceptanceMsg after the acceptance event has been written.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line makes it sound like it's only called from onAcceptanceMsg but that's not true.

Comment thread service/history/workflow/update/update.go
Comment thread service/history/workflow/update/update.go
func (u *Update) AttachCallbacks(
req *updatepb.Request,
eventStore EventStore,
) (bool, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a hard time reasoning about this bool return here. After some back and forth my recommendation is to use a named return, ie (wroteHistoryEvent bool, err error) or wroteEvent here to make it easier to understand. I'd also remove references to "workflow task" here since that's not sth this method should care about IMO.

Noop: false,
CreateWorkflowTask: false,
}, nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"go.temporal.io/server/common/nexus/nexusrpc"
)

type WorkflowUpdate struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not yet clearly documented anywhere that the semantics of a rejected Update with vs without callbacks are different now. If an update has callbacks it will incur a write on rejection now where it didn't before.

I'd update docs/architecture/workflow-update.md and add comments across the update package to clarify that (e.g. pendingCallbacks, onRejectionMsg).

Copy link
Copy Markdown
Contributor Author

@Quinn-With-Two-Ns Quinn-With-Two-Ns May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an update has callbacks it will incur a write on rejection now where it didn't before.

That shouldn't be true, the callback is not used for rejection normally (speculative update case). It should only used if the update was durably admitted

Copy link
Copy Markdown
Contributor

@stephanos stephanos May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see onRejectionMsg has eventStore EventStore and if the update is in "sent" or "admitted" (ie before "acceptance") it invokes flushPendingCallbacks which - if there are callbacks - invokes AddWorkflowExecutionOptionsUpdatedEvent which AFAICT is a write? And then RejectWorkflowExecutionUpdate also adds another one, no?

Comment thread service/history/workflow/mutable_state_impl.go
) (bool, error) {
// Only attach callbacks if the request actually has something to attach.
// This preserves existing behavior for callers that don't set callbacks.
if len(req.GetCompletionCallbacks()) == 0 && req.GetRequestId() == "" {
Copy link
Copy Markdown
Contributor

@stephanos stephanos May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is wrong; when looking into the update rejection I saw that if no callbacks are given, but a request ID is set, it falls into the u.pendingCallbacks = append case and appends an empty pendingCallback AFAICT. Maybe this should be || or return an error earlier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants