From 45a2420f7a0311db13cdb32392c3aadd306e36a7 Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Tue, 12 May 2026 16:29:49 -0400 Subject: [PATCH 1/2] feat(bootstrapper): run phase items in parallel when possible MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make all shared state in `Bootstrapper` and `BootstrapRequirementResolver` thread-safe, then restructure the `bootstrap()` main loop to process work in uniform batches: collect consecutive same-phase items → run via `ThreadPoolExecutor` → restore results to stack in original order. `BootstrapPhase.can_parallelize` controls whether a batch collects multiple items or just one. All work goes through the executor; batch size determines effective parallelism. Phases that install packages are marked as needing to be run in serial to ensure that they are processed one at a time after all dependencies are actually built. This allows most work to happen in parallel, at least somewhat, based on the content of the work item stack. Pass `ctx.settings.max_jobs` as `max_workers` to `ThreadPoolExecutor` so the number of parallel threads respects the `--jobs` flag. `None` (no flag given) uses the executor's default; a positive int caps the thread count. Thread-safety changes: - Use contextvars.ContextVar for the `why` dependency chain stack. ContextVar is safe across both threads and asyncio contexts. ThreadPoolExecutor copies the current context for each submitted task (Python 3.7+), so each task naturally inherits the caller's context without manual snapshot/restore overhead. - `self._state_lock` added to `Bootstrapper`; held at all check-then-set write sites: `_phase_start` (seen dedup), `_add_to_build_order`, `_add_to_graph`, `_handle_phase_error` (multiple-versions block), `_record_test_mode_failure`, and progressbar calls in `_handle_build_requirements` - Add `_wheel_dir_lock` to `Bootstrapper.__init__` and hold it around the operations that can race. - Extract `UPDATE_BUILD_SEQUENCE` as a separate phase so it can be run serially to ensure items are added in the right order. - The `bootstrap` command always sets the worker pool size to 1, so even though tasks run in a thread they are serialized. - Register an `add_done_callback` on each submitted future so the progress bar updates as individual items complete rather than waiting for the entire batch. Serial phases update inline after each item. The callback holds `_state_lock` around progress bar calls for thread safety. - Add `GET_BUILD_DEPS` as a new parallel phase after the existing `PREPARE_BUILD` phase. `PREPARE_BUILD` now only installs build system deps (must be serial to avoid races between packages sharing build deps). `GET_BUILD_DEPS` queries build backend and sdist deps, which are independent per-package and safe to run concurrently. - Split `INSTALL_BUILD_DEPS` phase out of the `BUILD` phase to install build backend/sdist deps into the build environment; guaranteed safe because only one package runs this at a time, so every dep's wheel is in the mirror before installation. Closes: #1149 Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Doug Hellmann --- e2e/ci_bootstrap_parallel_suite.sh | 1 + ...st_bootstrap_parallel_multiple_versions.sh | 97 ++++ src/fromager/bootstrapper.py | 422 +++++++++++++----- src/fromager/commands/bootstrap.py | 3 + tests/test_bootstrapper.py | 374 +++++++++++++++- tests/test_bootstrapper_iterative.py | 7 +- 6 files changed, 789 insertions(+), 115 deletions(-) create mode 100755 e2e/test_bootstrap_parallel_multiple_versions.sh diff --git a/e2e/ci_bootstrap_parallel_suite.sh b/e2e/ci_bootstrap_parallel_suite.sh index 42051443..5dbd018e 100755 --- a/e2e/ci_bootstrap_parallel_suite.sh +++ b/e2e/ci_bootstrap_parallel_suite.sh @@ -15,5 +15,6 @@ test_section "bootstrap parallel tests" run_test "bootstrap_parallel" run_test "bootstrap_parallel_git_url" run_test "bootstrap_parallel_git_url_tag" +run_test "bootstrap_parallel_multiple_versions" finish_suite diff --git a/e2e/test_bootstrap_parallel_multiple_versions.sh b/e2e/test_bootstrap_parallel_multiple_versions.sh new file mode 100755 index 00000000..8e78a21c --- /dev/null +++ b/e2e/test_bootstrap_parallel_multiple_versions.sh @@ -0,0 +1,97 @@ +#!/bin/bash +# -*- indent-tabs-mode: nil; tab-width: 2; sh-indentation: 2; -*- + +# Test bootstrap-parallel with --multiple-versions flag. +# Bootstraps flit-core 3.0.0 through 3.12.0 inclusive (many versions) with +# only 5 parallel workers to verify that batches larger than the worker pool +# are handled correctly. +# +# Note: flit-core <3.10.0 uses ast.Str which was removed in Python 3.14 and +# will fail during GET_BUILD_DEPS; those failures are expected and handled +# gracefully by the bootstrapper. The test only asserts that the versions +# that *can* build (>=3.10.0) are present in the output. + +SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +source "$SCRIPTDIR/common.sh" + +# Constrain flit-core (its own build backend) to the same range so the +# resolver never pulls in a version outside 3.0.0–3.12.0 as a build dep. +constraints_file=$(mktemp) +trap 'rm -f "$constraints_file"; on_exit' EXIT +cat > "$constraints_file" <=3.0.0,<=3.12.0 +setuptools==82.0.1 +poetry-core==2.4.0 +tomli==2.4.1 +toml==0.10.2 +EOF + +fromager \ + --log-file="$OUTDIR/bootstrap.log" \ + --error-log-file="$OUTDIR/fromager-errors.log" \ + --sdists-repo="$OUTDIR/sdists-repo" \ + --wheels-repo="$OUTDIR/wheels-repo" \ + --work-dir="$OUTDIR/work-dir" \ + --constraints-file="$constraints_file" \ + bootstrap-parallel \ + --multiple-versions \ + --max-workers 5 \ + 'flit-core>=3.0.0,<=3.12.0' + +echo "Checking for flit-core wheels..." +find "$OUTDIR/wheels-repo/downloads/" -name 'flit_core-*.whl' | sort + +# Verify that a representative sample of versions that can build on this +# Python (>=3.10.0, which dropped ast.Str) were bootstrapped. +# flit-core 3.0.0 through 3.12.0 includes many releases; 3.10.0–3.12.0 +# are the ones compatible with Python 3.14+. +EXPECTED_VERSIONS="3.10.0 3.11.0 3.12.0" +MISSING_VERSIONS="" + +for version in $EXPECTED_VERSIONS; do + if find "$OUTDIR/wheels-repo/downloads/" -name "flit_core-${version}-*.whl" | grep -q .; then + echo "found wheel for flit-core $version" + else + echo "missing wheel for flit-core $version" 1>&2 + MISSING_VERSIONS="$MISSING_VERSIONS $version" + fi +done + +if [ -n "$MISSING_VERSIONS" ]; then + echo "" 1>&2 + echo "ERROR: Missing expected versions:$MISSING_VERSIONS" 1>&2 + echo "The --multiple-versions flag should have bootstrapped all matching versions" 1>&2 + echo "" 1>&2 + echo "Found flit-core wheels:" 1>&2 + find "$OUTDIR/wheels-repo/downloads/" -name 'flit_core-*.whl' 1>&2 + exit 1 +fi + +# Verify that the phases expected to parallelize each logged at least one +# "starting parallel batch" line, confirming the parallel code path ran. +# +# Note: "resolve" is intentionally excluded — with a single top-level +# requirement there is only one resolve item so that phase always runs serially. +PARALLEL_PHASES="start prepare-source get-build-deps build process-install-deps complete" +PHASES_WITHOUT_PARALLEL_BATCH="" + +echo "" +echo "Checking that parallel phases ran at least one parallel batch..." +for phase in $PARALLEL_PHASES; do + if grep -q "starting parallel batch: phase=${phase} " "$OUTDIR/bootstrap.log" 2>/dev/null; then + echo "phase=${phase}: OK (parallel batch confirmed)" + else + echo "phase=${phase}: MISSING parallel batch" 1>&2 + PHASES_WITHOUT_PARALLEL_BATCH="${PHASES_WITHOUT_PARALLEL_BATCH} ${phase}" + fi +done + +if [ -n "$PHASES_WITHOUT_PARALLEL_BATCH" ]; then + echo "" 1>&2 + echo "ERROR: these phases never ran a parallel batch:${PHASES_WITHOUT_PARALLEL_BATCH}" 1>&2 + exit 1 +fi + +echo "OK: all parallel phases ran at least one parallel batch" +echo "" +echo "SUCCESS: All sampled flit-core versions (3.10.0–3.12.0) were bootstrapped (from 3.0.0–3.12.0 range with 5 workers)" diff --git a/src/fromager/bootstrapper.py b/src/fromager/bootstrapper.py index 3560395f..60be5660 100644 --- a/src/fromager/bootstrapper.py +++ b/src/fromager/bootstrapper.py @@ -1,6 +1,8 @@ from __future__ import annotations +import concurrent.futures import contextlib +import contextvars import dataclasses import datetime import json @@ -10,6 +12,7 @@ import pathlib import shutil import tempfile +import threading import typing import zipfile from enum import StrEnum @@ -88,16 +91,21 @@ class BootstrapPhase(StrEnum): """Processing phases for iterative bootstrap. All packages: RESOLVE -> START -> ... - Source packages: ... -> PREPARE_SOURCE -> PREPARE_BUILD -> BUILD - -> PROCESS_INSTALL_DEPS -> COMPLETE. - Prebuilt packages: ... -> PREPARE_SOURCE -> PROCESS_INSTALL_DEPS -> COMPLETE. + Source packages: ... -> PREPARE_SOURCE -> PREPARE_BUILD (serial) -> GET_BUILD_DEPS + -> INSTALL_BUILD_DEPS (serial) -> BUILD + -> UPDATE_BUILD_SEQUENCE -> PROCESS_INSTALL_DEPS -> COMPLETE. + Prebuilt packages: ... -> PREPARE_SOURCE -> UPDATE_BUILD_SEQUENCE + -> PROCESS_INSTALL_DEPS -> COMPLETE. """ RESOLVE = "resolve" START = "start" PREPARE_SOURCE = "prepare-source" PREPARE_BUILD = "prepare-build" + GET_BUILD_DEPS = "get-build-deps" + INSTALL_BUILD_DEPS = "install-build-deps" BUILD = "build" + UPDATE_BUILD_SEQUENCE = "update-build-sequence" PROCESS_INSTALL_DEPS = "process-install-deps" COMPLETE = "complete" @@ -106,6 +114,28 @@ def tracks_why(self) -> bool: """Whether this phase pushes onto the dependency-chain (why) stack.""" return self not in (BootstrapPhase.RESOLVE, BootstrapPhase.START) + @property + def can_parallelize(self) -> bool: + """Whether items in this phase can be processed in parallel. + + Phases that install packages into build environments must be serial to + ensure a dependency's wheel exists in the mirror before any package that + needs it runs its install step. Those phases are PREPARE_BUILD (installs + build system deps) and INSTALL_BUILD_DEPS (installs build backend/sdist + deps). UPDATE_BUILD_SEQUENCE is also serial to serialise writes to + build-order.json. All other phases, including BUILD itself, are safe to + parallelize. + """ + return self in ( + BootstrapPhase.RESOLVE, + BootstrapPhase.START, + BootstrapPhase.PREPARE_SOURCE, + BootstrapPhase.GET_BUILD_DEPS, + BootstrapPhase.BUILD, + BootstrapPhase.PROCESS_INSTALL_DEPS, + BootstrapPhase.COMPLETE, + ) + @dataclasses.dataclass class WorkItem: @@ -144,6 +174,11 @@ class WorkItem: build_sdist_deps: set[Requirement] = dataclasses.field(default_factory=set) +_why_ctxvar: contextvars.ContextVar[ + list[tuple[RequirementType, Requirement, Version]] +] = contextvars.ContextVar("why", default=None) # type: ignore[arg-type] + + class Bootstrapper: def __init__( self, @@ -154,6 +189,7 @@ def __init__( sdist_only: bool = False, test_mode: bool = False, multiple_versions: bool = False, + max_workers: int | None = None, ) -> None: if test_mode and sdist_only: raise ValueError( @@ -161,13 +197,15 @@ def __init__( ) self.ctx = ctx + self.ctx.enable_parallel_builds() self.progressbar = progressbar or progress.Progressbar(None) self.prev_graph = prev_graph self.cache_wheel_server_url = cache_wheel_server_url or ctx.wheel_server_url self.sdist_only = sdist_only self.test_mode = test_mode self.multiple_versions = multiple_versions - self.why: list[tuple[RequirementType, Requirement, Version]] = [] + self._max_workers = max_workers + _why_ctxvar.set([]) # Delegate resolution to BootstrapRequirementResolver self._resolver = bootstrap_requirement_resolver.BootstrapRequirementResolver( @@ -192,10 +230,35 @@ def __init__( # Track failed packages in test mode (list of typed dicts for JSON export) self.failed_packages: list[FailureRecord] = [] + # Lock protecting all shared mutable state accessed from worker threads: + # _seen_requirements, _build_requirements, _build_stack, failed_packages, + # _failed_versions, ctx.dependency_graph, and progress bar calls that + # originate from worker threads (e.g. via _handle_build_requirements). + self._state_lock = threading.Lock() + + # Lock protecting wheel directory operations (wheels_build / wheels_downloads). + # Prevents TOCTOU races between _find_cached_wheel (which inspects wheels_build) + # and _download_prebuilt (which calls update_wheel_mirror, moving files out of + # wheels_build). + self._wheel_dir_lock = threading.Lock() + # Track failed versions in multiple_versions mode # Maps (package_name, version) -> exception info self._failed_versions: list[tuple[str, str, Exception]] = [] + @property + def why(self) -> list[tuple[RequirementType, Requirement, Version]]: + """Per-task dependency chain stack. Thread-safe via contextvars.""" + result = _why_ctxvar.get() + if result is None: + result = [] + _why_ctxvar.set(result) + return result + + @why.setter + def why(self, value: list[tuple[RequirementType, Requirement, Version]]) -> None: + _why_ctxvar.set(value) + def resolve_and_add_top_level( self, req: Requirement, @@ -369,32 +432,83 @@ def bootstrap(self, req: Requirement, req_type: RequirementType) -> None: ) ] - # Main iterative DFS loop - while stack: - item = stack.pop() + def run_one(item: WorkItem) -> list[WorkItem]: + # Initialize this thread's why stack from the item's snapshot. self.why = list(item.why_snapshot) - with req_ctxvar_context(item.req), self._track_why(item): try: - new_items = self._dispatch_phase(item) + return self._dispatch_phase(item) except Exception as err: - new_items = self._handle_phase_error(item, err) + return self._handle_phase_error(item, err) - # Progress bar: count new RESOLVE-phase items as new dependencies - new_dep_count = sum( - 1 for it in new_items if it.phase == BootstrapPhase.RESOLVE - ) - if new_dep_count > 0: - self.progressbar.update_total(new_dep_count) - if not new_items: - self.progressbar.update() + # Main iterative loop — always batch all consecutive same-phase items. + # All items run through the shared thread pool regardless of phase. + # `can_parallelize` controls batch size: serial phases pop only the + # single top item (so other phases can interleave); parallel phases + # collect all consecutive same-phase items at the top of the stack. + try: + with concurrent.futures.ThreadPoolExecutor( + max_workers=self._max_workers + ) as executor: + while stack: + top_phase = stack[-1].phase + + if not top_phase.can_parallelize: + # Serial: only process the single top item so other phases can interleave + batch = [stack.pop()] + else: + # Parallel: collect all consecutive same-phase items + batch = [] + i = len(stack) - 1 + while i >= 0 and stack[i].phase == top_phase: + batch.append(stack[i]) + i -= 1 + del stack[i + 1 :] - # Phase handlers return [continuation, *new_deps] so extend() - # naturally puts new deps on top of the stack (processed first). - stack.extend(new_items) + logger.info( + "starting %s batch: phase=%s items=%d", + "parallel" if top_phase.can_parallelize else "serial", + top_phase.value, + len(batch), + ) - # Restore why stack for the caller - self.why = saved_why + def _update_progress( + fut: concurrent.futures.Future[list[WorkItem]], + ) -> None: + try: + result_items = fut.result() + except Exception: + return # errors are handled by f.result() in the main thread + with self._state_lock: + new_deps = sum( + 1 + for it in result_items + if it.phase == BootstrapPhase.RESOLVE + ) + if new_deps > 0: + self.progressbar.update_total(new_deps) + if not result_items: + self.progressbar.update() + + futures: list[concurrent.futures.Future[list[WorkItem]]] = [] + for item in batch: + f = executor.submit(run_one, item) + f.add_done_callback(_update_progress) + futures.append(f) + try: + all_new_items = [f.result() for f in futures] + except Exception: + for f in futures: + f.cancel() + raise + + # Restore results to stack in original order. + # batch[0] was topmost; reversing puts its results back on top. + for new_items in reversed(all_new_items): + stack.extend(new_items) + finally: + # Restore why stack for the caller + self.why = saved_why # In multiple versions mode, report any failures for this requirement if self.multiple_versions and self._failed_versions: @@ -455,15 +569,16 @@ def _record_test_mode_failure( else: logger.error("%s: %s", msg, err, exc_info=True) - self.failed_packages.append( - { - "package": str(req.name), - "version": version, - "exception_type": err.__class__.__name__, - "exception_message": str(err), - "failure_type": failure_type, - } - ) + with self._state_lock: + self.failed_packages.append( + { + "package": str(req.name), + "version": version, + "exception_type": err.__class__.__name__, + "exception_message": str(err), + "failure_type": failure_type, + } + ) @property def _explain(self) -> str: @@ -515,7 +630,11 @@ def _build_wheel( version=resolved_version, build_env=build_env, ) - server.update_wheel_mirror(self.ctx) + # Hold _wheel_dir_lock around update_wheel_mirror, which moves the + # built wheel out of wheels_build, to prevent TOCTOU races with + # _find_cached_wheel. + with self._wheel_dir_lock: + server.update_wheel_mirror(self.ctx) # When we update the mirror, the built file moves to the # downloads directory. wheel_filename = self.ctx.wheels_downloads / built_filename.name @@ -600,7 +719,8 @@ def _handle_build_requirements( (_resolve_version_from_git_url -> _get_version_from_package_metadata). The main iterative bootstrap loop handles build deps via phase handlers. """ - self.progressbar.update_total(len(build_dependencies)) + with self._state_lock: + self.progressbar.update_total(len(build_dependencies)) for dep in self._sort_requirements(build_dependencies): with req_ctxvar_context(dep): @@ -609,7 +729,8 @@ def _handle_build_requirements( saved_why = list(self.why) self.bootstrap(req=dep, req_type=build_type) self.why = saved_why - self.progressbar.update() + with self._state_lock: + self.progressbar.update() def _download_prebuilt( self, @@ -623,8 +744,11 @@ def _download_prebuilt( wheel_filename = wheels.download_wheel(req, wheel_url, self.ctx.wheels_prebuilt) unpack_dir = self._create_unpack_dir(req, resolved_version) # Update the wheel mirror so pre-built wheels are indexed - # and available to subsequent builds that need them as dependencies - server.update_wheel_mirror(self.ctx) + # and available to subsequent builds that need them as dependencies. + # Hold _wheel_dir_lock around update_wheel_mirror, which moves files out of + # wheels_build, to prevent TOCTOU races with _find_cached_wheel. + with self._wheel_dir_lock: + server.update_wheel_mirror(self.ctx) return (wheel_filename, unpack_dir) def _find_cached_wheel( @@ -645,9 +769,12 @@ def _find_cached_wheel( """ # Check if we have previously built a wheel and still have it on the # local filesystem. - cached_wheel, unpacked = self._look_for_existing_wheel( - req, resolved_version, self.ctx.wheels_build - ) + # Hold _wheel_dir_lock while inspecting wheels_build to prevent a concurrent + # _download_prebuilt from moving wheels out from under us via update_wheel_mirror. + with self._wheel_dir_lock: + cached_wheel, unpacked = self._look_for_existing_wheel( + req, resolved_version, self.ctx.wheels_build + ) if cached_wheel: return cached_wheel, unpacked @@ -938,7 +1065,8 @@ def _download_wheel_from_cache( if self.cache_wheel_server_url != self.ctx.wheel_server_url: # Only update the local server if we actually downloaded # something from a different server. - server.update_wheel_mirror(self.ctx) + with self._wheel_dir_lock: + server.update_wheel_mirror(self.ctx) logger.info("found built wheel on cache server") unpack_dir = self._unpack_metadata_from_wheel( req, resolved_version, cached_wheel @@ -1176,17 +1304,18 @@ def _add_to_graph( # Update the dependency graph after we determine that this requirement is # useful but before we determine if it is redundant so that we capture all # edges to use for building a valid constraints file. - self.ctx.dependency_graph.add_dependency( - parent_name=canonicalize_name(parent_req.name) if parent_req else None, - parent_version=parent_version, - req_type=req_type, - req=req, - req_version=req_version, - download_url=download_url, - pre_built=pbi.pre_built, - constraint=self.ctx.constraints.get_constraint(req.name), - ) - self.ctx.write_to_graph_to_file() + with self._state_lock: + self.ctx.dependency_graph.add_dependency( + parent_name=canonicalize_name(parent_req.name) if parent_req else None, + parent_version=parent_version, + req_type=req_type, + req=req, + req_version=req_version, + download_url=download_url, + pre_built=pbi.pre_built, + constraint=self.ctx.constraints.get_constraint(req.name), + ) + self.ctx.write_to_graph_to_file() def _sort_requirements( self, @@ -1246,28 +1375,29 @@ def _add_to_build_order( # value, included in the _resolved_key() output, can confuse # that so we ignore itand build our own key using just the # name and version. - key = (canonicalize_name(req.name), str(version)) - if key in self._build_requirements: - return - logger.info(f"adding {key} to build order") - self._build_requirements.add(key) - info = { - "req": str(req), - "constraint": str(constraint) if constraint else "", - "dist": canonicalize_name(req.name), - "version": str(version), - "prebuilt": prebuilt, - "source_url": source_url, - "source_url_type": str(source_type), - } - if req.url: - info["source_url"] = req.url - self._build_stack.append(info) - with open(self._build_order_filename, "w") as f: - # Set default=str because the why value includes - # Requirement and Version instances that can't be - # converted to JSON without help. - json.dump(self._build_stack, f, indent=2, default=str) + with self._state_lock: + key = (canonicalize_name(req.name), str(version)) + if key in self._build_requirements: + return + logger.info(f"adding {key} to build order") + self._build_requirements.add(key) + info = { + "req": str(req), + "constraint": str(constraint) if constraint else "", + "dist": canonicalize_name(req.name), + "version": str(version), + "prebuilt": prebuilt, + "source_url": source_url, + "source_url_type": str(source_type), + } + if req.url: + info["source_url"] = req.url + self._build_stack.append(info) + with open(self._build_order_filename, "w") as f: + # Set default=str because the why value includes + # Requirement and Version instances that can't be + # converted to JSON without help. + json.dump(self._build_stack, f, indent=2, default=str) # ---- Iterative bootstrap: phase handlers and helpers ---- @@ -1384,14 +1514,17 @@ def _phase_start(self, item: WorkItem) -> list[WorkItem]: self.sdist_only and not self._processing_build_requirement(item.req_type) ) - if self._has_been_seen(item.req, item.resolved_version, item.build_sdist_only): - logger.debug( - f"redundant {item.req_type} dependency {item.req} " - f"({item.resolved_version}, sdist_only={item.build_sdist_only}) " - f"for {self._explain}" - ) - return [] - self._mark_as_seen(item.req, item.resolved_version, item.build_sdist_only) + with self._state_lock: + if self._has_been_seen( + item.req, item.resolved_version, item.build_sdist_only + ): + logger.debug( + f"redundant {item.req_type} dependency {item.req} " + f"({item.resolved_version}, sdist_only={item.build_sdist_only}) " + f"for {self._explain}" + ) + return [] + self._mark_as_seen(item.req, item.resolved_version, item.build_sdist_only) logger.info( f"new {item.req_type} dependency {item.req} " @@ -1406,7 +1539,7 @@ def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: """PREPARE_SOURCE phase: download source or prebuilt, get build system deps. Returns: - Prebuilt: [item] advanced to PROCESS_INSTALL_DEPS (skip build phases). + Prebuilt: [item] advanced to UPDATE_BUILD_SEQUENCE (skip build phases). Source: [item advanced to PREPARE_BUILD, *build_system_dep_items]. """ assert item.resolved_version is not None @@ -1434,7 +1567,7 @@ def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: build_env=None, source_type=SourceType.PREBUILT, ) - item.phase = BootstrapPhase.PROCESS_INSTALL_DEPS + item.phase = BootstrapPhase.UPDATE_BUILD_SEQUENCE return [item] # Source build path @@ -1491,10 +1624,13 @@ def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: return [item] + dep_items def _phase_prepare_build(self, item: WorkItem) -> list[WorkItem]: - """PREPARE_BUILD phase: install system deps, get backend/sdist deps. + """PREPARE_BUILD phase: install build system deps into build env (serial). + + Runs one package at a time so build_system_deps' wheels are guaranteed + present before installation. Advances to GET_BUILD_DEPS. Returns: - [item advanced to BUILD, *backend_dep_items, *sdist_dep_items]. + [item advanced to GET_BUILD_DEPS]. """ assert item.resolved_version is not None assert item.build_env is not None @@ -1503,6 +1639,22 @@ def _phase_prepare_build(self, item: WorkItem) -> list[WorkItem]: # Install build system deps (their wheels exist from DFS processing) item.build_env.install(item.build_system_deps) + item.phase = BootstrapPhase.GET_BUILD_DEPS + return [item] + + def _phase_get_build_deps(self, item: WorkItem) -> list[WorkItem]: + """GET_BUILD_DEPS phase: query build backend/sdist deps (parallel-safe). + + Build system is now installed; each package has its own build env so + these queries are independent and can run concurrently. + + Returns: + [item advanced to BUILD, *backend_dep_items, *sdist_dep_items]. + """ + assert item.resolved_version is not None + assert item.build_env is not None + assert item.sdist_root_dir is not None + # Get build backend dependencies item.build_backend_deps = dependencies.get_build_backend_dependencies( ctx=self.ctx, @@ -1535,24 +1687,42 @@ def _phase_prepare_build(self, item: WorkItem) -> list[WorkItem]: ) dep_items = backend_items + sdist_items - item.phase = BootstrapPhase.BUILD + item.phase = BootstrapPhase.INSTALL_BUILD_DEPS return [item] + dep_items - def _phase_build(self, item: WorkItem) -> list[WorkItem]: - """BUILD phase: install remaining deps, build wheel/sdist. + def _phase_install_build_deps(self, item: WorkItem) -> list[WorkItem]: + """INSTALL_BUILD_DEPS phase: install build backend/sdist deps (serial). + + Runs one package at a time so each dep's wheel is guaranteed present in + the mirror before installation. Advances to BUILD. Returns: - [item] advanced to PROCESS_INSTALL_DEPS. + [item] advanced to BUILD. """ assert item.resolved_version is not None assert item.build_env is not None assert item.sdist_root_dir is not None - # Install backend+sdist deps if disjoint from system deps remaining_deps = item.build_backend_deps | item.build_sdist_deps if remaining_deps.isdisjoint(item.build_system_deps): item.build_env.install(remaining_deps) + item.phase = BootstrapPhase.BUILD + return [item] + + def _phase_build(self, item: WorkItem) -> list[WorkItem]: + """BUILD phase: build wheel/sdist (parallel-safe). + + Build deps are already installed; each package has its own build + environment so builds are independent and can run concurrently. + + Returns: + [item] advanced to UPDATE_BUILD_SEQUENCE. + """ + assert item.resolved_version is not None + assert item.build_env is not None + assert item.sdist_root_dir is not None + wheel_filename, sdist_filename = self._do_build( req=item.req, resolved_version=item.resolved_version, @@ -1573,11 +1743,35 @@ def _phase_build(self, item: WorkItem) -> list[WorkItem]: source_type=source_type, ) + item.phase = BootstrapPhase.UPDATE_BUILD_SEQUENCE + return [item] + + def _phase_update_build_sequence(self, item: WorkItem) -> list[WorkItem]: + """UPDATE_BUILD_SEQUENCE phase: record this package in the build order. + + Returns: + [item] advanced to PROCESS_INSTALL_DEPS. + """ + assert item.resolved_version is not None + assert item.source_url is not None + assert item.build_result is not None + + pbi = self.ctx.package_build_info(item.req) + constraint = self.ctx.constraints.get_constraint(item.req.name) + self._add_to_build_order( + req=item.req, + version=item.resolved_version, + source_url=item.source_url, + source_type=item.build_result.source_type, + prebuilt=pbi.pre_built, + constraint=constraint, + ) + item.phase = BootstrapPhase.PROCESS_INSTALL_DEPS return [item] def _phase_process_install_deps(self, item: WorkItem) -> list[WorkItem]: - """PROCESS_INSTALL_DEPS phase: hooks, extract deps, build order. + """PROCESS_INSTALL_DEPS phase: hooks and extract install deps. Returns: [item advanced to COMPLETE, *install_dep_items]. @@ -1635,17 +1829,6 @@ def _phase_process_install_deps(self, item: WorkItem) -> list[WorkItem]: ", ".join(sorted(str(r) for r in install_dependencies)), ) - pbi = self.ctx.package_build_info(item.req) - constraint = self.ctx.constraints.get_constraint(item.req.name) - self._add_to_build_order( - req=item.req, - version=item.resolved_version, - source_url=item.source_url, - source_type=item.build_result.source_type, - prebuilt=pbi.pre_built, - constraint=constraint, - ) - dep_items = self._create_unresolved_work_items( install_dependencies, RequirementType.INSTALL, @@ -1680,8 +1863,14 @@ def _dispatch_phase(self, item: WorkItem) -> list[WorkItem]: return self._phase_prepare_source(item) case BootstrapPhase.PREPARE_BUILD: return self._phase_prepare_build(item) + case BootstrapPhase.GET_BUILD_DEPS: + return self._phase_get_build_deps(item) + case BootstrapPhase.INSTALL_BUILD_DEPS: + return self._phase_install_build_deps(item) case BootstrapPhase.BUILD: return self._phase_build(item) + case BootstrapPhase.UPDATE_BUILD_SEQUENCE: + return self._phase_update_build_sequence(item) case BootstrapPhase.PROCESS_INSTALL_DEPS: return self._phase_process_install_deps(item) case BootstrapPhase.COMPLETE: @@ -1713,6 +1902,8 @@ def _handle_phase_error( in ( BootstrapPhase.PREPARE_SOURCE, BootstrapPhase.PREPARE_BUILD, + BootstrapPhase.GET_BUILD_DEPS, + BootstrapPhase.INSTALL_BUILD_DEPS, BootstrapPhase.BUILD, ) and not item.pbi_pre_built @@ -1726,7 +1917,7 @@ def _handle_phase_error( ) if fallback is not None: item.build_result = fallback - item.phase = BootstrapPhase.PROCESS_INSTALL_DEPS + item.phase = BootstrapPhase.UPDATE_BUILD_SEQUENCE return [item] self._record_test_mode_failure( item.req, str(item.resolved_version), err, "bootstrap" @@ -1737,20 +1928,25 @@ def _handle_phase_error( if self.multiple_versions: assert item.resolved_version is not None pkg_name = canonicalize_name(item.req.name) - self._failed_versions.append((pkg_name, str(item.resolved_version), err)) logger.warning( f"{item.req.name}=={item.resolved_version}: " f"failed to bootstrap during {item.phase} phase: " f"{type(err).__name__}: {err}" ) - self.ctx.dependency_graph.remove_dependency(pkg_name, item.resolved_version) - self._seen_requirements.discard( - self._resolved_key(item.req, item.resolved_version, "sdist") - ) - self._seen_requirements.discard( - self._resolved_key(item.req, item.resolved_version, "wheel") - ) - self.ctx.write_to_graph_to_file() + with self._state_lock: + self._failed_versions.append( + (pkg_name, str(item.resolved_version), err) + ) + self.ctx.dependency_graph.remove_dependency( + pkg_name, item.resolved_version + ) + self._seen_requirements.discard( + self._resolved_key(item.req, item.resolved_version, "sdist") + ) + self._seen_requirements.discard( + self._resolved_key(item.req, item.resolved_version, "wheel") + ) + self.ctx.write_to_graph_to_file() return [] # Normal mode: fail-fast diff --git a/src/fromager/commands/bootstrap.py b/src/fromager/commands/bootstrap.py index 6b4252cd..4fff6dae 100644 --- a/src/fromager/commands/bootstrap.py +++ b/src/fromager/commands/bootstrap.py @@ -130,6 +130,7 @@ def bootstrap( multiple_versions: bool, max_release_age: int | None, toplevel: list[str], + max_workers: int | None = 1, ) -> None: """Compute and build the dependencies of a set of requirements recursively @@ -198,6 +199,7 @@ def bootstrap( sdist_only=sdist_only, test_mode=test_mode, multiple_versions=multiple_versions, + max_workers=max_workers, ) # Pre-resolution phase: Resolve all top-level dependencies before recursive @@ -556,6 +558,7 @@ def bootstrap_parallel( sdist_only=True, skip_constraints=skip_constraints, multiple_versions=multiple_versions, + max_workers=max_workers, max_release_age=max_release_age, toplevel=toplevel, ) diff --git a/tests/test_bootstrapper.py b/tests/test_bootstrapper.py index 339b2deb..d3c8f63e 100644 --- a/tests/test_bootstrapper.py +++ b/tests/test_bootstrapper.py @@ -1,5 +1,6 @@ import json import pathlib +import unittest.mock from unittest.mock import Mock, patch import pytest @@ -294,7 +295,7 @@ def test_phase_build_produces_source_build_result(tmp_context: WorkContext) -> N result_items = bt._phase_build(item) assert len(result_items) == 1 - assert result_items[0].phase == bootstrapper.BootstrapPhase.PROCESS_INSTALL_DEPS + assert result_items[0].phase == bootstrapper.BootstrapPhase.UPDATE_BUILD_SEQUENCE result = result_items[0].build_result assert isinstance(result, bootstrapper.SourceBuildResult) @@ -305,6 +306,45 @@ def test_phase_build_produces_source_build_result(tmp_context: WorkContext) -> N assert result.source_type == SourceType.SDIST +def test_phase_update_build_sequence_advances_to_process_install_deps( + tmp_context: WorkContext, +) -> None: + """Verify _phase_update_build_sequence records build order and advances phase.""" + bt = bootstrapper.Bootstrapper(tmp_context) + bt.why = [] + + req = Requirement("test-package") + version = Version("1.0.0") + mock_wheel = tmp_context.work_dir / "test_package-1.0.0-py3-none-any.whl" + mock_sdist_root = tmp_context.work_dir / "test-package-1.0.0" / "test-package-1.0.0" + mock_sdist_root.parent.mkdir(parents=True, exist_ok=True) + + item = bootstrapper.WorkItem( + req=req, + req_type=RequirementType.TOP_LEVEL, + source_url="https://pypi.test/simple/test-package", + resolved_version=version, + phase=bootstrapper.BootstrapPhase.UPDATE_BUILD_SEQUENCE, + why_snapshot=[], + build_result=bootstrapper.SourceBuildResult( + wheel_filename=mock_wheel, + sdist_filename=None, + unpack_dir=mock_sdist_root.parent, + sdist_root_dir=mock_sdist_root, + build_env=None, + source_type=SourceType.SDIST, + ), + ) + + with bt._track_why(item): + result_items = bt._phase_update_build_sequence(item) + + assert len(result_items) == 1 + assert result_items[0].phase == bootstrapper.BootstrapPhase.PROCESS_INSTALL_DEPS + key = (canonicalize_name(req.name), str(version)) + assert key in bt._build_requirements + + def test_multiple_versions_continues_on_error(tmp_context: WorkContext) -> None: """Test that multiple versions mode continues when one version fails.""" bt = bootstrapper.Bootstrapper(tmp_context, multiple_versions=True) @@ -549,3 +589,335 @@ def test_cache_lookup_no_cache_url_returns_none(tmp_context: WorkContext) -> Non ) assert result == (None, None) + + +def test_phase_can_parallelize(tmp_context: WorkContext) -> None: + """PREPARE_BUILD and UPDATE_BUILD_SEQUENCE are serial; all other phases parallelize.""" + parallelizable = ( + bootstrapper.BootstrapPhase.RESOLVE, + bootstrapper.BootstrapPhase.START, + bootstrapper.BootstrapPhase.PREPARE_SOURCE, + bootstrapper.BootstrapPhase.GET_BUILD_DEPS, + bootstrapper.BootstrapPhase.BUILD, + bootstrapper.BootstrapPhase.PROCESS_INSTALL_DEPS, + bootstrapper.BootstrapPhase.COMPLETE, + ) + for phase in parallelizable: + assert phase.can_parallelize is True, f"{phase} should be parallelizable" + for phase in bootstrapper.BootstrapPhase: + if phase not in parallelizable: + assert phase.can_parallelize is False, f"{phase} should be serial" + + +def test_bootstrap_parallel_resolve_returns_ordered_results( + tmp_context: WorkContext, +) -> None: + """Two RESOLVE items run in parallel; results appear in original stack order.""" + bt = bootstrapper.Bootstrapper(tmp_context, max_workers=2) + + pkg_b = bootstrapper.WorkItem( + req=Requirement("pkg-b"), + req_type=RequirementType.INSTALL, + phase=bootstrapper.BootstrapPhase.RESOLVE, + why_snapshot=[], + ) + pkg_c = bootstrapper.WorkItem( + req=Requirement("pkg-c"), + req_type=RequirementType.INSTALL, + phase=bootstrapper.BootstrapPhase.RESOLVE, + why_snapshot=[], + ) + + dispatched: list[str] = [] + + def dispatch_side_effect( + item: bootstrapper.WorkItem, + ) -> list[bootstrapper.WorkItem]: + dispatched.append(item.req.name) + if item.req.name == "pkg-a": + # Return two sibling RESOLVE items so they batch and run concurrently + return [pkg_b, pkg_c] + return [] + + with patch.object(bt, "_dispatch_phase", side_effect=dispatch_side_effect): + bt.bootstrap(req=Requirement("pkg-a"), req_type=RequirementType.TOP_LEVEL) + + # pkg-a dispatched first; pkg-b and pkg-c dispatched concurrently after + assert dispatched[0] == "pkg-a" + assert set(dispatched[1:]) == {"pkg-b", "pkg-c"} + + +def test_bootstrap_parallel_resolve_test_mode_failure_records_and_continues( + tmp_context: WorkContext, +) -> None: + """In test mode, a failed RESOLVE item is recorded and siblings succeed.""" + bt = bootstrapper.Bootstrapper(tmp_context, test_mode=True, max_workers=2) + + pkg_b = bootstrapper.WorkItem( + req=Requirement("pkg-b"), + req_type=RequirementType.INSTALL, + phase=bootstrapper.BootstrapPhase.RESOLVE, + why_snapshot=[], + ) + pkg_c = bootstrapper.WorkItem( + req=Requirement("pkg-c"), + req_type=RequirementType.INSTALL, + phase=bootstrapper.BootstrapPhase.RESOLVE, + why_snapshot=[], + ) + + def dispatch_side_effect( + item: bootstrapper.WorkItem, + ) -> list[bootstrapper.WorkItem]: + if item.req.name == "pkg-a": + return [pkg_b, pkg_c] + if item.req.name == "pkg-b": + raise ValueError("simulated resolution failure") + return [] # pkg-c succeeds with no further work + + with patch.object(bt, "_dispatch_phase", side_effect=dispatch_side_effect): + bt.bootstrap(req=Requirement("pkg-a"), req_type=RequirementType.TOP_LEVEL) + + # pkg-b failure recorded; pkg-c succeeded and is not in failed list + assert len(bt.failed_packages) == 1 + assert bt.failed_packages[0]["package"] == "pkg-b" + + +def test_bootstrap_loop_uses_threadpool_for_single_item_phases( + tmp_context: WorkContext, +) -> None: + """All phases run through the shared ThreadPoolExecutor.""" + import concurrent.futures + + bt = bootstrapper.Bootstrapper(tmp_context) + submit_calls: list[object] = [] + + class TrackingExecutor(concurrent.futures.ThreadPoolExecutor): + def submit( + self, fn: object, /, *args: object, **kwargs: object + ) -> concurrent.futures.Future: # type: ignore[override] + submit_calls.append(fn) + return super().submit(fn, *args, **kwargs) # type: ignore[arg-type] + + with patch.object( + bt._resolver, + "resolve", + return_value=[("https://pkg.test/p-1.0.tar.gz", Version("1.0"))], + ): + with patch.object(bt, "_has_been_seen", return_value=True): + with patch( + "fromager.bootstrapper.concurrent.futures.ThreadPoolExecutor", + TrackingExecutor, + ): + bt.bootstrap(req=Requirement("pkg"), req_type=RequirementType.TOP_LEVEL) + + # At least one item was submitted to the executor + assert len(submit_calls) >= 1 + + +def test_update_build_sequence_precedes_its_process_install_deps( + tmp_context: WorkContext, +) -> None: + """Each UPDATE_BUILD_SEQUENCE item runs before its paired PROCESS_INSTALL_DEPS item. + + With single-item serial batching, UPDATE_BUILD_SEQUENCE items interleave + with PROCESS_INSTALL_DEPS items from other packages (UBS-a, PID-a, UBS-b, + PID-b rather than UBS-a, UBS-b, PID-a, PID-b). The per-item ordering + guarantee (each UBS precedes its own PID) still holds. + """ + bt = bootstrapper.Bootstrapper(tmp_context) + dispatch_log: list[bootstrapper.BootstrapPhase] = [] + + req_a = Requirement("pkg-a") + req_b = Requirement("pkg-b") + ubs_phase = bootstrapper.BootstrapPhase.UPDATE_BUILD_SEQUENCE + pid_phase = bootstrapper.BootstrapPhase.PROCESS_INSTALL_DEPS + + # resolved_version must be set on items whose phase has tracks_why=True + # (all phases except RESOLVE and START) so that _track_why doesn't assert-fail. + def make_ubs(req: Requirement) -> bootstrapper.WorkItem: + return bootstrapper.WorkItem( + req=req, + req_type=RequirementType.TOP_LEVEL, + phase=ubs_phase, + why_snapshot=[], + resolved_version=Version("1.0"), + ) + + def make_pid(req: Requirement) -> bootstrapper.WorkItem: + return bootstrapper.WorkItem( + req=req, + req_type=RequirementType.TOP_LEVEL, + phase=pid_phase, + why_snapshot=[], + resolved_version=Version("1.0"), + ) + + pid_a = make_pid(req_a) + pid_b = make_pid(req_b) + ubs_a = make_ubs(req_a) + ubs_b = make_ubs(req_b) + + def mock_dispatch(item: bootstrapper.WorkItem) -> list[bootstrapper.WorkItem]: + dispatch_log.append(item.phase) + if item.phase == bootstrapper.BootstrapPhase.RESOLVE: + # Return two UBS items so both land on the stack together + return [ubs_a, ubs_b] + if item.phase == ubs_phase: + return [pid_a] if item.req.name == "pkg-a" else [pid_b] + # PID phase + return [] + + with patch.object(bt, "_dispatch_phase", side_effect=mock_dispatch): + bt.bootstrap(req=req_a, req_type=RequirementType.TOP_LEVEL) + + ubs_indices = [i for i, p in enumerate(dispatch_log) if p == ubs_phase] + pid_indices = [i for i, p in enumerate(dispatch_log) if p == pid_phase] + + assert len(ubs_indices) == 2, f"Expected 2 UBS dispatches, got {ubs_indices}" + assert len(pid_indices) == 2, f"Expected 2 PID dispatches, got {pid_indices}" + + # Per-item ordering: the n-th UBS must precede the n-th PID it produces. + # With single-item serial batching the pairs interleave (UBS-b, PID-b, + # UBS-a, PID-a) so we compare sorted pairs rather than requiring a global + # all-UBS-before-all-PID ordering. + for ubs_idx, pid_idx in zip(sorted(ubs_indices), sorted(pid_indices), strict=True): + assert ubs_idx < pid_idx, ( + f"UBS dispatch at {ubs_idx} should precede its paired PID at {pid_idx}" + ) + + +def test_bootstrap_parallel_process_install_deps( + tmp_context: WorkContext, +) -> None: + """Multiple PROCESS_INSTALL_DEPS items are dispatched concurrently.""" + bt = bootstrapper.Bootstrapper(tmp_context) + dispatch_log: list[bootstrapper.BootstrapPhase] = [] + + req_a = Requirement("pkg-a") + req_b = Requirement("pkg-b") + pid_phase = bootstrapper.BootstrapPhase.PROCESS_INSTALL_DEPS + + def make_pid(req: Requirement) -> bootstrapper.WorkItem: + return bootstrapper.WorkItem( + req=req, + req_type=RequirementType.TOP_LEVEL, + phase=pid_phase, + why_snapshot=[], + resolved_version=Version("1.0"), + ) + + pid_a = make_pid(req_a) + pid_b = make_pid(req_b) + + def mock_dispatch(item: bootstrapper.WorkItem) -> list[bootstrapper.WorkItem]: + dispatch_log.append(item.phase) + if item.phase == bootstrapper.BootstrapPhase.RESOLVE: + # Return two PID items directly so both land on the stack together + return [pid_a, pid_b] + # PID phase + return [] + + with patch.object(bt, "_dispatch_phase", side_effect=mock_dispatch): + bt.bootstrap(req=req_a, req_type=RequirementType.TOP_LEVEL) + + pid_dispatches = [p for p in dispatch_log if p == pid_phase] + assert len(pid_dispatches) == 2, f"Expected 2 PID dispatches, got {pid_dispatches}" + + +def test_bootstrap_parallel_complete( + tmp_context: WorkContext, +) -> None: + """Multiple COMPLETE items are dispatched concurrently.""" + bt = bootstrapper.Bootstrapper(tmp_context) + dispatch_log: list[bootstrapper.BootstrapPhase] = [] + + req_a = Requirement("pkg-a") + req_b = Requirement("pkg-b") + complete_phase = bootstrapper.BootstrapPhase.COMPLETE + + def make_complete(req: Requirement) -> bootstrapper.WorkItem: + return bootstrapper.WorkItem( + req=req, + req_type=RequirementType.TOP_LEVEL, + phase=complete_phase, + why_snapshot=[], + resolved_version=Version("1.0"), + ) + + complete_a = make_complete(req_a) + complete_b = make_complete(req_b) + + def mock_dispatch(item: bootstrapper.WorkItem) -> list[bootstrapper.WorkItem]: + dispatch_log.append(item.phase) + if item.phase == bootstrapper.BootstrapPhase.RESOLVE: + # Return two COMPLETE items directly so both land on the stack together + return [complete_a, complete_b] + # COMPLETE phase + return [] + + with patch.object(bt, "_dispatch_phase", side_effect=mock_dispatch): + bt.bootstrap(req=req_a, req_type=RequirementType.TOP_LEVEL) + + complete_dispatches = [p for p in dispatch_log if p == complete_phase] + assert len(complete_dispatches) == 2, ( + f"Expected 2 COMPLETE dispatches, got {complete_dispatches}" + ) + + +def test_find_cached_wheel_holds_wheel_dir_lock_during_build_lookup( + tmp_context: WorkContext, +) -> None: + """_find_cached_wheel holds _wheel_dir_lock while inspecting wheels_build.""" + bt = bootstrapper.Bootstrapper(tmp_context) + lock_was_held: list[bool] = [] + original = bt._look_for_existing_wheel + + def spy( + req: Requirement, + version: Version, + search_in: pathlib.Path, + ) -> tuple[pathlib.Path | None, pathlib.Path | None]: + if search_in == tmp_context.wheels_build: + acquired = bt._wheel_dir_lock.acquire(blocking=False) + lock_was_held.append(not acquired) + if acquired: + bt._wheel_dir_lock.release() + return original(req, version, search_in) + + with unittest.mock.patch.object(bt, "_look_for_existing_wheel", side_effect=spy): + bt._find_cached_wheel(Requirement("test-package"), Version("1.0.0")) + + assert lock_was_held == [True] + + +@unittest.mock.patch("fromager.server.update_wheel_mirror") +@unittest.mock.patch("fromager.wheels.download_wheel") +def test_download_prebuilt_holds_wheel_dir_lock_around_mirror_update( + mock_download: unittest.mock.Mock, + mock_mirror: unittest.mock.Mock, + tmp_context: WorkContext, +) -> None: + """_download_prebuilt holds _wheel_dir_lock while calling update_wheel_mirror.""" + bt = bootstrapper.Bootstrapper(tmp_context) + mock_download.return_value = pathlib.Path( + "/fake/test_package-1.0.0-py3-none-any.whl" + ) + lock_was_held: list[bool] = [] + + def spy_mirror(ctx: object) -> None: + acquired = bt._wheel_dir_lock.acquire(blocking=False) + lock_was_held.append(not acquired) + if acquired: + bt._wheel_dir_lock.release() + + mock_mirror.side_effect = spy_mirror + + bt._download_prebuilt( + req=Requirement("test-package"), + req_type=RequirementType.TOP_LEVEL, + resolved_version=Version("1.0.0"), + wheel_url="https://pkg.test/test_package-1.0.0-py3-none-any.whl", + ) + + assert lock_was_held == [True] diff --git a/tests/test_bootstrapper_iterative.py b/tests/test_bootstrapper_iterative.py index 1bf16de9..e1acaf25 100644 --- a/tests/test_bootstrapper_iterative.py +++ b/tests/test_bootstrapper_iterative.py @@ -477,6 +477,7 @@ class TestDispatchPhase: (BootstrapPhase.PREPARE_SOURCE, "_phase_prepare_source"), (BootstrapPhase.PREPARE_BUILD, "_phase_prepare_build"), (BootstrapPhase.BUILD, "_phase_build"), + (BootstrapPhase.UPDATE_BUILD_SEQUENCE, "_phase_update_build_sequence"), (BootstrapPhase.PROCESS_INSTALL_DEPS, "_phase_process_install_deps"), (BootstrapPhase.COMPLETE, "_phase_complete"), ], @@ -555,7 +556,7 @@ def test_build_phase_test_mode_fallback_success( assert len(result) == 1 assert result[0] is item assert item.build_result is mock_fallback - assert item.phase == BootstrapPhase.PROCESS_INSTALL_DEPS + assert item.phase == BootstrapPhase.UPDATE_BUILD_SEQUENCE assert len(bt.failed_packages) == 0 def test_build_phase_test_mode_fallback_failure( @@ -697,6 +698,9 @@ def tracking_dispatch(item: WorkItem) -> list[WorkItem]: build_env=None, source_type=SourceType.SDIST, ) + item.phase = BootstrapPhase.UPDATE_BUILD_SEQUENCE + return [item] + if item.phase == BootstrapPhase.UPDATE_BUILD_SEQUENCE: item.phase = BootstrapPhase.PROCESS_INSTALL_DEPS return [item] if item.phase == BootstrapPhase.PROCESS_INSTALL_DEPS: @@ -722,6 +726,7 @@ def tracking_dispatch(item: WorkItem) -> list[WorkItem]: BootstrapPhase.PREPARE_SOURCE, BootstrapPhase.PREPARE_BUILD, BootstrapPhase.BUILD, + BootstrapPhase.UPDATE_BUILD_SEQUENCE, BootstrapPhase.PROCESS_INSTALL_DEPS, BootstrapPhase.COMPLETE, ] From 825467867957728a31233139407400a596ae5581 Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Thu, 14 May 2026 16:14:39 -0400 Subject: [PATCH 2/2] try adding wait states --- e2e/test_rust_vendor.sh | 2 + src/fromager/bootstrapper.py | 134 ++++++++++++++++++++++++++++++++--- tests/test_bootstrapper.py | 7 +- 3 files changed, 132 insertions(+), 11 deletions(-) diff --git a/e2e/test_rust_vendor.sh b/e2e/test_rust_vendor.sh index 52d849e0..3ad07d22 100755 --- a/e2e/test_rust_vendor.sh +++ b/e2e/test_rust_vendor.sh @@ -15,6 +15,8 @@ VERSION="1.6.0" # Bootstrap the test project fromager \ $NETWORK_ISOLATION \ + --log-file="$OUTDIR/bootstrap.log" \ + --error-log-file="$OUTDIR/fromager-errors.log" \ --sdists-repo="$OUTDIR/sdists-repo" \ --wheels-repo="$OUTDIR/wheels-repo" \ --work-dir="$OUTDIR/work-dir" \ diff --git a/src/fromager/bootstrapper.py b/src/fromager/bootstrapper.py index 60be5660..94bfa8e2 100644 --- a/src/fromager/bootstrapper.py +++ b/src/fromager/bootstrapper.py @@ -91,7 +91,9 @@ class BootstrapPhase(StrEnum): """Processing phases for iterative bootstrap. All packages: RESOLVE -> START -> ... - Source packages: ... -> PREPARE_SOURCE -> PREPARE_BUILD (serial) -> GET_BUILD_DEPS + Source packages: ... -> PREPARE_SOURCE -> WAIT_BUILD_SYSTEM_DEPS (serial barrier) + -> PREPARE_BUILD (serial) -> GET_BUILD_DEPS + -> WAIT_BUILD_DEPS (serial barrier) -> INSTALL_BUILD_DEPS (serial) -> BUILD -> UPDATE_BUILD_SEQUENCE -> PROCESS_INSTALL_DEPS -> COMPLETE. Prebuilt packages: ... -> PREPARE_SOURCE -> UPDATE_BUILD_SEQUENCE @@ -101,8 +103,10 @@ class BootstrapPhase(StrEnum): RESOLVE = "resolve" START = "start" PREPARE_SOURCE = "prepare-source" + WAIT_BUILD_SYSTEM_DEPS = "wait-build-system-deps" PREPARE_BUILD = "prepare-build" GET_BUILD_DEPS = "get-build-deps" + WAIT_BUILD_DEPS = "wait-build-deps" INSTALL_BUILD_DEPS = "install-build-deps" BUILD = "build" UPDATE_BUILD_SEQUENCE = "update-build-sequence" @@ -118,13 +122,19 @@ def tracks_why(self) -> bool: def can_parallelize(self) -> bool: """Whether items in this phase can be processed in parallel. - Phases that install packages into build environments must be serial to - ensure a dependency's wheel exists in the mirror before any package that - needs it runs its install step. Those phases are PREPARE_BUILD (installs - build system deps) and INSTALL_BUILD_DEPS (installs build backend/sdist - deps). UPDATE_BUILD_SEQUENCE is also serial to serialise writes to - build-order.json. All other phases, including BUILD itself, are safe to - parallelize. + Parallel phases: RESOLVE, START, PREPARE_SOURCE, GET_BUILD_DEPS, BUILD, + PROCESS_INSTALL_DEPS, COMPLETE. + + Serial phases and their reasons: + - WAIT_BUILD_SYSTEM_DEPS: barrier between parallel PREPARE_SOURCE and + PREPARE_BUILD; ensures all discovered build-system deps are fully built + before any package installs them. See _phase_wait_build_system_deps. + - PREPARE_BUILD: installs build-system deps into the build env. + - WAIT_BUILD_DEPS: barrier between parallel GET_BUILD_DEPS and + INSTALL_BUILD_DEPS; ensures all discovered build-backend/sdist deps are + fully built before any package installs them. See _phase_wait_build_deps. + - INSTALL_BUILD_DEPS: installs build-backend/sdist deps into the build env. + - UPDATE_BUILD_SEQUENCE: serialises writes to build-order.json. """ return self in ( BootstrapPhase.RESOLVE, @@ -451,6 +461,7 @@ def run_one(item: WorkItem) -> list[WorkItem]: max_workers=self._max_workers ) as executor: while stack: + self._write_stack_snapshot(stack) top_phase = stack[-1].phase if not top_phase.can_parallelize: @@ -1620,7 +1631,11 @@ def _phase_prepare_source(self, item: WorkItem) -> list[WorkItem]: item.resolved_version, ) - item.phase = BootstrapPhase.PREPARE_BUILD + item.phase = ( + BootstrapPhase.WAIT_BUILD_SYSTEM_DEPS + if dep_items + else BootstrapPhase.PREPARE_BUILD + ) return [item] + dep_items def _phase_prepare_build(self, item: WorkItem) -> list[WorkItem]: @@ -1687,9 +1702,59 @@ def _phase_get_build_deps(self, item: WorkItem) -> list[WorkItem]: ) dep_items = backend_items + sdist_items - item.phase = BootstrapPhase.INSTALL_BUILD_DEPS + item.phase = ( + BootstrapPhase.WAIT_BUILD_DEPS + if dep_items + else BootstrapPhase.INSTALL_BUILD_DEPS + ) return [item] + dep_items + def _phase_wait_build_system_deps(self, item: WorkItem) -> list[WorkItem]: + """WAIT_BUILD_SYSTEM_DEPS phase: serial barrier after build-system-dep discovery. + + WHY THIS PHASE EXISTS + --------------------- + PREPARE_SOURCE runs in parallel: multiple packages simultaneously download + their source archives and read their build-system requirements. Each package + that discovers a new build-system dep pushes a RESOLVE work item for that dep + onto the LIFO stack *above* its own next-phase item. Because the stack is + LIFO, those dep items are processed (resolved → built → mirrored) before any + package reaches its install step. + + The serial barrier enforces this guarantee across packages: only one package + at a time advances past this point. By the time a package's + WAIT_BUILD_SYSTEM_DEPS item runs, every dep item that was pushed above it + during the preceding parallel PREPARE_SOURCE batch has already been fully + processed. PREPARE_BUILD can therefore safely call build_env.install() + without racing against a dep that is still being built. + + This phase does no work of its own — it exists solely to hold the serial slot. + """ + item.phase = BootstrapPhase.PREPARE_BUILD + return [item] + + def _phase_wait_build_deps(self, item: WorkItem) -> list[WorkItem]: + """WAIT_BUILD_DEPS phase: serial barrier after build-backend/sdist dep discovery. + + WHY THIS PHASE EXISTS + --------------------- + GET_BUILD_DEPS runs in parallel: multiple packages simultaneously ask their + build backends for build-backend and build-sdist requirements. Each package + that discovers new deps pushes RESOLVE work items for those deps onto the + LIFO stack *above* its own next-phase item, so the deps are built first. + + The serial barrier enforces this guarantee across packages: only one package + at a time advances past this point. By the time a package's WAIT_BUILD_DEPS + item runs, every dep item pushed above it during the preceding parallel + GET_BUILD_DEPS batch has already been fully processed. INSTALL_BUILD_DEPS + can therefore safely call build_env.install() without racing against a dep + that is still being built. + + This phase does no work of its own — it exists solely to hold the serial slot. + """ + item.phase = BootstrapPhase.INSTALL_BUILD_DEPS + return [item] + def _phase_install_build_deps(self, item: WorkItem) -> list[WorkItem]: """INSTALL_BUILD_DEPS phase: install build backend/sdist deps (serial). @@ -1852,6 +1917,49 @@ def _phase_complete(self, item: WorkItem) -> list[WorkItem]: ) return [] + def _write_stack_snapshot(self, stack: list[WorkItem]) -> None: + """Write the current work stack to a JSON file for debugging. + + Overwrites ``bootstrap-stack.json`` in the work directory at the start + of every loop iteration so the file always reflects the stack state at + the moment of the last batch, making it easy to diagnose ordering bugs + or install failures. + + Items are listed top-of-stack first (i.e. what will be processed next + appears first in the array). + """ + snapshot = [ + { + "phase": item.phase.value, + "req": str(item.req), + "req_type": str(item.req_type), + "resolved_version": ( + str(item.resolved_version) if item.resolved_version else None + ), + "why": [ + { + "req_type": str(req_type), + "req": str(req), + "version": str(version), + } + for req_type, req, version in item.why_snapshot + ], + "build_system_deps": [ + str(r) for r in self._sort_requirements(item.build_system_deps) + ], + "build_backend_deps": [ + str(r) for r in self._sort_requirements(item.build_backend_deps) + ], + "build_sdist_deps": [ + str(r) for r in self._sort_requirements(item.build_sdist_deps) + ], + } + for item in reversed(stack) + ] + stack_file = self.ctx.work_dir / "bootstrap-stack.json" + with open(stack_file, "w") as f: + json.dump(snapshot, f, indent=2) + def _dispatch_phase(self, item: WorkItem) -> list[WorkItem]: """Route a work item to the appropriate phase handler.""" match item.phase: @@ -1861,10 +1969,14 @@ def _dispatch_phase(self, item: WorkItem) -> list[WorkItem]: return self._phase_start(item) case BootstrapPhase.PREPARE_SOURCE: return self._phase_prepare_source(item) + case BootstrapPhase.WAIT_BUILD_SYSTEM_DEPS: + return self._phase_wait_build_system_deps(item) case BootstrapPhase.PREPARE_BUILD: return self._phase_prepare_build(item) case BootstrapPhase.GET_BUILD_DEPS: return self._phase_get_build_deps(item) + case BootstrapPhase.WAIT_BUILD_DEPS: + return self._phase_wait_build_deps(item) case BootstrapPhase.INSTALL_BUILD_DEPS: return self._phase_install_build_deps(item) case BootstrapPhase.BUILD: @@ -1901,8 +2013,10 @@ def _handle_phase_error( item.phase in ( BootstrapPhase.PREPARE_SOURCE, + BootstrapPhase.WAIT_BUILD_SYSTEM_DEPS, BootstrapPhase.PREPARE_BUILD, BootstrapPhase.GET_BUILD_DEPS, + BootstrapPhase.WAIT_BUILD_DEPS, BootstrapPhase.INSTALL_BUILD_DEPS, BootstrapPhase.BUILD, ) diff --git a/tests/test_bootstrapper.py b/tests/test_bootstrapper.py index d3c8f63e..1d7de1a1 100644 --- a/tests/test_bootstrapper.py +++ b/tests/test_bootstrapper.py @@ -592,7 +592,12 @@ def test_cache_lookup_no_cache_url_returns_none(tmp_context: WorkContext) -> Non def test_phase_can_parallelize(tmp_context: WorkContext) -> None: - """PREPARE_BUILD and UPDATE_BUILD_SEQUENCE are serial; all other phases parallelize.""" + """RESOLVE, START, PREPARE_SOURCE, GET_BUILD_DEPS, BUILD, PROCESS_INSTALL_DEPS, COMPLETE parallelize. + + Serial phases: WAIT_BUILD_SYSTEM_DEPS, PREPARE_BUILD, WAIT_BUILD_DEPS, + INSTALL_BUILD_DEPS, UPDATE_BUILD_SEQUENCE. The two WAIT phases are serial + barriers that ensure dep builds complete before the subsequent install step runs. + """ parallelizable = ( bootstrapper.BootstrapPhase.RESOLVE, bootstrapper.BootstrapPhase.START,