Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class InputManager(
case (uri, partitioning) =>
new InputPortMaterializationReaderThread(
uri = uri,
portId = portId,
inputMessageQueue = this.inputMessageQueue,
workerActorId = this.actorId,
partitioning = partitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ class DefaultResourceAllocator(
val toWorkerActorIds =
operatorConfigs(globalPortId.opId).workerConfigs.map(_.workerId)
val fromVirtualThreadActorIds = toWorkerActorIds.map(toWorkerActorId =>
getFromActorIdForInputPortStorage(inputMatUri.toString, toWorkerActorId)
getFromActorIdForInputPortStorage(
inputMatUri.toString,
globalPortId.portId,
toWorkerActorId
)
)
// Extract the input port partitionInfo defined in the physicalOp, defaulting to UnknownPartition.
val inputPortPartitionInfo = region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.texera.amber.core.virtualidentity.{
ChannelIdentity,
EmbeddedControlMessageIdentity
}
import org.apache.texera.amber.core.workflow.PortIdentity
import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.toPartitioner
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.{
NO_ALIGNMENT,
Expand All @@ -55,6 +56,7 @@ import scala.collection.mutable.ArrayBuffer

class InputPortMaterializationReaderThread(
uri: URI,
portId: PortIdentity,
inputMessageQueue: LinkedBlockingQueue[DPInputQueueElement],
workerActorId: ActorVirtualIdentity,
partitioning: Partitioning
Expand All @@ -65,7 +67,7 @@ class InputPortMaterializationReaderThread(
private lazy val channelId = {
// A unique channel between this thread (dummy actor) and the worker actor.
val fromActorId: ActorVirtualIdentity =
getFromActorIdForInputPortStorage(uri.toString, workerActorId)
getFromActorIdForInputPortStorage(uri.toString, portId, workerActorId)
ChannelIdentity(fromActorId, workerActorId, isControl = false)
}
private val partitioner = toPartitioner(partitioning, workerActorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ trait AssignPortHandler {
dp.inputManager.addPort(msg.portId, schema, inputPortURIs, partitionings)
inputPortURIStrs.foreach { uriStr =>
val toActorId = ctx.receiver
val fromActorId = getFromActorIdForInputPortStorage(uriStr, toActorId)
val fromActorId = getFromActorIdForInputPortStorage(uriStr, msg.portId, toActorId)
val channelId =
ChannelIdentity(fromWorkerId = fromActorId, toWorkerId = toActorId, isControl = false)
// Same as AddInputChannelHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow
import org.apache.texera.amber.operator.TestOperators
import org.apache.texera.amber.operator.difference.DifferenceOpDesc
import org.apache.texera.amber.operator.split.SplitOpDesc
import org.apache.texera.amber.operator.udf.python.{
DualInputPortsPythonUDFOpDescV2,
Expand Down Expand Up @@ -334,4 +335,42 @@ class ExpansionGreedyScheduleGeneratorSpec extends AnyFlatSpec with MockFactory
}
}

"RegionPlanGenerator" should "generate a runnable schedule for csv->difference with both ports from same csv" in {
val csv = TestOperators.headerlessSmallCsvScanOpDesc()
val diff = new DifferenceOpDesc()
val workflow = buildWorkflow(
List(csv, diff),
List(
LogicalLink(
csv.operatorIdentifier,
PortIdentity(),
diff.operatorIdentifier,
PortIdentity()
),
LogicalLink(
csv.operatorIdentifier,
PortIdentity(),
diff.operatorIdentifier,
PortIdentity(1)
)
),
new WorkflowContext()
)

val (schedule, _) = new ExpansionGreedyScheduleGenerator(
workflow.context,
workflow.physicalPlan
).generate()

val levels = schedule.levelSets
// Self-link to both ports must be broken into multiple region levels via materialization,
// otherwise the upstream csv blocks waiting on the dependent right port and execution deadlocks.
assert(
levels.size > 1,
s"expected multiple region levels (csv + materialized read), got ${levels.size} levels"
)
val regionList = levels.values.flatten.toList
assert(regionList.nonEmpty, "scheduler must produce at least one region")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.texera.amber.core.virtualidentity.{
PhysicalOpIdentity,
WorkflowIdentity
}
import org.apache.texera.amber.core.workflow.PortIdentity

import scala.util.matching.Regex

Expand Down Expand Up @@ -95,14 +96,26 @@ object VirtualIdentityUtils {
/**
* An input port materialization reader thread mimics the behavior of an upstream worker.
* Each thread has a virtual actor id. This method creates such a virtual actor id.
* @param storageURIStr The materialization location to read from.
*
* The destination port id is part of the identity so that when one upstream URI feeds
* multiple input ports of the same downstream worker (e.g. a single source connected to
* both inputs of Difference), each (uri, port, worker) triple still produces a distinct
* channel, preventing FIFO sequence collisions and end-of-channel markers from being
* routed to the wrong port.
*
* @param storageURIStr The materialization location to read from.
* @param toPortId The downstream input port the reader feeds.
* @param toWorkerActorId The worker actor that the thread belongs to.
* @return
*/
def getFromActorIdForInputPortStorage(
storageURIStr: String,
toPortId: PortIdentity,
toWorkerActorId: ActorVirtualIdentity
): ActorVirtualIdentity = {
ActorVirtualIdentity(MATERIALIZATION_READER_ACTOR_PREFIX + storageURIStr + toWorkerActorId.name)
ActorVirtualIdentity(
MATERIALIZATION_READER_ACTOR_PREFIX + storageURIStr +
s"_port${toPortId.id}${if (toPortId.internal) "i" else ""}_" +
toWorkerActorId.name
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.texera.amber.core.virtualidentity.{
PhysicalOpIdentity,
WorkflowIdentity
}
import org.apache.texera.amber.core.workflow.PortIdentity
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -152,12 +153,29 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers {

// ----- getFromActorIdForInputPortStorage -----

"getFromActorIdForInputPortStorage" should "prefix MATERIALIZATION_READER_ to the storage URI plus actor name" in {
"getFromActorIdForInputPortStorage" should "prefix MATERIALIZATION_READER_ and include the destination port id" in {
val toWorker = ActorVirtualIdentity("Worker:WF1-myOp-main-0")
val virtualReader = VirtualIdentityUtils.getFromActorIdForInputPortStorage(
"iceberg:/warehouse/x",
PortIdentity(0),
toWorker
)
virtualReader.name shouldBe "MATERIALIZATION_READER_iceberg:/warehouse/xWorker:WF1-myOp-main-0"
virtualReader.name shouldBe
"MATERIALIZATION_READER_iceberg:/warehouse/x_port0_Worker:WF1-myOp-main-0"
}

it should "produce distinct ids for the same uri+worker but different port ids" in {
val toWorker = ActorVirtualIdentity("Worker:WF1-myOp-main-0")
val left = VirtualIdentityUtils.getFromActorIdForInputPortStorage(
"iceberg:/warehouse/x",
PortIdentity(0),
toWorker
)
val right = VirtualIdentityUtils.getFromActorIdForInputPortStorage(
"iceberg:/warehouse/x",
PortIdentity(1),
toWorker
)
left should not be right
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DifferenceOpDesc extends LogicalOp {
OperatorGroupConstants.SET_GROUP,
inputPorts = List(
InputPort(PortIdentity(), displayName = "left"),
InputPort(PortIdentity(1), displayName = "right")
InputPort(PortIdentity(1), displayName = "right", dependencies = List(PortIdentity()))
),
outputPorts = List(OutputPort(blocking = true))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,19 @@ class DifferenceOpExecSpec extends AnyFlatSpec with BeforeAndAfter {
opExec.close()
}

it should "return empty set when same tuples are used as both inputs (self-difference)" in {
opExec.open()
counter = 0
val tuples = (1 to 5).map(_ => tuple()).toList

tuples.foreach(t => opExec.processTuple(t, input1))
assert(opExec.onFinish(input1).isEmpty)

tuples.foreach(t => opExec.processTuple(t, input2))
val outputTuples = opExec.onFinish(input2).toSet
assert(outputTuples.isEmpty)

opExec.close()
}

}
Loading