diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala index 0399e386ba7..bf88fa6112c 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveManager.scala @@ -32,7 +32,8 @@ import org.apache.texera.amber.config.PythonUtils * for each Computing Unit * * It supports: - * - Creating and initializing isolated Python environments + * - Creating and initializing isolated Python environments (with system packages) + * - Installing user defined packages * - Streaming pip output logs back to the caller * * Each PVE is stored under: @@ -41,6 +42,11 @@ import org.apache.texera.amber.config.PythonUtils object PveManager { + case class PvePackageResponse( + pveName: String, + userPackages: Seq[String] + ) + private val VenvRoot: Path = Paths.get("/tmp/texera-pve/venvs") private def cuidDir(cuid: Int, pveName: String): Path = { @@ -66,6 +72,32 @@ object PveManager { Process(Seq(python, "-m", "pip", "freeze")).!!.split("\n").map(_.trim).filter(_.nonEmpty).toSeq } + private def runPipInstall( + python: String, + args: Seq[String], + queue: BlockingQueue[String] + ): Int = { + Process( + Seq( + python, + "-u", + "-m", + "pip", + "install", + "--progress-bar", + "off", + "--no-input" + ) ++ args, + None, + pipEnv.toSeq: _* + ).!( + ProcessLogger( + out => queue.put(s"[pip] $out"), + err => queue.put(s"[pip][ERR] $err") + ) + ) + } + /** * Creates a new PVE for a CU. * @@ -101,7 +133,6 @@ object PveManager { val venvDirPath = pveDir(cuid, pveName).toAbsolutePath val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString - val envVars = pipEnv val createVenvPython = PythonUtils.getPythonExecutable @@ -121,43 +152,19 @@ object PveManager { return } - if (!Files.exists(requirementsPath)) { - queue.put(s"[PVE][ERR] requirements.txt not found at ${requirementsPath.toAbsolutePath}") - return - } - - if (!Files.exists(operatorRequirementsPath)) { - queue.put( - s"[PVE][ERR] operator-requirements.txt not found at ${operatorRequirementsPath.toAbsolutePath}" - ) - return - } - queue.put( - s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and ${operatorRequirementsPath.toAbsolutePath}" + s"[PVE] Installing requirements from ${requirementsPath.toAbsolutePath} and operator requirements from ${operatorRequirementsPath.toAbsolutePath}" ) - val installReqCode = Process( + val installReqCode = runPipInstall( + python, Seq( - python, - "-u", - "-m", - "pip", - "install", - "--progress-bar", - "off", "-r", requirementsPath.toString, "-r", operatorRequirementsPath.toString ), - None, - envVars.toSeq: _* - ).!( - ProcessLogger( - out => queue.put(s"[pip] $out"), - err => queue.put(s"[pip][ERR] $err") - ) + queue ) queue.put(s"[PVE] requirements install finished with exit code $installReqCode") @@ -170,7 +177,8 @@ object PveManager { queue.put(s"[PVE] Created new environment for cuid = $cuid") } - def getEnvironments(cuid: Int): List[String] = { + // returns list of PVE names and corresponding user packages for a given CU + def getEnvironments(cuid: Int): List[PvePackageResponse] = { val cuPath = VenvRoot.resolve(cuid.toString) @@ -185,7 +193,27 @@ object PveManager { .iterator() .asScala .filter(path => Files.isDirectory(path)) - .map(path => path.getFileName.toString) + .map { path => + val pveName = path.getFileName.toString + val metadataPath = path.resolve("user-packages.txt") + + val userPackages = + if (Files.exists(metadataPath)) { + Files + .readAllLines(metadataPath) + .asScala + .map(_.trim) + .filter(_.nonEmpty) + .toSeq + } else { + Seq() + } + + PvePackageResponse( + pveName = pveName, + userPackages = userPackages + ) + } .toList } finally { stream.close() @@ -212,4 +240,75 @@ object PveManager { stream.close() } } + + /** + * Installs user requested Python packages into the PVE. + * + * 1. Executes pip install for each package + * 2. Updates user metadata file + * 3. Streams logs back via queue + */ + def installUserPackages( + packages: List[String], + cuid: Int, + queue: BlockingQueue[String], + pveName: String + ): Unit = { + + val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString + val envVars = pipEnv + + if (!Files.exists(Paths.get(python))) { + queue.put(s"[PVE][ERR] Python executable not found for PVE: $python") + return + } + + val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt") + + var installedPackages = + if (Files.exists(metadataPath)) { + Files + .readAllLines(metadataPath) + .asScala + .map(_.trim) + .filter(_.nonEmpty) + .toSet + } else { + Set[String]() + } + + packages.foreach { pkg => + val trimmedPkg = pkg.trim + + if (trimmedPkg.nonEmpty) { + queue.put(s"[PVE] Installing package: $trimmedPkg") + + val code = runPipInstall( + python, + Seq(trimmedPkg), + queue + ) + + queue.put(s"[pip] install($trimmedPkg) finished with exit code $code") + + if (code != 0) { + queue.put(s"[PVE][ERR] Failed to install package: $trimmedPkg") + return + } + + installedPackages = installedPackages + trimmedPkg + + Files.write( + metadataPath, + installedPackages.toSeq.sorted.asJava + ) + } + } + + queue.put("[PVE] Final user package list:") + + installedPackages.toSeq.sorted.foreach { pkg => + queue.put(s"[user-package] $pkg") + } + } } diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala index 1040fd64ea4..0a058ed6f5c 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResource.scala @@ -28,7 +28,7 @@ import java.util @Consumes(Array(MediaType.APPLICATION_JSON)) class PveResource { // -------------------------------------------------- - // Get installed packages + // Get system packages // -------------------------------------------------- @GET @Path("/system") @@ -45,7 +45,7 @@ class PveResource { } // -------------------------------------------------- - // Fetch PVEs + // Fetch PVEs and Installed User Packages // -------------------------------------------------- @GET @Path("/pves") @@ -54,9 +54,10 @@ class PveResource { try { PveManager .getEnvironments(cuid) - .map { pveName => + .map { pve => Map( - "pveName" -> pveName.asInstanceOf[Object] + "pveName" -> pve.pveName.asInstanceOf[Object], + "userPackages" -> pve.userPackages.asJava.asInstanceOf[Object] ).asJava } .asJava diff --git a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala index b93d1bfde03..7b46640626e 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveWebsocketResource.scala @@ -26,9 +26,9 @@ import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global /** - * WebSocket endpoint for PVE creation that streams pip installation logs - * to the frontend in real time. The environment setup runs asynchronously, - * and output is pushed to the client until completion. + * WebSocket endpoint for PVE creation and user pacakge installation that streams + * pip installation logs to the frontend in real time. The environment setup runs + * asynchronously, and output is pushed to the client until completion. */ @ServerEndpoint("/wsapi/pve") @@ -42,12 +42,33 @@ class PveWebsocketResource { val cuid = params.get("cuid").get(0).toInt val pveName = params.get("pveName").get(0) val isLocal = params.get("isLocal").get(0).toBoolean + val action = params.getOrDefault("action", java.util.List.of("create")).get(0) val queue = new LinkedBlockingQueue[String]() Future { try { - PveManager.createNewPve(cuid, queue, pveName, isLocal) + action match { + case "create" => + PveManager.createNewPve(cuid, queue, pveName, isLocal) + + case "install" => + val packages = + params + .getOrDefault("packages", java.util.List.of("[]")) + .get(0) + .stripPrefix("[") + .stripSuffix("]") + .split(",") + .toList + .map(_.replace("\"", "").trim) + .filter(_.nonEmpty) + + PveManager.installUserPackages(packages, cuid, queue, pveName) + + case _ => + queue.put(s"[ERR] Unknown action: $action") + } } catch { case e: Exception => queue.put(s"[ERR] ${e.getMessage}") @@ -61,7 +82,6 @@ class PveWebsocketResource { while (!done && session.isOpen) { val line = queue.take() - session.getBasicRemote.sendText(line) if (line == "__DONE__") { diff --git a/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala b/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala index a093cf1ad2f..fa79ddd86c1 100644 --- a/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala +++ b/amber/src/test/scala/org/apache/texera/web/resource/pythonvirtualenvironment/PveResourceSpec.scala @@ -64,7 +64,37 @@ class PveResourceSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach Files.exists(pythonPath) shouldBe true Files.exists(pipPath) shouldBe true - PveManager.getEnvironments(testCuid) should contain(testPveName) + PveManager.getEnvironments(testCuid).map(_.pveName) should contain(testPveName) + } + + "PveManager" should "install a user package and list it for the PVE" in { + PveManager.createNewPve(testCuid, queue, testPveName, isLocal = true) + + val packageName = "colorama" + val packageVersion = "0.4.6" + val packageSpec = s"$packageName==$packageVersion" + + queue.clear() + + PveManager.installUserPackages( + List(packageSpec), + testCuid, + queue, + testPveName + ) + + val logs = queueText() + + logs should not include "[PVE][ERR]" + logs should include(s"[PVE] Installing package: $packageSpec") + logs should include(s"[user-package] $packageSpec") + + val pve = PveManager + .getEnvironments(testCuid) + .find(_.pveName == testPveName) + + pve should not be empty + pve.get.userPackages should contain(packageSpec) } "PveManager" should "delete all PVEs for a computing unit" in { diff --git a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html index b742c71581c..6f16073073f 100644 --- a/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html +++ b/frontend/src/app/workspace/component/power-button/computing-unit-selection.component.html @@ -480,7 +480,7 @@