From 369380564b6a1f67cf7c9a49b0b5197db2970683 Mon Sep 17 00:00:00 2001 From: Ian Duncan Date: Tue, 10 Mar 2026 10:52:40 +0100 Subject: [PATCH] feat(sdk): add BuildId newtype and implement remaining pending tests --- sdk/src/Temporal/Common.hs | 6 ++++ sdk/src/Temporal/Worker.hs | 5 ++-- sdk/src/Temporal/Workflow/Types.hs | 2 +- sdk/src/Temporal/Workflow/Worker.hs | 2 +- sdk/test/PendingFeaturesSpec.hs | 4 --- sdk/test/ReplaySpec.hs | 39 +++++++++++++++++++++++++ sdk/test/UpdateSpec.hs | 25 ++++++++++++++++ sdk/test/WorkflowSpec.hs | 45 +++++++++++++++++++++++++++-- 8 files changed, 118 insertions(+), 10 deletions(-) diff --git a/sdk/src/Temporal/Common.hs b/sdk/src/Temporal/Common.hs index c6300b69..f23e04d2 100644 --- a/sdk/src/Temporal/Common.hs +++ b/sdk/src/Temporal/Common.hs @@ -92,6 +92,12 @@ newtype TaskQueue = TaskQueue {rawTaskQueue :: Text} deriving newtype (Eq, Ord, Show, Hashable, IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey) +-- | Identifies the exact code version running in a worker. Useful for versioned task queues. +newtype BuildId = BuildId {rawBuildId :: Text} + deriving stock (Lift) + deriving newtype (Eq, Ord, Show, Hashable, IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey) + + newtype ActivityType = ActivityType {rawActivityType :: Text} deriving stock (Data, Lift) deriving newtype (Eq, Ord, Show, Hashable, IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey) diff --git a/sdk/src/Temporal/Worker.hs b/sdk/src/Temporal/Worker.hs index 9217fa9a..11908cb1 100644 --- a/sdk/src/Temporal/Worker.hs +++ b/sdk/src/Temporal/Worker.hs @@ -86,6 +86,7 @@ module Temporal.Worker ( setGracefulShutdownPeriodMillis, addInterceptors, setPayloadProcessor, + BuildId (..), WorkflowId (..), ) where @@ -298,8 +299,8 @@ in the Temporal UI. The buildId is used to identify the exact version of the cod its dependencies. In e.g. Nix, the executable path in the Nix store would be a useful buildId. -} -setBuildId :: Text -> ConfigM actEnv () -setBuildId bid = modifyCore $ \conf -> +setBuildId :: BuildId -> ConfigM actEnv () +setBuildId (BuildId bid) = modifyCore $ \conf -> conf { Core.buildId = bid } diff --git a/sdk/src/Temporal/Workflow/Types.hs b/sdk/src/Temporal/Workflow/Types.hs index 4ddf44f1..77c52bc2 100644 --- a/sdk/src/Temporal/Workflow/Types.hs +++ b/sdk/src/Temporal/Workflow/Types.hs @@ -23,7 +23,7 @@ import Temporal.SearchAttributes data Info = Info { historyLength :: {-# UNPACK #-} !Word32 , attempt :: {-# UNPACK #-} !Int - , buildId :: !Text + , buildId :: !BuildId , continuedRunId :: !(Maybe RunId) , cronSchedule :: !(Maybe Text) , executionTimeout :: !(Maybe Duration) diff --git a/sdk/src/Temporal/Workflow/Worker.hs b/sdk/src/Temporal/Workflow/Worker.hs index 1f23b39e..721a7241 100644 --- a/sdk/src/Temporal/Workflow/Worker.hs +++ b/sdk/src/Temporal/Workflow/Worker.hs @@ -244,7 +244,7 @@ handleActivation activation = inSpan' "handleActivation" (defaultSpanArguments { Temporal.WorkflowInstance.Info { historyLength = activation ^. Activation.historyLength , attempt = fromIntegral $ initializeWorkflow ^. Activation.attempt - , buildId = Core.buildId $ Core.getWorkerConfig workerCore + , buildId = BuildId $ Core.buildId $ Core.getWorkerConfig workerCore , taskQueue = worker.workerTaskQueue , workflowId = WorkflowId $ initializeWorkflow ^. Activation.workflowId , workflowType = initializeWorkflow ^. Activation.workflowType . to WorkflowType diff --git a/sdk/test/PendingFeaturesSpec.hs b/sdk/test/PendingFeaturesSpec.hs index bc3c7a4f..28512a60 100644 --- a/sdk/test/PendingFeaturesSpec.hs +++ b/sdk/test/PendingFeaturesSpec.hs @@ -102,13 +102,9 @@ spec = do it "task failure fails replay" $ const pending describe "Update handler executing activity (Py/TS)" $ do - it "update timeout or cancel" $ const pending - it "update separate handle" $ const pending it "updateWithStart" $ const pending describe "Worker Lifecycle (Py/TS)" $ do - it "worker allows heartbeating after shutdown" $ const pending - it "worker fails unknown namespace" $ const pending it "worker connection replacement" $ const pending it "worker exposes abort controller equivalent" $ const pending it "worker heartbeat management" $ const pending diff --git a/sdk/test/ReplaySpec.hs b/sdk/test/ReplaySpec.hs index 2a67593b..d509ecfb 100644 --- a/sdk/test/ReplaySpec.hs +++ b/sdk/test/ReplaySpec.hs @@ -1,7 +1,10 @@ module ReplaySpec where +import Control.Exception (SomeException) +import qualified Control.Monad.Catch as Catch import Control.Monad (void, when) import Data.Either (isLeft, isRight) +import Data.ProtoLens (defMessage) import qualified Data.Text as Text import RequireCallStack (provideCallStack) import System.Directory (getTemporaryDirectory, removeFile) @@ -183,3 +186,39 @@ tests = describe "Workflow Replay" $ do case result of Left err -> err.message `shouldSatisfy` (not . Text.null) Right () -> expectationFailure "Expected replay failure" + + specify "empty history replay fails" $ \TestEnv {..} -> do + let workflow :: W.ProvidedWorkflow (W.Workflow ()) + workflow = W.provideWorkflow JSON "replay-empty-wf" $ pure () + conf = provideCallStack $ configure () workflow baseConf + emptyHistory = defMessage + result <- Catch.try @_ @SomeException (runReplayHistory globalRuntime conf emptyHistory) + result `shouldSatisfy` \case + Left _ -> True + Right (Left _) -> True + Right (Right ()) -> False + + specify "task failure fails replay" $ \TestEnv {..} -> do + let workflow :: W.ProvidedWorkflow (W.Workflow ()) + workflow = W.provideWorkflow JSON "replay-task-fail-wf" $ provideCallStack $ do + W.sleep $ milliseconds 10 + _ <- W.executeActivity replayActivityDef.reference (W.defaultStartActivityOptions $ W.StartToClose $ seconds 3) + pure () + defs = (replayActivityDef, workflow) + conf = provideCallStack $ configure () defs baseConf + + history <- withWorker conf $ do + uuid <- uuidText + let opts = defaultStartOptsWithTimeout taskQueue (seconds 10) + useClient $ do + wfHandle <- C.start workflow (W.WorkflowId uuid) opts + C.waitWorkflowResult wfHandle + C.fetchHistory wfHandle + + let throwingWorkflow :: W.ProvidedWorkflow (W.Workflow ()) + throwingWorkflow = W.provideWorkflow JSON "replay-task-fail-wf" $ provideCallStack $ do + error "deliberate task failure during replay" + throwingConf = provideCallStack $ configure () throwingWorkflow baseConf + + result <- runReplayHistory globalRuntime throwingConf history + result `shouldSatisfy` isLeft diff --git a/sdk/test/UpdateSpec.hs b/sdk/test/UpdateSpec.hs index 11bb96ed..4a840c9c 100644 --- a/sdk/test/UpdateSpec.hs +++ b/sdk/test/UpdateSpec.hs @@ -267,6 +267,31 @@ updateTests = describe "Update" $ do ur `shouldBe` 42 wr `shouldBe` 42 + it "startUpdate and waitUpdateResult work as separate steps" $ \TestEnv {..} -> do + let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do baseConf + withWorker conf $ do + let opts = defaultStartOptsWithTimeout taskQueue (seconds 10) + updateOpts = C.UpdateOptions {updateId = "separate-handle", updateHeaders = mempty} + (ur, wr) <- useClient do + h <- C.start UpdateWithoutValidator "separate-handle-wf" opts + updateHandle <- C.startUpdate h testUpdate updateOpts 42 + updateResult <- C.waitUpdateResult updateHandle + wfResult <- C.waitWorkflowResult h + pure (updateResult, wfResult) + ur `shouldBe` 42 + wr `shouldBe` 42 + + it "startUpdate fire-and-forget does not block" $ \TestEnv {..} -> do + let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do baseConf + withWorker conf $ do + let opts = defaultStartOptsWithTimeout taskQueue (seconds 10) + updateOpts = C.UpdateOptions {updateId = "fire-forget", updateHeaders = mempty} + wr <- useClient do + h <- C.start UpdateWithoutValidator "fire-forget-wf" opts + _ <- C.startUpdate h testUpdate updateOpts 99 + C.waitWorkflowResult h + wr `shouldBe` 99 + updateInterceptorTests :: SpecWith PortNumber updateInterceptorTests = do diff --git a/sdk/test/WorkflowSpec.hs b/sdk/test/WorkflowSpec.hs index f83d0137..0ce8b041 100644 --- a/sdk/test/WorkflowSpec.hs +++ b/sdk/test/WorkflowSpec.hs @@ -5,6 +5,7 @@ module WorkflowSpec where import Conduit import Control.Concurrent (threadDelay) import Control.Exception (SomeException, bracket) +import Data.IORef import Control.Exception.Annotated (checkpoint) import qualified Control.Monad.Catch as Catch import Control.Monad.Logger (logInfoN) @@ -711,7 +712,7 @@ tests = do useClient (C.execute wf.reference "buildIdWorkflow" opts) `shouldReturn` 1 specify "build ID is visible in workflow info" $ \TestEnv {..} -> do - let workflow :: W.Workflow Text + let workflow :: W.Workflow BuildId workflow = do i <- W.info pure i.buildId @@ -721,7 +722,7 @@ tests = do setBuildId "my-build-abc" withWorker conf $ do let opts = defaultStartOpts taskQueue - useClient (C.execute wf.reference "buildIdInfoWf" opts) `shouldReturn` "my-build-abc" + useClient (C.execute wf.reference "buildIdInfoWf" opts) `shouldReturn` BuildId "my-build-abc" describe "Memo operations (Py/TS: memo access)" $ do specify "getMemoValues returns initial memos (Py: test_workflow_memo)" $ \TestEnv {..} -> do @@ -986,3 +987,43 @@ tests = do _ <- useClient (C.start wf.reference "shutdownCancelsAct" opts) shutdown worker pure () + + specify "worker fails unknown namespace" $ \TestEnv {..} -> do + let workflow :: W.Workflow () + workflow = pure () + wf = W.provideWorkflow defaultCodec "unknownNsWf" workflow + conf = configure () wf $ do + baseConf + setNamespace "this-namespace-does-not-exist-oogabooga" + result <- Catch.try @_ @SomeException (startWorker coreClient conf >>= shutdown) + result `shouldSatisfy` \case + Left _ -> True + Right _ -> False + + specify "worker allows heartbeating after shutdown" $ \TestEnv {..} -> do + heartbeatAfterShutdown <- newIORef False + let act :: A.Activity () () + act = A.withHeartbeat defaultCodec $ \hb (_ :: A.Activity () (Maybe ())) -> do + let loop :: A.Activity () () + loop = do + hb () + liftIO $ threadDelay 200_000 + loop + loop + actDef = A.provideActivity defaultCodec "hbAfterShutdownAct" act + workflow :: MyWorkflow () + workflow = + W.executeActivity actDef.reference + (W.defaultStartActivityOptions $ W.StartToClose $ seconds 30) + { W.heartbeatTimeout = Just $ seconds 5 } + wf = W.provideWorkflow defaultCodec "hbAfterShutdownWf" workflow + conf = configure () (wf, actDef) $ do + baseConf + setGracefulShutdownPeriodMillis 1000 + worker <- startWorker coreClient conf + let opts = defaultStartOptsWithTimeout taskQueue (seconds 15) + _ <- useClient (C.start wf.reference "hbAfterShutdown" opts) + liftIO $ threadDelay 500_000 + shutdown worker + writeIORef heartbeatAfterShutdown True + readIORef heartbeatAfterShutdown `shouldReturn` True