Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/fromager/commands/list_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ def list_versions(
) -> None:
"""List all available versions for a package requirement specifier."""
click.secho("use 'fromager package list-versions'", bold=True)
output_format = "requirements" if format_as_requirements else "versions"
ctx.invoke(
package.list_versions,
requirement_spec=requirement_spec,
distribution_type=distribution_type,
sdist_server_url=sdist_server_url,
ignore_no_versions=ignore_no_versions,
format_as_requirements=format_as_requirements,
output_format=output_format,
)
265 changes: 252 additions & 13 deletions src/fromager/commands/package.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
import csv
import datetime
import enum
import json
import logging
import pathlib
import sys
import typing
from collections import defaultdict

import click
import pypi_simple
import rich
from packaging.requirements import Requirement
from packaging.version import Version
from resolvelib.resolvers import ResolverException

from .. import context, log, overrides, packagesettings, request_session, resolver
from ..candidate import Candidate
from rich.table import Table

from .. import (
clickext,
context,
log,
overrides,
packagesettings,
request_session,
resolver,
)
from ..candidate import Candidate, Cooldown

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -94,9 +108,29 @@ def package() -> None:
help="Do not treat missing versions as an error",
)
@click.option(
"--format-as-requirements/--no-format-as-requirements",
"--format",
"output_format",
type=click.Choice(
["versions", "requirements", "table", "csv", "json"],
case_sensitive=False,
),
default="versions",
help="Output format (default: versions)",
)
@click.option(
"-o",
"--output",
type=clickext.ClickPath(),
help="Output file (default: stdout)",
)
@click.option(
"--ignore-per-package-overrides",
is_flag=True,
default=False,
help="Format output as requirement specifiers (name==version) instead of just version numbers",
help=(
"Ignore per-package min_release_age overrides when computing cooldown "
"status; uses only the global --min-release-age value."
),
)
@click.argument("requirement_spec", required=True)
@click.pass_obj
Expand All @@ -106,7 +140,9 @@ def list_versions(
distribution_type: str,
sdist_server_url: str,
ignore_no_versions: bool,
format_as_requirements: bool,
output_format: str,
output: pathlib.Path | None,
ignore_per_package_overrides: bool,
) -> None:
"""List all available versions for a package requirement specifier.

Expand All @@ -123,7 +159,18 @@ def list_versions(
- "sdist": Only include source distributions
- "wheel": Only include wheels
- "both": Include both source distributions and wheels

Output formats:
- "versions": one version per line (default)
- "requirements": name==version per line (pip-installable pins)
- "table": Rich table with upload timestamps, age, and cooldown status
- "csv": CSV with the same detail columns
- "json": JSON array with the same detail columns

Use --ignore-per-package-overrides to see what the global cooldown
policy would block without per-package exemptions.
"""
Comment thread
shifa-khan marked this conversation as resolved.

try:
req = Requirement(requirement_spec)
except Exception as e:
Expand Down Expand Up @@ -154,7 +201,8 @@ def list_versions(
sdist_server_url=override_sdist_server_url,
)

