diff --git a/Test/DurableTask.Core.Tests/ContinueAsNewTraceBehaviorTests.cs b/Test/DurableTask.Core.Tests/ContinueAsNewTraceBehaviorTests.cs new file mode 100644 index 000000000..a5b2a4560 --- /dev/null +++ b/Test/DurableTask.Core.Tests/ContinueAsNewTraceBehaviorTests.cs @@ -0,0 +1,486 @@ +// --------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// --------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Tests +{ + using DurableTask.Core.Command; + using DurableTask.Core.History; + using DurableTask.Core.Tracing; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using System.Runtime.Serialization.Json; + using System.Threading; + using System.Threading.Tasks; + + /// + /// Tests for the ContinueAsNew fresh-trace feature. + /// + /// Background: Long-running periodic orchestrations that use ContinueAsNew accumulate all + /// generations into a single distributed trace, making individual cycles hard to observe. + /// This feature adds an opt-in option + /// that starts the next generation in a fresh trace while preserving the default lineage + /// behavior for existing users. + /// + /// The trace identity lifecycle: + /// 1. Orchestrator calls ContinueAsNew(version, input, options) with StartNewTrace. + /// 2. Dispatcher creates the next ExecutionStartedEvent with GenerateNewTrace = true + /// and does NOT copy the old ParentTraceContext. + /// 3. TraceHelper.StartTraceActivityForOrchestrationExecution sees GenerateNewTrace, + /// creates a fresh root producer span, stores its identity in ParentTraceContext, + /// and resets GenerateNewTrace = false. + /// 4. On subsequent replays, GenerateNewTrace is false and the persisted ParentTraceContext + /// identity is used — ensuring stable span identity across replays. + /// + [TestClass] + public class ContinueAsNewTraceBehaviorTests + { + private ActivityListener? listener; + + [TestInitialize] + public void Setup() + { + // Set up an ActivityListener so System.Diagnostics.Activity spans are actually created. + listener = new ActivityListener + { + ShouldListenTo = source => source.Name == "DurableTask.Core", + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + }; + ActivitySource.AddActivityListener(listener); + } + + [TestCleanup] + public void Cleanup() + { + DistributedTraceActivity.Current?.Stop(); + DistributedTraceActivity.Current = null; + listener?.Dispose(); + } + + #region ExecutionStartedEvent.GenerateNewTrace property + + [TestMethod] + public void GenerateNewTrace_DefaultIsFalse() + { + // A new event should default to false so existing behavior is unchanged. + var evt = new ExecutionStartedEvent(-1, "input"); + Assert.IsFalse(evt.GenerateNewTrace); + } + + [TestMethod] + public void GenerateNewTrace_CopyConstructorPreservesValue() + { + var original = new ExecutionStartedEvent(-1, "input") + { + GenerateNewTrace = true, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + var copy = new ExecutionStartedEvent(original); + Assert.IsTrue(copy.GenerateNewTrace, "Copy constructor should preserve GenerateNewTrace = true"); + } + + [TestMethod] + public void GenerateNewTrace_SurvivesJsonSerialization() + { + // GenerateNewTrace must survive serialization because the event is persisted + // to storage and must signal TraceHelper on the first execution. + var original = new ExecutionStartedEvent(-1, "input") + { + GenerateNewTrace = true, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + var serializer = new DataContractJsonSerializer(typeof(ExecutionStartedEvent)); + using var stream = new MemoryStream(); + serializer.WriteObject(stream, original); + + stream.Position = 0; + var deserialized = (ExecutionStartedEvent?)serializer.ReadObject(stream); + + Assert.IsNotNull(deserialized); + Assert.IsTrue(deserialized.GenerateNewTrace, "GenerateNewTrace should survive JSON round-trip"); + } + + [TestMethod] + public void GenerateNewTrace_FalseByDefault_BackwardCompatible() + { + // An event serialized without GenerateNewTrace should deserialize with false. + // This simulates loading a pre-upgrade event from storage. + var oldEvent = new ExecutionStartedEvent(-1, "input") + { + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + var serializer = new DataContractJsonSerializer(typeof(ExecutionStartedEvent)); + using var stream = new MemoryStream(); + serializer.WriteObject(stream, oldEvent); + + stream.Position = 0; + var deserialized = (ExecutionStartedEvent?)serializer.ReadObject(stream); + + Assert.IsNotNull(deserialized); + Assert.IsFalse(deserialized.GenerateNewTrace, "Pre-upgrade events should default to false"); + } + + #endregion + + #region Tag isolation — GenerateNewTrace does NOT leak through tags + + [TestMethod] + public void GenerateNewTrace_DoesNotAppearInTags() + { + // The property-based approach should never pollute the customer-facing Tags dictionary. + var evt = new ExecutionStartedEvent(-1, "input") + { + GenerateNewTrace = true, + Tags = new Dictionary { { "user-tag", "value" } }, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + Assert.IsFalse(evt.Tags.ContainsKey("MS_CreateTrace"), + "GenerateNewTrace should use a typed property, not a tag"); + Assert.IsFalse(evt.Tags.ContainsKey("GenerateNewTrace"), + "GenerateNewTrace should not appear as a tag"); + } + + [TestMethod] + public void GenerateNewTrace_DoesNotLeakThroughTagCloning() + { + // This is the key test for the tag-leak bug found during review. + // GenerateNewTrace is a property, not a tag, so it cannot leak through tag cloning. + var genNTags = new Dictionary { { "app-tag", "hello" } }; + + // Simulate dispatcher creating Gen N+1's event with StartNewTrace + var genN1Event = new ExecutionStartedEvent(-1, "input") + { + Tags = new Dictionary(genNTags), // clone of Gen N's tags + GenerateNewTrace = true, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + // Simulate Gen N+1 doing a default ContinueAsNew (no StartNewTrace). + // Dispatcher clones Gen N+1's tags but sets GenerateNewTrace from the action (false). + var genN2Tags = new Dictionary(genN1Event.Tags); + var genN2Event = new ExecutionStartedEvent(-1, "input") + { + Tags = genN2Tags, + GenerateNewTrace = false, // from the action, not inherited + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec2" }, + Name = "TestOrch", + }; + + Assert.IsFalse(genN2Event.GenerateNewTrace, + "GenerateNewTrace must not leak to subsequent generations through tag cloning"); + Assert.AreEqual(1, genN2Event.Tags.Count, "Only application tags should be present"); + } + + #endregion + + #region OrchestrationCompleteOrchestratorAction + + [TestMethod] + public void Action_ContinueAsNewTraceBehavior_DefaultIsPreserve() + { + var action = new OrchestrationCompleteOrchestratorAction(); + Assert.AreEqual(ContinueAsNewTraceBehavior.PreserveTraceContext, action.ContinueAsNewTraceBehavior); + } + + [TestMethod] + public void Action_ContinueAsNewTraceBehavior_CanBeSetToStartNewTrace() + { + var action = new OrchestrationCompleteOrchestratorAction + { + ContinueAsNewTraceBehavior = ContinueAsNewTraceBehavior.StartNewTrace, + }; + Assert.AreEqual(ContinueAsNewTraceBehavior.StartNewTrace, action.ContinueAsNewTraceBehavior); + } + + #endregion + + #region TraceHelper — GenerateNewTrace consumption and trace creation + + [TestMethod] + public void TraceHelper_GenerateNewTrace_CreatesNewRootTrace() + { + // When GenerateNewTrace=true and no ParentTraceContext, TraceHelper should create + // a fresh producer span (new root trace) and then create the orchestration span. + var startEvent = new ExecutionStartedEvent(-1, "input") + { + GenerateNewTrace = true, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + Activity? activity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent); + + Assert.IsNotNull(activity, "Should create an orchestration activity for fresh trace"); + Assert.IsFalse(startEvent.GenerateNewTrace, "GenerateNewTrace should be reset after consumption"); + Assert.IsNotNull(startEvent.ParentTraceContext, "ParentTraceContext should be set by the producer span"); + Assert.IsNotNull(startEvent.ParentTraceContext.Id, "Durable Id should be stored for replay"); + Assert.IsNotNull(startEvent.ParentTraceContext.SpanId, "Durable SpanId should be stored for replay"); + + activity.Stop(); + DistributedTraceActivity.Current = null; + } + + [TestMethod] + public void TraceHelper_GenerateNewTrace_ReplayUsesPersistedIdentity() + { + // Simulates: first execution creates a fresh trace and persists identity. + // Subsequent replay loads the event with GenerateNewTrace=false and persisted + // Id/SpanId. The orchestration span should restore the same identity. + var startEvent = new ExecutionStartedEvent(-1, "input") + { + GenerateNewTrace = true, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + // First execution — creates fresh trace + Activity? firstActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent); + Assert.IsNotNull(firstActivity); + + string firstTraceId = firstActivity.TraceId.ToString(); + string firstSpanId = firstActivity.SpanId.ToString(); + + firstActivity.Stop(); + DistributedTraceActivity.Current = null; + + // Simulate replay — GenerateNewTrace was reset, Id/SpanId persisted + Assert.IsFalse(startEvent.GenerateNewTrace); + + Activity? replayActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent); + Assert.IsNotNull(replayActivity); + Assert.AreEqual(firstTraceId, replayActivity.TraceId.ToString(), + "Replay should use the same trace ID from the persisted identity"); + Assert.AreEqual(firstSpanId, replayActivity.SpanId.ToString(), + "Replay should use the same span ID from the persisted identity"); + + replayActivity.Stop(); + DistributedTraceActivity.Current = null; + } + + [TestMethod] + public void TraceHelper_PreserveTrace_NullParentReturnsNull() + { + // Default behavior: GenerateNewTrace=false and no ParentTraceContext → no activity. + var startEvent = new ExecutionStartedEvent(-1, "input") + { + GenerateNewTrace = false, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + Activity? activity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent); + Assert.IsNull(activity, "No activity should be created when there's no parent trace context"); + } + + [TestMethod] + public void TraceHelper_GenerateNewTrace_ProducesNewTraceId_NotInheritedFromAmbient() + { + // Verify the fresh trace gets a genuinely new trace ID, not inherited from ambient. + using var ambientActivity = new Activity("ambient-parent"); + ambientActivity.SetIdFormat(ActivityIdFormat.W3C); + ambientActivity.Start(); + string ambientTraceId = ambientActivity.TraceId.ToString(); + + var startEvent = new ExecutionStartedEvent(-1, "input") + { + GenerateNewTrace = true, + OrchestrationInstance = new OrchestrationInstance { InstanceId = "test", ExecutionId = "exec1" }, + Name = "TestOrch", + }; + + // Stop ambient before calling TraceHelper (mirrors real dispatcher behavior + // where the previous generation's activity is stopped before the next starts) + ambientActivity.Stop(); + + Activity? activity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent); + Assert.IsNotNull(activity); + Assert.AreNotEqual(ambientTraceId, activity.TraceId.ToString(), + "Fresh trace should NOT inherit the ambient trace ID"); + + activity.Stop(); + DistributedTraceActivity.Current = null; + } + + #endregion + + #region TaskOrchestrationContext — ContinueAsNew overloads + + [TestMethod] + public void Context_ContinueAsNew_WithStartNewTrace_SetsTraceBehavior() + { + var instance = new OrchestrationInstance { InstanceId = "test", ExecutionId = Guid.NewGuid().ToString() }; + var context = new TestableTaskOrchestrationContext(instance, TaskScheduler.Default); + + context.ContinueAsNew(null, "test-input", new ContinueAsNewOptions + { + TraceBehavior = ContinueAsNewTraceBehavior.StartNewTrace, + }); + + // Verify the pending action has the correct trace behavior + var actions = context.GetActions(); + Assert.AreEqual(1, actions.Count); + var completeAction = (OrchestrationCompleteOrchestratorAction)actions[0]; + Assert.AreEqual(OrchestrationStatus.ContinuedAsNew, completeAction.OrchestrationStatus); + Assert.AreEqual(ContinueAsNewTraceBehavior.StartNewTrace, completeAction.ContinueAsNewTraceBehavior); + } + + [TestMethod] + public void Context_ContinueAsNew_Default_PreservesTrace() + { + var instance = new OrchestrationInstance { InstanceId = "test", ExecutionId = Guid.NewGuid().ToString() }; + var context = new TestableTaskOrchestrationContext(instance, TaskScheduler.Default); + + context.ContinueAsNew("input"); + + var actions = context.GetActions(); + Assert.AreEqual(1, actions.Count); + var completeAction = (OrchestrationCompleteOrchestratorAction)actions[0]; + Assert.AreEqual(ContinueAsNewTraceBehavior.PreserveTraceContext, completeAction.ContinueAsNewTraceBehavior); + } + + [TestMethod] + public void Context_ContinueAsNew_WithVersion_SetsTraceBehavior() + { + var instance = new OrchestrationInstance { InstanceId = "test", ExecutionId = Guid.NewGuid().ToString() }; + var context = new TestableTaskOrchestrationContext(instance, TaskScheduler.Default); + + context.ContinueAsNew("2.0", "test-input", new ContinueAsNewOptions + { + TraceBehavior = ContinueAsNewTraceBehavior.StartNewTrace, + }); + + var actions = context.GetActions(); + Assert.AreEqual(1, actions.Count); + var completeAction = (OrchestrationCompleteOrchestratorAction)actions[0]; + Assert.AreEqual("2.0", completeAction.NewVersion); + Assert.AreEqual(ContinueAsNewTraceBehavior.StartNewTrace, completeAction.ContinueAsNewTraceBehavior); + } + + [TestMethod] + public void Context_ContinueAsNew_LastCallWins() + { + // When ContinueAsNew is called multiple times, the last call's options win. + var instance = new OrchestrationInstance { InstanceId = "test", ExecutionId = Guid.NewGuid().ToString() }; + var context = new TestableTaskOrchestrationContext(instance, TaskScheduler.Default); + + context.ContinueAsNew(null, "input1", new ContinueAsNewOptions + { + TraceBehavior = ContinueAsNewTraceBehavior.StartNewTrace, + }); + + // Second call with default behavior should overwrite + context.ContinueAsNew(null, "input2", new ContinueAsNewOptions + { + TraceBehavior = ContinueAsNewTraceBehavior.PreserveTraceContext, + }); + + var actions = context.GetActions(); + Assert.AreEqual(1, actions.Count); + var completeAction = (OrchestrationCompleteOrchestratorAction)actions[0]; + Assert.AreEqual(ContinueAsNewTraceBehavior.PreserveTraceContext, completeAction.ContinueAsNewTraceBehavior, + "Last ContinueAsNew call should win"); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void Context_ContinueAsNew_NullOptions_Throws() + { + var instance = new OrchestrationInstance { InstanceId = "test", ExecutionId = Guid.NewGuid().ToString() }; + var context = new TestableTaskOrchestrationContext(instance, TaskScheduler.Default); + context.ContinueAsNew(null, "input", (ContinueAsNewOptions)null!); + } + + #endregion + + #region Base class — NotSupportedException for unsupported implementations + + [TestMethod] + [ExpectedException(typeof(NotSupportedException))] + public void BaseClass_ContinueAsNewWithOptions_ThrowsNotSupported() + { + var ctx = new MinimalOrchestrationContext(); + ctx.ContinueAsNew("1.0", "input", new ContinueAsNewOptions()); + } + + #endregion + + #region ContinueAsNewOptions defaults + + [TestMethod] + public void ContinueAsNewOptions_DefaultTraceBehavior_IsPreserve() + { + var options = new ContinueAsNewOptions(); + Assert.AreEqual(ContinueAsNewTraceBehavior.PreserveTraceContext, options.TraceBehavior); + } + + #endregion + + #region Test helpers + + /// + /// A minimal OrchestrationContext subclass that does NOT override the options overload. + /// Used to verify the base class throws NotSupportedException. + /// + private class MinimalOrchestrationContext : OrchestrationContext + { + public override void ContinueAsNew(object input) { } + public override void ContinueAsNew(string newVersion, object input) { } + public override Task CreateSubOrchestrationInstance(string name, string version, string instanceId, object input) + => throw new NotImplementedException(); + public override Task CreateSubOrchestrationInstance(string name, string version, string instanceId, object input, IDictionary tags) + => throw new NotImplementedException(); + public override Task CreateSubOrchestrationInstance(string name, string version, object input) + => throw new NotImplementedException(); + public override Task ScheduleTask(string name, string version, params object[] parameters) + => throw new NotImplementedException(); + public override Task CreateTimer(DateTime fireAt, T state) + => throw new NotImplementedException(); + public override Task CreateTimer(DateTime fireAt, T state, CancellationToken cancelToken) + => throw new NotImplementedException(); + public override void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData) + => throw new NotImplementedException(); + } + + /// + /// A testable TaskOrchestrationContext that exposes the pending actions. + /// ContinueAsNew is stored internally until CompleteOrchestration is called, + /// at which point it becomes visible through OrchestratorActions. + /// + private class TestableTaskOrchestrationContext : TaskOrchestrationContext + { + public TestableTaskOrchestrationContext(OrchestrationInstance instance, TaskScheduler scheduler) + : base(instance, scheduler) + { + CurrentUtcDateTime = DateTime.UtcNow; + } + + public IReadOnlyList GetActions() + { + // Trigger the completion path that moves continueAsNew into the actions map + CompleteOrchestration("result", null, OrchestrationStatus.Completed); + return OrchestratorActions.ToList(); + } + + public override Task ScheduleTask(string name, string version, params object[] parameters) + => base.ScheduleTask(name, version, parameters); + + public override Task CreateTimer(DateTime fireAt, T state, CancellationToken cancelToken) + => Task.FromResult(state); + } + + #endregion + } +} diff --git a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs index 54abd5225..6b5d0316b 100644 --- a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs +++ b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs @@ -61,5 +61,12 @@ public class OrchestrationCompleteOrchestratorAction : OrchestratorAction /// Gets a collection of tags associated with the completion action. /// public IDictionary Tags { get; } = new Dictionary(); + + /// + /// Gets or sets how distributed tracing should behave for the next ContinueAsNew generation. + /// Defaults to . + /// + public ContinueAsNewTraceBehavior ContinueAsNewTraceBehavior { get; set; } = + ContinueAsNewTraceBehavior.PreserveTraceContext; } } \ No newline at end of file diff --git a/src/DurableTask.Core/ContinueAsNewOptions.cs b/src/DurableTask.Core/ContinueAsNewOptions.cs new file mode 100644 index 000000000..38e833466 --- /dev/null +++ b/src/DurableTask.Core/ContinueAsNewOptions.cs @@ -0,0 +1,46 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core +{ + /// + /// Configures how an orchestration continues as new. + /// + public sealed class ContinueAsNewOptions + { + /// + /// Gets or sets how distributed tracing should behave for the next generation. + /// The default is , + /// which keeps the next generation in the same distributed trace. + /// + public ContinueAsNewTraceBehavior TraceBehavior { get; set; } = + ContinueAsNewTraceBehavior.PreserveTraceContext; + } + + /// + /// Describes how distributed tracing should behave for the next ContinueAsNew generation. + /// + public enum ContinueAsNewTraceBehavior + { + /// + /// Preserve the current trace lineage across generations. This is the default. + /// + PreserveTraceContext = 0, + + /// + /// Start the next generation in a fresh distributed trace. Useful for long-running + /// periodic orchestrations where each cycle should be independently observable. + /// + StartNewTrace = 1, + } +} diff --git a/src/DurableTask.Core/History/ExecutionStartedEvent.cs b/src/DurableTask.Core/History/ExecutionStartedEvent.cs index 59c6b8202..b68b2c48e 100644 --- a/src/DurableTask.Core/History/ExecutionStartedEvent.cs +++ b/src/DurableTask.Core/History/ExecutionStartedEvent.cs @@ -71,6 +71,7 @@ internal ExecutionStartedEvent(ExecutionStartedEvent other) Correlation = other.Correlation; ScheduledStartTime = other.ScheduledStartTime; Generation = other.Generation; + GenerateNewTrace = other.GenerateNewTrace; } /// @@ -133,6 +134,15 @@ internal ExecutionStartedEvent(ExecutionStartedEvent other) [DataMember] public int? Generation { get; set; } + /// + /// When true, indicates that this execution should start a fresh distributed trace + /// rather than inheriting the trace context from the previous generation. + /// This flag is consumed once by the trace infrastructure and reset to false after + /// the new trace is created, so that subsequent replays use the persisted trace identity. + /// + [DataMember] + public bool GenerateNewTrace { get; set; } + // Used for Continue-as-New scenarios internal void SetParentTraceContext(ExecutionStartedEvent parent) { diff --git a/src/DurableTask.Core/OrchestrationContext.cs b/src/DurableTask.Core/OrchestrationContext.cs index 63179f485..1f405a6ec 100644 --- a/src/DurableTask.Core/OrchestrationContext.cs +++ b/src/DurableTask.Core/OrchestrationContext.cs @@ -451,6 +451,33 @@ public abstract Task CreateSubOrchestrationInstance(string name, string ve /// public abstract void ContinueAsNew(string newVersion, object input); + /// + /// Checkpoint the orchestration instance by completing the current execution in the ContinueAsNew + /// state and creating a new execution of this instance with the specified input parameter. + /// This overload allows the caller to customize how the next generation behaves, such as + /// starting a fresh distributed trace. + /// + /// + /// New version of the orchestration to start. Pass null to keep the current version. + /// + /// + /// Input to the new execution of this instance. This is the same type as the one used to start + /// the first execution of this orchestration instance. + /// + /// + /// Options that customize the next generation. + /// + /// + /// Thrown if the current implementation does not support + /// . Override this method in a derived class to add support. + /// + public virtual void ContinueAsNew(string newVersion, object input, ContinueAsNewOptions options) + { + throw new NotSupportedException( + $"This {GetType().Name} implementation does not support ContinueAsNewOptions. " + + "Override this method in a derived class to add support."); + } + /// /// Create a proxy client class to schedule remote TaskActivities via a strongly typed interface. /// diff --git a/src/DurableTask.Core/TaskOrchestrationContext.cs b/src/DurableTask.Core/TaskOrchestrationContext.cs index 4972e6fcd..d9ec5b6ab 100644 --- a/src/DurableTask.Core/TaskOrchestrationContext.cs +++ b/src/DurableTask.Core/TaskOrchestrationContext.cs @@ -249,15 +249,25 @@ public override void SendEvent(OrchestrationInstance orchestrationInstance, stri public override void ContinueAsNew(object input) { - ContinueAsNew(null, input); + ContinueAsNewCore(null, input, new ContinueAsNewOptions()); } public override void ContinueAsNew(string newVersion, object input) { - ContinueAsNewCore(newVersion, input); + ContinueAsNewCore(newVersion, input, new ContinueAsNewOptions()); } - void ContinueAsNewCore(string newVersion, object input) + public override void ContinueAsNew(string newVersion, object input, ContinueAsNewOptions options) + { + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + + ContinueAsNewCore(newVersion, input, options); + } + + void ContinueAsNewCore(string newVersion, object input, ContinueAsNewOptions options) { string serializedInput = this.MessageDataConverter.SerializeInternal(input); @@ -265,7 +275,8 @@ void ContinueAsNewCore(string newVersion, object input) { Result = serializedInput, OrchestrationStatus = OrchestrationStatus.ContinuedAsNew, - NewVersion = newVersion + NewVersion = newVersion, + ContinueAsNewTraceBehavior = options.TraceBehavior, }; } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index c85536793..faf31cf10 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -26,6 +26,7 @@ namespace DurableTask.Core using System; using System.Collections.Generic; using System.Diagnostics; + using System.Globalization; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -641,8 +642,20 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext; }); - // Copy the distributed trace context, if any - continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent); + // Copy the distributed trace context to preserve lineage, unless + // the next generation was explicitly requested to start a fresh trace. + if (!continueAsNewExecutionStarted!.GenerateNewTrace) + { + continueAsNewExecutionStarted.SetParentTraceContext(runtimeState.ExecutionStartedEvent); + } + else + { + // Stamp the request time so the producer span created by TraceHelper + // uses an accurate start time instead of the dequeue time. + continueAsNewExecutionStarted.Tags ??= new Dictionary(); + continueAsNewExecutionStarted.Tags[OrchestrationTags.RequestTime] = + DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture); + } runtimeState = new OrchestrationRuntimeState(); runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); @@ -1055,10 +1068,13 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt InstanceId = runtimeState.OrchestrationInstance!.InstanceId, ExecutionId = Guid.NewGuid().ToString("N") }, - Tags = runtimeState.Tags, + // Clone tags to avoid mutating the current generation's tag dictionary + Tags = runtimeState.Tags != null ? new Dictionary(runtimeState.Tags) : null, ParentInstance = runtimeState.ParentInstance, Name = runtimeState.Name, - Version = completeOrchestratorAction.NewVersion ?? runtimeState.Version + Version = completeOrchestratorAction.NewVersion ?? runtimeState.Version, + // Signal that the next generation should start a fresh distributed trace + GenerateNewTrace = completeOrchestratorAction.ContinueAsNewTraceBehavior == ContinueAsNewTraceBehavior.StartNewTrace, }; taskMessage.OrchestrationInstance = startedEvent.OrchestrationInstance; diff --git a/src/DurableTask.Core/Tracing/TraceHelper.cs b/src/DurableTask.Core/Tracing/TraceHelper.cs index 9c45b2889..b8ad17de7 100644 --- a/src/DurableTask.Core/Tracing/TraceHelper.cs +++ b/src/DurableTask.Core/Tracing/TraceHelper.cs @@ -95,9 +95,12 @@ public class TraceHelper return null; } - if (startEvent.Tags != null && startEvent.Tags.ContainsKey(OrchestrationTags.CreateTraceForNewOrchestration)) + // When GenerateNewTrace is set, create a fresh root trace for this orchestration. + // The flag is consumed once and reset so that subsequent replays use the + // persisted trace identity rather than creating yet another new trace. + if (startEvent.GenerateNewTrace) { - startEvent.Tags.Remove(OrchestrationTags.CreateTraceForNewOrchestration); + startEvent.GenerateNewTrace = false; // Note that if we create the trace activity for starting a new orchestration here, then its duration will be longer since its end time will be set to once we // start processing the orchestration rather than when the request for a new orchestration is committed to storage. using var activityForNewOrchestration = StartActivityForNewOrchestration(startEvent);