diff --git a/.gitignore b/.gitignore index 3cd7a21..2a0e6fd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,180 @@ -dist/* +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +/build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +/py/**/lib/** +!/py/**/lib/**.py +!/py/**/lib/**.pyx +lib64/ +parts/ +/py/**/sdist/** +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + bricoler.json .claude + diff --git a/README.md b/README.md index 5fb9004..aca8ce8 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ The basic idea is to simplify common src development tasks by provding a framewo Make sure that python 3 and hatch are installed: ``` -$ pkg install python3 py311-hatch +$ pkg install python3 py311-hatch py311-sqlite3 ``` Run `hatch build` from the root of the repository. diff --git a/pyproject.toml b/pyproject.toml index db676bb..4487698 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,12 +19,14 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Programming Language :: Python :: 3.15", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", ] dependencies = [ "pexpect", - "pysqlite3", ] [project.scripts] @@ -61,3 +63,33 @@ exclude_lines = [ "if __name__ == .__main__.:", "if TYPE_CHECKING:", ] + +[tool.ruff] +target-version = "py310" +line-length = 88 +exclude = [ + "build", + "dist" +] +lint.select = [ + "ANN", + "B", + "D", + "E", + "F", + "I", + "PERF", + "PLW", + "PYI", + "RUF", + "S", + "W" +] +lint.ignore = [ + "D107", # Document __init__ nags. +] + +[tool.ruff.lint.per-file-ignores] +"tests/**" = [ + "S" +] diff --git a/src/bricoler/bricoler.py b/src/bricoler/bricoler.py index 276f7c0..48a3f8c 100644 --- a/src/bricoler/bricoler.py +++ b/src/bricoler/bricoler.py @@ -4,41 +4,50 @@ # SPDX-License-Identifier: BSD-2-Clause # +from __future__ import annotations + import functools import glob import json import os -import pysqlite3 import re import shutil +import sqlite3 import sys import textwrap import time from enum import Enum from importlib import resources from pathlib import Path -from typing import Dict, List, Optional, Tuple, Type, Union from .config import Config from .git import GitRepository from .mtree import MtreeFile -from .task import Task, TaskParameter, TaskMeta, TaskSchedule +from .task import Task, TaskMeta, TaskParameter, TaskSchedule from .util import EmailReport, chdir, host_machine, info, run_cmd, warn -from .vm import FreeBSDVM, VMImage, VMHypervisor, BhyveRun, QEMURun, RVVMRun, SSHCommandRunner +from .vm import ( + BhyveRun, + FreeBSDVM, + QEMURun, + RVVMRun, + SSHCommandRunner, + VMHypervisor, + VMImage, +) class KyuaDB: SCHEMA_VERSION = 3 class Result(Enum): - PASSED = 'passed' - FAILED = 'failed' - SKIPPED = 'skipped' - BROKEN = 'broken' + PASSED = "passed" + FAILED = "failed" + SKIPPED = "skipped" + BROKEN = "broken" - def __init__(self, path: Path): + def __init__(self, path: Path) -> None: self.path = path - self.conn = pysqlite3.connect(path) + self.conn = sqlite3.connect(path) res = self.conn.execute("SELECT schema_version from metadata").fetchone() if res is None: @@ -47,55 +56,58 @@ def __init__(self, path: Path): raise ValueError(f"KyuaDB: Unsupported schema_version {res[0]} in {path}") @functools.cache - def _results(self, restype: Result) -> List[str]: + def _results(self, restype: Result) -> list[str]: cursor = self.conn.cursor() - cursor.execute(f""" + cursor.execute( + """ SELECT tp.relative_path, tc.name FROM test_results tr JOIN test_cases tc ON tr.test_case_id = tc.test_case_id JOIN test_programs tp ON tc.test_program_id = tp.test_program_id - WHERE tr.result_type = '{restype.value}' - """) + WHERE tr.result_type = ? + """, + (restype.value,), + ) results = cursor.fetchall() return [f"{row[0]}:{row[1]}" for row in results] - def passed(self) -> List[str]: + def passed(self) -> list[str]: return self._results(self.Result.PASSED) - def failed(self) -> List[str]: + def failed(self) -> list[str]: return self._results(self.Result.FAILED) - def skipped(self) -> List[str]: + def skipped(self) -> list[str]: return self._results(self.Result.SKIPPED) - def broken(self) -> List[str]: + def broken(self) -> list[str]: return self._results(self.Result.BROKEN) class FreeBSDSrcRepository(GitRepository): @functools.cache def get___FreeBSD_version(self) -> int: - file = self.path / 'sys' / 'sys' / 'param.h' - with file.open('r') as f: - pattern = re.compile(r'^\s*#define\s+__FreeBSD_version\s+(\d+)') + file = self.path / "sys" / "sys" / "param.h" + with file.open("r") as f: + pattern = re.compile(r"^\s*#define\s+__FreeBSD_version\s+(\d+)") for line in f: match = pattern.match(line) if match: return int(match.group(1)) raise ValueError( - f"Could not obtain __FreeBSD_version from {file}" + f"Could not obtain __FreeBSD_version from {file}", ) - def make(self, args: List[str], **kwargs): - cmd = ['make', '-C', self.path.resolve()] + args + def make(self, args: list[str], **kwargs): + cmd = ["make", "-C", self.path.resolve(), *args] # Don't skip the command if we need to capture output. - skip = self._no_cmds and not kwargs.get('capture_output', False) + skip = self._no_cmds and not kwargs.get("capture_output", False) return run_cmd(cmd, skip=skip, **kwargs) @functools.cache - def machine_targets(self) -> List[str]: - pattern = re.compile(r'^\s*\w+/\w+$') - output = self.make(['targets'], capture_output=True).stdout.decode() + def machine_targets(self) -> list[str]: + pattern = re.compile(r"^\s*\w+/\w+$") + output = self.make(["targets"], capture_output=True).stdout.decode() targets = [] for line in output.splitlines(): if pattern.match(line.strip()): @@ -104,132 +116,136 @@ def machine_targets(self) -> List[str]: class GitCheckoutTask(Task): - """ - Clone a git repository, or update an existing clone. + """Clone a git repository, or update an existing clone. Alternately, pass a filesystem path for the "url" parameter instead of a URL or ssh address to use an existing local clone. """ + name = "git-checkout" parameters = { - 'branch': TaskParameter( + "branch": TaskParameter( description="Branch to check out", ), - 'shallow': TaskParameter( + "shallow": TaskParameter( description="Perform a shallow clone and fetch", default=True, ), - 'url': TaskParameter( + "url": TaskParameter( description="URL of the Git repository to clone, or a filesystem path", required=True, ), } outputs = { - 'repo': GitRepository + "repo": GitRepository, } - def run(self, ctx, repotype: Type[GitRepository] = GitRepository): - repo = repotype(self.url, Path("./src"), self.branch, - shallow=self.shallow, no_cmds=self.skip) + def run(self, ctx, repotype: type[GitRepository] = GitRepository): + repo = repotype( + self.url, + Path("./src"), + self.branch, + shallow=self.shallow, + no_cmds=self.skip, + ) repo.update(shallow=self.shallow) - return {'repo': repo} + return {"repo": repo} class FreeBSDSrcGitCheckoutTask(GitCheckoutTask): - """ - Clone the FreeBSD src tree, or update an existing clone. - """ + """Clone the FreeBSD src tree, or update an existing clone.""" + name = "freebsd-src-git-checkout" url = "anongit@git.freebsd.org:src.git" branch = "main" outputs = { - 'repo': FreeBSDSrcRepository, - 'FreeBSD_version': int, + "repo": FreeBSDSrcRepository, + "FreeBSD_version": int, } def run(self, ctx): outputs = super().run(ctx, repotype=FreeBSDSrcRepository) - outputs['FreeBSD_version'] = outputs['repo'].get___FreeBSD_version() + outputs["FreeBSD_version"] = outputs["repo"].get___FreeBSD_version() return outputs class FreeBSDSrcBuildTask(Task): - """ - Build a FreeBSD source tree. On its own this does nothing, the invoker + """Build a FreeBSD source tree. On its own this does nothing, the invoker needs to specify some build targets. """ + name = "freebsd-src-build" parameters = { - 'clean': TaskParameter( + "clean": TaskParameter( description="Clean build directories before building", default=False, ), - 'kernel_config': TaskParameter( + "kernel_config": TaskParameter( description="Kernel configuration to build", default="GENERIC", ), - 'machine': TaskParameter( + "machine": TaskParameter( description="Target machine architecture", default=host_machine(), ), - 'make_options': TaskParameter( + "make_options": TaskParameter( description="Additional make(1) options to pass to the build", type=str, # XXX-MJ List[str] ), - 'make_targets': TaskParameter( + "make_targets": TaskParameter( description="Make targets to build", type=str, # XXX-MJ List[str] - default='' + default="", ), - 'objdir': TaskParameter( + "objdir": TaskParameter( description="Object directory path for the build", type=Path, # XXX-MJ default must be computed after some partial eval ), - 'toolchain': TaskParameter( + "toolchain": TaskParameter( description="Toolchain to use for the build", ), } inputs = { - 'src': FreeBSDSrcGitCheckoutTask, + "src": FreeBSDSrcGitCheckoutTask, } outputs = { - 'machine': str, - 'metalog': MtreeFile, - 'repo': FreeBSDSrcRepository, - 'stagedir': Path, + "machine": str, + "metalog": MtreeFile, + "repo": FreeBSDSrcRepository, + "stagedir": Path, } def run(self, ctx): # See if the user specified a valid target platform. - if '/' not in self.machine: + if "/" not in self.machine: machine = self.machine - machine_arch = '' + machine_arch = "" else: - (machine, machine_arch) = self.machine.split('/', maxsplit=1) + (machine, machine_arch) = self.machine.split("/", maxsplit=1) targets = self.src.repo.machine_targets() - if machine_arch == '': + if machine_arch == "": matches = [target for target in targets if target.startswith(f"{machine}/")] if len(matches) == 1: - machine_arch = matches[0].split('/', maxsplit=1)[1] + machine_arch = matches[0].split("/", maxsplit=1)[1] else: raise ValueError( - f"Multiple architectures found for machine '{machine}': {' '.join(matches)}'" + f"Multiple architectures found for machine '{machine}': {' '.join(matches)}'", ) if f"{machine}/{machine_arch}" not in targets: raise ValueError( - f"Unknown target platform: {self.machine}" + f"Unknown target platform: {self.machine}", ) # If the kernel config is a path, extract the basename and dirname. kernconf = self.kernel_config kernconfdir = None - if '/' in kernconf: + if "/" in kernconf: kernconf = Path(kernconf).name kernconfdir = Path(self.kernel_config).parent @@ -245,13 +261,14 @@ def run(self, ctx): for target in self.make_targets.split(): metalog = stagedir / f"METALOG.{target}.mtree" if not self.skip: - with open(metalog, 'w') as f: + with open(metalog, "w") as f: f.truncate(0) args = [ target, "-ss", - "-j", ctx.max_jobs, + "-j", + ctx.max_jobs, "-DNO_ROOT", f"DESTDIR={stagedir}", f"METALOG={metalog}", @@ -282,10 +299,10 @@ def run(self, ctx): mtree.load(metalog, append=True, contents_root=stagedir) return { - 'machine': f"{machine}/{machine_arch}", - 'metalog': mtree, - 'repo': self.src.repo, - 'stagedir': stagedir, + "machine": f"{machine}/{machine_arch}", + "metalog": mtree, + "repo": self.src.repo, + "stagedir": stagedir, } @@ -298,75 +315,75 @@ class FreeBSDPkgBaseBuildTask(FreeBSDSrcBuildTask): class FreeBSDVMImageFilesystem(Enum): - UFS = 'ufs' - ZFS = 'zfs' + UFS = "ufs" + ZFS = "zfs" class FreeBSDVMImageTask(Task): - """ - Build a FreeBSD VM image, optionally adding an overlay tree, installing + """Build a FreeBSD VM image, optionally adding an overlay tree, installing packages, and applying other customizations. """ + name = "freebsd-vm-image" inputs = { - 'src': FreeBSDSrcGitCheckoutTask, - 'build': FreeBSDSrcBuildAndInstallTask, + "src": FreeBSDSrcGitCheckoutTask, + "build": FreeBSDSrcBuildAndInstallTask, } outputs = { - 'image': VMImage, - 'ssh_key': Path, - 'sysroot': Path, + "image": VMImage, + "ssh_key": Path, + "sysroot": Path, } parameters = { - 'filesystem': TaskParameter( + "filesystem": TaskParameter( description="Filesystem type for the VM image", type=FreeBSDVMImageFilesystem, # XXX-MJ validate enum default=FreeBSDVMImageFilesystem.UFS, ), - 'hostname': TaskParameter( + "hostname": TaskParameter( description="Hostname for the VM", - default='freebsd', + default="freebsd", ), - 'image_size': TaskParameter( + "image_size": TaskParameter( description="Size of the filesystem image in GiB", default=10, ), - 'loader_tunables': TaskParameter( + "loader_tunables": TaskParameter( description="Loader tunables for the VM", type=str, # XXX-MJ Dict[str, str] - default='', + default="", ), - 'overlay': TaskParameter( + "overlay": TaskParameter( description="Path to an overlay directory to copy into the image", type=Path, ), - 'packages': TaskParameter( + "packages": TaskParameter( description="A list of packages to install into the image", ), - 'package_repo_file': TaskParameter( + "package_repo_file": TaskParameter( description="Path to a pkg(8) repository configuration file used to fetch packages", type=Path, # XXX-MJ should be a default ), - 'rc_kld_list': TaskParameter( + "rc_kld_list": TaskParameter( description="A list of kernel modules to load at boot time", ), - 'single_user': TaskParameter( + "single_user": TaskParameter( description="Boot into single-user mode", default=False, ), - 'sudo_users': TaskParameter( + "sudo_users": TaskParameter( description="A list of users to grant sudo privileges, useful for tests", type=str, ), - 'swap_size': TaskParameter( + "swap_size": TaskParameter( description="Size of the swap partition", default="2G", ), - 'sysctls': TaskParameter( + "sysctls": TaskParameter( description="A list of sysctl(8) settings to apply to the image", type=str, # XXX-MJ Dict[str, str] ), @@ -384,15 +401,18 @@ def run(self, ctx): with chdir(Path("./ssh-keys")): keyfile = Path.cwd() / "id_ed25519_root" if not keyfile.is_file(): - self.run_cmd(["ssh-keygen", "-t", "ed25519", "-f", str(keyfile), "-N", ""]) - metalog.add_file(keyfile.with_suffix('.pub'), - Path("root/.ssh/authorized_keys")) - outputs['ssh_key'] = keyfile + self.run_cmd( + ["ssh-keygen", "-t", "ed25519", "-f", str(keyfile), "-N", ""] + ) + metalog.add_file( + keyfile.with_suffix(".pub"), Path("root/.ssh/authorized_keys") + ) + outputs["ssh_key"] = keyfile def add_overlay(root: Path) -> None: if not root.is_dir(): raise ValueError(f"Overlay path '{root}' is not a directory") - for item in root.rglob('*'): + for item in root.rglob("*"): rel = item.relative_to(root) if item.is_dir(): metalog.add_dir(rel) @@ -402,9 +422,9 @@ def add_overlay(root: Path) -> None: warn(f"Skipping unsupported overlay item: {item}") def add_config_file( - _path: Union[Path, str], + _path: Path | str, *args, - source: Optional[Path] = None, + source: Path | None = None, comment_delimiter: str = "#", ) -> None: if self.skip: @@ -425,58 +445,73 @@ def add_config_file( if self.overlay is not None: add_overlay(self.overlay) - add_config_file("etc/ssh/sshd_config", - "PermitRootLogin without-password", - source=(stagedir / "etc/ssh/sshd_config")) + add_config_file( + "etc/ssh/sshd_config", + "PermitRootLogin without-password", + source=(stagedir / "etc/ssh/sshd_config"), + ) if self.rc_kld_list is not None: kld_list = self.rc_kld_list.split() else: kld_list = [] - add_config_file("etc/rc.conf", - f"hostname={self.hostname}", - "ifconfig_vtnet0=SYNCDHCP", - "ifconfig_em0=SYNCDHCP", - "defaultroute_delay=2", - "sshd_enable=YES", - "sshd_rsa_enable=NO", - *[f"kld_list=\"${{kld_list}} {kld}\"" for kld in kld_list], - f""" + add_config_file( + "etc/rc.conf", + f"hostname={self.hostname}", + "ifconfig_vtnet0=SYNCDHCP", + "ifconfig_em0=SYNCDHCP", + "defaultroute_delay=2", + "sshd_enable=YES", + "sshd_rsa_enable=NO", + *[f'kld_list="${{kld_list}} {kld}"' for kld in kld_list], + f""" zfs_enable=YES zpool_reguid={zfs_pool_name} zpool_upgrade={zfs_pool_name} - """ if self.filesystem == FreeBSDVMImageFilesystem.ZFS else "") - - add_config_file("etc/fstab", """ + if self.filesystem == FreeBSDVMImageFilesystem.ZFS + else "", + ) + + add_config_file( + "etc/fstab", + """ /dev/gpt/rootfs / ufs rw 1 1 - """ if self.filesystem == FreeBSDVMImageFilesystem.UFS else "", """ + if self.filesystem == FreeBSDVMImageFilesystem.UFS + else "", + """ none /dev/fd fdescfs rw 0 0 /dev/gpt/swap non swap sw 0 0 - """) - - add_config_file("boot/loader.conf", - "autoboot_delay=1", - "beastie_disable=YES", - "loader_logo=none", - "console=comconsole", - "kernel_options=-s" if self.single_user else "", - "kern.geom.label.disk_ident.enable=0", - "p9fs_load=YES", - "virtio_p9fs_load=YES", - "zfs_load=YES" if self.filesystem == FreeBSDVMImageFilesystem.ZFS else "", - *[tunable for tunable in self.loader_tunables.split()]) + """, + ) + + add_config_file( + "boot/loader.conf", + "autoboot_delay=1", + "beastie_disable=YES", + "loader_logo=none", + "console=comconsole", + "kernel_options=-s" if self.single_user else "", + "kern.geom.label.disk_ident.enable=0", + "p9fs_load=YES", + "virtio_p9fs_load=YES", + "zfs_load=YES" if self.filesystem == FreeBSDVMImageFilesystem.ZFS else "", + *[tunable for tunable in self.loader_tunables.split()], + ) if self.sysctls is not None: - add_config_file("etc/sysctl.conf", - *[sysctl for sysctl in self.sysctls.split()], - source=(stagedir / "etc/sysctl.conf")) + add_config_file( + "etc/sysctl.conf", + *[sysctl for sysctl in self.sysctls.split()], + source=(stagedir / "etc/sysctl.conf"), + ) if self.sudo_users is not None: for user in self.sudo_users.split(): - add_config_file(f"usr/local/etc/sudoers.d/{user}", - f"{user} ALL=(ALL) NOPASSWD: ALL") + add_config_file( + f"usr/local/etc/sudoers.d/{user}", f"{user} ALL=(ALL) NOPASSWD: ALL" + ) add_config_file("firstboot") @@ -502,13 +537,20 @@ def add_config_file( def pkg_cmd(*args, **kwargs): cmd = [ "pkg", - "-o", "ASSUME_ALWAYS_YES=true", - "-o", "INSTALL_AS_USER=yes", - "-o", f"ABI={pkgabi}", - "-o", f"PKG_CACHEDIR={cache_dir}", - "-o", f"PKG_DBDIR={db_dir}", - "-o", f"OSVERSION={self.src.FreeBSD_version}", - "-o", f"REPOS_DIR={repos_dir}", + "-o", + "ASSUME_ALWAYS_YES=true", + "-o", + "INSTALL_AS_USER=yes", + "-o", + f"ABI={pkgabi}", + "-o", + f"PKG_CACHEDIR={cache_dir}", + "-o", + f"PKG_DBDIR={db_dir}", + "-o", + f"OSVERSION={self.src.FreeBSD_version}", + "-o", + f"REPOS_DIR={repos_dir}", ] cmd += list(args) return self.run_cmd(cmd, **kwargs) @@ -518,7 +560,9 @@ def pkg_cmd(*args, **kwargs): pkg_dir = stage_dir / pkg_reldir pkg_dir.mkdir(parents=True, exist_ok=True) pkg_cmd("update") - pkg_cmd("fetch", "--dependencies", "-o", pkg_dir, "pkg", *self.packages.split()) + pkg_cmd( + "fetch", "--dependencies", "-o", pkg_dir, "pkg", *self.packages.split() + ) pkg_cmd("-o", f"PKG_CACHEDIR={pkg_dir}", "clean") pkg_cmd("repo", pkg_dir) @@ -531,24 +575,33 @@ def pkg_cmd(*args, **kwargs): # supposed to do it, but it creates broken symlinks at the moment. See # https://github.com/freebsd/pkg/pull/2587 if not Path(pkg_dir / "All" / pkg_pkg).is_file(): - matches = glob.glob(str(pkg_dir / "All/Hashed" / f"pkg-{pkg_version}*.pkg")) + matches = glob.glob( + str(pkg_dir / "All/Hashed" / f"pkg-{pkg_version}*.pkg") + ) if len(matches) > 0: - (pkg_dir / "All" / pkg_pkg).symlink_to(Path("Hashed") / Path(matches[0]).name) + (pkg_dir / "All" / pkg_pkg).symlink_to( + Path("Hashed") / Path(matches[0]).name + ) else: - raise ValueError(f"Could not find fetched {pkg_pkg} in {pkg_dir / 'All'}") + raise ValueError( + f"Could not find fetched {pkg_pkg} in {pkg_dir / 'All'}" + ) add_overlay(stage_dir) - add_config_file("etc/pkg/local.conf", - f""" + add_config_file( + "etc/pkg/local.conf", + f""" local: {{ url: "file:///{pkg_reldir}", signature_type: "none", }} - """) + """, + ) - add_config_file("etc/rc.local", - f""" + add_config_file( + "etc/rc.local", + f""" bricoler_add_pkgs() {{ export PATH=${{PATH}}:/usr/local/sbin:/usr/local/bin @@ -565,7 +618,8 @@ def pkg_cmd(*args, **kwargs): if [ -f /firstboot ]; then bricoler_add_pkgs fi - """) + """, + ) metalog_path = Path.cwd() / "METALOG.mtree" metalog.write(metalog_path) @@ -578,20 +632,26 @@ def pkg_cmd(*args, **kwargs): makefs_cmd = ["makefs"] if self.filesystem == FreeBSDVMImageFilesystem.UFS: makefs_cmd += [ - "-t", "ffs", + "-t", + "ffs", "-Z", - "-o", "softupdates=1", - "-o" "version=2" + "-o", + "softupdates=1", + "-oversion=2", ] else: makefs_cmd += [ - "-t", "zfs", - "-o", f"poolname={zfs_pool_name}", - "-o", f"bootfs={zfs_pool_name}" + "-t", + "zfs", + "-o", + f"poolname={zfs_pool_name}", + "-o", + f"bootfs={zfs_pool_name}", ] makefs_cmd += [ "-DD", - "-s", f"{self.image_size}g", + "-s", + f"{self.image_size}g", fs_image_path, metalog_path, ] @@ -599,27 +659,32 @@ def pkg_cmd(*args, **kwargs): with chdir(stagedir): self.run_cmd(makefs_cmd) - has_efi = not (machine.startswith('i386/') or machine.startswith('powerpc/')) + has_efi = not (machine.startswith("i386/") or machine.startswith("powerpc/")) if has_efi: efi_loaders = { - 'amd64': "bootx64.efi", - 'arm': "bootarm.efi", - 'arm64': "bootaa64.efi", - 'riscv': "bootriscv64.efi", + "amd64": "bootx64.efi", + "arm": "bootarm.efi", + "arm64": "bootaa64.efi", + "riscv": "bootriscv64.efi", } esp_dir = Path(image_prefix + "-efi") shutil.rmtree(esp_dir, ignore_errors=True) with chdir(esp_dir / "EFI/BOOT"): - efi_loader = efi_loaders[machine.split('/')[0]] + efi_loader = efi_loaders[machine.split("/")[0]] shutil.copyfile(stagedir / "boot/loader.efi", Path(efi_loader)) makefs_cmd = [ "makefs", - "-t", "msdos", - "-o", "fat_type=16", - "-o", "sectors_per_cluster=1", - "-o", "volume_label=EFI", - "-s", "4m", + "-t", + "msdos", + "-o", + "fat_type=16", + "-o", + "sectors_per_cluster=1", + "-o", + "volume_label=EFI", + "-s", + "4m", esp_image_path, esp_dir, ] @@ -628,99 +693,115 @@ def pkg_cmd(*args, **kwargs): bootdir = stagedir / "boot" mkimg_cmd = [ "mkimg", - "-f", "raw", - "-S", 512, - "-o", vm_image_path, + "-f", + "raw", + "-S", + 512, + "-o", + vm_image_path, ] - if machine.startswith('powerpc/'): + if machine.startswith("powerpc/"): mkimg_cmd += [ - "-s", "mbr", - "-a", "1", - "-p", f"prepboot:={bootdir / 'boot1.elf'}" - "-p", f"freebsd:={vm_image_path}", + "-s", + "mbr", + "-a", + "1", + "-p", + f"prepboot:={bootdir / 'boot1.elf'}-p", + f"freebsd:={vm_image_path}", ] else: mkimg_cmd += ["-s", "gpt"] - if machine.startswith('amd64/') or machine.startswith('i386/'): + if machine.startswith("amd64/") or machine.startswith("i386/"): mkimg_cmd += [ - "-b", f"{bootdir / 'pmbr'}", - "-p", f"freebsd-boot/bootfs:={bootdir / 'gptboot'}", + "-b", + f"{bootdir / 'pmbr'}", + "-p", + f"freebsd-boot/bootfs:={bootdir / 'gptboot'}", ] if has_efi: mkimg_cmd += [ - "-p", f"efi:={esp_image_path}", + "-p", + f"efi:={esp_image_path}", ] mkimg_cmd += [ - "-p", f"freebsd-swap/swap::{self.swap_size}", - "-p", f"freebsd-{self.filesystem.value}/rootfs:={fs_image_path}", + "-p", + f"freebsd-swap/swap::{self.swap_size}", + "-p", + f"freebsd-{self.filesystem.value}/rootfs:={fs_image_path}", ] self.run_cmd(mkimg_cmd) - outputs['image'] = VMImage(vm_image_path, machine) - outputs['sysroot'] = stagedir + outputs["image"] = VMImage(vm_image_path, machine) + outputs["sysroot"] = stagedir return outputs class FreeBSDVMBootTask(Task): - """ - Boot a FreeBSD VM image using QEMU or bhyve. + """Boot a FreeBSD VM image using QEMU or bhyve. In interactive mode, the VM console is provided on standard stdin/stdout, and this task does not return until the VM exits. In non-interactive mode, bricoler owns the console and can interact with it """ + name = "freebsd-vm-boot" inputs = { - 'vm_image': FreeBSDVMImageTask, + "vm_image": FreeBSDVMImageTask, } parameters = { - 'disk_list': TaskParameter( + "disk_list": TaskParameter( description="A list of extra files to add as disks", - type=str + type=str, ), - 'hypervisor': TaskParameter( + "hypervisor": TaskParameter( description="Hypervisor to use for running the VM", type=VMHypervisor, # XXX-MJ should somehow default to qemu for non-native images - default=lambda: VMHypervisor.BHYVE if BhyveRun.canrun() else VMHypervisor.QEMU, + default=lambda: ( + VMHypervisor.BHYVE if BhyveRun.canrun() else VMHypervisor.QEMU + ), ), - 'interactive': TaskParameter( + "interactive": TaskParameter( description="Run the VM in interactive mode", default=True, ), - 'memory': TaskParameter( + "memory": TaskParameter( description="Amount of memory to allocate to the VM in MiB", default=2048, ), - 'ncpus': TaskParameter( + "ncpus": TaskParameter( description="Number of CPUs to allocate to the VM", default=2, ), - 'p9_shares': TaskParameter( + "p9_shares": TaskParameter( description="Comma-separated list of shares of the form :", type=str, # XXX-MJ List[Tuple[str, Path]] ), - 'reboot': TaskParameter( + "reboot": TaskParameter( description="Restart the VM when it exits due to a reboot", default=False, ), } outputs = { - 'vm': Optional[FreeBSDVM], + "vm": FreeBSDVM | None, } def run(self, ctx): match self.hypervisor: - case VMHypervisor.BHYVE: cls = BhyveRun - case VMHypervisor.QEMU: cls = QEMURun - case VMHypervisor.RVVM: cls = RVVMRun + case VMHypervisor.BHYVE: + cls = BhyveRun + case VMHypervisor.QEMU: + cls = QEMURun + case VMHypervisor.RVVM: + cls = RVVMRun if self.p9_shares: - p9_shares = [tuple(desc.split(':')) for desc in self.p9_shares.split(',')] + p9_shares = [tuple(desc.split(":")) for desc in self.p9_shares.split(",")] else: p9_shares = [] vmrun = cls( @@ -753,149 +834,193 @@ def run(self, ctx): console_log = open("vm-console.log", "wb") vm = FreeBSDVM(vmrun, logfiles=[console_log, sys.stdout.buffer]) - return {'vm': vm} + return {"vm": vm} - def _gdb(self, *args): + def _gdb(self, *args) -> None: if shutil.which("gdb") is None: raise ValueError("gdb is not available") sysroot = Path(os.readlink(Path.cwd() / "sysroot")) - with open(Path.cwd() / "gdb-addr", "r") as f: + with open(Path.cwd() / "gdb-addr") as f: addr = f.read().strip() - (host, portstr) = addr.split(':', maxsplit=1) + (host, portstr) = addr.split(":", maxsplit=1) port = int(portstr) gdb_cmd = [ "gdb", - "-ex", f"set sysroot {Path.cwd() / sysroot}", - "-ex", f"file {sysroot / 'boot/kernel/kernel'}", - "-ex", f"source {sysroot / 'usr/lib/debug/boot/kernel/kernel-gdb.py'}", - "-ex", f"target remote {host}:{port}", + "-ex", + f"set sysroot {Path.cwd() / sysroot}", + "-ex", + f"file {sysroot / 'boot/kernel/kernel'}", + "-ex", + f"source {sysroot / 'usr/lib/debug/boot/kernel/kernel-gdb.py'}", + "-ex", + f"target remote {host}:{port}", ] gdb_cmd += args self.run_cmd(gdb_cmd, process_group=0) - def _ssh(self, *args): - with open(Path.cwd() / "ssh-addr", "r") as f: + def _ssh(self, *args) -> None: + with open(Path.cwd() / "ssh-addr") as f: addr = f.read().strip() - (host, portstr) = addr.split(':', maxsplit=1) + (host, portstr) = addr.split(":", maxsplit=1) ssh = SSHCommandRunner((host, portstr), Path.cwd() / "ssh_key") ssh.run_cmd() actions = { - 'gdb': _gdb, - 'ssh': _ssh, + "gdb": _gdb, + "ssh": _ssh, } class FreeBSDRegressionTestSuiteBuildTask(FreeBSDSrcBuildAndInstallTask): - make_options = " ".join([ - # Remove some optional components to speed up the build. - "WITHOUT_CLANG=", "WITHOUT_LLD=", "WITHOUT_LLDB=", "WITHOUT_LIB32=", - # The in-tree ZFS tests take a long time to run and aren't very useful - # outside of ZFS development. - "WITHOUT_ZFS_TESTS=", - ]) + make_options = " ".join( + [ + # Remove some optional components to speed up the build. + "WITHOUT_CLANG=", + "WITHOUT_LLD=", + "WITHOUT_LLDB=", + "WITHOUT_LIB32=", + # The in-tree ZFS tests take a long time to run and aren't very useful + # outside of ZFS development. + "WITHOUT_ZFS_TESTS=", + ] + ) def run(self, ctx): outputs = super().run(ctx) # Manually install the run-kyua helper script. - dest = outputs['stagedir'] / "usr/tests/run-kyua" + dest = outputs["stagedir"] / "usr/tests/run-kyua" with resources.as_file(resources.files("bricoler") / "run-kyua") as src: shutil.copyfile(src, dest) - outputs['metalog'].add_file(dest, Path("usr/tests/run-kyua"), mode=0o755) + outputs["metalog"].add_file(dest, Path("usr/tests/run-kyua"), mode=0o755) return outputs class FreeBSDRegressionTestSuiteVMImageTask(FreeBSDVMImageTask): - loader_tunables = " ".join([ - "net.inet.ip.fw.default_to_accept=1", - "net.inet.ipf.jail_allowed=1", - "net.fibs=8", - ]) - - packages = " ".join([ - "coreutils", - "filesystems/ext2", - "gdb", - "git-lite", - "gtar", - "isc-dhcp44-server", - "jq", - "ksh93", - "llvm", - "ndisc6", - "net/py-dpkt", - "net/tcptestsuite", - "nist-kat", - "nmap", - "openvpn", - "perl5", - "pimd", - "porch", - "python", - "python3", - "devel/py-pytest", - "devel/py-twisted", - "net/scapy", - "security/setaudit", - "sg3_utils", - "sudo", - ]) - - rc_kld_list = " ".join([ - "accf_data", "accf_dns", "accf_http", "accf_tls", - "blake2", - "carp", - "cfiscsi", - "cryptodev", - "ctl", - "dummymbuf", - "dummynet", - "fusefs", - "if_bridge", "if_enc", "if_epair", "if_geneve", "if_gre", "if_lagg", "if_ovpn", "if_stf", "if_wg", - "ipdivert", - "ipfw", "ipfw_nat", "ipfw_nptv6", - "ip_mroute", "ip6_mroute", - "ipl", - "ipsec", - "mac_bsdextended", "mac_ipacl", "mac_portacl", "mqueuefs", - "pf", "pflog", "pflow", "pfsync", - "sctp", - "snd_dummy", - "tarfs", - "tcpmd5", - "unionfs", - "zfs", - ]) - - sysctls = " ".join([ - "kern.ipc.tls.enable=1", - "vfs.aio.enable_unsafe=1", - "kern.crypto.allow_soft=1", - "vm.panic_on_oom=1", - "security.mac.bsdextended.enabled=0", - "security.mac.ipacl.ipv4=0", - "security.mac.ipacl.ipv6=0", - "security.mac.portacl.enabled=0", - ]) + loader_tunables = " ".join( + [ + "net.inet.ip.fw.default_to_accept=1", + "net.inet.ipf.jail_allowed=1", + "net.fibs=8", + ] + ) + + packages = " ".join( + [ + "coreutils", + "filesystems/ext2", + "gdb", + "git-lite", + "gtar", + "isc-dhcp44-server", + "jq", + "ksh93", + "llvm", + "ndisc6", + "net/py-dpkt", + "net/tcptestsuite", + "nist-kat", + "nmap", + "openvpn", + "perl5", + "pimd", + "porch", + "python", + "python3", + "devel/py-pytest", + "devel/py-twisted", + "net/scapy", + "security/setaudit", + "sg3_utils", + "sudo", + ] + ) + + rc_kld_list = " ".join( + [ + "accf_data", + "accf_dns", + "accf_http", + "accf_tls", + "blake2", + "carp", + "cfiscsi", + "cryptodev", + "ctl", + "dummymbuf", + "dummynet", + "fusefs", + "if_bridge", + "if_enc", + "if_epair", + "if_geneve", + "if_gre", + "if_lagg", + "if_ovpn", + "if_stf", + "if_wg", + "ipdivert", + "ipfw", + "ipfw_nat", + "ipfw_nptv6", + "ip_mroute", + "ip6_mroute", + "ipl", + "ipsec", + "mac_bsdextended", + "mac_ipacl", + "mac_portacl", + "mqueuefs", + "pf", + "pflog", + "pflow", + "pfsync", + "sctp", + "snd_dummy", + "tarfs", + "tcpmd5", + "unionfs", + "zfs", + ] + ) + + sysctls = " ".join( + [ + "kern.ipc.tls.enable=1", + "vfs.aio.enable_unsafe=1", + "kern.crypto.allow_soft=1", + "vm.panic_on_oom=1", + "security.mac.bsdextended.enabled=0", + "security.mac.ipacl.ipv4=0", + "security.mac.ipacl.ipv6=0", + "security.mac.portacl.enabled=0", + ] + ) inputs = { - 'build': FreeBSDRegressionTestSuiteBuildTask, + "build": FreeBSDRegressionTestSuiteBuildTask, } def run(self, ctx): metalog = self.build.metalog - metalog.add_symlink(symlink_dest='/usr/local/bin/clang', path_in_image='usr/bin/cc') - metalog.add_symlink(symlink_dest='/usr/local/bin/ld.lld', path_in_image='usr/bin/ld') - metalog.add_symlink(symlink_dest='/usr/local/bin/clang-cpp', path_in_image='usr/bin/cpp') - metalog.add_symlink(symlink_dest='/usr/local/bin/clang++', path_in_image='usr/bin/c++') + metalog.add_symlink( + symlink_dest="/usr/local/bin/clang", path_in_image="usr/bin/cc" + ) + metalog.add_symlink( + symlink_dest="/usr/local/bin/ld.lld", path_in_image="usr/bin/ld" + ) + metalog.add_symlink( + symlink_dest="/usr/local/bin/clang-cpp", path_in_image="usr/bin/cpp" + ) + metalog.add_symlink( + symlink_dest="/usr/local/bin/clang++", path_in_image="usr/bin/c++" + ) return super().run(ctx) class FreeBSDRegressionTestSuiteTask(FreeBSDVMBootTask): - """ - Boot a virtual machine and run the FreeBSD regression test suite. - """ + """Boot a virtual machine and run the FreeBSD regression test suite.""" + name = "freebsd-regression-test-suite" # XXX-MJ kernel_config should be GENERIC-DEBUG on stable branches @@ -904,49 +1029,54 @@ class FreeBSDRegressionTestSuiteTask(FreeBSDVMBootTask): memory = 1024 * (os.cpu_count() // 2) inputs = { - 'vm_image': FreeBSDRegressionTestSuiteVMImageTask, + "vm_image": FreeBSDRegressionTestSuiteVMImageTask, } parameters = { - 'count': TaskParameter( + "count": TaskParameter( description="Number of times to run the tests", default=1, ), - 'parallelism': TaskParameter( + "parallelism": TaskParameter( description="Number of tests to run in parallel", default=os.cpu_count() // 2, # XXX-MJ duplicating the ncpus value ), - 'tests': TaskParameter( + "tests": TaskParameter( description="A space-separated list of test cases or test suites to run, " - "or the empty string to run all tests", + "or the empty string to run all tests", default="", ), } outputs = { - 'report_db_path': Path, - 'report_txt_path': Path, + "report_db_path": Path, + "report_txt_path": Path, } def run(self, ctx): outputs = super().run(ctx) - vm: FreeBSDVM = outputs['vm'] + vm: FreeBSDVM = outputs["vm"] if vm is None: raise ValueError( - "Cannot run tests, VM must be run in non-interactive mode" + "Cannot run tests, VM must be run in non-interactive mode", ) try: vm.boot_to_login() cmd = [ "/usr/tests/run-kyua", - "-c", str(self.count), - "-j", str(self.parallelism), - "-r", "/root/kyua.db", - "-o", "/root/kyua-report.txt", - ] + self.tests.split() + "-c", + str(self.count), + "-j", + str(self.parallelism), + "-r", + "/root/kyua.db", + "-o", + "/root/kyua-report.txt", + *self.tests.split(), + ] vm.sendcmd(cmd) - vm.wait_for_prompt(timeout=10*3600) + vm.wait_for_prompt(timeout=10 * 3600) report_db_path = Path.cwd() / "kyua.db" report_txt_path = Path.cwd() / "kyua-report.txt" @@ -962,23 +1092,22 @@ def run(self, ctx): except FreeBSDVM.PanicException as e: if sys.stdin.isatty(): self._gdb("-ex", f"thread {e.cpuid + 1}") - raise e + raise return { - 'report_db_path': report_db_path, - 'report_txt_path': report_txt_path, + "report_db_path": report_db_path, + "report_txt_path": report_txt_path, } - def _report(self, *args): + def _report(self, *args) -> None: self.run_cmd(["less", Path.cwd() / "kyua-report.txt"]) actions = { - 'report': _report, + "report": _report, } class FreeBSDRegressionTestSuiteCITask(FreeBSDRegressionTestSuiteTask): - """ - Run the regression test suite in CI mode: + """Run the regression test suite in CI mode: - Raise warnings about unexpected skipped tests. XXX-MJ - Detect flakiness in tests and report it. - Look for witness warnings after test runs and report them. XXX-MJ @@ -986,31 +1115,34 @@ class FreeBSDRegressionTestSuiteCITask(FreeBSDRegressionTestSuiteTask): - Keep track of results over time and report regressions. XXX-MJ - Generate email reports. """ + name = "freebsd-regression-test-suite-ci" inputs = { - 'src': FreeBSDSrcGitCheckoutTask, + "src": FreeBSDSrcGitCheckoutTask, } outputs = { - 'email': EmailReport, + "email": EmailReport, } def run(self, ctx): outputs = super().run(ctx) - db = KyuaDB(outputs['report_db_path']) + db = KyuaDB(outputs["report_db_path"]) failing_tests = db.failed() + db.broken() flaky = [] if failing_tests: - info(f"Re-running {len(failing_tests)} failed/broken test case(s) to check for flakiness") + info( + f"Re-running {len(failing_tests)} failed/broken test case(s) to check for flakiness" + ) with chdir(Path("./flaky-check")): self.parallelism = 1 self.tests = " ".join(failing_tests) flaky_outputs = super().run(ctx) - flaky_passed = set(KyuaDB(flaky_outputs['report_db_path']).passed()) + flaky_passed = set(KyuaDB(flaky_outputs["report_db_path"]).passed()) flaky = [t for t in failing_tests if t in flaky_passed] confirmed_failures = [t for t in failing_tests if t not in flaky_passed] @@ -1026,7 +1158,7 @@ def run(self, ctx): subject = f"FreeBSD regression test suite results ({self.src.repo.checked_out_branch()})" - report = f"Branch: {self.src.repo.checked_out_branch()}\n" + report = f"Branch: {self.src.repo.checked_out_branch()}\n" report += f"Commit: {self.src.repo.checked_out_revision()}\n" report += "The test run completed successfully, with " report += f"{len(db.passed())} passed, " @@ -1044,7 +1176,7 @@ def run(self, ctx): report += "\n" return { - 'email': EmailReport( + "email": EmailReport( subject=subject, body=report, ), @@ -1052,43 +1184,48 @@ def run(self, ctx): class FreeBSDDTraceTestSuiteBuildTask(FreeBSDRegressionTestSuiteBuildTask): - make_options = FreeBSDRegressionTestSuiteBuildTask.make_options + " WITH_DTRACE_TESTS=" + make_options = ( + FreeBSDRegressionTestSuiteBuildTask.make_options + " WITH_DTRACE_TESTS=" + ) class FreeBSDDTraceTestSuiteVMImageTask(FreeBSDRegressionTestSuiteVMImageTask): - packages = " ".join([ - "binutils", - "jq", - "libxml2", - "llvm", - "nmap", - "pdksh", - "perl5", - ]) - - rc_kld_list = " ".join([ - "dtraceall", - "dtrace_test", - "kinst", - "sctp" - ]) + packages = " ".join( + [ + "binutils", + "jq", + "libxml2", + "llvm", + "nmap", + "pdksh", + "perl5", + ] + ) + + rc_kld_list = " ".join( + [ + "dtraceall", + "dtrace_test", + "kinst", + "sctp", + ] + ) inputs = { - 'build': FreeBSDDTraceTestSuiteBuildTask, + "build": FreeBSDDTraceTestSuiteBuildTask, } class FreeBSDDTraceTestSuiteTask(FreeBSDRegressionTestSuiteTask): - """ - Boot a virtual machine and run the FreeBSD DTrace regression test suite. - """ + """Boot a virtual machine and run the FreeBSD DTrace regression test suite.""" + name = "freebsd-dtrace-test-suite" parallelism = 1 tests = "cddl/usr.sbin/dtrace" inputs = { - 'vm_image': FreeBSDDTraceTestSuiteVMImageTask, + "vm_image": FreeBSDDTraceTestSuiteVMImageTask, } @@ -1107,7 +1244,7 @@ class CheriBSDSrcBuildTask(FreeBSDSrcBuildTask): kernel_config = "GENERIC-MORELLO-PURECAP" inputs = { - 'src': CheriBSDSrcGitCheckoutTask, + "src": CheriBSDSrcGitCheckoutTask, } @@ -1119,8 +1256,8 @@ class CheriBSDVMImageTask(FreeBSDVMImageTask): name = "cheribsd-vm-image" inputs = { - 'src': CheriBSDSrcGitCheckoutTask, - 'build': CheriBSDSrcBuildAndInstallTask, + "src": CheriBSDSrcGitCheckoutTask, + "build": CheriBSDSrcBuildAndInstallTask, } @@ -1128,7 +1265,7 @@ class CheriBSDVMBootTask(FreeBSDVMBootTask): name = "cheribsd-vm-boot" inputs = { - 'vm_image': CheriBSDVMImageTask, + "vm_image": CheriBSDVMImageTask, } @@ -1136,7 +1273,7 @@ class EC2Provider: config: Config ssh_key_dir: Path - def __init__(self, config: Config, region: str): + def __init__(self, config: Config, region: str) -> None: # Lazy import since this takes a bit of time (~130ms on a Zen 4). try: import boto3 @@ -1144,8 +1281,8 @@ def __init__(self, config: Config, region: str): raise ImportError("boto3 is required for EC2 tasks") from e self.config = config - self.client = boto3.client('ec2', region) - self.resource = boto3.resource('ec2', region) + self.client = boto3.client("ec2", region) + self.resource = boto3.resource("ec2", region) self.ssh_key_dir = self.config.workdir / "ec2-ssh-keys" def create_ssh_keypair(self, key_name: str, tag_value: str) -> Path: @@ -1154,15 +1291,17 @@ def create_ssh_keypair(self, key_name: str, tag_value: str) -> Path: if not keyfile.is_file(): key_pair = self.resource.create_key_pair( KeyName=key_name, - TagSpecifications=[{ - 'ResourceType': "key-pair", - 'Tags': [ - {'Key': "bricoler", 'Value': tag_value}, - ], - }] + TagSpecifications=[ + { + "ResourceType": "key-pair", + "Tags": [ + {"Key": "bricoler", "Value": tag_value}, + ], + } + ], ) private_key = key_pair.key_material - with keyfile.open('w', encoding='utf-8') as f: + with keyfile.open("w", encoding="utf-8") as f: f.write(private_key) keyfile.chmod(0o400) return keyfile @@ -1176,8 +1315,8 @@ def create_instance( tag_value: str, ): ami = self.ami_by_id(image_id) - bdm = ami.get('BlockDeviceMappings') - bdm[0]['Ebs']['VolumeSize'] = volume_size + bdm = ami.get("BlockDeviceMappings") + bdm[0]["Ebs"]["VolumeSize"] = volume_size instances = self.resource.create_instances( ImageId=image_id, @@ -1186,29 +1325,33 @@ def create_instance( MinCount=1, MaxCount=1, BlockDeviceMappings=bdm, - TagSpecifications=[{ - 'ResourceType': "instance", - 'Tags': [{'Key': "bricoler", 'Value': tag_value}], - }] + TagSpecifications=[ + { + "ResourceType": "instance", + "Tags": [{"Key": "bricoler", "Value": tag_value}], + } + ], ) instance = instances[0] instance.wait_until_running() instance.reload() timeout = 300 - info(f"Waiting up to {timeout} seconds for instance {instance.id} to become ready") + info( + f"Waiting up to {timeout} seconds for instance {instance.id} to become ready" + ) start_time = time.time() ec2_client = instance.meta.client while time.time() - start_time < timeout: response = ec2_client.describe_instance_status( InstanceIds=[instance.id], - IncludeAllInstances=False + IncludeAllInstances=False, ) - if response['InstanceStatuses']: - status = response['InstanceStatuses'][0] - instance_status = status['InstanceStatus']['Status'] - system_status = status['SystemStatus']['Status'] + if response["InstanceStatuses"]: + status = response["InstanceStatuses"][0] + instance_status = status["InstanceStatus"]["Status"] + system_status = status["SystemStatus"]["Status"] if instance_status == "ok" and system_status == "ok": # XXX-MJ also need to wait for ssh @@ -1216,9 +1359,9 @@ def create_instance( time.sleep(5) raise TimeoutError(f"Instance not ready after {timeout} seconds") - def clean(self, tag_value: str = "*"): + def clean(self, tag_value: str = "*") -> None: filters = [ - {'Name': "tag:bricoler", 'Values': [tag_value]}, + {"Name": "tag:bricoler", "Values": [tag_value]}, ] shutil.rmtree(self.ssh_key_dir, ignore_errors=True) @@ -1232,38 +1375,40 @@ def clean(self, tag_value: str = "*"): instance.wait_until_terminated() @functools.cache - def ami_by_id(self, image_id: str) -> Dict[str, str]: + def ami_by_id(self, image_id: str) -> dict[str, str]: response = self.client.describe_images(ImageIds=[image_id]) - images = response['Images'] + images = response["Images"] if len(images) == 0: raise ValueError(f"AMI {image_id} not found") return images[0] @functools.cache - def freebsd_amis(self, owners: Tuple[str] = ("aws-marketplace",)) -> List[Dict[str, str]]: + def freebsd_amis( + self, owners: tuple[str] = ("aws-marketplace",) + ) -> list[dict[str, str]]: response = self.client.describe_images( Filters=[ - {'Name': "name", 'Values': ["FreeBSD*"]}, - {'Name': "state", 'Values': ["available"]}, + {"Name": "name", "Values": ["FreeBSD*"]}, + {"Name": "state", "Values": ["available"]}, ], Owners=list(owners), ) - images = response['Images'] - images.sort(key=lambda x: x['CreationDate'], reverse=True) + images = response["Images"] + images.sort(key=lambda x: x["CreationDate"], reverse=True) return images @functools.cache def instance_types(self): response = self.client.describe_instance_types() - instance_types = response['InstanceTypes'] - instance_types.sort(key=lambda x: x['InstanceType']) + instance_types = response["InstanceTypes"] + instance_types.sort(key=lambda x: x["InstanceType"]) return instance_types class EC2MetaTask(Task): parameters = { # XXX-MJ need a mechanism to set a default value for this from the config file - 'aws_region': TaskParameter( + "aws_region": TaskParameter( description="AWS region to use", default="us-east-1", ), @@ -1271,21 +1416,20 @@ class EC2MetaTask(Task): class EC2LaunchTask(EC2MetaTask): - """ - Launch an EC2 instance accessible via ssh. - """ + """Launch an EC2 instance accessible via ssh.""" + name = "ec2-launch-freebsd" parameters = { - 'image_id': TaskParameter( + "image_id": TaskParameter( description="AMI ID of the FreeBSD image to launch", required=True, ), - 'instance_type': TaskParameter( + "instance_type": TaskParameter( description="EC2 instance type to launch", required=True, ), - 'volume_size': TaskParameter( + "volume_size": TaskParameter( description="Size of the root volume in GiB", default=20, ), @@ -1312,17 +1456,17 @@ def run(self, ctx): class EC2CleanTask(EC2MetaTask): - """ - Clean up EC2 resources created by bricoler. + """Clean up EC2 resources created by bricoler. By default it only cleans up resources created in the current workdir; the "all" parameter can be used to clean up all resources created by bricoler using a given IAM account. """ + name = "ec2-clean" parameters = { - 'all': TaskParameter( + "all": TaskParameter( description="Clean up all EC2 resources created by bricoler across all workdirs", default=False, ), @@ -1335,13 +1479,12 @@ def run(self, ctx): class EC2ListAMIsTask(EC2MetaTask): - """ - List FreeBSD AMIs available from the specified owner. - """ + """List FreeBSD AMIs available from the specified owner.""" + name = "ec2-list-freebsd-amis" parameters = { - 'owners': TaskParameter( + "owners": TaskParameter( description="Space-separated list of AMI owners to filter by", default="782442783595", # FreeBSD community AMIs ), @@ -1355,17 +1498,16 @@ def run(self, ctx): class EC2ListInstanceTypesTask(EC2MetaTask): - """ - List all EC2 instance types in a given region. - """ + """List all EC2 instance types in a given region.""" + name = "ec2-list-instance-types" parameters = { - 'min_ncpu': TaskParameter( + "min_ncpu": TaskParameter( description="Filter instance types by minimum number of CPUs", default=1, ), - 'min_memory': TaskParameter( + "min_memory": TaskParameter( description="Filter instance types by minimum memory (in MiB)", default=256, ), @@ -1375,52 +1517,50 @@ def run(self, ctx): provider = EC2Provider(self.config, self.aws_region) instance_types = provider.instance_types() for it in instance_types: - if it['VCpuInfo']['DefaultVCpus'] < self.min_ncpu: + if it["VCpuInfo"]["DefaultVCpus"] < self.min_ncpu: continue - if it['MemoryInfo']['SizeInMiB'] < self.min_memory: + if it["MemoryInfo"]["SizeInMiB"] < self.min_memory: continue json.dump(it, sys.stdout, indent=2) return {} class OpenZFSGitCheckoutTask(GitCheckoutTask): - """ - Clone the OpenZFS repository, or update an existing clone. - """ + """Clone the OpenZFS repository, or update an existing clone.""" + name = "openzfs-git-checkout" url = "https://github.com/openzfs/zfs" branch = "master" outputs = { - 'repo': GitRepository, + "repo": GitRepository, } class OpenZFSBuildTask(Task): - """ - Build OpenZFS from a Git repository checkout. - """ + """Build OpenZFS from a Git repository checkout.""" + name = "openzfs-build" parameters = { - 'clean': TaskParameter( + "clean": TaskParameter( description="Clean build artifacts before building", - default=False + default=False, ), - 'sysdir': TaskParameter( + "sysdir": TaskParameter( description="Path to the FreeBSD kernel source to compile against", - default=Path("/usr/src/sys") + default=Path("/usr/src/sys"), ), } inputs = { - 'src': OpenZFSGitCheckoutTask, + "src": OpenZFSGitCheckoutTask, } outputs = { - 'user_stagedir': Path, - 'kmod_stagedir': Path, + "user_stagedir": Path, + "kmod_stagedir": Path, } def run(self, ctx): @@ -1434,13 +1574,15 @@ def run(self, ctx): if not Path("./configure").is_file() or self.clean: self.run_cmd(["./autogen.sh"]) if not Path("./Makefile").is_file() or self.clean: - self.run_cmd([ - "./configure", - "MAKE=gmake", - "--with-config=user", - "--enable-invariants", - "--enable-debug", - ]) + self.run_cmd( + [ + "./configure", + "MAKE=gmake", + "--with-config=user", + "--enable-invariants", + "--enable-debug", + ] + ) self.run_cmd(["gmake", "-j", str(ctx.max_jobs)]) self.run_cmd(["gmake", "install", f"DESTDIR={user_stagedir}"]) @@ -1448,43 +1590,54 @@ def run(self, ctx): with chdir(self.src.repo.path / "module"): if self.clean: self.run_cmd(["make", "-f", "Makefile.bsd", "clean"]) - self.run_cmd([ - "make", "-s", - "-j", str(ctx.max_jobs), - "-f", "Makefile.bsd", - "CC=cc", - f"SYSDIR={self.sysdir}", - "WITH_DEBUG=true" - ]) - self.run_cmd([ - "make", "-s", - "-f", "Makefile.bsd", - "install", - f"KMODOWN={os.geteuid()}", - f"KMODGRP={os.getegid()}", - "KMODDIR=", - "DEBUGDIR=", - f"DESTDIR={kmod_stagedir}", - "WITHOUT_DEBUG_FILES=", - ]) + self.run_cmd( + [ + "make", + "-s", + "-j", + str(ctx.max_jobs), + "-f", + "Makefile.bsd", + "CC=cc", + f"SYSDIR={self.sysdir}", + "WITH_DEBUG=true", + ] + ) + self.run_cmd( + [ + "make", + "-s", + "-f", + "Makefile.bsd", + "install", + f"KMODOWN={os.geteuid()}", + f"KMODGRP={os.getegid()}", + "KMODDIR=", + "DEBUGDIR=", + f"DESTDIR={kmod_stagedir}", + "WITHOUT_DEBUG_FILES=", + ] + ) return { - 'user_stagedir': user_stagedir, - 'kmod_stagedir': kmod_stagedir, + "user_stagedir": user_stagedir, + "kmod_stagedir": kmod_stagedir, } class OpenZFSTestSuiteFreeBSDSrcBuildTask(FreeBSDSrcBuildAndInstallTask): - make_options = " ".join([ - "WITHOUT_LIB32=", - "WITHOUT_TOOLCHAIN=", - "WITHOUT_ZFS=", - ]) + make_options = " ".join( + [ + "WITHOUT_LIB32=", + "WITHOUT_TOOLCHAIN=", + "WITHOUT_ZFS=", + ] + ) class OpenZFSTestSuiteBuildTask(OpenZFSBuildTask): inputs = { - 'freebsd_build': OpenZFSTestSuiteFreeBSDSrcBuildTask, + "freebsd_build": OpenZFSTestSuiteFreeBSDSrcBuildTask, } outputs = OpenZFSBuildTask.outputs | OpenZFSTestSuiteFreeBSDSrcBuildTask.outputs @@ -1501,39 +1654,41 @@ class OpenZFSTestSuiteVMImageTask(FreeBSDVMImageTask): image_size = 50 - packages = " ".join([ - "bash", - "devel/py-sysctl", - "fio", - "jq", - "ksh93", - "libunwind", - "pamtester", - "python3", - "rsync", - "sudo", - "xxhash", - ]) + packages = " ".join( + [ + "bash", + "devel/py-sysctl", + "fio", + "jq", + "ksh93", + "libunwind", + "pamtester", + "python3", + "rsync", + "sudo", + "xxhash", + ] + ) sudo_users = "tests" inputs = { - 'build': OpenZFSTestSuiteBuildTask, + "build": OpenZFSTestSuiteBuildTask, } def run(self, ctx): mtree = self.build.metalog kmoddir = self.build.kmod_stagedir - mtree.add_file(kmoddir / "openzfs.ko", - Path("boot/kernel/openzfs.ko")) - mtree.add_file(kmoddir / "openzfs.ko.debug", - Path("usr/lib/debug/boot/kernel/openzfs.ko")) + mtree.add_file(kmoddir / "openzfs.ko", Path("boot/kernel/openzfs.ko")) + mtree.add_file( + kmoddir / "openzfs.ko.debug", Path("usr/lib/debug/boot/kernel/openzfs.ko") + ) def add_overlay(root: Path) -> None: if not root.is_dir(): raise ValueError(f"Overlay path '{root}' is not a directory") - for item in root.rglob('*'): + for item in root.rglob("*"): rel = item.relative_to(root) if item.is_dir(): mtree.add_dir(rel) @@ -1543,7 +1698,7 @@ def add_overlay(root: Path) -> None: mtree.add_symlink(src_symlink=item, path_in_image=rel) else: raise ValueError( - f"Unsupported file type for overlay: {item}" + f"Unsupported file type for overlay: {item}", ) add_overlay(self.build.user_stagedir) @@ -1552,9 +1707,8 @@ def add_overlay(root: Path) -> None: class OpenZFSTestSuiteTask(FreeBSDVMBootTask): - """ - Boot a virtual machine and run the OpenZFS test suite (ZTS). - """ + """Boot a virtual machine and run the OpenZFS test suite (ZTS).""" + name = "openzfs-test-suite" interactive = False @@ -1562,7 +1716,7 @@ class OpenZFSTestSuiteTask(FreeBSDVMBootTask): memory = 1024 * (os.cpu_count() // 2) inputs = { - 'vm_image': OpenZFSTestSuiteVMImageTask, + "vm_image": OpenZFSTestSuiteVMImageTask, } def run(self, ctx): @@ -1574,31 +1728,30 @@ def run(self, ctx): self.disk_list = disk_list outputs = super().run(ctx) - vm: FreeBSDVM = outputs['vm'] + vm: FreeBSDVM = outputs["vm"] if vm is None: raise ValueError( - "Cannot run tests, VM must be run in non-interactive mode" + "Cannot run tests, VM must be run in non-interactive mode", ) try: vm.boot_to_login() cmd = "/usr/local/share/zfs/zfs-tests.sh -v" - vm.sendline(f"DISKS=\"vtbd1 vtbd2 vtbd3\" su -m tests -c \"{cmd}\"") - vm.wait_for_prompt(timeout=10*3600) + vm.sendline(f'DISKS="vtbd1 vtbd2 vtbd3" su -m tests -c "{cmd}"') + vm.wait_for_prompt(timeout=10 * 3600) ssh = SSHCommandRunner(vm.vmrun.ssh_addr, vm.vmrun.ssh_key) ssh.scp_from("/var/tmp/test_results/current", Path.cwd() / "test_results") except FreeBSDVM.PanicException as e: if sys.stdin.isatty(): self._gdb("-ex", f"thread {e.cpuid + 1}") - raise e + raise return outputs class SyzkallerGitCheckoutTask(GitCheckoutTask): - """ - Clone the syzkaller repository, or update an existing clone. - """ + """Clone the syzkaller repository, or update an existing clone.""" + name = "syzkaller-git-checkout" url = "https://github.com/google/syzkaller" @@ -1606,52 +1759,53 @@ class SyzkallerGitCheckoutTask(GitCheckoutTask): shallow = False # Some syzkaller tests require a full clone. outputs = { - 'repo': GitRepository, + "repo": GitRepository, } class SyzkallerBuildTask(Task): - """ - Build syzkaller from a Git repository checkout. - """ + """Build syzkaller from a Git repository checkout.""" + name = "syzkaller-build" parameters = { - 'test': TaskParameter( + "test": TaskParameter( description="Run tests after building", - default=True + default=True, ), } inputs = { - 'src': SyzkallerGitCheckoutTask, + "src": SyzkallerGitCheckoutTask, } outputs = { - 'bindir': Path, - 'repo': GitRepository, + "bindir": Path, + "repo": GitRepository, } def run(self, ctx): with chdir(self.src.repo.path): - env = {'GOMAXPROCS': str(ctx.max_jobs)} + env = {"GOMAXPROCS": str(ctx.max_jobs)} self.run_cmd(["gmake"], env=env) if self.test: self.run_cmd(["gmake", "test"], env=env) return { - 'bindir': self.src.repo.path / "bin", - 'repo': self.src.repo, + "bindir": self.src.repo.path / "bin", + "repo": self.src.repo, } class SyzkallerFuzzFreeBSDBuildTask(FreeBSDSrcBuildAndInstallTask): def run(self, ctx): with open("SYZKALLER", "w") as f: - f.write("# Added by bricoler\n" - f"include {self.kernel_config}\n" - "ident SYZKALLER\n" - "options COVERAGE\n" - "options KCOV\n") + f.write( + "# Added by bricoler\n" + f"include {self.kernel_config}\n" + "ident SYZKALLER\n" + "options COVERAGE\n" + "options KCOV\n" + ) f.flush() self.kernel_config = str(Path.cwd() / "SYZKALLER") return super().run(ctx) @@ -1659,99 +1813,101 @@ def run(self, ctx): class SyzkallerFuzzFreeBSDVMImageTask(FreeBSDVMImageTask): inputs = { - 'build': SyzkallerFuzzFreeBSDBuildTask, + "build": SyzkallerFuzzFreeBSDBuildTask, } class SyzkallerFuzzFreeBSDTask(Task): - """ - Run syzkaller against a FreeBSD target - """ + """Run syzkaller against a FreeBSD target""" + name = "syzkaller-fuzz-freebsd" parameters = { - 'dashboard_addr': TaskParameter( + "dashboard_addr": TaskParameter( description="Address of the syzkaller HTTP dashboard", default="0.0.0.0:8080", ), - 'debug': TaskParameter( + "debug": TaskParameter( description="Run syzkaller in debug mode with a single VM and verbose logging", default=False, ), - 'hypervisor': TaskParameter( + "hypervisor": TaskParameter( description="Hypervisor to use for running the VM", type=VMHypervisor, default=VMHypervisor.BHYVE if BhyveRun.canrun() else VMHypervisor.QEMU, ), - 'vm_count': TaskParameter( + "vm_count": TaskParameter( description="Number of VMs to run in parallel (ignored in debug mode)", default=os.cpu_count() // 2, ), - 'vm_ncpu': TaskParameter( + "vm_ncpu": TaskParameter( description="Number of CPUs to allocate to each VM", default=2, ), - 'vm_memory': TaskParameter( + "vm_memory": TaskParameter( description="Amount of memory to allocate to each VM in MiB", default=2048, ), - 'zfs_dataset': TaskParameter( + "zfs_dataset": TaskParameter( description="ZFS dataset to use for storing syzkaller workdir and VM images", type=str, - ) + ), } inputs = { - 'syzkaller': SyzkallerBuildTask, - 'vm_image': SyzkallerFuzzFreeBSDVMImageTask, + "syzkaller": SyzkallerBuildTask, + "vm_image": SyzkallerFuzzFreeBSDVMImageTask, } - def run(self, ctx): + def run(self, ctx) -> None: hypervisor_args = {} image_path = None if self.hypervisor == VMHypervisor.BHYVE: if self.zfs_dataset is None: raise ValueError("zfs_dataset parameter is required when using bhyve") + def zfs_get(dataset, prop): cmd = ["zfs", "get", "-H", "-o", "value", prop, dataset] return self.run_cmd(cmd, capture_output=True).stdout.decode().strip() + mountpoint = zfs_get(self.zfs_dataset, "mountpoint") if mountpoint == "none": raise ValueError(f"ZFS dataset {self.zfs_dataset} is not mounted") # bhyve doesn't support transient disk snapshots, so we have to provide # a ZFS dataset to syz-manager that it can clone. - hypervisor_args['dataset'] = self.zfs_dataset - hypervisor_args['bootrom'] = "/usr/local/share/uefi-firmware/BHYVE_UEFI.fd" + hypervisor_args["dataset"] = self.zfs_dataset + hypervisor_args["bootrom"] = "/usr/local/share/uefi-firmware/BHYVE_UEFI.fd" image_path = str(Path(mountpoint) / "syzkaller.img") shutil.copyfile(self.vm_image.image.path, image_path) else: # --enable-kvm is hard-coded in the QEMU parameters for FreeBSD # targets, so we have to do this fragile thing to remove it. - hypervisor_args['qemu_args'] = "" + hypervisor_args["qemu_args"] = "" image_path = self.vm_image.image.path workdir = Path.cwd() / "workdir" workdir.mkdir(exist_ok=True) - machine = self.vm_image.image.machine.split('/', maxsplit=1)[1] + machine = self.vm_image.image.machine.split("/", maxsplit=1)[1] params = { - 'target': f"freebsd/{machine}", - 'workdir': str(workdir), - 'type': f"{self.hypervisor.value.lower()}", - 'syzkaller': str(self.syzkaller.repo.path), - 'image': str(image_path), - 'http': self.dashboard_addr, - 'ssh_user': "root", - 'sshkey': str(self.vm_image.ssh_key), - 'procs': 2, - 'vm': { - 'cpu': self.vm_ncpu, - 'mem': str(self.vm_memory) + "M", - 'count': self.vm_count, - } | hypervisor_args, + "target": f"freebsd/{machine}", + "workdir": str(workdir), + "type": f"{self.hypervisor.value.lower()}", + "syzkaller": str(self.syzkaller.repo.path), + "image": str(image_path), + "http": self.dashboard_addr, + "ssh_user": "root", + "sshkey": str(self.vm_image.ssh_key), + "procs": 2, + "vm": { + "cpu": self.vm_ncpu, + "mem": str(self.vm_memory) + "M", + "count": self.vm_count, + } + | hypervisor_args, } # Write the parameters to a JSON config file. @@ -1763,6 +1919,7 @@ def zfs_get(dataset, prop): cmd.append("-debug") self.run_cmd(cmd) + # # Features to add: # - automatic bisection for build and test failures @@ -1791,15 +1948,14 @@ def main() -> int: for alias in config.aliases: print(f" {alias['alias']} (alias for {alias['task']})") return 0 - elif args.list: + if args.list: for task_name in TaskMeta.task_names(): print(task_name) for alias in config.aliases: - print(alias['alias']) + print(alias["alias"]) return 0 - else: - config.usage() - return 1 + config.usage() + return 1 if args.alias: config.add_alias(args.alias) @@ -1812,13 +1968,13 @@ def main() -> int: if sched.target.__class__.__doc__ is not None: print(sched.target.__class__.__doc__.strip() + "\n") else: - print("") + print() if len(sched.target.parameters) > 0: print("Parameters:") width = max(len(name) for name in sched.parameters.keys()) + 2 for name, param in sched.parameters.items(): - print(f"{name+':':<{width}} {param[0].description}") - print(f"{'':{width+1}}{str(param[1])}") + print(f"{name + ':':<{width}} {param[0].description}") + print(f"{'':{width + 1}}{param[1]!s}") elif args.list: for task in sched.tasks.values(): for name in task.__class__.get_parameter_keys(): diff --git a/src/bricoler/config.py b/src/bricoler/config.py index 698ed5b..ea9998e 100644 --- a/src/bricoler/config.py +++ b/src/bricoler/config.py @@ -4,6 +4,8 @@ # SPDX-License-Identifier: BSD-2-Clause # +from __future__ import annotations + import argparse import fcntl import json @@ -11,141 +13,155 @@ import sys import uuid from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple +from typing import Any class Config: - command_line_parameters: List[str] = [] - config_file_object: Dict[str, Any] = {} + command_line_parameters: list[str] = [] + config_file_object: dict[str, Any] = {} CONFIG_FILE_VERSION = 1 config_path: Path files_dir: Path - mail_from: Optional[str] = None - mail_to: Optional[str] = None + mail_from: str | None = None + mail_to: str | None = None max_jobs: int = os.cpu_count() parser: argparse.ArgumentParser skip: bool = False - task_params: Dict[str, Dict[str, Any]] = {} + task_params: dict[str, dict[str, Any]] = {} workdir: Path uuid: uuid.UUID - def __init__(self): - self.files_dir = Path(sys.argv[0]).parent.resolve() / 'files' - self.workdir = Path(os.environ.get('BRICOLER_WORKDIR', - Path.home() / 'bricoler')).resolve() + def __init__(self) -> None: + self.files_dir = Path(sys.argv[0]).parent.resolve() / "files" + self.workdir = Path( + os.environ.get("BRICOLER_WORKDIR", Path.home() / "bricoler") + ).resolve() - parser = argparse.ArgumentParser(prog='bricoler') + parser = argparse.ArgumentParser(prog="bricoler") parser.add_argument( - '-a', '--alias', - action='store', - help='define an alias for the current command-line invocation') + "-a", + "--alias", + action="store", + help="define an alias for the current command-line invocation", + ) parser.add_argument( - "-j", "--max-jobs", + "-j", + "--max-jobs", type=int, - metavar='N', + metavar="N", default=self.max_jobs, - help='set the maximum number of concurrent jobs (default: number of CPUs)') + help="set the maximum number of concurrent jobs (default: number of CPUs)", + ) parser.add_argument( - '-l', '--list', - action='store_true', - help=argparse.SUPPRESS) # only really meant for completion handlers + "-l", "--list", action="store_true", help=argparse.SUPPRESS + ) # only really meant for completion handlers parser.add_argument( - '--mail-from', - metavar='ADDR', - help='set the email address to send notifications from') + "--mail-from", + metavar="ADDR", + help="set the email address to send notifications from", + ) parser.add_argument( - '--mail-to', - metavar='ADDR', - help='set the email address to send notifications to') + "--mail-to", + metavar="ADDR", + help="set the email address to send notifications to", + ) parser.add_argument( - '-s', '--show', - action='store_true', - help='show all available tasks or task parameters') + "-s", + "--show", + action="store_true", + help="show all available tasks or task parameters", + ) parser.add_argument( - '-S', '--skip', - action='store_true', - help='skip execution of dependent tasks') + "-S", + "--skip", + action="store_true", + help="skip execution of dependent tasks", + ) parser.add_argument( - '-w', '--workdir', - metavar='DIR', + "-w", + "--workdir", + metavar="DIR", default=self.workdir, - help='set the work directory (default: $BRICOLER_WORKDIR or ${HOME}/bricoler)') - parser.add_argument( - 'task', - nargs='?', - help='the task to run') + help="set the work directory (default: $BRICOLER_WORKDIR or ${HOME}/bricoler)", + ) + parser.add_argument("task", nargs="?", help="the task to run") self.parser = parser @property - def aliases(self) -> List[Dict[str, Any]]: - return self.config_file_object['aliases'] + def aliases(self) -> list[dict[str, Any]]: + return self.config_file_object["aliases"] - def add_alias(self, name: str): + def add_alias(self, name: str) -> None: # Remove an existing alias. Perhaps we should rename it instead? - self.config_file_object['aliases'] = [ - a for a in self.config_file_object['aliases'] if a['alias'] != name + self.config_file_object["aliases"] = [ + a for a in self.config_file_object["aliases"] if a["alias"] != name ] - self.config_file_object['aliases'].append({ - "alias": name, - "task": self.task.name, - "parameters": self.command_line_parameters, - }) - with self.config_path.open('w') as f: + self.config_file_object["aliases"].append( + { + "alias": name, + "task": self.task.name, + "parameters": self.command_line_parameters, + } + ) + with self.config_path.open("w") as f: json.dump(self.config_file_object, fp=f, indent=4) - def lookup_alias(self, name: str) -> Optional[Dict[str, Any]]: + def lookup_alias(self, name: str) -> dict[str, Any] | None: return next( - (a for a in self.config_file_object['aliases'] if a['alias'] == name), - None + (a for a in self.config_file_object["aliases"] if a["alias"] == name), + None, ) - def load(self, lookup) -> Tuple[argparse.Namespace, Optional[List[str]]]: - """ - Load the configuration file and parse command-line arguments. Return + def load(self, lookup) -> tuple[argparse.Namespace, list[str] | None]: + """Load the configuration file and parse command-line arguments. Return task parameters and any auxilliary action. """ # Parse global arguments and the task name. opts, args = self.parser.parse_known_args() - self.mail_from = opts.mail_from or self.config_file_object.get('mail_from') - self.mail_to = opts.mail_to or self.config_file_object.get('mail_to') + self.mail_from = opts.mail_from or self.config_file_object.get("mail_from") + self.mail_to = opts.mail_to or self.config_file_object.get("mail_to") self.max_jobs = opts.max_jobs self.skip = opts.skip self.workdir.mkdir(parents=True, exist_ok=True) - self.config_path = Path(self.workdir / 'bricoler.json') + self.config_path = Path(self.workdir / "bricoler.json") # Load aliases from the configuration file. try: - f = self.config_path.open('r') + f = self.config_path.open("r") except FileNotFoundError: # Populate it with some initial structure. - with self.config_path.open('w') as f: - json.dump({ - "aliases": [], - "mail_from": "", - "mail_to": "", - "uuid": str(uuid.uuid4()), - "version": Config.CONFIG_FILE_VERSION, - }, fp=f, indent=4) + with self.config_path.open("w") as f: + json.dump( + { + "aliases": [], + "mail_from": "", + "mail_to": "", + "uuid": str(uuid.uuid4()), + "version": Config.CONFIG_FILE_VERSION, + }, + fp=f, + indent=4, + ) finally: - with self.config_path.open('r') as f: + with self.config_path.open("r") as f: try: self.config_file_object = json.load(f) except json.JSONDecodeError as e: raise ValueError( - f"Configuration file '{self.config_path}' is not valid JSON: {e}" + f"Configuration file '{self.config_path}' is not valid JSON: {e}", ) from e - version = self.config_file_object.get('version', -1) + version = self.config_file_object.get("version", -1) if version != Config.CONFIG_FILE_VERSION: raise ValueError( - f"Unknown or unsupported configuration file version: {version}" + f"Unknown or unsupported configuration file version: {version}", ) try: - self.uuid = uuid.UUID(self.config_file_object.get('uuid', "")) + self.uuid = uuid.UUID(self.config_file_object.get("uuid", "")) except ValueError as e: raise ValueError( - f"Configuration file '{self.config_path}' has invalid UUID: {e}" + f"Configuration file '{self.config_path}' has invalid UUID: {e}", ) from e if opts.task: @@ -154,12 +170,12 @@ def load(self, lookup) -> Tuple[argparse.Namespace, Optional[List[str]]]: alias = self.lookup_alias(opts.task) if alias is None: raise ValueError(f"Unknown task '{opts.task}'") - task = lookup(alias['task']) + task = lookup(alias["task"]) if task is None: raise ValueError( - f"Unknown task '{alias['task']}' in alias '{opts.task}'" + f"Unknown task '{alias['task']}' in alias '{opts.task}'", ) - args = [f"--{param}" for param in alias['parameters']] + args + args = [f"--{param}" for param in alias["parameters"]] + args self.task = task # Parse task-specific arguments. These are of the form @@ -171,30 +187,30 @@ def load(self, lookup) -> Tuple[argparse.Namespace, Optional[List[str]]]: # e.g., bricoler freebsd-vm-boot ssh to ssh into a running task VM. action = None for arg in args: - if not arg.startswith('--'): + if not arg.startswith("--"): if action is None: - action = args[args.index(arg):] + action = args[args.index(arg) :] break arg = arg[2:] - if '=' not in arg: + if "=" not in arg: raise ValueError( - f"Task parameters must be of the form --/=: {arg}" + f"Task parameters must be of the form --/=: {arg}", ) - key, val = arg.split('=', 1) - if '/' not in key: + key, val = arg.split("=", 1) + if "/" not in key: raise ValueError( - f"Task parameters must be of the form --/=: {arg}" + f"Task parameters must be of the form --/=: {arg}", ) - task_name, param_name = key.split('/', 1) + task_name, param_name = key.split("/", 1) task = lookup(task_name) if task is None: raise ValueError( - f"Unknown task '{task_name}' in parameter '{arg}'" + f"Unknown task '{task_name}' in parameter '{arg}'", ) param = task.get_parameter(param_name) if param is None: raise ValueError( - f"Task '{task_name}' has no parameter named '{param_name}'" + f"Task '{task_name}' has no parameter named '{param_name}'", ) if task_name not in self.task_params: @@ -204,15 +220,15 @@ def load(self, lookup) -> Tuple[argparse.Namespace, Optional[List[str]]]: return (opts, action) - def lock(self): + def lock(self) -> None: # Lock the configuration file to prevent concurrent modifications. try: - self._locked_file = self.config_path.open('r+') + self._locked_file = self.config_path.open("r+") fcntl.flock(self._locked_file, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: raise RuntimeError( f"Could not acquire lock on configuration file '{self.config_path}': " - "another instance of bricoler is running" + "another instance of bricoler is running", ) from None def usage(self) -> None: diff --git a/src/bricoler/git.py b/src/bricoler/git.py index 55ece6a..8d741f8 100644 --- a/src/bricoler/git.py +++ b/src/bricoler/git.py @@ -4,8 +4,9 @@ # SPDX-License-Identifier: BSD-2-Clause # +from __future__ import annotations + from pathlib import Path -from typing import Dict, List, Optional from urllib.parse import urlparse from .util import run_cmd @@ -16,52 +17,58 @@ class GitRepository: def is_ssh_url(url: str) -> bool: # Simple check for SSH-style Git URLs. This doesn't match git's exact # behaviour but it seems close enough that it won't matter. - colon = url.find(':') + colon = url.find(":") if colon == -1: return False - slash = url.find('/') + slash = url.find("/") return slash == -1 or colon < slash def __init__( self, url: str, path: Path, - branch: Optional[str] = None, + branch: str | None = None, shallow: bool = True, no_cmds: bool = False, - ): + ) -> None: self.url = url self.branch = branch self._no_cmds = no_cmds parsed = urlparse(url) - self.external = parsed.scheme == '' and not self.is_ssh_url(url) + self.external = parsed.scheme == "" and not self.is_ssh_url(url) if self.external: self.path = Path(url).resolve() else: self.path = path.resolve() self.clone(shallow=shallow) - def git(self, cmd: List[str], *args, **kwargs): + def git(self, cmd: list[str], *args, **kwargs): if not self.path: raise ValueError("Repository has not been cloned yet") - return run_cmd(['git', '-C', self.path] + cmd, *args, **kwargs) + return run_cmd(["git", "-C", self.path, *cmd], *args, **kwargs) def checked_out_branch(self) -> str: - return self.git(["rev-parse", "--abbrev-ref", "HEAD"], capture_output=True).stdout.decode().strip() + return ( + self.git(["rev-parse", "--abbrev-ref", "HEAD"], capture_output=True) + .stdout.decode() + .strip() + ) def checked_out_revision(self) -> str: - return self.git(["rev-parse", "HEAD"], capture_output=True).stdout.decode().strip() + return ( + self.git(["rev-parse", "HEAD"], capture_output=True).stdout.decode().strip() + ) def isshallow(self) -> bool: output = self.git(["rev-parse", "--is-shallow-repository"], capture_output=True) return output.stdout.decode().strip() == "true" - def clone(self, shallow=True): + def clone(self, shallow=True) -> None: if not (self.path / ".git").exists(): if self.external: raise ValueError( - f"Repository path '{self.url}' does not exist or is not a repo clone" + f"Repository path '{self.url}' does not exist or is not a repo clone", ) cmd = ["git", "clone"] if shallow: @@ -71,7 +78,7 @@ def clone(self, shallow=True): cmd += [self.url, str(self.path.resolve())] run_cmd(cmd) - def update(self, shallow=True): + def update(self, shallow=True) -> None: assert self.path is not None if self.external: # This repository is externally managed. @@ -85,7 +92,7 @@ def update(self, shallow=True): break else: raise ValueError( - f"Clone at '{self.path}' has no remote corresponding to '{self.url}'" + f"Clone at '{self.path}' has no remote corresponding to '{self.url}'", ) if shallow or not self.isshallow(): self.git(["fetch", remote]) @@ -95,7 +102,7 @@ def update(self, shallow=True): self.git(["merge", "--ff-only", remote, f"{self.branch}"]) @property - def remotes(self) -> Dict[str, str]: + def remotes(self) -> dict[str, str]: result = {} output = self.git(["remote", "-v"], capture_output=True) for line in output.stdout.decode().splitlines(): diff --git a/src/bricoler/mtree.py b/src/bricoler/mtree.py index 988d641..4350959 100644 --- a/src/bricoler/mtree.py +++ b/src/bricoler/mtree.py @@ -9,19 +9,22 @@ # SPDX-License-Identifier: BSD-2-Clause # +from __future__ import annotations + import collections.abc import fnmatch import os import shlex import stat from collections import OrderedDict +from collections.abc import Iterator from pathlib import Path, PurePath, PurePosixPath -from typing import Dict, Iterator, List, Optional, Union + from .util import warn class MtreePath(PurePosixPath): - def __str__(self): + def __str__(self) -> str: pathstr = super().__str__() if pathstr != ".": pathstr = "./" + pathstr @@ -29,7 +32,7 @@ def __str__(self): class MtreeEntry: - def __init__(self, path: MtreePath, attributes: Dict[str, str]): + def __init__(self, path: MtreePath, attributes: dict[str, str]) -> None: self.path = path self.attributes = attributes @@ -40,7 +43,7 @@ def is_file(self) -> bool: return self.attributes.get("type") == "file" @classmethod - def parse(cls, line: str, contents_root: Optional[Path] = None) -> "MtreeEntry": + def parse(cls, line: str, contents_root: Path | None = None) -> MtreeEntry: elements = shlex.split(line) tmppath = elements[0] # Ensure that the path is normalized: @@ -58,7 +61,7 @@ def parse(cls, line: str, contents_root: Optional[Path] = None) -> "MtreeEntry": # FIXME: use contents= @classmethod - def parse_all_dirs_in_mtree(cls, mtree_file: Path) -> List["MtreeEntry"]: + def parse_all_dirs_in_mtree(cls, mtree_file: Path) -> list[MtreeEntry]: with mtree_file.open("r", encoding="utf-8") as f: result = [] for line in f.readlines(): @@ -86,8 +89,8 @@ def __repr__(self) -> str: class MtreeSubtree(collections.abc.MutableMapping): def __init__(self) -> None: - self.entry: Optional[MtreeEntry] = None - self.children: Dict[str, "MtreeSubtree"] = OrderedDict() + self.entry: MtreeEntry | None = None + self.children: dict[str, MtreeSubtree] = OrderedDict() @staticmethod def _split_key(key): @@ -110,7 +113,7 @@ def __getitem__(self, key): return self.entry return self.children[split[0]][split[1]] - def __setitem__(self, key, value): + def __setitem__(self, key, value) -> None: split = self._split_key(key) if split is None: self.entry = value @@ -119,7 +122,7 @@ def __setitem__(self, key, value): self.children[split[0]] = MtreeSubtree() self.children[split[0]][split[1]] = value - def __delitem__(self, key): + def __delitem__(self, key) -> None: split = self._split_key(key) if split is None: if self.entry is None: @@ -135,13 +138,15 @@ def __iter__(self): for k2 in v: yield MtreePath(k, k2) - def __len__(self): + def __len__(self) -> int: ret = int(self.entry is not None) for c in self.children.values(): ret += len(c) return ret - def _glob(self, patfrags: "list[str]", prefix: MtreePath, *, case_sensitive=False) -> Iterator[MtreePath]: + def _glob( + self, patfrags: list[str], prefix: MtreePath, *, case_sensitive=False + ) -> Iterator[MtreePath]: if len(patfrags) == 0: if self.entry is not None: yield prefix @@ -167,7 +172,7 @@ def glob(self, pattern: str, *, case_sensitive=False) -> Iterator[MtreePath]: patfrags.insert(0, tail) return self._glob(patfrags, MtreePath(), case_sensitive=case_sensitive) - def _walk(self, top, prefix) -> Iterator[tuple[MtreePath, List[str], List[str]]]: + def _walk(self, top, prefix) -> Iterator[tuple[MtreePath, list[str], list[str]]]: split = self._split_key(top) if split is not None: if split[0] in self.children: @@ -175,8 +180,8 @@ def _walk(self, top, prefix) -> Iterator[tuple[MtreePath, List[str], List[str]]] return if self.entry is not None and self.entry.attributes["type"] != "dir": return - files: "list[tuple[str, MtreeSubtree]]" = [] - dirs: "list[tuple[str, MtreeSubtree]]" = [] + files: list[tuple[str, MtreeSubtree]] = [] + dirs: list[tuple[str, MtreeSubtree]] = [] for k, v in self.children.items(): if v.entry is not None and v.entry.attributes["type"] != "dir": files.append((k, v)) @@ -186,17 +191,19 @@ def _walk(self, top, prefix) -> Iterator[tuple[MtreePath, List[str], List[str]]] for _, v in dirs: yield from v._walk(MtreePath(), prefix) - def walk(self, top) -> "Iterator[tuple[MtreePath, list[str], list[str]]]": + def walk(self, top) -> Iterator[tuple[MtreePath, list[str], list[str]]]: return self._walk(top, MtreePath()) class MtreeFile: - def __init__(self, file: Optional[Path] = None, contents_root: Optional[Path] = None): + def __init__( + self, file: Path | None = None, contents_root: Path | None = None + ) -> None: self._mtree = MtreeSubtree() if file: self.load(file, contents_root=contents_root, append=False) - def load(self, file: Path, append: bool, contents_root: Optional[Path] = None): + def load(self, file: Path, append: bool, contents_root: Path | None = None) -> None: with file.open("r") as f: if not append: self._mtree.clear() @@ -213,7 +220,7 @@ def load(self, file: Path, append: bool, contents_root: Optional[Path] = None): self._mtree[key] = entry @staticmethod - def _ensure_mtree_mode_fmt(mode: Union[str, int]) -> str: + def _ensure_mtree_mode_fmt(mode: str | int) -> str: if not isinstance(mode, str): mode = "0" + oct(mode)[2:] assert mode.startswith("0") @@ -236,20 +243,24 @@ def infer_mode_string(path: Path, should_be_dir) -> str: # make sure that the .ssh config files are installed with the right permissions if path.name == ".ssh" and result != "0700": return "0700" - if path.parent.name == ".ssh" and not path.name.endswith(".pub") and result != "0600": + if ( + path.parent.name == ".ssh" + and not path.name.endswith(".pub") + and result != "0600" + ): return "0600" return result def add_file( self, - file: Optional[Path], + file: Path | None, path_in_image, mode=None, uname="root", gname="wheel", parent_dir_mode=None, - symlink_dest: Optional[str] = None, - ): + symlink_dest: str | None = None, + ) -> None: if isinstance(path_in_image, PurePath): path_in_image = str(path_in_image) assert not path_in_image.startswith("/") @@ -286,24 +297,26 @@ def add_file( gname=gname, reference_dir=reference_dir, ) - attribs = OrderedDict([ - ("type", mtree_type), - ("uname", uname), - ("gname", gname), - ("mode", mode), - last_attrib - ]) + attribs = OrderedDict( + [ + ("type", mtree_type), + ("uname", uname), + ("gname", gname), + ("mode", mode), + last_attrib, + ] + ) entry = MtreeEntry(mtree_path, attribs) self._mtree[mtree_path] = entry def add_symlink( self, *, - src_symlink: Optional[Path] = None, + src_symlink: Path | None = None, symlink_dest=None, path_in_image: str, - **kwargs - ): + **kwargs, + ) -> None: if src_symlink is not None: assert symlink_dest is None self.add_file(src_symlink, path_in_image, **kwargs) @@ -311,7 +324,9 @@ def add_symlink( assert src_symlink is None self.add_file(None, path_in_image, symlink_dest=str(symlink_dest), **kwargs) - def add_dir(self, path, mode=None, uname="root", gname="wheel", reference_dir=None) -> None: + def add_dir( + self, path, mode=None, uname="root", gname="wheel", reference_dir=None + ) -> None: if isinstance(path, PurePath): path = str(path) assert not path.startswith("/") @@ -326,21 +341,29 @@ def add_dir(self, path, mode=None, uname="root", gname="wheel", reference_dir=No mode = self.infer_mode_string(reference_dir, True) mode = self._ensure_mtree_mode_fmt(mode) # Ensure that SSH will work even if the extra-file directory has wrong permissions - if (path == "root" or path == "root/.ssh") and mode != "0700" and mode != "0755": + if ( + (path == "root" or path == "root/.ssh") + and mode != "0700" + and mode != "0755" + ): mode = "0755" # recursively add all parent dirs that don't exist yet parent = str(Path(path).parent) if parent != path: # avoid recursion for path == "." if reference_dir is not None: - self.add_dir(parent, None, uname, gname, reference_dir=reference_dir.parent) + self.add_dir( + parent, None, uname, gname, reference_dir=reference_dir.parent + ) else: self.add_dir(parent, mode, uname, gname, reference_dir=None) # now add the actual entry - attribs = OrderedDict([("type", "dir"), ("uname", uname), ("gname", gname), ("mode", mode)]) + attribs = OrderedDict( + [("type", "dir"), ("uname", uname), ("gname", gname), ("mode", mode)] + ) entry = MtreeEntry(mtree_path, attribs) self._mtree[mtree_path] = entry - def add_from_mtree(self, mtree_file: "MtreeFile", path: Union[PurePath, str]): + def add_from_mtree(self, mtree_file: MtreeFile, path: PurePath | str) -> None: if isinstance(path, PurePath): path = str(path) assert not path.startswith("/") @@ -390,13 +413,13 @@ def __repr__(self) -> str: return "" - def write(self, output: Path): + def write(self, output: Path) -> None: with output.open("w", encoding="utf-8") as f: f.write("#mtree 2.0\n") for path in sorted(self._mtree.keys()): f.write(str(self._mtree[path]) + "\n") - def get(self, key) -> Optional[MtreeEntry]: + def get(self, key) -> MtreeEntry | None: return self._mtree.get(key) @property diff --git a/src/bricoler/task.py b/src/bricoler/task.py index bbe219c..fa89a6e 100644 --- a/src/bricoler/task.py +++ b/src/bricoler/task.py @@ -20,6 +20,8 @@ # from parent classes, providing a way to extend and customize tasks. # +from __future__ import annotations + import builtins import inspect import subprocess @@ -28,59 +30,59 @@ from enum import Enum from pathlib import Path from types import SimpleNamespace -from typing import Any, Dict, List, Optional, Set, Tuple, Type +from typing import Any from .config import Config from .util import chdir, run_cmd class TaskMeta(ABCMeta): - _registry: Dict[str, Type['Task']] = {} - _reserved_names: Set[str] = { - 'actions', - 'bindings', - 'config', - 'inputs', - 'name', - 'outputs', - 'parameters', - 'run', - 'skip', + _registry: dict[str, type[Task]] = {} + _reserved_names: set[str] = { + "actions", + "bindings", + "config", + "inputs", + "name", + "outputs", + "parameters", + "run", + "skip", } @classmethod - def _validate_common(mcs, cls: Type['Task'], namespace) -> None: + def _validate_common(mcs, cls: type[Task], namespace) -> None: # Task class names must end with 'Task'. - if not cls.__name__.endswith('Task'): + if not cls.__name__.endswith("Task"): raise ValueError( - f"Task class name '{cls.__name__}' must end with 'Task'" + f"Task class name '{cls.__name__}' must end with 'Task'", ) # No bindings should be defined initially, they are added once we # instantiate tasks and bind parameters. - if len(getattr(cls, 'bindings', {})) > 0: + if len(getattr(cls, "bindings", {})) > 0: raise ValueError( - f"Task '{cls.name}' should not define any bindings" + f"Task '{cls.name}' should not define any bindings", ) # Any members must be a parameter value. parameters = cls._chained_parameters for name in namespace.keys(): - if name.startswith('_'): + if name.startswith("_"): continue if name in mcs._reserved_names: continue if name not in parameters: raise ValueError( - f"Member '{name}' in task '{cls.name}' is not a defined parameter" + f"Member '{name}' in task '{cls.name}' is not a defined parameter", ) @classmethod - def _validate_named_task(mcs, cls: Type['Task'], name: str, namespace) -> None: + def _validate_named_task(mcs, cls: type[Task], name: str, namespace) -> None: mcs._validate_common(cls, namespace) - parameters = getattr(cls, 'parameters') - inputs = getattr(cls, 'inputs') - outputs = getattr(cls, 'outputs') + parameters = cls.parameters + inputs = cls.inputs + outputs = cls.outputs # Do some validation of the task definition. # @@ -88,21 +90,23 @@ def _validate_named_task(mcs, cls: Type['Task'], name: str, namespace) -> None: overlap = cls._chained_inputs.keys() & cls._chained_parameters.keys() if len(overlap) > 0: raise ValueError( - f"Task '{name}' has overlapping names: {', '.join(overlap)}" + f"Task '{name}' has overlapping names: {', '.join(overlap)}", ) # Make sure that none of the names overlap with reserved names. - overlap = (inputs.keys() & mcs._reserved_names) | \ - (outputs.keys() & mcs._reserved_names) | \ - (parameters.keys() & mcs._reserved_names) + overlap = ( + (inputs.keys() & mcs._reserved_names) + | (outputs.keys() & mcs._reserved_names) + | (parameters.keys() & mcs._reserved_names) + ) if len(overlap) > 0: raise ValueError( - f"Task '{name}' uses reserved names: {', '.join(overlap)}" + f"Task '{name}' uses reserved names: {', '.join(overlap)}", ) # Inputs must be a subclass of Task. for name, input_type in inputs.items(): if not inspect.isclass(input_type) or not issubclass(input_type, Task): raise TypeError( - f"Input '{name}' in task '{cls.name}' must be a subclass of Task" + f"Input '{name}' in task '{cls.name}' must be a subclass of Task", ) # Validate parameter types. for name, param in parameters.items(): @@ -110,17 +114,19 @@ def _validate_named_task(mcs, cls: Type['Task'], name: str, namespace) -> None: if val is not None and type(val) is not param.type: raise TypeError( f"Parameter '{name}' in task '{cls.name}' has type " - f"{type(getattr(cls, name))}, expected {param.typename}" + f"{type(getattr(cls, name))}, expected {param.typename}", ) @classmethod - def _validate_anonymous_task(mcs, cls: Type['Task'], namespace) -> None: + def _validate_anonymous_task(mcs, cls: type[Task], namespace) -> None: mcs._validate_common(cls, namespace) - invalid_keys = mcs._reserved_names & (set(namespace.keys()) - {"run", "inputs", "outputs"}) + invalid_keys = mcs._reserved_names & ( + set(namespace.keys()) - {"run", "inputs", "outputs"} + ) if len(invalid_keys) > 0: raise ValueError( - f"Anonymous task '{cls.__name__}' cannot define: {', '.join(invalid_keys)}" + f"Anonymous task '{cls.__name__}' cannot define: {', '.join(invalid_keys)}", ) def __new__(mcs, name, bases, namespace): @@ -128,13 +134,17 @@ def __new__(mcs, name, bases, namespace): for table in ["actions", "inputs", "parameters"]: chained = f"_chained_{table}" - setattr(cls, - chained, - ChainMap(getattr(cls, table), - *[getattr(b, chained) for b in bases if hasattr(b, chained)])) + setattr( + cls, + chained, + ChainMap( + getattr(cls, table), + *[getattr(b, chained) for b in bases if hasattr(b, chained)], + ), + ) if not inspect.isabstract(cls): - task_name = namespace.get('name') + task_name = namespace.get("name") if task_name is not None: mcs._validate_named_task(cls, task_name, namespace) mcs._registry[task_name] = cls @@ -143,16 +153,16 @@ def __new__(mcs, name, bases, namespace): return cls @classmethod - def lookup(mcs, name: str) -> Optional[Type['Task']]: + def lookup(mcs, name: str) -> type[Task] | None: return mcs._registry.get(name) @classmethod - def task_names(mcs) -> List[str]: + def task_names(mcs) -> list[str]: return list(mcs._registry.keys()) class TaskParameter: - choices: Optional[List[Any]] = None + choices: list[Any] | None = None default: Any description: str _initialized = False @@ -161,12 +171,12 @@ class TaskParameter: def __init__( self, - description: str = '', + description: str = "", type: type = None, default: Any = None, - choices: Optional[List[Any]] = None, + choices: list[Any] | None = None, required: bool = False, - ): + ) -> None: self.description = description self.type = type self.default = default @@ -178,13 +188,13 @@ def __init__( self.type = builtins.type(self.default) if not callable(self.default) and not isinstance(self.default, self.type): raise TypeError( - f"Default value {type(self.default)} does not match parameter type {self.type}" + f"Default value {type(self.default)} does not match parameter type {self.type}", ) elif self.type is None: self.type = str self._initialized = True - def __setattr__(self, key, value): + def __setattr__(self, key, value) -> None: if self._initialized: # These objects are immutable after instantiation. raise AttributeError(f"Cannot modify attribute '{key}' of TaskParameter") @@ -192,15 +202,15 @@ def __setattr__(self, key, value): @property def typename(self) -> str: - if hasattr(self.type, '__name__'): + if hasattr(self.type, "__name__"): return self.type.__name__ return str(self.type) def str2val(self, s: str) -> Any: if self.type is bool: - if s.lower() in ('1', 'true', 'yes', 'on'): + if s.lower() in ("1", "true", "yes", "on"): val = True - elif s.lower() in ('0', 'false', 'no', 'off'): + elif s.lower() in ("0", "false", "no", "off"): val = False else: raise ValueError(f"Value '{s}' is not of type {self.typename}") @@ -214,15 +224,15 @@ def str2val(self, s: str) -> Any: class TaskParameterBinding: value: Any - source: 'TaskParameterBinding.BindingType' - task: Optional[str] + source: TaskParameterBinding.BindingType + task: str | None class BindingType(Enum): - DEFAULT = 1, - COMMAND_LINE = 2, - OVERRIDDEN = 3, + DEFAULT = (1,) + COMMAND_LINE = (2,) + OVERRIDDEN = (3,) - def __init__(self, value, source: BindingType, task=None): + def __init__(self, value, source: BindingType, task=None) -> None: self.value = value self.source = source self.task = task @@ -232,22 +242,22 @@ def __str__(self) -> str: class Task(ABC, metaclass=TaskMeta): - actions: Dict[str, Any] = {} - bindings: Dict[str, TaskParameterBinding] + actions: dict[str, Any] = {} + bindings: dict[str, TaskParameterBinding] config: Config name: str - inputs: Dict[str, Type['Task']] = {} - outputs: Dict[str, Any] = {} - parameters: Dict[str, TaskParameter] = {} + inputs: dict[str, type[Task]] = {} + outputs: dict[str, Any] = {} + parameters: dict[str, TaskParameter] = {} skip: bool = False _chained_actions: ChainMap[str, Any] - _chained_inputs: ChainMap[str, Type['Task']] + _chained_inputs: ChainMap[str, type[Task]] _chained_parameters: ChainMap[str, TaskParameter] - _final_outputs: Optional[Dict[str, Any]] = None + _final_outputs: dict[str, Any] | None = None _finished: bool - def __init__(self, config: Config): + def __init__(self, config: Config) -> None: super().__init__() self.bindings = {} self.config = config @@ -256,23 +266,28 @@ def __init__(self, config: Config): for name, param in self._chained_parameters.items(): # Callable defaults are handled when assembling the schedule. if not callable(param.default): - self.bind({name: param.default}, - TaskParameterBinding.BindingType.DEFAULT) + self.bind( + {name: param.default}, TaskParameterBinding.BindingType.DEFAULT + ) for name in dir(self.__class__): if name in self._chained_parameters: - self.bind({name: getattr(self, name)}, - TaskParameterBinding.BindingType.OVERRIDDEN) + self.bind( + {name: getattr(self, name)}, + TaskParameterBinding.BindingType.OVERRIDDEN, + ) - def bind(self, params: Dict[str, Any], source: TaskParameterBinding.BindingType) -> None: + def bind( + self, params: dict[str, Any], source: TaskParameterBinding.BindingType + ) -> None: for name, param in params.items(): if name not in self._chained_parameters: raise ValueError( - f"Task '{self.name}' has no parameter named '{name}'" + f"Task '{self.name}' has no parameter named '{name}'", ) self.bindings[name] = TaskParameterBinding(value=param, source=source) @classmethod - def get_action_names(self) -> List[str]: + def get_action_names(self) -> list[str]: return list(self._chained_actions.keys()) @classmethod @@ -280,10 +295,10 @@ def get_parameter(self, name: str) -> TaskParameter: return self._chained_parameters[name] @classmethod - def get_parameter_keys(self) -> List[str]: + def get_parameter_keys(self) -> list[str]: return list(self._chained_parameters.keys()) - def _run(self, ctx: SimpleNamespace) -> Dict[str, Any]: + def _run(self, ctx: SimpleNamespace) -> dict[str, Any]: if self._final_outputs is not None: # Each task runs only once. return self._final_outputs @@ -295,59 +310,58 @@ def _run(self, ctx: SimpleNamespace) -> Dict[str, Any]: outputs = self.run(ctx) if outputs is None: raise ValueError( - f"Task '{self.name}' did not return any outputs, missing return statement?" + f"Task '{self.name}' did not return any outputs, missing return statement?", ) if type(outputs) is not dict: raise TypeError( - f"Task '{self.name}' returned outputs of type {type(outputs)}, expected dict" + f"Task '{self.name}' returned outputs of type {type(outputs)}, expected dict", ) if set(outputs.keys()) != set(self.outputs.keys()): missing = set(self.outputs.keys()) - set(outputs.keys()) if len(missing) > 0: raise ValueError( - f"Task {self.name} did not produce expected outputs: {', '.join(missing)}" - ) - else: - extra = set(outputs.keys()) - set(self.outputs.keys()) - raise ValueError( - f"Task {self.name} produced unexpected outputs: {', '.join(extra)}" + f"Task {self.name} did not produce expected outputs: {', '.join(missing)}", ) + extra = set(outputs.keys()) - set(self.outputs.keys()) + raise ValueError( + f"Task {self.name} produced unexpected outputs: {', '.join(extra)}", + ) for name, val in outputs.items(): # It would be nice if we could validate this statically... expected_type = self.outputs[name] if not isinstance(val, expected_type): raise TypeError( f"Output '{name}' in task '{self.name}' has type " - f"{type(val)}, expected {expected_type}" + f"{type(val)}, expected {expected_type}", ) self._final_outputs = outputs return outputs - def _run_action(self, action_name: str, action_args: List[str]) -> Any: + def _run_action(self, action_name: str, action_args: list[str]) -> Any: with chdir(Path.cwd() / self.name): self._chained_actions[action_name](self, action_args) - def run_cmd(self, cmd: List[Any], *args, **kwargs) -> subprocess.CompletedProcess: + def run_cmd(self, cmd: list[Any], *args, **kwargs) -> subprocess.CompletedProcess: # Ignore self.skip if the caller needs command output. - skip = self.skip and not kwargs.get('capture_output', False) + skip = self.skip and not kwargs.get("capture_output", False) return run_cmd(cmd, *args, skip=skip, **kwargs) @abstractmethod - def run(self, ctx) -> Dict[str, Any]: ... + def run(self, ctx) -> dict[str, Any]: ... class TaskSchedule: class TaskScheduleNode: task: Task - children: Dict[str, 'TaskSchedule.TaskScheduleNode'] + children: dict[str, TaskSchedule.TaskScheduleNode] - def __init__(self, task: Type[Task], config: Config): + def __init__(self, task: type[Task], config: Config) -> None: self.task = task(config) self.children = {} for name, input in task._chained_inputs.items(): self.children[name] = TaskSchedule.TaskScheduleNode(input, config) - def _run(self, ctx: SimpleNamespace) -> Dict[str, Any]: + def _run(self, ctx: SimpleNamespace) -> dict[str, Any]: for input, child in self.children.items(): outputs = child._run(ctx) inputs = {} @@ -364,12 +378,12 @@ def __iter__(self): config: Config schedule: TaskScheduleNode - def __init__(self, config: Config): + def __init__(self, config: Config) -> None: self.config = config self.schedule = self.TaskScheduleNode(config.task, config) # Preen the schedule. We only keep one instance of each task. - tasks: Dict[str, Task] = {} + tasks: dict[str, Task] = {} for node in self.schedule: if node.task.name in tasks: node.task = tasks[node.task.name] @@ -382,13 +396,13 @@ def __init__(self, config: Config): if node.task.name in params: node.task.bind( params[node.task.name], - TaskParameterBinding.BindingType.COMMAND_LINE + TaskParameterBinding.BindingType.COMMAND_LINE, ) del params[node.task.name] if len(params) > 0: - unknown_tasks = ', '.join(params.keys()) + unknown_tasks = ", ".join(params.keys()) raise ValueError( - f"Unknown tasks in command-line parameters: {unknown_tasks}" + f"Unknown tasks in command-line parameters: {unknown_tasks}", ) # Set default parameters that haven't been overridden by config or the @@ -403,7 +417,7 @@ def __init__(self, config: Config): default = default() node.task.bind( {name: default}, - TaskParameterBinding.BindingType.DEFAULT + TaskParameterBinding.BindingType.DEFAULT, ) # Mark dependent tasks to be skipped. @@ -412,48 +426,53 @@ def __init__(self, config: Config): if node != self.schedule: node.task.skip = True - def run(self) -> Dict[str, Any]: + def run(self) -> dict[str, Any]: self.config.lock() # Do any tasks have unbound required parameters? Raise an error if so. # We check this here rather than in the constructor so that it's possible # to do things like list unbound parameters in a schedule. for node in self.schedule: required = { - name for name, param in node.task._chained_parameters.items() if param.required + name + for name, param in node.task._chained_parameters.items() + if param.required } bindings = { - name for name, param in node.task.bindings.items() if param.value is not None + name + for name, param in node.task.bindings.items() + if param.value is not None } missing = required - bindings if len(missing) > 0: raise ValueError( - f"Task '{node.task.name}' is missing required parameters: {', '.join(missing)}" + f"Task '{node.task.name}' is missing required parameters: {', '.join(missing)}", ) ctx = SimpleNamespace(max_jobs=self.config.max_jobs) with chdir(self.config.workdir): return self.schedule._run(ctx) - def run_action(self, action_name: str, action_args: List[str]): + def run_action(self, action_name: str, action_args: list[str]) -> None: task = self.schedule.task if action_name not in task._chained_actions: - available = ', '.join(task._chained_actions.keys()) + available = ", ".join(task._chained_actions.keys()) raise ValueError( - f"'{task.name}' has no action '{action_name}', available actions are: {available}" + f"'{task.name}' has no action '{action_name}', available actions are: {available}", ) with chdir(self.config.workdir): task._run_action(action_name, action_args) @property - def parameters(self) -> Dict[str, Tuple[TaskParameter, Any]]: + def parameters(self) -> dict[str, tuple[TaskParameter, Any]]: """Return a mapping of parameter names to their values in the schedule.""" - result: Dict[str, Any] = {} + result: dict[str, Any] = {} - def _collect(node: TaskSchedule.TaskScheduleNode): + def _collect(node: TaskSchedule.TaskScheduleNode) -> None: for name in node.task._chained_parameters.keys(): val = node.task.bindings.get(name, None) result[f"{node.task.name}/{name}"] = ( - node.task._chained_parameters[name], val + node.task._chained_parameters[name], + val, ) for child in node.children.values(): @@ -463,11 +482,11 @@ def _collect(node: TaskSchedule.TaskScheduleNode): return result @property - def tasks(self) -> Dict[str, Task]: + def tasks(self) -> dict[str, Task]: """Return a mapping of task names to task instances in the schedule.""" - result: Dict[str, Task] = {} + result: dict[str, Task] = {} - def _collect(node: TaskSchedule.TaskScheduleNode): + def _collect(node: TaskSchedule.TaskScheduleNode) -> None: result[node.task.name] = node.task for child in node.children.values(): _collect(child) diff --git a/src/bricoler/util.py b/src/bricoler/util.py index 98979b5..675e818 100644 --- a/src/bricoler/util.py +++ b/src/bricoler/util.py @@ -4,6 +4,8 @@ # SPDX-License-Identifier: BSD-2-Clause # +from __future__ import annotations + import functools import os import signal @@ -13,26 +15,23 @@ from contextlib import contextmanager from enum import Enum from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple class EmailReport: - def __init__(self, subject: str, body: str, attachments: List[Path] = []): + def __init__( + self, subject: str, body: str, attachments: list[Path] | None = None + ) -> None: self.subject = subject self.body = body - self.attachments = attachments + self.attachments = attachments or [] - def send(self, mail_to: str, mail_from: str): + def send(self, mail_to: str, mail_from: str) -> None: msg = ( - f"From: {mail_from}\n" - f"To: {mail_to}\n" - f"Subject: {self.subject}\n" - f"\n" - f"{self.body}" + f"From: {mail_from}\nTo: {mail_to}\nSubject: {self.subject}\n\n{self.body}" ) for attachment in self.attachments: - with attachment.open('rb') as f: + with attachment.open("rb") as f: content = f.read() msg += f"\n\nAttachment: {attachment.name}\n{content.decode(errors='replace')}" @@ -44,14 +43,14 @@ def send(self, mail_to: str, mail_from: str): class ANSIColour(Enum): - BLACK = 30, - RED = 31, - GREEN = 32, - YELLOW = 33, - BLUE = 34, - MAGENTA = 35, - CYAN = 36, - WHITE = 37, + BLACK = (30,) + RED = (31,) + GREEN = (32,) + YELLOW = (33,) + BLUE = (34,) + MAGENTA = (35,) + CYAN = (36,) + WHITE = (37,) def colour(text: str, colour: ANSIColour) -> str: @@ -59,10 +58,10 @@ def colour(text: str, colour: ANSIColour) -> str: @contextmanager -def chdir(dir: Path, **kwargs): +def chdir(path: Path, **kwargs): old_dir = Path.cwd() - dir.mkdir(parents=True, exist_ok=True, **kwargs) - os.chdir(dir) + path.mkdir(parents=True, exist_ok=True, **kwargs) + os.chdir(path) try: yield finally: @@ -80,16 +79,16 @@ def sysctl(name: str) -> str: def run_cmd( - cmd: List[Any], + cmd: list, *args, - env: Optional[Dict[str, str]] = None, + env: dict[str, str] | None = None, check_result: bool = True, skip: bool = False, - **kwargs + **kwargs, ): cmd = [str(c) for c in cmd] - cmdstr = ' '.join(cmd) - log = not kwargs.get('capture_output', False) + cmdstr = " ".join(cmd) + log = not kwargs.get("capture_output", False) if skip: if log: info(f"EXEC(skipped): '{cmdstr}'") @@ -100,15 +99,15 @@ def run_cmd( tmp = os.environ.copy() tmp.update(env) env = tmp - assert kwargs.get('env') is None - kwargs['env'] = env + assert kwargs.get("env") is None + kwargs["env"] = env - capture_output = kwargs.pop('capture_output', False) + capture_output = kwargs.pop("capture_output", False) if capture_output: - kwargs['stdout'] = subprocess.PIPE - kwargs['stderr'] = subprocess.PIPE + kwargs["stdout"] = subprocess.PIPE + kwargs["stderr"] = subprocess.PIPE - new_pgrp = kwargs.get('process_group') == 0 + new_pgrp = kwargs.get("process_group") == 0 old_pgrp = None old_sigttou = None if new_pgrp and sys.stdin.isatty(): @@ -134,15 +133,15 @@ def run_cmd( return result -def info(message: str): +def info(message: str) -> None: print(colour("INFO", ANSIColour.GREEN) + f": {message}") -def warn(message: str): +def warn(message: str) -> None: print(colour("WARN", ANSIColour.YELLOW) + f": {message}", file=sys.stderr) -def unused_tcp_addr() -> Tuple[str, int]: +def unused_tcp_addr() -> tuple[str, int]: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('127.0.0.1', 0)) + s.bind(("127.0.0.1", 0)) return s.getsockname() diff --git a/src/bricoler/vm.py b/src/bricoler/vm.py index c98f1aa..c3fd229 100644 --- a/src/bricoler/vm.py +++ b/src/bricoler/vm.py @@ -4,6 +4,8 @@ # SPDX-License-Identifier: BSD-2-Clause # +from __future__ import annotations + import functools import os import sys @@ -11,38 +13,50 @@ from abc import abstractmethod from enum import Enum from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING + +import pexpect from .util import run_cmd, unused_tcp_addr -import pexpect +if TYPE_CHECKING: + import io class SSHCommandRunner: - def __init__(self, addr: Tuple[str, Union[str, int]], key: Path): + def __init__(self, addr: tuple[str, str | int], key: Path) -> None: self.addr = addr[0] self.port = addr[1] self.key = key - def run_cmd(self, cmd: List[str] = []): + def run_cmd(self, cmd: list[str] = []) -> None: ssh_cmd = [ "ssh", - "-o", "UserKnownHostsFile=/dev/null", - "-o", "StrictHostKeyChecking=no", - "-p", str(self.port), - "-i", str(self.key), + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "StrictHostKeyChecking=no", + "-p", + str(self.port), + "-i", + str(self.key), f"root@{self.addr}", - ] + cmd + *cmd, + ] run_cmd(ssh_cmd, check_result=True) - def scp_from(self, src: Path, dst: Path): + def scp_from(self, src: Path, dst: Path) -> None: scp_cmd = [ "scp", "-r", - "-o", "UserKnownHostsFile=/dev/null", - "-o", "StrictHostKeyChecking=no", - "-P", str(self.port), - "-i", str(self.key), + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "StrictHostKeyChecking=no", + "-P", + str(self.port), + "-i", + str(self.key), f"root@{self.addr}:{src}", str(dst), ] @@ -50,47 +64,47 @@ def scp_from(self, src: Path, dst: Path): class VMImage: - def __init__(self, path: Path, machine: str): + def __init__(self, path: Path, machine: str) -> None: self.path = path self.machine = machine - def select(self, d: Dict[str, str], default=None) -> str: + def select(self, d: dict[str, str], default=None) -> str: val = d.get(self.machine) if val is None: - val = d.get(self.machine.split('/', maxsplit=1)[0]) + val = d.get(self.machine.split("/", maxsplit=1)[0]) if val is None: return default return val class VMHypervisor(Enum): - BHYVE = 'bhyve' - QEMU = 'qemu' - RVVM = 'rvvm' + BHYVE = "bhyve" + QEMU = "qemu" + RVVM = "rvvm" class VMRun: class BlockDriver(Enum): - VIRTIO = 1, - AHCI = 2, - NVME = 3, + VIRTIO = (1,) + AHCI = (2,) + NVME = (3,) class NetworkDriver(Enum): - VIRTIO = 1, - E1000 = 2, - NONE = 3, + VIRTIO = (1,) + E1000 = (2,) + NONE = (3,) def __init__( self, image: VMImage, - extra_disks: List[str] = [], + extra_disks: list[str] = [], memory: int = 2048, ncpus: int = 2, block_driver: BlockDriver = BlockDriver.VIRTIO, nic_driver: NetworkDriver = NetworkDriver.VIRTIO, - p9_shares: List[Tuple[str, Path]] = [], - ssh_key: Optional[Path] = None, - ): + p9_shares: list[tuple[str, Path]] = [], + ssh_key: Path | None = None, + ) -> None: self.image = image self.extra_disks = extra_disks self.memory = memory @@ -103,10 +117,10 @@ def __init__( self.ssh_addr = unused_tcp_addr() @abstractmethod - def setup(self) -> List[Any]: ... + def setup(self) -> list: ... @abstractmethod - def ssh_addr(self) -> Tuple[str, int]: ... + def ssh_addr(self) -> tuple[str, int]: ... def ssh_handle(self): return SSHCommandRunner(self.ssh_addr, self.ssh_key) @@ -114,20 +128,25 @@ def ssh_handle(self): class BhyveRun(VMRun): class PrivModel(Enum): - INVALID = 1, # Cannot run bhyve. - UNPRIV = 2, # Can run bhyve with current privileges. - MDO = 3, # Can run bhyve with mdo(1). - SUDO = 4, # Can run bhyve with sudo. + INVALID = (1,) # Cannot run bhyve. + UNPRIV = (2,) # Can run bhyve with current privileges. + MDO = (3,) # Can run bhyve with mdo(1). + SUDO = (4,) # Can run bhyve with sudo. @functools.cache @staticmethod def access(): if os.access("/dev/vmmctl", os.R_OK | os.W_OK): return BhyveRun.PrivModel.UNPRIV - if run_cmd(["mdo", "test", "-w", "/dev/vmmctl"], check_result=False).returncode == 0: + if ( + run_cmd(["mdo", "test", "-w", "/dev/vmmctl"], check_result=False).returncode + == 0 + ): return BhyveRun.PrivModel.MDO - if run_cmd(["sudo", "-ln", "bhyve"], check_result=False).returncode == 0 and \ - run_cmd(["sudo", "-ln", "bhyvectl"], check_result=False).returncode == 0: + if ( + run_cmd(["sudo", "-ln", "bhyve"], check_result=False).returncode == 0 + and run_cmd(["sudo", "-ln", "bhyvectl"], check_result=False).returncode == 0 + ): return BhyveRun.PrivModel.SUDO return BhyveRun.PrivModel.INVALID @@ -139,39 +158,47 @@ def canrun() -> bool: @staticmethod @functools.cache def has_monitor_mode() -> bool: - usage = run_cmd(["bhyve", "--help"], capture_output=True, check_result=False, text=True) + usage = run_cmd( + ["bhyve", "--help"], capture_output=True, check_result=False, text=True + ) return "-M: monitor mode" in usage.stderr - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) - if self.image.machine.split('/')[0] not in ('amd64', 'arm64', 'i386'): + if self.image.machine.split("/")[0] not in ("amd64", "arm64", "i386"): raise ValueError( - f"bhyve does not support machine type '{self.image.machine}'" + f"bhyve does not support machine type '{self.image.machine}'", ) def block_driver_name(self) -> str: driver = self.block_driver if driver == VMRun.BlockDriver.VIRTIO: return "virtio-blk" - elif driver == VMRun.BlockDriver.AHCI: + if driver == VMRun.BlockDriver.AHCI: return "ahci-hd" - elif driver == VMRun.BlockDriver.NVME: + if driver == VMRun.BlockDriver.NVME: return "nvme" def bootrom_path(self) -> Path: - return self.image.select({ - 'amd64': Path('/usr/local/share/uefi-firmware/BHYVE_UEFI.fd'), - 'arm64': Path('/usr/local/share/u-boot/u-boot-bhyve-arm64/u-boot.bin'), - 'i386': Path('/usr/local/share/uefi-firmware/BHYVE_UEFI_32.fd'), - }) + return self.image.select( + { + "amd64": Path("/usr/local/share/uefi-firmware/BHYVE_UEFI.fd"), + "arm64": Path("/usr/local/share/u-boot/u-boot-bhyve-arm64/u-boot.bin"), + "i386": Path("/usr/local/share/uefi-firmware/BHYVE_UEFI_32.fd"), + } + ) def network_driver_name(self) -> str: match self.nic_driver: - case VMRun.NetworkDriver.VIRTIO: return "virtio-net" - case VMRun.NetworkDriver.E1000: return "e1000" - raise ValueError(f"Unsupported network driver {driver} is not supported by bhyve") + case VMRun.NetworkDriver.VIRTIO: + return "virtio-net" + case VMRun.NetworkDriver.E1000: + return "e1000" + raise ValueError( + f"Unsupported network driver {driver} is not supported by bhyve" + ) - def setup(self) -> List[Any]: + def setup(self) -> list: if BhyveRun.has_monitor_mode(): vmname = f"bricoler-{uuid.uuid4()}" else: @@ -188,32 +215,46 @@ def setup(self) -> List[Any]: devindex = 0 - def add_device(desc): + def add_device(desc) -> None: nonlocal devindex bhyve_cmd.extend(["-s", f"{devindex}:0,{desc}"]) devindex += 1 + add_device("hostbridge") bootrom = self.bootrom_path() - if self.image.machine.startswith('amd64/') or self.image.machine.startswith('i386/'): - bhyve_cmd.extend([ - "-H", - "-l", "com1,stdio", - "-l", f"bootrom,{bootrom}", - "-G", f"{self.gdb_addr[0]}:{self.gdb_addr[1]}", - "-M", - ]) + if self.image.machine.startswith("amd64/") or self.image.machine.startswith( + "i386/" + ): + bhyve_cmd.extend( + [ + "-H", + "-l", + "com1,stdio", + "-l", + f"bootrom,{bootrom}", + "-G", + f"{self.gdb_addr[0]}:{self.gdb_addr[1]}", + "-M", + ] + ) add_device("lpc") else: - bhyve_cmd.extend([ - "-o", "console=stdio", - "-o", f"bootrom,{bootrom}" - ]) + bhyve_cmd.extend( + [ + "-o", + "console=stdio", + "-o", + f"bootrom,{bootrom}", + ] + ) add_device(f"{self.block_driver_name()},{self.image.path}") for disk in self.extra_disks: add_device(f"{self.block_driver_name()},{disk}") if self.nic_driver != VMRun.NetworkDriver.NONE: - add_device(f"{self.network_driver_name()},slirp,open,hostfwd=tcp:{self.ssh_addr[0]}:{self.ssh_addr[1]}-:22") + add_device( + f"{self.network_driver_name()},slirp,open,hostfwd=tcp:{self.ssh_addr[0]}:{self.ssh_addr[1]}-:22" + ) for share in self.p9_shares: add_device(f"virtio-9p,{share[0]}={share[1]}") @@ -223,108 +264,140 @@ def add_device(desc): class RVVMRun(VMRun): - def setup(self) -> List[str]: + def setup(self) -> list[str]: rvvm_cmd = [ "rvvm", "/usr/local/share/RVVM/fw_payload.bin", - "-image", f"{self.image.path}", - "-mem", f"{self.memory}M", - "-smp", f"{self.ncpus}", + "-image", + f"{self.image.path}", + "-mem", + f"{self.memory}M", + "-smp", + f"{self.ncpus}", "-nogui", ] match self.nic_driver: case VMRun.NetworkDriver.VIRTIO: - rvvm_cmd.extend(["-portfwd", f"tcp/{self.ssh_addr[0]}:{self.ssh_addr[1]}=22"]) + rvvm_cmd.extend( + ["-portfwd", f"tcp/{self.ssh_addr[0]}:{self.ssh_addr[1]}=22"] + ) case VMRun.NetworkDriver.NONE: rvvm_cmd.extend(["-nonet"]) case _: - raise ValueError(f"Unsupported network driver {self.nic_driver} is not supported by RVVM") + raise ValueError( + f"Unsupported network driver {self.nic_driver} is not supported by RVVM" + ) return rvvm_cmd + class QEMURun(VMRun): - def bios_path(self) -> Optional[Path]: - return self.image.select({ - # XXX-MJ add some dict type which automatically checks for the path - # and suggests some recourse if it's not available - 'amd64': Path("/usr/local/share/edk2-qemu/QEMU_UEFI-x86_64.fd"), - 'arm': Path("/usr/local/share/u-boot/u-boot-qemu-arm/u-boot.bin"), - 'arm64': Path("/usr/local/share/qemu/edk2-aarch64-code.fd"), - 'riscv': Path("/usr/local/share/opensbi/lp64/generic/firmware/fw_jump.elf"), - }) + def bios_path(self) -> Path | None: + return self.image.select( + { + # XXX-MJ add some dict type which automatically checks for the path + # and suggests some recourse if it's not available + "amd64": Path("/usr/local/share/edk2-qemu/QEMU_UEFI-x86_64.fd"), + "arm": Path("/usr/local/share/u-boot/u-boot-qemu-arm/u-boot.bin"), + "arm64": Path("/usr/local/share/qemu/edk2-aarch64-code.fd"), + "riscv": Path( + "/usr/local/share/opensbi/lp64/generic/firmware/fw_jump.elf" + ), + } + ) - def kernel_path(self) -> Optional[Path]: + def kernel_path(self) -> Path | None: kernels = { # XXX-MJ add some dict type which automatically checks for the path # and suggests some recourse if it's not available - 'riscv': Path("/usr/local/share/u-boot/u-boot-qemu-riscv64/u-boot.bin"), + "riscv": Path("/usr/local/share/u-boot/u-boot-qemu-riscv64/u-boot.bin"), } - return kernels.get(self.image.machine.split('/', maxsplit=1)[0], None) + return kernels.get(self.image.machine.split("/", maxsplit=1)[0]) def block_driver_name(self) -> str: driver = self.block_driver if driver == VMRun.BlockDriver.VIRTIO: return "virtio-blk-pci" - else: - raise ValueError( - f"Unsupported block driver {driver} is not supported by QEMU" - ) + raise ValueError( + f"Unsupported block driver {driver} is not supported by QEMU", + ) def nic_driver_name(self) -> str: driver = self.nic_driver if driver == VMRun.NetworkDriver.VIRTIO: return "virtio-net-pci" - else: - raise ValueError( - f"Unsupported network driver {driver} is not supported by QEMU" - ) + raise ValueError( + f"Unsupported network driver {driver} is not supported by QEMU", + ) def machine_type(self) -> str: - return self.image.select({ - 'amd64': "q35", - 'arm64': "virt,gic-version=3", - }, "virt") - - def setup(self) -> List[Any]: - exe = self.image.select({ - 'amd64': 'qemu-system-x86_64', - 'i386': 'qemu-system-i386', - 'arm': 'qemu-system-arm', - 'arm64': 'qemu-system-aarch64', - 'arm64/aarch64c': 'qemu-system-morello', - 'riscv': 'qemu-system-riscv64', - }) + return self.image.select( + { + "amd64": "q35", + "arm64": "virt,gic-version=3", + }, + "virt", + ) + + def setup(self) -> list: + exe = self.image.select( + { + "amd64": "qemu-system-x86_64", + "i386": "qemu-system-i386", + "arm": "qemu-system-arm", + "arm64": "qemu-system-aarch64", + "arm64/aarch64c": "qemu-system-morello", + "riscv": "qemu-system-riscv64", + } + ) if exe is None: raise ValueError( - f"qemu does not support machine type '{self.image.machine}'" + f"qemu does not support machine type '{self.image.machine}'", ) - cpu = self.image.select({ - 'arm64/aarch64c': "morello" - }, "max") + cpu = self.image.select( + { + "arm64/aarch64c": "morello", + }, + "max", + ) qemu_cmd = [ exe, "-nographic", "-no-reboot", - "-cpu", cpu, - "-m", f"{self.memory}M", - "-smp", self.ncpus, - "-device", "virtio-rng-pci", - "-device", f"{self.block_driver_name()},drive=image", - "-drive", f"file={self.image.path},if=none,id=image,format=raw", - "-gdb", f"tcp:{self.gdb_addr[0]}:{self.gdb_addr[1]}", + "-cpu", + cpu, + "-m", + f"{self.memory}M", + "-smp", + self.ncpus, + "-device", + "virtio-rng-pci", + "-device", + f"{self.block_driver_name()},drive=image", + "-drive", + f"file={self.image.path},if=none,id=image,format=raw", + "-gdb", + f"tcp:{self.gdb_addr[0]}:{self.gdb_addr[1]}", ] for disk in self.extra_disks: - qemu_cmd.extend([ - "-device", f"{self.block_driver_name()},drive=extra{disk}", - "-drive", f"file={disk},if=none,id=extra{disk},format=raw", - ]) + qemu_cmd.extend( + [ + "-device", + f"{self.block_driver_name()},drive=extra{disk}", + "-drive", + f"file={disk},if=none,id=extra{disk},format=raw", + ] + ) for share in self.p9_shares: - qemu_cmd.extend([ - "-virtfs", f"local,path={share[1]},mount_tag={share[0]},security_model=none", - ]) + qemu_cmd.extend( + [ + "-virtfs", + f"local,path={share[1]},mount_tag={share[0]},security_model=none", + ] + ) machine_type = self.machine_type() if machine_type is not None: qemu_cmd.extend(["-M", machine_type]) @@ -335,10 +408,14 @@ def setup(self) -> List[Any]: if kernel_path is not None: qemu_cmd.extend(["-kernel", kernel_path]) if self.nic_driver != VMRun.NetworkDriver.NONE: - qemu_cmd.extend([ - "-device", f"{self.nic_driver_name()},netdev=net0", - "-netdev", f"user,id=net0,hostfwd=tcp:{self.ssh_addr[0]}:{self.ssh_addr[1]}-:22", - ]) + qemu_cmd.extend( + [ + "-device", + f"{self.nic_driver_name()},netdev=net0", + "-netdev", + f"user,id=net0,hostfwd=tcp:{self.ssh_addr[0]}:{self.ssh_addr[1]}-:22", + ] + ) return [str(a) for a in qemu_cmd] @@ -350,29 +427,30 @@ def __init__( panicstr: str, cpuid: int, backtrace: str, - message: Optional[str] = "VM panicked", - ): + message: str | None = "VM panicked", + ) -> None: self.panicstr = panicstr self.cpuid = cpuid self.backtrace = backtrace super().__init__(f"{message}: {panicstr}") class _Tee: - def __init__(self, *files): + def __init__(self, *files) -> None: self.files = files - def write(self, data): + def write(self, data) -> None: for f in self.files: f.write(data) f.flush() - def flush(self): + def flush(self) -> None: for f in self.files: f.flush() - def __init__(self, vmrun: VMRun, logfiles=[sys.stdout.buffer]): + def __init__(self, vmrun: VMRun, logfiles: list[io.BytesIO] | None = None) -> None: self.vmrun = vmrun self.cmd = vmrun.setup() + logfiles = logfiles or [sys.stdout.buffer] self.logfile = FreeBSDVM._Tee(*logfiles) def expect(self, prompt: str, **kwargs) -> int: @@ -386,20 +464,21 @@ def expect(self, prompt: str, **kwargs) -> int: # Collect the panic string, CPU ID and backtrace. panicstr = self.proc.match.group("panic").strip().decode() self.proc.expect(r"cpuid\s*=\s*(?P\d+)\s*\r?\n") - cpuid = int(self.proc.match.group('cpuid')) + cpuid = int(self.proc.match.group("cpuid")) self.proc.expect(r"KDB:\s+stack backtrace:\s*\r?\n") backtrace_lines = [] while True: - idx = self.proc.expect([ - r"KDB:\s+enter:\s+panic\s\r?\n", - r"([a-zA-Z0-9_]+\(\) at [^\r\n]*)\s*\r?\n", - ]) + idx = self.proc.expect( + [ + r"KDB:\s+enter:\s+panic\s\r?\n", + r"([a-zA-Z0-9_]+\(\) at [^\r\n]*)\s*\r?\n", + ] + ) if idx == 0: break - else: - line = self.proc.match.group(1).decode() - backtrace_lines.append(line) + line = self.proc.match.group(1).decode() + backtrace_lines.append(line) # We could potentially symbolize the backtrace using addr2line, # or attach a debugger and get full output. @@ -407,16 +486,16 @@ def expect(self, prompt: str, **kwargs) -> int: raise self.PanicException(panicstr, cpuid, backtrace) return pattern - def sendline(self, line: str): + def sendline(self, line: str) -> None: self.proc.sendline(line) - def sendcmd(self, args: List[str]): + def sendcmd(self, args: list[str]) -> None: """Split a long command into multiple lines to avoid hitting buffer limits.""" for arg in args[:-1]: self.proc.send(arg + " \\\n") self.proc.sendline(args[-1]) - def boot_to_login(self): + def boot_to_login(self) -> None: self.proc = pexpect.spawn( self.cmd[0], self.cmd[1:], @@ -430,9 +509,9 @@ def boot_to_login(self): e.args = (f"VM panicked during boot: {e.panicstr}",) raise e - def wait_for_prompt(self, **kwargs): + def wait_for_prompt(self, **kwargs) -> None: self.expect("root@.*#", **kwargs) - def poweroff(self): + def poweroff(self) -> None: self.sendline("poweroff") self.proc.expect(pexpect.EOF, timeout=120)