From ca3e520bf995915eb9cd8b39b0662ce826d395fd Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Wed, 6 May 2026 23:36:39 -0700 Subject: [PATCH 1/2] add port dependency to DifferenceOpDesc to prevent self-join deadlock --- .../operator/difference/DifferenceOpDesc.scala | 2 +- .../difference/DifferenceOpExecSpec.scala | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/difference/DifferenceOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/difference/DifferenceOpDesc.scala index c12f057a25b..ce4de985df5 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/difference/DifferenceOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/difference/DifferenceOpDesc.scala @@ -57,7 +57,7 @@ class DifferenceOpDesc extends LogicalOp { OperatorGroupConstants.SET_GROUP, inputPorts = List( InputPort(PortIdentity(), displayName = "left"), - InputPort(PortIdentity(1), displayName = "right") + InputPort(PortIdentity(1), displayName = "right", dependencies = List(PortIdentity())) ), outputPorts = List(OutputPort(blocking = true)) ) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpExecSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpExecSpec.scala index 6819ed5259c..080436a4843 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpExecSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/difference/DifferenceOpExecSpec.scala @@ -149,4 +149,19 @@ class DifferenceOpExecSpec extends AnyFlatSpec with BeforeAndAfter { opExec.close() } + it should "return empty set when same tuples are used as both inputs (self-difference)" in { + opExec.open() + counter = 0 + val tuples = (1 to 5).map(_ => tuple()).toList + + tuples.foreach(t => opExec.processTuple(t, input1)) + assert(opExec.onFinish(input1).isEmpty) + + tuples.foreach(t => opExec.processTuple(t, input2)) + val outputTuples = opExec.onFinish(input2).toSet + assert(outputTuples.isEmpty) + + opExec.close() + } + } From aa8d85241f2e1f5baf44e399e7d40e45d98dc112 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Fri, 8 May 2026 01:38:09 -0700 Subject: [PATCH 2/2] include port id in materialization reader actor id to fix self-join deadlock --- .../messaginglayer/InputManager.scala | 1 + .../resourcePolicies/ResourceAllocator.scala | 6 ++- ...InputPortMaterializationReaderThread.scala | 4 +- .../promisehandlers/AssignPortHandler.scala | 2 +- ...ExpansionGreedyScheduleGeneratorSpec.scala | 39 +++++++++++++++++++ .../amber/util/VirtualIdentityUtils.scala | 19 +++++++-- .../amber/util/VirtualIdentityUtilsSpec.scala | 22 ++++++++++- 7 files changed, 85 insertions(+), 8 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/InputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/InputManager.scala index 4e297829431..c68dbcb2e57 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/InputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/InputManager.scala @@ -78,6 +78,7 @@ class InputManager( case (uri, partitioning) => new InputPortMaterializationReaderThread( uri = uri, + portId = portId, inputMessageQueue = this.inputMessageQueue, workerActorId = this.actorId, partitioning = partitioning diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala index 2953ece0b34..a3970ad1609 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/resourcePolicies/ResourceAllocator.scala @@ -105,7 +105,11 @@ class DefaultResourceAllocator( val toWorkerActorIds = operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId) val fromVirtualThreadActorIds = toWorkerActorIds.map(toWorkerActorId => - getFromActorIdForInputPortStorage(inputMatUri.toString, toWorkerActorId) + getFromActorIdForInputPortStorage( + inputMatUri.toString, + globalPortId.portId, + toWorkerActorId + ) ) // Extract the input port partitionInfo defined in the physicalOp, defaulting to UnknownPartition. val inputPortPartitionInfo = region diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala index 10fbbc44a2c..c6bb79c9600 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala @@ -29,6 +29,7 @@ import org.apache.texera.amber.core.virtualidentity.{ ChannelIdentity, EmbeddedControlMessageIdentity } +import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.toPartitioner import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{ NO_ALIGNMENT, @@ -55,6 +56,7 @@ import scala.collection.mutable.ArrayBuffer class InputPortMaterializationReaderThread( uri: URI, + portId: PortIdentity, inputMessageQueue: LinkedBlockingQueue[DPInputQueueElement], workerActorId: ActorVirtualIdentity, partitioning: Partitioning @@ -65,7 +67,7 @@ class InputPortMaterializationReaderThread( private lazy val channelId = { // A unique channel between this thread (dummy actor) and the worker actor. val fromActorId: ActorVirtualIdentity = - getFromActorIdForInputPortStorage(uri.toString, workerActorId) + getFromActorIdForInputPortStorage(uri.toString, portId, workerActorId) ChannelIdentity(fromActorId, workerActorId, isControl = false) } private val partitioner = toPartitioner(partitioning, workerActorId) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index fe959733abb..a05b14752b2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -49,7 +49,7 @@ trait AssignPortHandler { dp.inputManager.addPort(msg.portId, schema, inputPortURIs, partitionings) inputPortURIStrs.foreach { uriStr => val toActorId = ctx.receiver - val fromActorId = getFromActorIdForInputPortStorage(uriStr, toActorId) + val fromActorId = getFromActorIdForInputPortStorage(uriStr, msg.portId, toActorId) val channelId = ChannelIdentity(fromWorkerId = fromActorId, toWorkerId = toActorId, isControl = false) // Same as AddInputChannelHandler diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala index e720b9c6cb5..e1463c29621 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGeneratorSpec.scala @@ -23,6 +23,7 @@ import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow import org.apache.texera.amber.operator.TestOperators +import org.apache.texera.amber.operator.difference.DifferenceOpDesc import org.apache.texera.amber.operator.split.SplitOpDesc import org.apache.texera.amber.operator.udf.python.{ DualInputPortsPythonUDFOpDescV2, @@ -334,4 +335,42 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory } } + "RegionPlanGenerator" should "generate a runnable schedule for csv->difference with both ports from same csv" in { + val csv = TestOperators.headerlessSmallCsvScanOpDesc() + val diff = new DifferenceOpDesc() + val workflow = buildWorkflow( + List(csv, diff), + List( + LogicalLink( + csv.operatorIdentifier, + PortIdentity(), + diff.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + csv.operatorIdentifier, + PortIdentity(), + diff.operatorIdentifier, + PortIdentity(1) + ) + ), + new WorkflowContext() + ) + + val (schedule, _) = new ExpansionGreedyScheduleGenerator( + workflow.context, + workflow.physicalPlan + ).generate() + + val levels = schedule.levelSets + // Self-link to both ports must be broken into multiple region levels via materialization, + // otherwise the upstream csv blocks waiting on the dependent right port and execution deadlocks. + assert( + levels.size > 1, + s"expected multiple region levels (csv + materialized read), got ${levels.size} levels" + ) + val regionList = levels.values.flatten.toList + assert(regionList.nonEmpty, "scheduler must produce at least one region") + } + } diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 031c4b8c7f9..84ccbfc1157 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -25,6 +25,7 @@ import org.apache.texera.amber.core.virtualidentity.{ PhysicalOpIdentity, WorkflowIdentity } +import org.apache.texera.amber.core.workflow.PortIdentity import scala.util.matching.Regex @@ -95,14 +96,26 @@ object VirtualIdentityUtils { /** * An input port materialization reader thread mimics the behavior of an upstream worker. * Each thread has a virtual actor id. This method creates such a virtual actor id. - * @param storageURIStr The materialization location to read from. + * + * The destination port id is part of the identity so that when one upstream URI feeds + * multiple input ports of the same downstream worker (e.g. a single source connected to + * both inputs of Difference), each (uri, port, worker) triple still produces a distinct + * channel, preventing FIFO sequence collisions and end-of-channel markers from being + * routed to the wrong port. + * + * @param storageURIStr The materialization location to read from. + * @param toPortId The downstream input port the reader feeds. * @param toWorkerActorId The worker actor that the thread belongs to. - * @return */ def getFromActorIdForInputPortStorage( storageURIStr: String, + toPortId: PortIdentity, toWorkerActorId: ActorVirtualIdentity ): ActorVirtualIdentity = { - ActorVirtualIdentity(MATERIALIZATION_READER_ACTOR_PREFIX + storageURIStr + toWorkerActorId.name) + ActorVirtualIdentity( + MATERIALIZATION_READER_ACTOR_PREFIX + storageURIStr + + s"_port${toPortId.id}${if (toPortId.internal) "i" else ""}_" + + toWorkerActorId.name + ) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index 4da024dd53b..b9053ea8be6 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -25,6 +25,7 @@ import org.apache.texera.amber.core.virtualidentity.{ PhysicalOpIdentity, WorkflowIdentity } +import org.apache.texera.amber.core.workflow.PortIdentity import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -152,12 +153,29 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { // ----- getFromActorIdForInputPortStorage ----- - "getFromActorIdForInputPortStorage" should "prefix MATERIALIZATION_READER_ to the storage URI plus actor name" in { + "getFromActorIdForInputPortStorage" should "prefix MATERIALIZATION_READER_ and include the destination port id" in { val toWorker = ActorVirtualIdentity("Worker:WF1-myOp-main-0") val virtualReader = VirtualIdentityUtils.getFromActorIdForInputPortStorage( "iceberg:/warehouse/x", + PortIdentity(0), toWorker ) - virtualReader.name shouldBe "MATERIALIZATION_READER_iceberg:/warehouse/xWorker:WF1-myOp-main-0" + virtualReader.name shouldBe + "MATERIALIZATION_READER_iceberg:/warehouse/x_port0_Worker:WF1-myOp-main-0" + } + + it should "produce distinct ids for the same uri+worker but different port ids" in { + val toWorker = ActorVirtualIdentity("Worker:WF1-myOp-main-0") + val left = VirtualIdentityUtils.getFromActorIdForInputPortStorage( + "iceberg:/warehouse/x", + PortIdentity(0), + toWorker + ) + val right = VirtualIdentityUtils.getFromActorIdForInputPortStorage( + "iceberg:/warehouse/x", + PortIdentity(1), + toWorker + ) + left should not be right } }