From cb4127484bea1c9a6b5a07bbcd6503c78a7244cd Mon Sep 17 00:00:00 2001 From: shifa-khan Date: Thu, 7 May 2026 13:49:21 -0400 Subject: [PATCH] feat(list-versions): show cooldown status and upload timestamps package list-versions now shows upload_time, age in days, and cooldown status for each version. --format controls the output: versions (default) and requirements print a filtered list excluding blocked entries; table, csv, and json include the full detail columns. This replaces the previous --format-as-requirements flag. --ignore-per-package-overrides shows what the global --min-release-age would block without per-package exemptions. Co-authored-by: Cursor --- src/fromager/commands/list_versions.py | 3 +- src/fromager/commands/package.py | 265 ++++++++++++- tests/test_list_versions.py | 494 +++++++++++++++++++++++++ 3 files changed, 748 insertions(+), 14 deletions(-) create mode 100644 tests/test_list_versions.py diff --git a/src/fromager/commands/list_versions.py b/src/fromager/commands/list_versions.py index 5ec82ba3..bf66c4ca 100644 --- a/src/fromager/commands/list_versions.py +++ b/src/fromager/commands/list_versions.py @@ -38,11 +38,12 @@ def list_versions( ) -> None: """List all available versions for a package requirement specifier.""" click.secho("use 'fromager package list-versions'", bold=True) + output_format = "requirements" if format_as_requirements else "versions" ctx.invoke( package.list_versions, requirement_spec=requirement_spec, distribution_type=distribution_type, sdist_server_url=sdist_server_url, ignore_no_versions=ignore_no_versions, - format_as_requirements=format_as_requirements, + output_format=output_format, ) diff --git a/src/fromager/commands/package.py b/src/fromager/commands/package.py index 96899d6b..1364d5ec 100644 --- a/src/fromager/commands/package.py +++ b/src/fromager/commands/package.py @@ -1,17 +1,31 @@ +import csv import datetime import enum +import json import logging +import pathlib import sys import typing +from collections import defaultdict import click import pypi_simple +import rich from packaging.requirements import Requirement from packaging.version import Version from resolvelib.resolvers import ResolverException - -from .. import context, log, overrides, packagesettings, request_session, resolver -from ..candidate import Candidate +from rich.table import Table + +from .. import ( + clickext, + context, + log, + overrides, + packagesettings, + request_session, + resolver, +) +from ..candidate import Candidate, Cooldown logger = logging.getLogger(__name__) @@ -94,9 +108,29 @@ def package() -> None: help="Do not treat missing versions as an error", ) @click.option( - "--format-as-requirements/--no-format-as-requirements", + "--format", + "output_format", + type=click.Choice( + ["versions", "requirements", "table", "csv", "json"], + case_sensitive=False, + ), + default="versions", + help="Output format (default: versions)", +) +@click.option( + "-o", + "--output", + type=clickext.ClickPath(), + help="Output file (default: stdout)", +) +@click.option( + "--ignore-per-package-overrides", + is_flag=True, default=False, - help="Format output as requirement specifiers (name==version) instead of just version numbers", + help=( + "Ignore per-package min_release_age overrides when computing cooldown " + "status; uses only the global --min-release-age value." + ), ) @click.argument("requirement_spec", required=True) @click.pass_obj @@ -106,7 +140,9 @@ def list_versions( distribution_type: str, sdist_server_url: str, ignore_no_versions: bool, - format_as_requirements: bool, + output_format: str, + output: pathlib.Path | None, + ignore_per_package_overrides: bool, ) -> None: """List all available versions for a package requirement specifier. @@ -123,7 +159,18 @@ def list_versions( - "sdist": Only include source distributions - "wheel": Only include wheels - "both": Include both source distributions and wheels + + Output formats: + - "versions": one version per line (default) + - "requirements": name==version per line (pip-installable pins) + - "table": Rich table with upload timestamps, age, and cooldown status + - "csv": CSV with the same detail columns + - "json": JSON array with the same detail columns + + Use --ignore-per-package-overrides to see what the global cooldown + policy would block without per-package exemptions. """ + try: req = Requirement(requirement_spec) except Exception as e: @@ -154,7 +201,8 @@ def list_versions( sdist_server_url=override_sdist_server_url, ) - # Get all available candidates from the provider + # Get all available candidates from the provider (cooldown is NOT set on + # the provider so we receive every version that matches the specifier). candidates = list( provider.find_matches( identifier=req.name, @@ -170,14 +218,205 @@ def list_versions( else: raise click.ClickException(f"No versions found for {req.name}") - versions: list[Version] = sorted(set(candidate.version for candidate in candidates)) - logger.info(f"Found {len(versions)} version(s)") + logger.info(f"Found {len(set(c.version for c in candidates))} version(s)") + + cooldown = _resolve_list_versions_cooldown(wkctx, req, ignore_per_package_overrides) + version_rows = _compute_version_details( + req.name, + candidates, + cooldown, + provider.supports_upload_time, + ) + + match output_format: + case "versions": + _export_versions_plain(version_rows, req.name, cooldown) + case "requirements": + _export_versions_plain( + version_rows, req.name, cooldown, as_requirements=True + ) + case "table": + _export_versions_table(version_rows, req.name, cooldown, output) + case "csv": + _export_versions_csv(version_rows, output) + case "json": + _export_versions_json(version_rows, output) + case _: + raise ValueError(f"Invalid output format: {output_format}") + + +def _resolve_list_versions_cooldown( + wkctx: context.WorkContext, + req: Requirement, + ignore_per_package_overrides: bool, +) -> Cooldown | None: + """Determine the effective cooldown for the list-versions detail view. + + When *ignore_per_package_overrides* is ``True``, only the global + ``--min-release-age`` value is used so the caller can audit what the + policy would block without per-package exemptions. + """ + if ignore_per_package_overrides: + return wkctx.cooldown + return resolver.resolve_package_cooldown(wkctx, req) + + +def _compute_version_details( + package_name: str, + candidates: list[Candidate], + cooldown: Cooldown | None, + supports_upload_time: bool, +) -> list[dict[str, str]]: + """Group candidates by version and compute cooldown status. + + Returns one row per version, sorted ascending. Each row is a dict with + keys: ``package``, ``version``, ``upload_time``, ``age_days``, + ``cooldown``. + """ + by_version: dict[Version, list[Candidate]] = defaultdict(list) + for c in candidates: + by_version[c.version].append(c) + + reference_time = ( + cooldown.bootstrap_time + if cooldown is not None + else datetime.datetime.now(datetime.UTC) + ) + + rows: list[dict[str, str]] = [] + for version in sorted(by_version): + version_candidates = by_version[version] + + upload_times = [ + c.upload_time for c in version_candidates if c.upload_time is not None + ] + upload_time = max(upload_times) if upload_times else None - for version in versions: - if format_as_requirements: - print(f"{req.name}=={version}") + if upload_time is not None: + age_days = (reference_time - upload_time).days else: - print(version) + age_days = None + + status = _cooldown_status(upload_time, cooldown, supports_upload_time) + + rows.append( + { + "package": package_name, + "version": str(version), + "upload_time": upload_time.strftime("%Y-%m-%d %H:%M") + if upload_time + else "", + "age_days": str(age_days) if age_days is not None else "", + "cooldown": status, + } + ) + return rows + + +def _cooldown_status( + upload_time: datetime.datetime | None, + cooldown: Cooldown | None, + supports_upload_time: bool, +) -> str: + """Classify cooldown status for a single version. + + Returns one of ``"blocked"``, ``"available"``, ``"skipped"``, or ``""`` + (no cooldown configured). + """ + if cooldown is None: + return "" + if upload_time is None: + if not supports_upload_time: + return "skipped" + return "blocked" + cutoff = cooldown.bootstrap_time - cooldown.min_age + if upload_time > cutoff: + return "blocked" + return "available" + + +# -- export helpers for list-versions ------------------------------------------- + + +def _export_versions_plain( + data: list[dict[str, str]], + package_name: str, + cooldown: Cooldown | None, + *, + as_requirements: bool = False, +) -> None: + """Export versions as a plain list, filtering out cooldown-blocked entries.""" + for row in data: + if cooldown is not None and row["cooldown"] == "blocked": + continue + if as_requirements: + print(f"{package_name}=={row['version']}") + else: + print(row["version"]) + + +def _export_versions_json( + data: list[dict[str, str]], output: pathlib.Path | None +) -> None: + """Export version details as JSON.""" + if output: + with open(output, "w") as outfile: + json.dump(data, outfile, indent=2) + else: + json.dump(data, sys.stdout, indent=2) + + +_VERSIONS_CSV_FIELDS = ["package", "version", "upload_time", "age_days", "cooldown"] + + +def _export_versions_csv( + data: list[dict[str, str]], output: pathlib.Path | None +) -> None: + """Export version details as CSV.""" + if output: + with open(output, "w", newline="") as outfile: + writer = csv.DictWriter( + outfile, + fieldnames=_VERSIONS_CSV_FIELDS, + quoting=csv.QUOTE_NONNUMERIC, + ) + writer.writeheader() + writer.writerows(data) + else: + writer = csv.DictWriter( + sys.stdout, + fieldnames=_VERSIONS_CSV_FIELDS, + quoting=csv.QUOTE_NONNUMERIC, + ) + writer.writeheader() + writer.writerows(data) + + +def _export_versions_table( + data: list[dict[str, str]], + package_name: str, + cooldown: Cooldown | None, + output: pathlib.Path | None = None, +) -> None: + """Export version details as a Rich table.""" + table = Table(title=f"Versions for {package_name}") + table.add_column("Version", justify="left", no_wrap=True) + table.add_column("Upload Time", justify="left", no_wrap=True) + table.add_column("Age (days)", justify="right", no_wrap=True) + if cooldown is not None: + table.add_column("Cooldown", justify="left", no_wrap=True) + + for row in data: + cells = [row["version"], row["upload_time"], row["age_days"]] + if cooldown is not None: + cells.append(row["cooldown"]) + table.add_row(*cells) + + if output: + with open(output, "w") as fh: + rich.console.Console(file=fh, width=120).print(table) + else: + rich.get_console().print(table) def _versions_string(versions: typing.Iterable[Version]) -> str: diff --git a/tests/test_list_versions.py b/tests/test_list_versions.py new file mode 100644 index 00000000..ba7ecfa3 --- /dev/null +++ b/tests/test_list_versions.py @@ -0,0 +1,494 @@ +"""Tests for the ``list-versions`` command cooldown enhancements (issue #1078). + +Verifies that ``package list-versions`` shows upload timestamps, age in days, +and cooldown status via ``--format`` choices, and that +``--ignore-per-package-overrides`` correctly bypasses per-package overrides. +""" + +import datetime +import json +import pathlib +import re +import typing + +import pytest +import requests_mock +from click.testing import CliRunner + +from fromager import candidate, resolver +from fromager.__main__ import main as fromager + +_BOOTSTRAP_TIME = datetime.datetime(2026, 3, 26, 0, 0, 0, tzinfo=datetime.UTC) +_COOLDOWN_7_DAYS = 7 +_PYPI_SIMPLE_JSON_CONTENT_TYPE = "application/vnd.pypi.simple.v1+json" + +# Three versions at known ages relative to _BOOTSTRAP_TIME: +# 2.0.0 uploaded 2026-03-24 → 2 days old (within 7-day cooldown) +# 1.3.2 uploaded 2026-03-15 → 11 days old (outside cooldown) +# 1.2.2 uploaded 2026-01-01 → 84 days old (outside cooldown) +_PYPI_JSON_RESPONSE = { + "meta": {"api-version": "1.1"}, + "name": "test-pkg", + "files": [ + { + "filename": "test_pkg-2.0.0.tar.gz", + "url": "https://files.pythonhosted.org/packages/test_pkg-2.0.0.tar.gz", + "hashes": {"sha256": "aaa"}, + "upload-time": "2026-03-24T00:00:00+00:00", + }, + { + "filename": "test_pkg-1.3.2.tar.gz", + "url": "https://files.pythonhosted.org/packages/test_pkg-1.3.2.tar.gz", + "hashes": {"sha256": "bbb"}, + "upload-time": "2026-03-15T00:00:00+00:00", + }, + { + "filename": "test_pkg-1.2.2.tar.gz", + "url": "https://files.pythonhosted.org/packages/test_pkg-1.2.2.tar.gz", + "hashes": {"sha256": "ccc"}, + "upload-time": "2026-01-01T00:00:00+00:00", + }, + ], +} + + +def _extract_json_from_output(output: str) -> str: + """Extract JSON array from CLI output that may contain log messages.""" + json_match = re.search(r"\[\s*\{.*\}\s*\]", output, re.DOTALL) + if json_match: + return json_match.group(0) + return "[]" + + +def _extract_csv_from_output(output: str) -> str: + """Extract CSV content from output that may contain log messages.""" + lines = output.strip().split("\n") + csv_lines = [line for line in lines if '"' in line and "," in line] + return "\n".join(csv_lines) + + +@pytest.fixture(autouse=True) +def clear_resolver_cache() -> typing.Generator[None, None, None]: + """Clear class-level resolver cache so mocked responses are always used.""" + resolver.BaseProvider.clear_cache() + resolver.BaseProvider._cooldown_unsupported_warned.clear() + yield + + +def _invoke_list_versions( + cli_runner: CliRunner, + extra_args: list[str] | None = None, + min_release_age: int = 0, +) -> typing.Any: + """Invoke ``fromager package list-versions`` with mocked PyPI.""" + args: list[str] = [] + if min_release_age: + args.extend(["--min-release-age", str(min_release_age)]) + args.extend(["package", "list-versions"]) + if extra_args: + args.extend(extra_args) + args.append("test-pkg") + + with requests_mock.Mocker() as m: + m.get( + "https://pypi.org/simple/test-pkg/", + json=_PYPI_JSON_RESPONSE, + headers={"Content-Type": _PYPI_SIMPLE_JSON_CONTENT_TYPE}, + ) + return cli_runner.invoke(fromager, args) + + +# --------------------------------------------------------------------------- +# Plain mode (no --details) +# --------------------------------------------------------------------------- + + +def test_list_versions_plain(cli_runner: CliRunner) -> None: + """Without --details the command prints version numbers only.""" + result = _invoke_list_versions(cli_runner) + assert result.exit_code == 0 + lines = [ + line + for line in result.stdout.strip().split("\n") + if not line.startswith("WARNING") + ] + versions = [line.strip() for line in lines if line.strip()] + assert "1.2.2" in versions + assert "1.3.2" in versions + assert "2.0.0" in versions + + +def test_list_versions_plain_with_cooldown_filters_blocked( + cli_runner: CliRunner, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Plain mode filters out cooldown-blocked versions.""" + original_init = candidate.Cooldown.__init__ + + def patched_init( + self: candidate.Cooldown, *args: typing.Any, **kwargs: typing.Any + ) -> None: + original_init(self, *args, **kwargs) + self.bootstrap_time = _BOOTSTRAP_TIME + + monkeypatch.setattr(candidate.Cooldown, "__init__", patched_init) + + result = _invoke_list_versions(cli_runner, min_release_age=_COOLDOWN_7_DAYS) + assert result.exit_code == 0 + lines = [ + line + for line in result.stdout.strip().split("\n") + if not line.startswith("WARNING") + ] + versions = [line.strip() for line in lines if line.strip()] + assert "2.0.0" not in versions + assert "1.3.2" in versions + assert "1.2.2" in versions + + +# --------------------------------------------------------------------------- +# Detailed JSON output +# --------------------------------------------------------------------------- + + +def test_list_versions_json_no_cooldown(cli_runner: CliRunner) -> None: + """--format json without cooldown shows upload times and empty cooldown.""" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "json"], + ) + assert result.exit_code == 0 + + data = json.loads(_extract_json_from_output(result.stdout)) + assert len(data) == 3 + + # Verify structure + for row in data: + assert "package" in row + assert "version" in row + assert "upload_time" in row + assert "age_days" in row + assert "cooldown" in row + assert row["package"] == "test-pkg" + assert row["cooldown"] == "" # no cooldown configured + + versions = [row["version"] for row in data] + assert versions == ["1.2.2", "1.3.2", "2.0.0"] + + # Upload times should be populated + assert all(row["upload_time"] != "" for row in data) + assert all(row["age_days"] != "" for row in data) + + +def test_list_versions_json_with_cooldown(cli_runner: CliRunner) -> None: + """--format json with cooldown marks blocked/allowed versions correctly.""" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "json"], + min_release_age=_COOLDOWN_7_DAYS, + ) + assert result.exit_code == 0 + + data = json.loads(_extract_json_from_output(result.stdout)) + assert len(data) == 3 + + status_by_version = {row["version"]: row["cooldown"] for row in data} + # 2.0.0 is within 7-day cooldown (2 days old relative to ~now, but the + # exact status depends on when the test runs). For a reliable assertion, + # check that at least one version has a non-empty cooldown status. + assert any(s in ("blocked", "available") for s in status_by_version.values()) + + +def test_list_versions_json_with_fixed_cooldown( + cli_runner: CliRunner, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """With a fixed bootstrap_time, verify exact cooldown statuses.""" + original_init = candidate.Cooldown.__init__ + + def patched_init( + self: candidate.Cooldown, *args: typing.Any, **kwargs: typing.Any + ) -> None: + original_init(self, *args, **kwargs) + self.bootstrap_time = _BOOTSTRAP_TIME + + monkeypatch.setattr(candidate.Cooldown, "__init__", patched_init) + + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "json"], + min_release_age=_COOLDOWN_7_DAYS, + ) + assert result.exit_code == 0 + + data = json.loads(_extract_json_from_output(result.stdout)) + status_by_version = {row["version"]: row for row in data} + + # 2.0.0 uploaded 2026-03-24, bootstrap 2026-03-26, age=2 days → blocked + assert status_by_version["2.0.0"]["cooldown"] == "blocked" + assert status_by_version["2.0.0"]["age_days"] == "2" + + # 1.3.2 uploaded 2026-03-15, age=11 days → allowed + assert status_by_version["1.3.2"]["cooldown"] == "available" + assert status_by_version["1.3.2"]["age_days"] == "11" + + # 1.2.2 uploaded 2026-01-01, age=84 days → allowed + assert status_by_version["1.2.2"]["cooldown"] == "available" + assert status_by_version["1.2.2"]["age_days"] == "84" + + +# --------------------------------------------------------------------------- +# CSV output +# --------------------------------------------------------------------------- + + +def test_list_versions_csv(cli_runner: CliRunner) -> None: + """--format csv produces valid CSV with expected columns.""" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "csv"], + ) + assert result.exit_code == 0 + + csv_output = _extract_csv_from_output(result.stdout) + lines = csv_output.strip().split("\n") + assert len(lines) == 4 # header + 3 data rows + + header = lines[0] + assert '"package"' in header + assert '"version"' in header + assert '"upload_time"' in header + assert '"age_days"' in header + assert '"cooldown"' in header + + +# --------------------------------------------------------------------------- +# Table output +# --------------------------------------------------------------------------- + + +def test_list_versions_table(cli_runner: CliRunner) -> None: + """--format table shows a Rich table.""" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "table"], + ) + assert result.exit_code == 0 + assert "Versions for test-pkg" in result.stdout + assert "Version" in result.stdout + assert "Upload Time" in result.stdout + assert "Age (days)" in result.stdout + assert "2.0.0" in result.stdout + assert "1.3.2" in result.stdout + assert "1.2.2" in result.stdout + + +def test_list_versions_table_with_cooldown(cli_runner: CliRunner) -> None: + """When cooldown is active the table includes a Cooldown column.""" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "table"], + min_release_age=_COOLDOWN_7_DAYS, + ) + assert result.exit_code == 0 + assert "Cooldown" in result.stdout + + +def test_list_versions_table_no_cooldown_column( + cli_runner: CliRunner, +) -> None: + """Without cooldown the table omits the Cooldown column.""" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "table"], + ) + assert result.exit_code == 0 + assert "Cooldown" not in result.stdout + + +# --------------------------------------------------------------------------- +# Output file +# --------------------------------------------------------------------------- + + +def test_list_versions_output_file( + cli_runner: CliRunner, tmp_path: pathlib.Path +) -> None: + """--output writes JSON to a file instead of stdout.""" + output_file = tmp_path / "versions.json" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "json", "--output", str(output_file)], + ) + assert result.exit_code == 0 + assert output_file.exists() + + data = json.loads(output_file.read_text()) + assert isinstance(data, list) + assert len(data) == 3 + + +# --------------------------------------------------------------------------- +# --ignore-per-package-overrides +# --------------------------------------------------------------------------- + + +def test_list_versions_ignore_per_package_overrides( + cli_runner: CliRunner, + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """--ignore-per-package-overrides uses global cooldown, ignoring per-package override. + + Set up a per-package override of min_release_age=0 (exempt) and verify + that --ignore-per-package-overrides still shows cooldown status based + on the global --min-release-age. + """ + original_init = candidate.Cooldown.__init__ + + def patched_init( + self: candidate.Cooldown, *args: typing.Any, **kwargs: typing.Any + ) -> None: + original_init(self, *args, **kwargs) + self.bootstrap_time = _BOOTSTRAP_TIME + + monkeypatch.setattr(candidate.Cooldown, "__init__", patched_init) + + # Create per-package override with min_release_age=0 (exempted) + settings_dir = tmp_path / "settings" + settings_dir.mkdir(parents=True) + pkg_settings = settings_dir / "test_pkg.yaml" + pkg_settings.write_text("resolver_dist:\n min_release_age: 0\n") + + # Without --ignore-per-package-overrides: cooldown is disabled (per-package=0) + result_with_override = _invoke_with_settings( + cli_runner, + settings_dir=settings_dir, + extra_args=["--format", "json"], + min_release_age=_COOLDOWN_7_DAYS, + ) + assert result_with_override.exit_code == 0 + data_with = json.loads(_extract_json_from_output(result_with_override.stdout)) + # Per-package override=0 means no cooldown → all empty status + for row in data_with: + assert row["cooldown"] == "" + + # With --ignore-per-package-overrides: global cooldown applies + result_ignore = _invoke_with_settings( + cli_runner, + settings_dir=settings_dir, + extra_args=[ + "--format", + "json", + "--ignore-per-package-overrides", + ], + min_release_age=_COOLDOWN_7_DAYS, + ) + assert result_ignore.exit_code == 0 + data_ignore = json.loads(_extract_json_from_output(result_ignore.stdout)) + status_by_version = {row["version"]: row["cooldown"] for row in data_ignore} + assert status_by_version["2.0.0"] == "blocked" + assert status_by_version["1.3.2"] == "available" + + +def _invoke_with_settings( + cli_runner: CliRunner, + settings_dir: pathlib.Path, + extra_args: list[str] | None = None, + min_release_age: int = 0, +) -> typing.Any: + """Invoke ``fromager package list-versions`` with settings dir and mocked PyPI.""" + args: list[str] = ["--settings-dir", str(settings_dir)] + if min_release_age: + args.extend(["--min-release-age", str(min_release_age)]) + args.extend(["package", "list-versions"]) + if extra_args: + args.extend(extra_args) + args.append("test-pkg") + + with requests_mock.Mocker() as m: + m.get( + "https://pypi.org/simple/test-pkg/", + json=_PYPI_JSON_RESPONSE, + headers={"Content-Type": _PYPI_SIMPLE_JSON_CONTENT_TYPE}, + ) + return cli_runner.invoke(fromager, args) + + +# --------------------------------------------------------------------------- +# Requirements format +# --------------------------------------------------------------------------- + + +def test_list_versions_requirements_format(cli_runner: CliRunner) -> None: + """--format requirements outputs name==version pins.""" + result = _invoke_list_versions( + cli_runner, + extra_args=["--format", "requirements"], + ) + assert result.exit_code == 0 + lines = [ + line + for line in result.stdout.strip().split("\n") + if line.startswith("test-pkg==") + ] + assert len(lines) == 3 + assert "test-pkg==2.0.0" in lines + + +# --------------------------------------------------------------------------- +# Unit tests for helper functions +# --------------------------------------------------------------------------- + + +def test_cooldown_status_no_cooldown() -> None: + """No cooldown configured returns empty string.""" + from fromager.commands.package import _cooldown_status + + assert _cooldown_status(datetime.datetime.now(datetime.UTC), None, True) == "" + + +def test_cooldown_status_blocked() -> None: + """Upload within cooldown window returns 'blocked'.""" + from fromager.commands.package import _cooldown_status + + cooldown = candidate.Cooldown( + min_age=datetime.timedelta(days=7), + bootstrap_time=_BOOTSTRAP_TIME, + ) + recent_upload = datetime.datetime(2026, 3, 24, 0, 0, 0, tzinfo=datetime.UTC) + assert _cooldown_status(recent_upload, cooldown, True) == "blocked" + + +def test_cooldown_status_allowed() -> None: + """Upload outside cooldown window returns 'available'.""" + from fromager.commands.package import _cooldown_status + + cooldown = candidate.Cooldown( + min_age=datetime.timedelta(days=7), + bootstrap_time=_BOOTSTRAP_TIME, + ) + old_upload = datetime.datetime(2026, 3, 15, 0, 0, 0, tzinfo=datetime.UTC) + assert _cooldown_status(old_upload, cooldown, True) == "available" + + +def test_cooldown_status_skipped() -> None: + """Missing upload_time with unsupported provider returns 'skipped'.""" + from fromager.commands.package import _cooldown_status + + cooldown = candidate.Cooldown( + min_age=datetime.timedelta(days=7), + bootstrap_time=_BOOTSTRAP_TIME, + ) + assert _cooldown_status(None, cooldown, False) == "skipped" + + +def test_cooldown_status_fail_closed() -> None: + """Missing upload_time with supported provider returns 'blocked' (fail-closed).""" + from fromager.commands.package import _cooldown_status + + cooldown = candidate.Cooldown( + min_age=datetime.timedelta(days=7), + bootstrap_time=_BOOTSTRAP_TIME, + ) + assert _cooldown_status(None, cooldown, True) == "blocked"