Callback for workflow update support#9614
Callback for workflow update support#9614Quinn-With-Two-Ns wants to merge 6 commits intotemporalio:mainfrom
Conversation
bergundy
left a comment
There was a problem hiding this comment.
I think we need just one more round here. For when updates are already completed, let's make sure to generate the new link type we discussed server-side.
| func (l *Library) Components() []*chasm.RegistrableComponent { | ||
| return []*chasm.RegistrableComponent{ | ||
| chasm.NewRegistrableComponent[*Workflow](chasm.WorkflowComponentName), | ||
| chasm.NewRegistrableComponent[*WorkflowUpdate](chasm.WorkflowUpdateComponentName), |
There was a problem hiding this comment.
Given that workflow update is tightly coupled to workflows, it makes total sense to put them in the same library.
| *workflowpb.UpdateState | ||
|
|
||
| // MSPointer is a special in-memory field for accessing the underlying mutable state. | ||
| chasm.MSPointer |
There was a problem hiding this comment.
This was only supposed to be embedded in the top level Workflow component but I can see why you'd want to access it here. No strong opinion because either way this would be a workaround. I wonder though if you need to embed this or if it'd be better to make it a named field.
There was a problem hiding this comment.
It was embed in the workflow component so I made it embed here
There was a problem hiding this comment.
if it's not embedded then it would also need to be an exported field otherwise CHASM tree deserialization will not work. Probably to keep similar convention embedding is ok here
| ) | ||
| MaxCallbacksPerUpdateID = NewNamespaceIntSetting( | ||
| "system.maxCallbacksPerUpdateID", | ||
| 32, |
There was a problem hiding this comment.
I think limiting all of the workflow callbacks, regardless of what component they're attached to makes more sense than a per component limit due to the fact that the entire tree needs to be loaded into memory when mutable state is accessed today.
There was a problem hiding this comment.
I also limited all workflow callbacks as well. I added this limit as well to keep one update from using up all the callbacks limit on a workflow.
stephanos
left a comment
There was a problem hiding this comment.
Only made it half-way through so far; but figured I can send my first review comments now.
| links []*commonpb.Link, | ||
| identity string, | ||
| priority *commonpb.Priority, | ||
| workflowUpdateOptions map[string]*historypb.WorkflowExecutionOptionsUpdatedEventAttributes_WorkflowUpdateOptionsUpdate, |
There was a problem hiding this comment.
I know it's not wrong, but ... WorkflowUpdateOptionsUpdate 😬
(non-blocking; just noticing)
There was a problem hiding this comment.
Yeah I agree
a453230 to
09ac27a
Compare
| // - The event will be written atomically with acceptance | ||
| // If the Update struct is lost (registry cleared), the abort mechanism fires | ||
| // registryClearedErr on the caller's future, prompting an immediate retry. | ||
| if u.state == stateAdmitted || u.state == stateSent { |
There was a problem hiding this comment.
added handling for stateAdmitted, should be same as stateSent but returns false, nil since IIUC caller still needs to create the speculative WFT at this stage
09ac27a to
9de5339
Compare
|
Made some updates to bring this to latest Only logical changes are on on the top commit -- handling |
| sigs.k8s.io/yaml v1.6.0 // indirect | ||
| ) | ||
|
|
||
| replace go.temporal.io/api => github.com/long-nt-tran/api-go v0.0.0-20260424225344-d14c0ccb7110 |
There was a problem hiding this comment.
Fork replace directive in go.mod must not merge
High Severity
A replace directive in go.mod redirects go.temporal.io/api to a personal fork (github.com/long-nt-tran/api-go). The PR description acknowledges this depends on a pending API PR, but this directive will break the build or introduce untested dependencies if merged to main. The corresponding go.sum entries for the fork are also included.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 9de5339. Configure here.
There was a problem hiding this comment.
this will be removed once api PR temporalio/api#742 is merged, ignore
9de5339 to
4b0915d
Compare
8551a4f to
3ae1202
Compare
Squashed these commits, left for posterity: - Add Nexus Workflow Update - Update from rebase - Fix sent state - Cleanup - Fix lint - Fix more CI - fix - Review clean up - Try suggestions from the review skill - Fix some tests - Add TODO for rejected event - Remove .omc from gitignore - Respond to PR comments - Add NS Capability for this feature - Respond to PR comments - Update API
3ae1202 to
2ce7339
Compare
2ce7339 to
239cb1f
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
Reviewed by Cursor Bugbot for commit 239cb1f. Configure here.
| Noop: false, | ||
| CreateWorkflowTask: false, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
Dedup return from AttachCallbacks skips workflow task creation
Medium Severity
When AttachCallbacks detects a duplicate requestID while in stateAdmitted, it returns (true, nil). The caller in api.go interprets callbacksAttached=true as "work is done" and returns early with Noop: false, CreateWorkflowTask: false, bypassing the speculative workflow task creation path. Since stateAdmitted means the update hasn't been dispatched to a worker yet, a workflow task is still needed. The first AttachCallbacks call correctly returns (false, nil), but a retry with the same requestID would hit the dedup path and skip WT creation, potentially leaving the update stuck.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 239cb1f. Configure here.
There was a problem hiding this comment.
not sure this is a bug? I think if we don't return true, nil on the second time, we may create 2 WFT for this update IIUC -- reviewers please chime in
There was a problem hiding this comment.
## What changed? When we set Nexus callback URL in test_env.go, the dynamic config override is still tied to the test's lifetime, not the cluster's lifetime, so a subsequent test that reuse this cluster will not have that override. Moving the override to onebox.go (similar pattern to #9918) so this default lives for the lifetime of the cluster. ## Why? Ran into issue with task token not set in #9614, this solves it. Breaking the fix in a separate PR for ease of review + checking this in first. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
## What changed? Added a `createExternalNexusServer(...)` which sets up an external Nexus endpoint with user-provided handler and listens on a provided address. This is used in nexus_workflow_test.go and will be used more in #9614 Opportunistically did a couple more drive-by refactors/consistency fixes, specifically: * Force user to provide `ctx` into the endpoint creation functions instead of making a new `ctx` * Use `env.Context()` instead of `testcore.NewContext()` in all suites that I touched here ## Why? Pulling changes out of #9614 into targeted PRs to reduce load on reviewers. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
| // TODO (alex-update): This method is noop because we don't currently write rejections to the history. | ||
| return nil | ||
| func (ms *MutableStateImpl) RejectWorkflowExecutionUpdate(updateID string, wfFailure *failurepb.Failure) error { | ||
| if !ms.chasmCallbacksEnabled() { |
There was a problem hiding this comment.
elsewhere we use chasmEnabled; why use chasmCallbacksEnabled here? if it's intentional, a comment would be helpful.
| // but update callbacks must fire now because the update was aborted on the old run. | ||
| func (w *Workflow) ProcessAllUpdateCloseCallbacks(ctx chasm.MutableContext) error { | ||
| for _, updateField := range w.Updates { | ||
| if err := callback.ScheduleStandbyCallbacks(ctx, updateField.Get(ctx).Callbacks); err != nil { |
There was a problem hiding this comment.
Is my understanding correct that once this returns an error; the entire attempt to schedule callbacks is aborted and retried from the top? We wouldn't want partial results.
| if len(wf.Updates) == 0 { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Is this a meaningful perf optimization?
| ce, errCE := ms.GetCompletionEvent(ctx) | ||
| if errors.Is(errCE, ErrMissingWorkflowCompletionEvent) { | ||
| return nexusrpc.CompleteOperationOptions{}, err | ||
| } else if errCE != nil { | ||
| return nexusrpc.CompleteOperationOptions{}, errCE | ||
| } |
There was a problem hiding this comment.
could we add some (short) comments here? I find it quite hard to follow why these branches exist exactly this way. The existing comment only adds to my confusion I found ("we need to respond to all callbacks"?).
| if len(u.pendingCallbacks) > 0 { | ||
| // Invariant: buffer must be cleared before reject. If we reach here, | ||
| // there is a bug in the caller (onRejectionMsg should clear the buffer). | ||
| return serviceerror.NewInternalf( |
There was a problem hiding this comment.
let's use softassert package here as it will return that internal error and highlight this invariant in test envs
| } | ||
| u.instrumentation.countRejectionMsg() | ||
| return u.reject(rej.Failure, effects) | ||
| // Flush any callbacks buffered during stateAdmitted or stateSent into CHASM |
There was a problem hiding this comment.
nit: let's add newline above here
| if len(u.pendingCallbacks) == 0 { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
is this a meaningful early return? the next line is the for loop and would take care of this already.
| // complete the update state machine. The API caller is notified of the | ||
| // rejection via WaitLifecycleStage regardless; only async CHASM callbacks | ||
| // are dropped, which is acceptable when the host workflow is already gone. | ||
| if eventStore.CanAddEvent() { |
There was a problem hiding this comment.
Would it be a better design to move the eventStore.CanAddEvent() check into flushPendingCallbacks? (flushPendingCallbacks should then always apply u.pendingCallbacks = nil)
That seems better to me as we can encapsulate this special case in there.
|
|
||
| // flushPendingCallbacks writes one WorkflowExecutionOptionsUpdatedEvent per | ||
| // buffered AttachCallbacks callback, skipping any whose requestID is already persisted. | ||
| // Called from onAcceptanceMsg after the acceptance event has been written. |
There was a problem hiding this comment.
This line makes it sound like it's only called from onAcceptanceMsg but that's not true.
| func (u *Update) AttachCallbacks( | ||
| req *updatepb.Request, | ||
| eventStore EventStore, | ||
| ) (bool, error) { |
There was a problem hiding this comment.
I have a hard time reasoning about this bool return here. After some back and forth my recommendation is to use a named return, ie (wroteHistoryEvent bool, err error) or wroteEvent here to make it easier to understand. I'd also remove references to "workflow task" here since that's not sth this method should care about IMO.
| Noop: false, | ||
| CreateWorkflowTask: false, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
| "go.temporal.io/server/common/nexus/nexusrpc" | ||
| ) | ||
|
|
||
| type WorkflowUpdate struct { |
There was a problem hiding this comment.
I think it's not yet clearly documented anywhere that the semantics of a rejected Update with vs without callbacks are different now. If an update has callbacks it will incur a write on rejection now where it didn't before.
I'd update docs/architecture/workflow-update.md and add comments across the update package to clarify that (e.g. pendingCallbacks, onRejectionMsg).
There was a problem hiding this comment.
If an update has callbacks it will incur a write on rejection now where it didn't before.
That shouldn't be true, the callback is not used for rejection normally (speculative update case). It should only used if the update was durably admitted
There was a problem hiding this comment.
I see onRejectionMsg has eventStore EventStore and if the update is in "sent" or "admitted" (ie before "acceptance") it invokes flushPendingCallbacks which - if there are callbacks - invokes AddWorkflowExecutionOptionsUpdatedEvent which AFAICT is a write? And then RejectWorkflowExecutionUpdate also adds another one, no?
| ) (bool, error) { | ||
| // Only attach callbacks if the request actually has something to attach. | ||
| // This preserves existing behavior for callers that don't set callbacks. | ||
| if len(req.GetCompletionCallbacks()) == 0 && req.GetRequestId() == "" { |
There was a problem hiding this comment.
I think this is wrong; when looking into the update rejection I saw that if no callbacks are given, but a request ID is set, it falls into the u.pendingCallbacks = append case and appends an empty pendingCallback AFAICT. Maybe this should be || or return an error earlier.


What changed?
Added support for Nexus workflow update completion callbacks via CHASM. This allows a Nexus caller to be notified when a workflow update completes by attaching completion callbacks to the update request.
Why?
Nexus operations that target workflow updates need a way to receive completion notifications. Without this, a Nexus caller that sends an update has no async mechanism to learn when the update finishes. Completion callbacks enable the same async notification pattern that already exists for workflow-level Nexus operations.
How did you test it?
Potential risks
Touches speculative workflow updates, they are always hard to reason about. Tried to compensate with lots of test coverage.
Note: Needs this API PR https://github.com/temporalio/api/pull/742/changes
Note
High Risk
High risk because it changes workflow update state-machine behavior and mutable-state/event interfaces to persist and fire per-update callbacks (including rejection handling), which impacts core history execution and callback delivery semantics.
Overview
Adds per-update completion callbacks (for Nexus) backed by CHASM by introducing a
WorkflowUpdatecomponent with persistedUpdateState(including rejection failure) and storing update-scoped callback maps under the root workflow.Extends the update protocol/state machine to accept
AttachCallbacks, buffer callback attachments pre-acceptance, persist them viaWorkflowExecutionOptionsUpdatedevents, dedupe byrequest_id, and ensure callbacks fire on update completion, rejection, and workflow close/continue-as-new/retry paths.Plumbs new dynamic config gates/limits (
EnableWorkflowUpdateCallbacks,MaxCallbacksPerUpdateID), adds update-callback visibility inDescribeWorkflowExecution, updates Nexus completion retrieval for updates (GetNexusUpdateCompletion), and adjusts API responses to include appropriate links for accepted vs rejected updates.Reviewed by Cursor Bugbot for commit 239cb1f. Bugbot is set up for automated code reviews on this repo. Configure here.