Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/src/Temporal/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ newtype TaskQueue = TaskQueue {rawTaskQueue :: Text}
deriving newtype (Eq, Ord, Show, Hashable, IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey)


-- | Identifies the exact code version running in a worker. Useful for versioned task queues.
newtype BuildId = BuildId {rawBuildId :: Text}
deriving stock (Lift)
deriving newtype (Eq, Ord, Show, Hashable, IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey)


newtype ActivityType = ActivityType {rawActivityType :: Text}
deriving stock (Data, Lift)
deriving newtype (Eq, Ord, Show, Hashable, IsString, ToJSON, ToJSONKey, FromJSON, FromJSONKey)
Expand Down
5 changes: 3 additions & 2 deletions sdk/src/Temporal/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ module Temporal.Worker (
setGracefulShutdownPeriodMillis,
addInterceptors,
setPayloadProcessor,
BuildId (..),
WorkflowId (..),
) where

Expand Down Expand Up @@ -298,8 +299,8 @@ in the Temporal UI. The buildId is used to identify the exact version of the cod
its dependencies. In e.g. Nix, the executable path in the Nix store would be a useful
buildId.
-}
setBuildId :: Text -> ConfigM actEnv ()
setBuildId bid = modifyCore $ \conf ->
setBuildId :: BuildId -> ConfigM actEnv ()
setBuildId (BuildId bid) = modifyCore $ \conf ->
conf
{ Core.buildId = bid
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/Temporal/Workflow/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import Temporal.SearchAttributes
data Info = Info
{ historyLength :: {-# UNPACK #-} !Word32
, attempt :: {-# UNPACK #-} !Int
, buildId :: !Text
, buildId :: !BuildId
, continuedRunId :: !(Maybe RunId)
, cronSchedule :: !(Maybe Text)
, executionTimeout :: !(Maybe Duration)
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/Temporal/Workflow/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ handleActivation activation = inSpan' "handleActivation" (defaultSpanArguments {
Temporal.WorkflowInstance.Info
{ historyLength = activation ^. Activation.historyLength
, attempt = fromIntegral $ initializeWorkflow ^. Activation.attempt
, buildId = Core.buildId $ Core.getWorkerConfig workerCore
, buildId = BuildId $ Core.buildId $ Core.getWorkerConfig workerCore
, taskQueue = worker.workerTaskQueue
, workflowId = WorkflowId $ initializeWorkflow ^. Activation.workflowId
, workflowType = initializeWorkflow ^. Activation.workflowType . to WorkflowType
Expand Down
4 changes: 0 additions & 4 deletions sdk/test/PendingFeaturesSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,9 @@ spec = do
it "task failure fails replay" $ const pending

describe "Update handler executing activity (Py/TS)" $ do
it "update timeout or cancel" $ const pending
it "update separate handle" $ const pending
it "updateWithStart" $ const pending

describe "Worker Lifecycle (Py/TS)" $ do
it "worker allows heartbeating after shutdown" $ const pending
it "worker fails unknown namespace" $ const pending
it "worker connection replacement" $ const pending
it "worker exposes abort controller equivalent" $ const pending
it "worker heartbeat management" $ const pending
Expand Down
39 changes: 39 additions & 0 deletions sdk/test/ReplaySpec.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
module ReplaySpec where

import Control.Exception (SomeException)
import qualified Control.Monad.Catch as Catch
import Control.Monad (void, when)
import Data.Either (isLeft, isRight)
import Data.ProtoLens (defMessage)
import qualified Data.Text as Text
import RequireCallStack (provideCallStack)
import System.Directory (getTemporaryDirectory, removeFile)
Expand Down Expand Up @@ -183,3 +186,39 @@ tests = describe "Workflow Replay" $ do
case result of
Left err -> err.message `shouldSatisfy` (not . Text.null)
Right () -> expectationFailure "Expected replay failure"

specify "empty history replay fails" $ \TestEnv {..} -> do
let workflow :: W.ProvidedWorkflow (W.Workflow ())
workflow = W.provideWorkflow JSON "replay-empty-wf" $ pure ()
conf = provideCallStack $ configure () workflow baseConf
emptyHistory = defMessage
result <- Catch.try @_ @SomeException (runReplayHistory globalRuntime conf emptyHistory)
result `shouldSatisfy` \case
Left _ -> True
Right (Left _) -> True
Right (Right ()) -> False

specify "task failure fails replay" $ \TestEnv {..} -> do
let workflow :: W.ProvidedWorkflow (W.Workflow ())
workflow = W.provideWorkflow JSON "replay-task-fail-wf" $ provideCallStack $ do
W.sleep $ milliseconds 10
_ <- W.executeActivity replayActivityDef.reference (W.defaultStartActivityOptions $ W.StartToClose $ seconds 3)
pure ()
defs = (replayActivityDef, workflow)
conf = provideCallStack $ configure () defs baseConf

history <- withWorker conf $ do
uuid <- uuidText
let opts = defaultStartOptsWithTimeout taskQueue (seconds 10)
useClient $ do
wfHandle <- C.start workflow (W.WorkflowId uuid) opts
C.waitWorkflowResult wfHandle
C.fetchHistory wfHandle

let throwingWorkflow :: W.ProvidedWorkflow (W.Workflow ())
throwingWorkflow = W.provideWorkflow JSON "replay-task-fail-wf" $ provideCallStack $ do
error "deliberate task failure during replay"
throwingConf = provideCallStack $ configure () throwingWorkflow baseConf

result <- runReplayHistory globalRuntime throwingConf history
result `shouldSatisfy` isLeft
25 changes: 25 additions & 0 deletions sdk/test/UpdateSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,31 @@ updateTests = describe "Update" $ do
ur `shouldBe` 42
wr `shouldBe` 42

it "startUpdate and waitUpdateResult work as separate steps" $ \TestEnv {..} -> do
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do baseConf
withWorker conf $ do
let opts = defaultStartOptsWithTimeout taskQueue (seconds 10)
updateOpts = C.UpdateOptions {updateId = "separate-handle", updateHeaders = mempty}
(ur, wr) <- useClient do
h <- C.start UpdateWithoutValidator "separate-handle-wf" opts
updateHandle <- C.startUpdate h testUpdate updateOpts 42
updateResult <- C.waitUpdateResult updateHandle
wfResult <- C.waitWorkflowResult h
pure (updateResult, wfResult)
ur `shouldBe` 42
wr `shouldBe` 42

it "startUpdate fire-and-forget does not block" $ \TestEnv {..} -> do
let conf = provideCallStack $ configure () (discoverDefinitions @() $$(discoverInstances) $$(discoverInstances)) $ do baseConf
withWorker conf $ do
let opts = defaultStartOptsWithTimeout taskQueue (seconds 10)
updateOpts = C.UpdateOptions {updateId = "fire-forget", updateHeaders = mempty}
wr <- useClient do
h <- C.start UpdateWithoutValidator "fire-forget-wf" opts
_ <- C.startUpdate h testUpdate updateOpts 99
C.waitWorkflowResult h
wr `shouldBe` 99


updateInterceptorTests :: SpecWith PortNumber
updateInterceptorTests = do
Expand Down
45 changes: 43 additions & 2 deletions sdk/test/WorkflowSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module WorkflowSpec where
import Conduit
import Control.Concurrent (threadDelay)
import Control.Exception (SomeException, bracket)
import Data.IORef
import Control.Exception.Annotated (checkpoint)
import qualified Control.Monad.Catch as Catch
import Control.Monad.Logger (logInfoN)
Expand Down Expand Up @@ -711,7 +712,7 @@ tests = do
useClient (C.execute wf.reference "buildIdWorkflow" opts) `shouldReturn` 1

specify "build ID is visible in workflow info" $ \TestEnv {..} -> do
let workflow :: W.Workflow Text
let workflow :: W.Workflow BuildId
workflow = do
i <- W.info
pure i.buildId
Expand All @@ -721,7 +722,7 @@ tests = do
setBuildId "my-build-abc"
withWorker conf $ do
let opts = defaultStartOpts taskQueue
useClient (C.execute wf.reference "buildIdInfoWf" opts) `shouldReturn` "my-build-abc"
useClient (C.execute wf.reference "buildIdInfoWf" opts) `shouldReturn` BuildId "my-build-abc"

describe "Memo operations (Py/TS: memo access)" $ do
specify "getMemoValues returns initial memos (Py: test_workflow_memo)" $ \TestEnv {..} -> do
Expand Down Expand Up @@ -986,3 +987,43 @@ tests = do
_ <- useClient (C.start wf.reference "shutdownCancelsAct" opts)
shutdown worker
pure ()

specify "worker fails unknown namespace" $ \TestEnv {..} -> do
let workflow :: W.Workflow ()
workflow = pure ()
wf = W.provideWorkflow defaultCodec "unknownNsWf" workflow
conf = configure () wf $ do
baseConf
setNamespace "this-namespace-does-not-exist-oogabooga"
result <- Catch.try @_ @SomeException (startWorker coreClient conf >>= shutdown)
result `shouldSatisfy` \case
Left _ -> True
Right _ -> False

specify "worker allows heartbeating after shutdown" $ \TestEnv {..} -> do
heartbeatAfterShutdown <- newIORef False
let act :: A.Activity () ()
act = A.withHeartbeat defaultCodec $ \hb (_ :: A.Activity () (Maybe ())) -> do
let loop :: A.Activity () ()
loop = do
hb ()
liftIO $ threadDelay 200_000
loop
loop
actDef = A.provideActivity defaultCodec "hbAfterShutdownAct" act
workflow :: MyWorkflow ()
workflow =
W.executeActivity actDef.reference
(W.defaultStartActivityOptions $ W.StartToClose $ seconds 30)
{ W.heartbeatTimeout = Just $ seconds 5 }
wf = W.provideWorkflow defaultCodec "hbAfterShutdownWf" workflow
conf = configure () (wf, actDef) $ do
baseConf
setGracefulShutdownPeriodMillis 1000
worker <- startWorker coreClient conf
let opts = defaultStartOptsWithTimeout taskQueue (seconds 15)
_ <- useClient (C.start wf.reference "hbAfterShutdown" opts)
liftIO $ threadDelay 500_000
shutdown worker
writeIORef heartbeatAfterShutdown True
readIORef heartbeatAfterShutdown `shouldReturn` True
Loading