# Get all available candidates from the provider
# Get all available candidates from the provider (cooldown is NOT set on
# the provider so we receive every version that matches the specifier).
candidates = list(
provider.find_matches(
identifier=req.name,
Expand All @@ -170,14 +218,205 @@ def list_versions(
else:
raise click.ClickException(f"No versions found for {req.name}")

versions: list[Version] = sorted(set(candidate.version for candidate in candidates))
logger.info(f"Found {len(versions)} version(s)")
logger.info(f"Found {len(set(c.version for c in candidates))} version(s)")

cooldown = _resolve_list_versions_cooldown(wkctx, req, ignore_per_package_overrides)
version_rows = _compute_version_details(
req.name,
candidates,
cooldown,
provider.supports_upload_time,
)

match output_format:
case "versions":
_export_versions_plain(version_rows, req.name, cooldown)
case "requirements":
_export_versions_plain(
version_rows, req.name, cooldown, as_requirements=True
)
case "table":
_export_versions_table(version_rows, req.name, cooldown, output)
case "csv":
_export_versions_csv(version_rows, output)
case "json":
_export_versions_json(version_rows, output)
case _:
raise ValueError(f"Invalid output format: {output_format}")


def _resolve_list_versions_cooldown(
wkctx: context.WorkContext,
req: Requirement,
ignore_per_package_overrides: bool,
) -> Cooldown | None:
"""Determine the effective cooldown for the list-versions detail view.

When *ignore_per_package_overrides* is ``True``, only the global
``--min-release-age`` value is used so the caller can audit what the
policy would block without per-package exemptions.
"""
if ignore_per_package_overrides:
return wkctx.cooldown
return resolver.resolve_package_cooldown(wkctx, req)


def _compute_version_details(
package_name: str,
candidates: list[Candidate],
cooldown: Cooldown | None,
supports_upload_time: bool,
) -> list[dict[str, str]]:
"""Group candidates by version and compute cooldown status.

Returns one row per version, sorted ascending. Each row is a dict with
keys: ``package``, ``version``, ``upload_time``, ``age_days``,
``cooldown``.
"""
by_version: dict[Version, list[Candidate]] = defaultdict(list)
for c in candidates:
by_version[c.version].append(c)

reference_time = (
cooldown.bootstrap_time
if cooldown is not None
else datetime.datetime.now(datetime.UTC)
)

rows: list[dict[str, str]] = []
for version in sorted(by_version):
version_candidates = by_version[version]

upload_times = [
c.upload_time for c in version_candidates if c.upload_time is not None
]
upload_time = max(upload_times) if upload_times else None

for version in versions:
if format_as_requirements:
print(f"{req.name}=={version}")
if upload_time is not None:
age_days = (reference_time - upload_time).days
else:
print(version)
age_days = None

status = _cooldown_status(upload_time, cooldown, supports_upload_time)

rows.append(
{
"package": package_name,
"version": str(version),
"upload_time": upload_time.strftime("%Y-%m-%d %H:%M")
if upload_time
else "",
"age_days": str(age_days) if age_days is not None else "",
"cooldown": status,
}
)
return rows


def _cooldown_status(
upload_time: datetime.datetime | None,
cooldown: Cooldown | None,
supports_upload_time: bool,
) -> str:
"""Classify cooldown status for a single version.

Returns one of ``"blocked"``, ``"available"``, ``"skipped"``, or ``""``
(no cooldown configured).
"""
if cooldown is None:
return ""
if upload_time is None:
if not supports_upload_time:
return "skipped"
return "blocked"
cutoff = cooldown.bootstrap_time - cooldown.min_age
if upload_time > cutoff:
return "blocked"
return "available"


# -- export helpers for list-versions -------------------------------------------


def _export_versions_plain(
data: list[dict[str, str]],
package_name: str,
cooldown: Cooldown | None,
*,
as_requirements: bool = False,
) -> None:
"""Export versions as a plain list, filtering out cooldown-blocked entries."""
for row in data:
if cooldown is not None and row["cooldown"] == "blocked":
continue
if as_requirements:
print(f"{package_name}=={row['version']}")
else:
print(row["version"])


def _export_versions_json(
data: list[dict[str, str]], output: pathlib.Path | None
) -> None:
"""Export version details as JSON."""
if output:
with open(output, "w") as outfile:
json.dump(data, outfile, indent=2)
else:
json.dump(data, sys.stdout, indent=2)


_VERSIONS_CSV_FIELDS = ["package", "version", "upload_time", "age_days", "cooldown"]


def _export_versions_csv(
data: list[dict[str, str]], output: pathlib.Path | None
) -> None:
"""Export version details as CSV."""
if output:
with open(output, "w", newline="") as outfile:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicates the full DictWriter block across if - else branches.

If we determine the output stream earlier we can avoid the duplication.

outfile = open(output, "w") if output else sys.stdout
try:
    writer = csv.DictWriter(.....)
    writer.writeheader()
    writer.writerows(data)
finally:
    if output:
        outfile.close()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or simpler: use click.File in the argument and pass io.TextIO type around.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smoparth @tiran The initial version matches the pattern in list_overrides._export_csv to stay consistent. If both commands need further cleanup (e.g., click.File or try/finally), I can address that in a follow-up PR. Note that click.File has drawbacks, such as not supporting the newline="" requirement for CSV files.

writer = csv.DictWriter(
outfile,
fieldnames=_VERSIONS_CSV_FIELDS,
quoting=csv.QUOTE_NONNUMERIC,
)
writer.writeheader()
writer.writerows(data)
else:
writer = csv.DictWriter(
sys.stdout,
fieldnames=_VERSIONS_CSV_FIELDS,
quoting=csv.QUOTE_NONNUMERIC,
)
writer.writeheader()
writer.writerows(data)


def _export_versions_table(
data: list[dict[str, str]],
package_name: str,
cooldown: Cooldown | None,
output: pathlib.Path | None = None,
) -> None:
"""Export version details as a Rich table."""
table = Table(title=f"Versions for {package_name}")
table.add_column("Version", justify="left", no_wrap=True)
table.add_column("Upload Time", justify="left", no_wrap=True)
table.add_column("Age (days)", justify="right", no_wrap=True)
if cooldown is not None:
table.add_column("Cooldown", justify="left", no_wrap=True)

for row in data:
cells = [row["version"], row["upload_time"], row["age_days"]]
if cooldown is not None:
cells.append(row["cooldown"])
table.add_row(*cells)

if output:
with open(output, "w") as fh:
rich.console.Console(file=fh, width=120).print(table)
else:
rich.get_console().print(table)


def _versions_string(versions: typing.Iterable[Version]) -> str:
Expand Down
Loading
Loading