Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.texera.amber.config.PythonUtils
* for each Computing Unit
*
* It supports:
* - Creating and initializing isolated Python environments
* - Creating and initializing isolated Python environments (with system packages)
* - Installing user defined packages
* - Streaming pip output logs back to the caller
*
* Each PVE is stored under:
Expand All @@ -41,6 +42,11 @@ import org.apache.texera.amber.config.PythonUtils

object PveManager {

case class PvePackageResponse(
pveName: String,
userPackages: Seq[String]
)

private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs")

private def cuidDir(cuid: Int, pveName: String): Path = {
Expand All @@ -66,6 +72,32 @@ object PveManager {
Process(Seq(python, "-m", "pip", "freeze")).!!.split("\n").map(_.trim).filter(_.nonEmpty).toSeq
}

private def runPipInstall(
python: String,
args: Seq[String],
queue: BlockingQueue[String]
): Int = {
Process(
Seq(
python,
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"--no-input"
) ++ args,
None,
pipEnv.toSeq: _*
).!(
ProcessLogger(
out => queue.put(s"[pip] $out"),
err => queue.put(s"[pip][ERR] $err")
)
)
}

/**
* Creates a new PVE for a CU.
*
Expand Down Expand Up @@ -101,7 +133,6 @@ object PveManager {

val venvDirPath = pveDir(cuid, pveName).toAbsolutePath
val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
val envVars = pipEnv

val createVenvPython = PythonUtils.getPythonExecutable

Expand All @@ -121,43 +152,19 @@ object PveManager {
return
}

if (!Files.exists(requirementsPath)) {
queue.put(s"[PVE][ERR] requirements.txt not found at ${requirementsPath.toAbsolutePath}")
return
}

if (!Files.exists(operatorRequirementsPath)) {
queue.put(
s"[PVE][ERR] operator-requirements.txt not found at ${operatorRequirementsPath.toAbsolutePath}"
)
return
}

Comment thread
kunwp1 marked this conversation as resolved.
queue.put(
s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and ${operatorRequirementsPath.toAbsolutePath}"
)

val installReqCode = Process(
val installReqCode = runPipInstall(
python,
Seq(
python,
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"-r",
requirementsPath.toString,
"-r",
operatorRequirementsPath.toString
),
None,
envVars.toSeq: _*
).!(
ProcessLogger(
out => queue.put(s"[pip] $out"),
err => queue.put(s"[pip][ERR] $err")
)
queue
)

queue.put(s"[PVE] requirements install finished with exit code $installReqCode")
Expand All @@ -170,7 +177,8 @@ object PveManager {
queue.put(s"[PVE] Created new environment for cuid = $cuid")
}

def getEnvironments(cuid: Int): List[String] = {
// returns list of PVE names and corresponding user packages for a given CU
def getEnvironments(cuid: Int): List[PvePackageResponse] = {

val cuPath = VenvRoot.resolve(cuid.toString)

Expand All @@ -185,7 +193,27 @@ object PveManager {
.iterator()
.asScala
.filter(path => Files.isDirectory(path))
.map(path => path.getFileName.toString)
.map { path =>
val pveName = path.getFileName.toString
val metadataPath = path.resolve("user-packages.txt")

val userPackages =
if (Files.exists(metadataPath)) {
Files
.readAllLines(metadataPath)
.asScala
.map(_.trim)
.filter(_.nonEmpty)
.toSeq
} else {
Seq()
}

PvePackageResponse(
pveName = pveName,
userPackages = userPackages
)
}
.toList
} finally {
stream.close()
Expand All @@ -212,4 +240,75 @@ object PveManager {
stream.close()
}
}

/**
* Installs user requested Python packages into the PVE.
*
* 1. Executes pip install for each package
* 2. Updates user metadata file
* 3. Streams logs back via queue
*/
def installUserPackages(
packages: List[String],
cuid: Int,
queue: BlockingQueue[String],
pveName: String
): Unit = {

val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
val envVars = pipEnv

if (!Files.exists(Paths.get(python))) {
queue.put(s"[PVE][ERR] Python executable not found for PVE: $python")
return
}

val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt")

var installedPackages =
if (Files.exists(metadataPath)) {
Files
.readAllLines(metadataPath)
.asScala
.map(_.trim)
.filter(_.nonEmpty)
.toSet
} else {
Set[String]()
}

packages.foreach { pkg =>
val trimmedPkg = pkg.trim

if (trimmedPkg.nonEmpty) {
queue.put(s"[PVE] Installing package: $trimmedPkg")

val code = runPipInstall(
python,
Seq(trimmedPkg),
queue
)

queue.put(s"[pip] install($trimmedPkg) finished with exit code $code")

if (code != 0) {
queue.put(s"[PVE][ERR] Failed to install package: $trimmedPkg")
return
}

installedPackages = installedPackages + trimmedPkg
Comment thread
SarahAsad23 marked this conversation as resolved.

Files.write(
metadataPath,
installedPackages.toSeq.sorted.asJava
)
}
}

queue.put("[PVE] Final user package list:")

installedPackages.toSeq.sorted.foreach { pkg =>
queue.put(s"[user-package] $pkg")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.util
@Consumes(Array(MediaType.APPLICATION_JSON))
class PveResource {
// --------------------------------------------------
// Get installed packages
// Get system packages
// --------------------------------------------------
@GET
@Path("/system")
Expand All @@ -45,7 +45,7 @@ class PveResource {
}

// --------------------------------------------------
// Fetch PVEs
// Fetch PVEs and Installed User Packages
// --------------------------------------------------
@GET
@Path("/pves")
Expand All @@ -54,9 +54,10 @@ class PveResource {
try {
PveManager
.getEnvironments(cuid)
.map { pveName =>
.map { pve =>
Map(
"pveName" -> pveName.asInstanceOf[Object]
"pveName" -> pve.pveName.asInstanceOf[Object],
"userPackages" -> pve.userPackages.asJava.asInstanceOf[Object]
).asJava
}
.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

/**
* WebSocket endpoint for PVE creation that streams pip installation logs
* to the frontend in real time. The environment setup runs asynchronously,
* and output is pushed to the client until completion.
* WebSocket endpoint for PVE creation and user pacakge installation that streams
* pip installation logs to the frontend in real time. The environment setup runs
* asynchronously, and output is pushed to the client until completion.
*/

@ServerEndpoint("/wsapi/pve")
Expand All @@ -42,12 +42,33 @@ class PveWebsocketResource {
val cuid = params.get("cuid").get(0).toInt
val pveName = params.get("pveName").get(0)
val isLocal = params.get("isLocal").get(0).toBoolean
val action = params.getOrDefault("action", java.util.List.of("create")).get(0)

val queue = new LinkedBlockingQueue[String]()

Future {
try {
PveManager.createNewPve(cuid, queue, pveName, isLocal)
action match {
case "create" =>
PveManager.createNewPve(cuid, queue, pveName, isLocal)

case "install" =>
val packages =
params
.getOrDefault("packages", java.util.List.of("[]"))
.get(0)
.stripPrefix("[")
.stripSuffix("]")
.split(",")
.toList
.map(_.replace("\"", "").trim)
.filter(_.nonEmpty)

PveManager.installUserPackages(packages, cuid, queue, pveName)

case _ =>
queue.put(s"[ERR] Unknown action: $action")
}
} catch {
case e: Exception =>
queue.put(s"[ERR] ${e.getMessage}")
Expand All @@ -61,7 +82,6 @@ class PveWebsocketResource {

while (!done && session.isOpen) {
val line = queue.take()

session.getBasicRemote.sendText(line)

if (line == "__DONE__") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,37 @@ class PveResourceSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach
Files.exists(pythonPath) shouldBe true
Files.exists(pipPath) shouldBe true

PveManager.getEnvironments(testCuid) should contain(testPveName)
PveManager.getEnvironments(testCuid).map(_.pveName) should contain(testPveName)
}

"PveManager" should "install a user package and list it for the PVE" in {
PveManager.createNewPve(testCuid, queue, testPveName, isLocal = true)

val packageName = "colorama"
val packageVersion = "0.4.6"
val packageSpec = s"$packageName==$packageVersion"

queue.clear()

PveManager.installUserPackages(
List(packageSpec),
testCuid,
queue,
testPveName
)

val logs = queueText()

logs should not include "[PVE][ERR]"
logs should include(s"[PVE] Installing package: $packageSpec")
logs should include(s"[user-package] $packageSpec")

val pve = PveManager
.getEnvironments(testCuid)
.find(_.pveName == testPveName)

pve should not be empty
pve.get.userPackages should contain(packageSpec)
}

"PveManager" should "delete all PVEs for a computing unit" in {
Expand Down
Loading