diff --git a/src/spac/templates/phenograph_clustering_cpu_template.py b/src/spac/templates/phenograph_clustering_cpu_template.py new file mode 100644 index 00000000..c38173b8 --- /dev/null +++ b/src/spac/templates/phenograph_clustering_cpu_template.py @@ -0,0 +1,113 @@ +""" +Galaxy CPU template for PhenoGraph clustering. + +Calls the standalone CPU path in +``spac.transform.clustering.phenograph.cpu.phenograph_cpu``. INDEPENDENT of +the legacy ``spac.transformations.phenograph_clustering`` function — that +continues to serve the legacy ``phenograph_clustering.xml`` tool unchanged. + +Usage +----- +>>> from spac.templates.phenograph_clustering_cpu_template import run_from_json +>>> run_from_json("examples/phenograph_clustering_cpu_params.json") +""" +import sys +from pathlib import Path +from typing import Any, Dict, Union + +# Add parent directory to path for SPAC imports (matches legacy template pattern) +sys.path.append(str(Path(__file__).parent.parent.parent)) + +from spac.transform.clustering.phenograph import phenograph_cpu +from spac.templates.template_utils import ( + load_input, + parse_params, + save_results, + text_to_value, +) + + +def run_from_json( + json_path: Union[str, Path, Dict[str, Any]], + save_to_disk: bool = True, + output_dir: str = None, +): + """ + Execute CPU PhenoGraph Clustering from Galaxy JSON parameters. + + Mirrors the legacy template's run_from_json contract for drop-in + replacement, minus the dead HPC fields. + """ + params = parse_params(json_path) + + if output_dir is None: + output_dir = params.get("Output_Directory", ".") + + if "outputs" not in params: + params["outputs"] = { + "analysis": {"type": "file", "name": "output.pickle"} + } + + # Load AnnData + adata = load_input(params["Upstream_Analysis"]) + + # Extract parameters + layer = params.get("Table_to_Process", "Original") + if layer == "Original": + layer = None + + k = int(params.get("K_Nearest_Neighbors", 30)) + seed = int(params.get("Seed", 42)) + resolution = float(params.get("Resolution_Parameter", 1.0)) + output_name = params.get("Output_Annotation_Name", "phenograph") + n_iterations = int(params.get("Number_of_Iterations", 100)) + + print("Before PhenoGraph CPU Clustering:\n", adata) + + phenograph_cpu( + adata=adata, + features=None, # use all var index + layer=layer, + k=k, + seed=seed, + resolution_parameter=resolution, + n_iterations=n_iterations, + output_annotation=output_name, + ) + + print(f'Count of cells in the output annotation "{output_name}":') + print(adata.obs[output_name].value_counts()) + print("\nAfter PhenoGraph CPU Clustering:\n", adata) + + if save_to_disk: + saved_files = save_results( + results={"analysis": adata}, + params=params, + output_base_dir=output_dir, + ) + print( + f"PhenoGraph CPU Clustering completed -> {saved_files['analysis']}" + ) + return saved_files + + return adata + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print( + "Usage: python phenograph_clustering_cpu_template.py " + " [output_dir]", + file=sys.stderr, + ) + sys.exit(1) + + output_dir = sys.argv[2] if len(sys.argv) > 2 else None + result = run_from_json(json_path=sys.argv[1], output_dir=output_dir) + + if isinstance(result, dict): + print("\nOutput files:") + for name, path in result.items(): + print(f" {name}: {path}") + else: + print("\nReturned AnnData object") diff --git a/src/spac/templates/phenograph_clustering_gpu_template.py b/src/spac/templates/phenograph_clustering_gpu_template.py new file mode 100644 index 00000000..af098dec --- /dev/null +++ b/src/spac/templates/phenograph_clustering_gpu_template.py @@ -0,0 +1,146 @@ +""" +Galaxy GPU template for PhenoGraph clustering. + +Calls ``spac.transform.clustering.phenograph.gpu.grapheno.phenograph_gpu`` +inside the GPU Docker container (``nciccbr/spac-gpu``). Supports profiling +mode, which runs Leiden at multiple resolutions sharing the same KNN and +Jaccard computation (a grapheno feature exposed via the parquet cache in +the job work directory). + +Usage +----- +>>> from spac.templates.phenograph_clustering_gpu_template import run_from_json +>>> run_from_json("examples/phenograph_clustering_gpu_params.json") +""" +import sys +from pathlib import Path +from typing import Any, Dict, Union + +sys.path.append(str(Path(__file__).parent.parent.parent)) + +from spac.transform.clustering.phenograph import phenograph_gpu +from spac.templates.template_utils import ( + load_input, + parse_params, + save_results, + text_to_value, +) + + +def _resolutions(params): + profiling = bool(params.get("Profiling", False)) + res_list = params.get("Resolution_List") or [] + res_param = params.get("Resolution_Parameter", 1.0) + + if profiling: + if not res_list: + raise ValueError( + "Profiling=True but Resolution_List is empty." + ) + return True, [float(r) for r in res_list] + return False, [float(res_param)] + + +def run_from_json( + json_path: Union[str, Path, Dict[str, Any]], + save_to_disk: bool = True, + output_dir: str = None, +): + """ + Execute GPU PhenoGraph Clustering from Galaxy JSON parameters. + """ + if phenograph_gpu is None: + raise RuntimeError( + "GPU backend not available. Ensure RAPIDS (cuml, cugraph) is " + "installed and this template is running inside the GPU Docker " + "image (nciccbr/spac-gpu)." + ) + + params = parse_params(json_path) + + if output_dir is None: + output_dir = params.get("Output_Directory", ".") + + if "outputs" not in params: + params["outputs"] = { + "analysis": {"type": "file", "name": "output.pickle"} + } + + # Load AnnData + adata = load_input(params["Upstream_Analysis"]) + + layer = params.get("Table_to_Process", "Original") + if layer == "Original": + layer = None + + k = int(params.get("K_Nearest_Neighbors", 30)) + seed = int(params.get("Seed", 42)) + output_name = params.get("Output_Annotation_Name", "phenograph") + n_iterations = int(params.get("Number_of_Iterations", 100)) + + profiling, resolutions = _resolutions(params) + + print("Before PhenoGraph GPU Clustering:\n", adata) + print(f"Mode: {'profiling' if profiling else 'single'}, resolutions={resolutions}") + + # In profiling mode, the first resolution's call populates the parquet cache + # (KNN + Jaccard); subsequent calls only re-run Leiden. The cache lives in + # the process cwd, which Galaxy sets to the per-job work directory. + for res in resolutions: + col_name = ( + f"{output_name}-{k}-{res}" if profiling else output_name + ) + phenograph_gpu( + adata=adata, + features=None, + layer=layer, + k=k, + seed=seed, + resolution_parameter=res, + n_iterations=n_iterations, + output_annotation=col_name, + ) + n_clusters = int(adata.obs[col_name].nunique()) + print(f" {col_name}: {n_clusters} clusters") + + # Summary column for provenance (single-resolution mode uses output_name; + # profiling mode uses the last resolution as a reference). + summary_col = ( + f"{output_name}-{k}-{resolutions[-1]}" if profiling else output_name + ) + print(f'\nCount of cells in "{summary_col}":') + print(adata.obs[summary_col].value_counts()) + print("\nAfter PhenoGraph GPU Clustering:\n", adata) + + if save_to_disk: + saved_files = save_results( + results={"analysis": adata}, + params=params, + output_base_dir=output_dir, + ) + print( + f"PhenoGraph GPU Clustering completed -> {saved_files['analysis']}" + ) + return saved_files + + return adata + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print( + "Usage: python phenograph_clustering_gpu_template.py " + " [output_dir]", + file=sys.stderr, + ) + sys.exit(1) + + output_dir = sys.argv[2] if len(sys.argv) > 2 else None + result = run_from_json(json_path=sys.argv[1], output_dir=output_dir) + + if isinstance(result, dict): + print("\nOutput files:") + for name, path in result.items(): + print(f" {name}: {path}") + else: + print("\nReturned AnnData object") diff --git a/src/spac/templates/umap_transformation_template.py b/src/spac/templates/umap_transformation_template.py index 388f3d11..fee8e931 100644 --- a/src/spac/templates/umap_transformation_template.py +++ b/src/spac/templates/umap_transformation_template.py @@ -100,7 +100,10 @@ def run_from_json( # Load the upstream analysis data adata = load_input(params["Upstream_Analysis"]) - # Extract parameters - Note: HPC parameters are ignored in SPAC version + # Extract parameters. HPC parameters (Run_on_HPC, Batch_Mode, Partition, + # Number_of_CPUs, Memory_GB, Request_Time) were removed from the Galaxy + # tool in v2.1.0 and are no longer sent. params.get() is kept for safety + # so older JSON blueprints containing those keys still load cleanly. n_neighbors = params.get("Number_of_Neighbors", 75) min_dist = params.get("Minimum_Distance_between_Points", 0.1) n_components = params.get("Target_Dimension_Number", 2) diff --git a/src/spac/transform/__init__.py b/src/spac/transform/__init__.py new file mode 100644 index 00000000..e2aa7f20 --- /dev/null +++ b/src/spac/transform/__init__.py @@ -0,0 +1,7 @@ +""" +spac.transform — transformation algorithms, organized by type. + +This module is a new top-level namespace per the April 17 refactoring decision. +For now, only phenograph clustering lives here. The rest of the transformations +continue to live in ``spac.transformations`` and will migrate gradually. +""" diff --git a/src/spac/transform/clustering/__init__.py b/src/spac/transform/clustering/__init__.py new file mode 100644 index 00000000..f7afc077 --- /dev/null +++ b/src/spac/transform/clustering/__init__.py @@ -0,0 +1,3 @@ +""" +spac.transform.clustering — clustering algorithms. +""" diff --git a/src/spac/transform/clustering/phenograph/__init__.py b/src/spac/transform/clustering/phenograph/__init__.py new file mode 100644 index 00000000..4d649489 --- /dev/null +++ b/src/spac/transform/clustering/phenograph/__init__.py @@ -0,0 +1,37 @@ +""" +spac.transform.clustering.phenograph — PhenoGraph clustering (CPU and GPU). + +Public entry points +------------------- +prepare_features(adata, layer, features) + Shared preprocessing: extract a dense float32 feature matrix from AnnData. + Used by BOTH cpu and gpu paths to guarantee identical input. + +phenograph_cpu(adata, features, layer, k, seed, ...) + CPU PhenoGraph via the `phenograph` Python package. + INDEPENDENT of spac.transformations.phenograph_clustering. + +phenograph_gpu(adata, features, layer, k, seed, ...) + GPU PhenoGraph via grapheno (RAPIDS cuml + cugraph). + Only importable inside the GPU container; returns None in CPU-only envs. + +Notes +----- +This module is a deliberate parallel to the Biowulf ``spac_phenograph/CPU/`` +and ``spac_phenograph/GPU/`` folder split. Each path is a standalone +implementation; neither wraps the other. The legacy +``spac.transformations.phenograph_clustering`` function is untouched and +continues to serve the legacy ``phenograph_clustering.xml`` Galaxy tool. +""" + +from .preprocess import prepare_features +from .cpu import phenograph_cpu + +# GPU import is deferred. Import failures in CPU-only environments must not +# break ``from spac.transform.clustering.phenograph import phenograph_cpu``. +try: + from .gpu.grapheno import phenograph_gpu +except ImportError: + phenograph_gpu = None + +__all__ = ["prepare_features", "phenograph_cpu", "phenograph_gpu"] diff --git a/src/spac/transform/clustering/phenograph/cpu.py b/src/spac/transform/clustering/phenograph/cpu.py new file mode 100644 index 00000000..300a9bf2 --- /dev/null +++ b/src/spac/transform/clustering/phenograph/cpu.py @@ -0,0 +1,106 @@ +""" +spac.transform.clustering.phenograph.cpu + +Standalone CPU PhenoGraph clustering (parallel sibling to gpu/grapheno.py). + +This implementation calls the ``phenograph`` Python package directly and +does NOT wrap ``spac.transformations.phenograph_clustering``. Keeping the +two paths independent allows the legacy ``phenograph_clustering.xml`` +Galaxy tool to remain untouched while the new ``phenograph_clustering_cpu.xml`` +tool evolves freely. +""" +from __future__ import annotations + +import time +from typing import List, Optional + +from .preprocess import prepare_features + + +def phenograph_cpu( + adata, + features: Optional[List[str]] = None, + layer: Optional[str] = None, + k: int = 30, + seed: int = 42, + resolution_parameter: float = 1.0, + n_iterations: int = 100, + output_annotation: str = "phenograph", + min_cluster_size: int = 10, +): + """ + CPU PhenoGraph clustering: KNN + Jaccard + Leiden, all on CPU via + the ``phenograph`` Python package. + + Parameters + ---------- + adata : AnnData + Input data. Modified in-place; cluster labels are written to + ``adata.obs[output_annotation]``. + features : list of str or None + Subset of ``adata.var.index`` to use. If ``None``, uses all variables. + layer : str or None + Layer name. Pass ``None`` or ``"Original"`` to use ``adata.X``. + k : int + Number of nearest neighbors. + seed : int + Random seed forwarded to ``phenograph.cluster``. + resolution_parameter : float + Leiden resolution parameter. + n_iterations : int + Leiden iterations. Negative means run until no improvement. + output_annotation : str + Column name in ``adata.obs`` to write cluster labels. + min_cluster_size : int + Minimum cluster size; smaller clusters are labeled -1. + + Returns + ------- + adata : AnnData + The same object, with ``adata.obs[output_annotation]`` and + ``adata.uns['phenograph_clustering_cpu']`` populated. + """ + import phenograph + + data, feature_names = prepare_features(adata, layer=layer, features=features) + + print( + f"CPU PhenoGraph: {data.shape[0]:,} cells x {data.shape[1]} features, " + f"k={k}, resolution={resolution_parameter}" + ) + + t0 = time.time() + communities, graph, Q = phenograph.cluster( + data, + k=k, + seed=seed, + resolution_parameter=resolution_parameter, + n_iterations=n_iterations, + min_cluster_size=min_cluster_size, + clustering_algo="leiden", + ) + elapsed = time.time() - t0 + + adata.obs[output_annotation] = communities.astype("category") + + # Provenance — makes it easy to distinguish old vs. new CPU path downstream. + adata.uns.setdefault("phenograph_clustering_cpu", {}).update( + { + "k": int(k), + "seed": int(seed), + "resolution_parameter": float(resolution_parameter), + "n_iterations": int(n_iterations), + "min_cluster_size": int(min_cluster_size), + "layer": layer, + "features": list(feature_names), + "modularity": float(Q), + "elapsed_seconds": float(elapsed), + "backend": "phenograph", + } + ) + + n_clusters = int(adata.obs[output_annotation].nunique()) + print( + f" {n_clusters} clusters, modularity={Q:.4f}, time={elapsed:.1f}s" + ) + return adata diff --git a/src/spac/transform/clustering/phenograph/gpu/__init__.py b/src/spac/transform/clustering/phenograph/gpu/__init__.py new file mode 100644 index 00000000..51417543 --- /dev/null +++ b/src/spac/transform/clustering/phenograph/gpu/__init__.py @@ -0,0 +1,10 @@ +""" +spac.transform.clustering.phenograph.gpu + +GPU phenograph implementations. Currently exposes ``grapheno`` (the +RAPIDS-based port of grapheno_dmap). + +If a ``phenograph_cuml`` backend is added later, it lives in this package +alongside ``grapheno`` and this __init__ decides which to expose as the +default ``phenograph_gpu`` entry point. +""" diff --git a/src/spac/transform/clustering/phenograph/gpu/grapheno.py b/src/spac/transform/clustering/phenograph/gpu/grapheno.py new file mode 100644 index 00000000..31aadfb9 --- /dev/null +++ b/src/spac/transform/clustering/phenograph/gpu/grapheno.py @@ -0,0 +1,491 @@ +""" +spac.transform.clustering.phenograph.gpu.grapheno + +GPU PhenoGraph clustering — ported from the grapheno_dmap package used in +Foundry/Biowulf production. + +Attribution +----------- +Original algorithm: Erik Burlingame (Apache 2.0) + https://gitlab.com/eburling/grapheno +DMAP adaptations: Rui He (NCI/DMAP), March 2024 — added resolution and + random_state forwarding. +SCSAWorkflow port: Fang Liu (NCI/DMAP), April 2026 — packaged into SPAC, + added numpy-array and AnnData entry points, added cugraph 22.08/24.x + API compatibility. + +Algorithm +--------- + Dask KNN (cuml) -> Jaccard (cugraph) -> Leiden (cugraph) -> sort_by_size + +The three core functions (``compute_and_cache_knn_edgelist``, +``compute_and_cache_jac_edgelist``, and ``cluster``) are preserved +byte-compatible with the Biowulf production source to keep the validated +5.5M-cell output identical. Two additions: + +1. ``cluster_from_array(data, ...)`` — numpy-array entry point. Wraps the + CSV-based ``cluster()`` by writing a temp CSV to ``work_dir`` and reading + the labels back. +2. ``phenograph_gpu(adata, ...)`` — AnnData-level entry point that mirrors + the CPU signature in ``spac.transform.clustering.phenograph.cpu``. + +Requires +-------- +RAPIDS (cuml, cugraph, cudf, cupy, dask_cudf, dask_cuda). Do NOT import +this module in a CPU-only environment; ``spac.transform.clustering.phenograph`` +__init__ handles the ImportError gracefully. +""" +from __future__ import annotations + +import os +import time +from typing import List, Optional, Tuple + +import numpy as np + +# Heavy RAPIDS imports — only succeed inside the GPU container. +import cudf +import cugraph +import cupy as cp +from cuml.neighbors import NearestNeighbors as NN +from dask.distributed import Client +from dask_cuda import LocalCUDACluster + +# Optional import: dask_cudf and distributed KNN. Not all RAPIDS versions +# expose the Dask NN at the same path. +try: + import dask_cudf + from cuml.dask.neighbors import NearestNeighbors as DaskNN + + HAS_DASK_NN = True +except ImportError: + dask_cudf = None + DaskNN = None + HAS_DASK_NN = False + + +# --------------------------------------------------------------------------- +# KNN edgelist (ported verbatim from grapheno_dmap.cluster, minus tqdm) +# --------------------------------------------------------------------------- + +def compute_and_cache_knn_edgelist( + input_csv_path: str, + knn_edgelist_path: str, + features: List[str], + n_neighbors: int, + client=None, +) -> None: + print( + f"Computing and caching {n_neighbors}NN edgelist: {knn_edgelist_path}" + ) + + if client and HAS_DASK_NN: + chunksize = cugraph.dask.get_chunksize(input_csv_path) + X = dask_cudf.read_csv(input_csv_path, chunksize=chunksize) + X = X.loc[:, features].astype("float32") + model = DaskNN(n_neighbors=n_neighbors + 1, client=client) + else: + X = cudf.read_csv(input_csv_path) + X = X.loc[:, features].astype("float32") + model = NN(n_neighbors=n_neighbors + 1) + + model.fit(X) + + n_vertices = X.shape[0].compute() if client and HAS_DASK_NN else X.shape[0] + + # exclude self index + knn_edgelist = model.kneighbors(X, return_distance=False).loc[:, 1:] + if client and HAS_DASK_NN: + knn_edgelist = knn_edgelist.compute().reset_index(drop=True) + knn_edgelist = knn_edgelist.melt(var_name="knn", value_name="dst") + knn_edgelist = knn_edgelist.reset_index().rename(columns={"index": "src"}) + knn_edgelist = knn_edgelist.loc[:, ["src", "dst"]] + knn_edgelist["src"] = knn_edgelist["src"] % n_vertices # avoids transpose + knn_edgelist.to_parquet(knn_edgelist_path) + + +# --------------------------------------------------------------------------- +# Jaccard edgelist +# --------------------------------------------------------------------------- + +def compute_and_cache_jac_edgelist( + knn_edgelist_path: str, + jac_edgelist_path: str, + distributed: bool = False, +) -> None: + print(f"Computing and caching jaccard edgelist: {jac_edgelist_path}") + knn_graph = _load_knn_graph(knn_edgelist_path, distributed) + jac_graph = cugraph.jaccard(knn_graph) + jac_graph.to_parquet(jac_edgelist_path) + + +def _load_knn_graph(knn_edgelist_path: str, distributed: bool = False): + """ + Load KNN edgelist parquet back into a cugraph.Graph. + + RAPIDS 22.08 uses source='src', destination='dst'. RAPIDS 24.x may + use different column names. We detect and remap here. + """ + G = cugraph.Graph() + if distributed and HAS_DASK_NN: + edgelist = dask_cudf.read_parquet( + knn_edgelist_path, split_row_groups=True + ) + else: + edgelist = cudf.read_parquet(knn_edgelist_path) + + cols = list(edgelist.columns) + # Biowulf-produced parquet uses 'src'/'dst'; normalize if needed. + src_col = "src" if "src" in cols else cols[0] + dst_col = "dst" if "dst" in cols else cols[1] + + if distributed and HAS_DASK_NN: + G.from_dask_cudf_edgelist(edgelist, source=src_col, destination=dst_col) + else: + G.from_cudf_edgelist(edgelist, source=src_col, destination=dst_col) + return G + + +def _load_jac_graph(jac_edgelist_path: str, distributed: bool = False): + """ + Load Jaccard edgelist parquet back into a cugraph.Graph with weights. + + cugraph 22.08 jaccard output columns: ``source``, ``destination``, + ``jaccard_coeff``. cugraph 24.x may rename these to ``first``, ``second``. + The ``edge_attr`` key should be stable as ``jaccard_coeff``. + """ + G = cugraph.Graph() + if distributed and HAS_DASK_NN: + edgelist = dask_cudf.read_parquet( + jac_edgelist_path, split_row_groups=True + ) + G.from_dask_cudf_edgelist(edgelist, edge_attr="jaccard_coeff") + else: + edgelist = cudf.read_parquet(jac_edgelist_path) + G.from_cudf_edgelist(edgelist, edge_attr="jaccard_coeff") + return G + + +# --------------------------------------------------------------------------- +# sort_by_size (verbatim from grapheno_dmap) +# --------------------------------------------------------------------------- + +def sort_by_size(clusters, min_size: int): + """ + Relabel clustering in order of descending cluster size. + + New labels are consecutive integers beginning at 0. Clusters smaller + than ``min_size`` are assigned to -1. + + Copied from https://github.com/jacoblevine/PhenoGraph. + """ + relabeled = cp.zeros(clusters.shape, dtype=int) + _, counts = cp.unique(clusters, return_counts=True) + o = cp.argsort(counts)[::-1] + for i, c in enumerate(o): + if counts[c] > min_size: + relabeled[clusters == c] = i + else: + relabeled[clusters == c] = -1 + return relabeled + + +# --------------------------------------------------------------------------- +# Leiden wrapper — version-agnostic +# --------------------------------------------------------------------------- + +def _run_leiden(jac_graph, resolution: float, random_state: int, max_iter: int): + """ + Call cugraph.leiden with forward-compatible kwargs. + + RAPIDS 22.08's cugraph.leiden signature: + leiden(G, resolution=1.0) + RAPIDS 24.x's cugraph.leiden signature: + leiden(G, resolution=1.0, random_state=None, max_iter=100) + + We prefer the newer signature; fall back to the old one via try/except. + """ + try: + return cugraph.leiden( + jac_graph, + resolution=resolution, + random_state=random_state, + max_iter=max_iter, + ) + except TypeError: + print( + " [WARN] Falling back to cugraph.leiden without random_state/" + "max_iter (RAPIDS 22.08 signature)" + ) + return cugraph.leiden(jac_graph, resolution=resolution) + + +# --------------------------------------------------------------------------- +# Core cluster() — ported from grapheno_dmap.cluster +# --------------------------------------------------------------------------- + +def cluster( + input_csv_path: str, + features: List[str], + n_neighbors: int = 30, + resolution: float = 1.0, + random_state: int = 42, + n_iterations: int = 100, + distributed_knn: bool = True, + distributed_graphs: bool = False, + min_size: int = 10, +): + """ + Run grapheno clustering on a CSV input, writing a parquet output with + a ``cluster`` column alongside the original features. + + Reads the input CSV, runs KNN -> Jaccard -> Leiden -> sort_by_size, and + returns a cuDF DataFrame with the original features plus a ``cluster`` + column. KNN and Jaccard edgelists are cached as parquet files in the + current working directory so subsequent calls at the same ``n_neighbors`` + can reuse them (the profiling optimization). + + Matches the original grapheno_dmap.cluster signature plus an added + ``n_iterations`` for Leiden (threaded through for cugraph 24.x). + """ + tic = time.time() + + stem = os.path.basename(input_csv_path).rsplit(".", 1)[0] + knn_edgelist_path = f"{stem}_{n_neighbors}NN_edgelist.parquet" + jac_edgelist_path = f"{stem}_{n_neighbors}NN_edgelist_jaccard.parquet" + + subtic = time.time() + + if os.path.exists(jac_edgelist_path): + print(f"Loading cached jaccard edgelist: {jac_edgelist_path}") + jac_graph = _load_jac_graph(jac_edgelist_path, distributed_graphs) + print(f"Jaccard graph loaded in {time.time() - subtic:.2f}s") + + elif os.path.exists(knn_edgelist_path): + print(f"Loading cached kNN edgelist: {knn_edgelist_path}") + compute_and_cache_jac_edgelist( + knn_edgelist_path, jac_edgelist_path, distributed_graphs + ) + jac_graph = _load_jac_graph(jac_edgelist_path, distributed_graphs) + print( + f"Jaccard graph computed, cached, and reloaded in " + f"{time.time() - subtic:.2f}s" + ) + + else: + # Note: LocalCUDACluster is started even when distributed_knn=False, + # matching the original grapheno_dmap behavior. On AWS Docker this is + # safe; on Biowulf Singularity with RAPIDS 24.x this triggers the UCX + # segfault — which is why production stays on RAPIDS 22.08. + if distributed_knn and HAS_DASK_NN: + with LocalCUDACluster() as lc, Client(lc) as client: + compute_and_cache_knn_edgelist( + input_csv_path, + knn_edgelist_path, + features, + n_neighbors, + client, + ) + else: + compute_and_cache_knn_edgelist( + input_csv_path, knn_edgelist_path, features, n_neighbors, None + ) + + print( + f"{n_neighbors}NN edgelist computed in " + f"{time.time() - subtic:.2f}s" + ) + + subtic = time.time() + compute_and_cache_jac_edgelist( + knn_edgelist_path, jac_edgelist_path, distributed_graphs + ) + jac_graph = _load_jac_graph(jac_edgelist_path, distributed_graphs) + print( + f"Jaccard graph computed, cached, and reloaded in " + f"{time.time() - subtic:.2f}s" + ) + + subtic = time.time() + print("Computing Leiden clustering over Jaccard graph...") + clusters_df, modularity = _run_leiden( + jac_graph, + resolution=resolution, + random_state=random_state, + max_iter=n_iterations, + ) + print(f"Leiden clustering completed in {time.time() - subtic:.2f}s") + print(f"Clusters detected: {len(clusters_df.partition.unique())}") + print(f"Clusters modularity: {modularity}") + + clusters_arr = clusters_df.sort_values(by="vertex").partition.values + clusters_arr = sort_by_size(clusters_arr, min_size) + + out_parquet = f"{input_csv_path.rsplit('.', 1)[0]}_{n_neighbors}NN_leiden.parquet" + print(f"Writing output dataframe: {out_parquet}") + + df = cudf.read_csv(input_csv_path) + df["cluster"] = clusters_arr + df.to_parquet(out_parquet) + df = cudf.read_parquet(out_parquet) + print(f"Grapheno completed in {time.time() - tic:.2f}s") + + return df, float(modularity), time.time() - tic + + +# --------------------------------------------------------------------------- +# NumPy-array entry point +# --------------------------------------------------------------------------- + +def cluster_from_array( + data: np.ndarray, + n_neighbors: int = 30, + resolution: float = 1.0, + random_state: int = 42, + n_iterations: int = 100, + min_size: int = 10, + work_dir: str = ".", +) -> Tuple[np.ndarray, float, float]: + """ + NumPy-array wrapper around ``cluster()``. + + Writes ``data`` as ``cluster_input.csv`` in ``work_dir``, calls + ``cluster()``, and reads back only the ``cluster`` column. Temp CSV and + parquet caches remain in ``work_dir`` — clean up externally if needed. + + Returns + ------- + labels : np.ndarray + (n_cells,) int cluster labels, -1 for clusters smaller than ``min_size``. + modularity : float + elapsed : float + Total wall-clock time. + """ + import pandas as pd # kept local to avoid pandas import cost at module load + + os.makedirs(work_dir, exist_ok=True) + feature_names = [f"feature_{i}" for i in range(data.shape[1])] + csv_path = os.path.join(work_dir, "cluster_input.csv") + pd.DataFrame(data, columns=feature_names).to_csv(csv_path, index=False) + + orig_dir = os.getcwd() + os.chdir(work_dir) + try: + df, modularity, elapsed = cluster( + input_csv_path=os.path.basename(csv_path), + features=feature_names, + n_neighbors=n_neighbors, + resolution=resolution, + random_state=random_state, + n_iterations=n_iterations, + distributed_knn=True, + distributed_graphs=False, + min_size=min_size, + ) + # df is a cudf.DataFrame; pull cluster column as numpy + labels = df["cluster"].to_numpy() + finally: + os.chdir(orig_dir) + + if len(labels) != data.shape[0]: + raise RuntimeError( + f"Grapheno returned {len(labels)} labels but input had " + f"{data.shape[0]} rows." + ) + + return labels, modularity, elapsed + + +# --------------------------------------------------------------------------- +# AnnData-level entry point (mirrors CPU signature) +# --------------------------------------------------------------------------- + +def phenograph_gpu( + adata, + features: Optional[List[str]] = None, + layer: Optional[str] = None, + k: int = 30, + seed: int = 42, + resolution_parameter: float = 1.0, + n_iterations: int = 100, + output_annotation: str = "phenograph", + min_cluster_size: int = 10, + work_dir: Optional[str] = None, +): + """ + GPU PhenoGraph clustering mirroring the CPU signature. + + Parameters match ``spac.transform.clustering.phenograph.cpu.phenograph_cpu`` + so templates can swap between them with minimal branching. + + Parameters + ---------- + adata : AnnData + features : list of str or None + layer : str or None + k : int + seed : int + resolution_parameter : float + n_iterations : int + output_annotation : str + min_cluster_size : int + work_dir : str or None + Scratch dir for grapheno's CSV and parquet caches. Defaults to cwd. + + Returns + ------- + adata : AnnData + The same object, with ``adata.obs[output_annotation]`` (categorical + string labels) and ``adata.uns['phenograph_clustering_gpu']`` populated. + """ + from .preprocess import prepare_features # local import to avoid cycle + + data, feature_names = prepare_features(adata, layer=layer, features=features) + + print( + f"GPU PhenoGraph (grapheno): {data.shape[0]:,} cells x " + f"{data.shape[1]} features, k={k}, resolution={resolution_parameter}" + ) + + labels, modularity, elapsed = cluster_from_array( + data=data, + n_neighbors=k, + resolution=resolution_parameter, + random_state=seed, + n_iterations=n_iterations, + min_size=min_cluster_size, + work_dir=work_dir or os.getcwd(), + ) + + # Convert to pandas categorical string labels (consistent with CPU) + adata.obs[output_annotation] = labels.astype(str) + adata.obs[output_annotation] = adata.obs[output_annotation].astype("category") + + adata.uns.setdefault("phenograph_clustering_gpu", {}).update( + { + "k": int(k), + "seed": int(seed), + "resolution_parameter": float(resolution_parameter), + "n_iterations": int(n_iterations), + "min_cluster_size": int(min_cluster_size), + "layer": layer, + "features": list(feature_names), + "modularity": float(modularity), + "elapsed_seconds": float(elapsed), + "backend": "grapheno", + } + ) + + n_clusters = int(adata.obs[output_annotation].nunique()) + print( + f" {n_clusters} clusters, modularity={modularity:.4f}, time={elapsed:.1f}s" + ) + return adata + + +__all__ = [ + "cluster", + "cluster_from_array", + "phenograph_gpu", + "sort_by_size", +] diff --git a/src/spac/transform/clustering/phenograph/preprocess.py b/src/spac/transform/clustering/phenograph/preprocess.py new file mode 100644 index 00000000..e204de6b --- /dev/null +++ b/src/spac/transform/clustering/phenograph/preprocess.py @@ -0,0 +1,68 @@ +""" +Shared preprocessing for CPU and GPU phenograph paths. + +Both paths use the same ``prepare_features`` to extract a dense float32 +feature matrix from AnnData, guaranteeing identical inputs. This is the +single piece of code shared between the two implementations, in line with +George's April 17 guidance ("all the preprocessing you do can happen in +one place, independent if you are GPU or CPU"). +""" +from __future__ import annotations + +from typing import List, Optional, Tuple + +import numpy as np + + +def prepare_features( + adata, + layer: Optional[str] = None, + features: Optional[List[str]] = None, +) -> Tuple[np.ndarray, List[str]]: + """ + Extract a dense feature matrix from AnnData for clustering. + + Parameters + ---------- + adata : AnnData + layer : str or None + Layer name. Pass ``None`` or ``"Original"`` to use ``adata.X``. + features : list of str or None + Subset of ``adata.var.index`` to use. If ``None``, uses all variables. + + Returns + ------- + data : np.ndarray + (n_cells, n_features) float32, C-contiguous. + feature_names : list of str + Ordered feature names matching the columns of ``data``. + + Raises + ------ + KeyError + If ``layer`` is not ``None``/``"Original"`` and is not in + ``adata.layers``. + """ + if layer in (None, "", "Original"): + matrix = adata.X + else: + if layer not in adata.layers: + available = list(adata.layers.keys()) + raise KeyError( + f"Layer '{layer}' not in adata.layers. Available: {available}" + ) + matrix = adata.layers[layer] + + # Densify sparse matrices + if hasattr(matrix, "toarray"): + matrix = matrix.toarray() + + feature_names = list(features) if features else list(adata.var.index) + + # Subset columns if a feature list was provided + if features: + col_idx = [adata.var.index.get_loc(f) for f in features] + matrix = matrix[:, col_idx] + + data = np.ascontiguousarray(np.asarray(matrix, dtype=np.float32)) + return data, feature_names diff --git a/tools/Dockerfile b/tools/Dockerfile index ab98d704..8753b4c3 100644 --- a/tools/Dockerfile +++ b/tools/Dockerfile @@ -1,17 +1,56 @@ -# Reference copy of the Dockerfile used to build nciccbr/spac Docker image -# for Galaxy and Code Ocean tools. -# The canonical build is triggered via PR to: https://github.com/CCBR/Dockers2 -# Docker Hub: https://hub.docker.com/r/nciccbr/spac +# Dockerfile.v0.9.2 — nciccbr/spac:v0.9.2 +# +# SPAC CPU image for AWS Galaxy. Adds the new CPU PhenoGraph path +# (spac.transform.clustering.phenograph.cpu) that lives on the +# feature/phenograph-gpu-refactor branch, alongside the legacy +# spac.transformations.phenograph_clustering that the existing +# phenograph_clustering.xml Galaxy tool uses. +# +# ============================================================================= +# Relationship to v0.9.1 +# ----------------------------------------------------------------------------- +# This file is a minimal fork of the validated v0.9.1 Dockerfile. The only +# substantive differences are: +# +# 1. LABEL version bumped 0.9.1 -> 0.9.2 +# 2. ARG GITHUB_BRANCH default flipped from "main" to the feature branch, +# because the new spac.transform.clustering.phenograph.cpu module and +# the new phenograph_clustering_cpu_template have not merged to main yet. +# (Once the SCSAWorkflow PR merges and a tag is cut, bump this to the +# release tag.) +# 3. Build-time verification tests (the RUN "=== VERIFYING SPAC INSTALLATION") +# extended with one extra check for the new CPU module, so Dockers2 CI +# catches any regression in the refactored code at build time rather than +# at Galaxy runtime. +# +# Everything else — curl-based environment.yml fetch, --no-deps pip install, +# channel config, env variables, CMD — is copied verbatim from v0.9.1 to +# minimize behavior drift and stay within the known-good CCBR/Dockers2 pattern. +# +# Why curl and not COPY? +# ---------------------- +# The CCBR/Dockers2 build-docker action's build context does not reliably +# make sibling files (like environment.yml) visible to the Docker build. +# Attempting `COPY environment.yml ...` fails with +# EnvironmentFileNotFound: '/environment.yml' file not found +# The v0.9.1 workaround — curl from raw.githubusercontent.com at build time — +# is what actually works in this CI. +# ============================================================================= + FROM continuumio/miniconda3:24.3.0-0 + # Build arguments ARG ENV_NAME=spac -ARG GITHUB_BRANCH=main +# Default to the feature branch until the PhenoGraph GPU refactor merges +# to SCSAWorkflow main and a release is tagged. Bump to the tag (e.g. +# v0.9.2 on SCSAWorkflow) once that happens. +ARG GITHUB_BRANCH=feature/phenograph-gpu-refactor ARG GITHUB_REPO=FNLCR-DMAP/SCSAWorkflow # Set labels LABEL maintainer="FNLCR-DMAP" LABEL description="SPAC - Single Cell Spatial Analysis Container" -LABEL version="0.9.1" +LABEL version="0.9.2" LABEL github.branch="${GITHUB_BRANCH}" # Install system dependencies including Chromium for Kaleido (ARM64 compatible) @@ -43,11 +82,11 @@ ENV CHROME_BIN=/usr/bin/chromium RUN conda install -n base -y conda-libmamba-solver && \ conda config --set solver libmamba -# Simulate exactly what a reviewer would do following README.md instructions WORKDIR /opt/SCSAWorkflow # Step 1: Fetch environment.yml from GitHub branch -# (Replaces local COPY since we're installing from GitHub) +# (Replaces local COPY since we're installing from GitHub, and because the +# CCBR/Dockers2 build context does not include sibling files.) RUN curl -fsSL "https://raw.githubusercontent.com/${GITHUB_REPO}/${GITHUB_BRANCH}/environment.yml" \ -o environment.yml @@ -77,7 +116,9 @@ ENV PYTHONPATH=/opt/conda/envs/${ENV_NAME}/lib/python3.9/site-packages:${PYTHONP # Step 6: "Install the SPAC package" from GitHub branch # (Replaces: pip install -e . with direct GitHub install) -# Using --no-deps to preserve conda-installed package versions (pandas 1.5.3, scimap 2.1.3_dmap_pandas153) +# Using --no-deps to preserve conda-installed package versions +# (pandas 1.5.3, scimap 2.1.3_dmap_pandas153) that would otherwise be +# clobbered by spac's unpinned install_requires. RUN /opt/conda/envs/${ENV_NAME}/bin/pip install --no-deps \ "git+https://github.com/${GITHUB_REPO}.git@${GITHUB_BRANCH}" @@ -106,7 +147,16 @@ RUN mkdir -p /workspace /data /results # Install jupyter and nbconvert for notebook testing RUN /opt/conda/envs/${ENV_NAME}/bin/pip install jupyter nbconvert -# Verify SPAC installation works correctly in multiple ways +# ----------------------------------------------------------------------------- +# Verify SPAC installation works correctly in multiple ways. +# +# Extended from v0.9.1 with a check for the new CPU path: +# - Test 5: new CPU path (spac.transform.clustering.phenograph.cpu + +# phenograph_clustering_cpu_template) — only on the feature +# branch. If Test 5 fails with ModuleNotFoundError on +# `spac.transform`, the GITHUB_BRANCH build arg is pointing +# at a ref that predates the PhenoGraph GPU refactor. +# ----------------------------------------------------------------------------- RUN echo "=== VERIFYING SPAC INSTALLATION ===" && \ echo "Test 1: Direct python call" && \ python -c "import spac; print(f'SPAC version: {spac.__version__}')" && \ @@ -116,8 +166,10 @@ RUN echo "=== VERIFYING SPAC INSTALLATION ===" && \ python -c "import sys; print(sys.executable)" && \ echo "Test 4: Import scimap" && \ python -c "import scimap; print('scimap imported successfully')" && \ - echo "Test 5: Check templates location" && \ - python -c "import spac; import os; templates_dir = os.path.join(os.path.dirname(spac.__file__), 'templates'); print(f'Templates dir: {templates_dir}'); print('Templates exist:', os.path.exists(templates_dir))" && \ + echo "Test 5: New CPU path (transform/clustering/phenograph)" && \ + python -c "from spac.transform.clustering.phenograph import phenograph_cpu, prepare_features; \ + from spac.templates.phenograph_clustering_cpu_template import run_from_json; \ + print('new CPU path OK')" && \ echo "Test 6: Verify UMAP compatibility with scikit-learn" && \ python -c "import umap; import numpy as np; data = np.random.rand(100, 10); reducer = umap.UMAP(n_neighbors=5, min_dist=0.1); reducer.fit_transform(data); print(f'UMAP version: {umap.__version__} - OK')" && \ echo "=== ALL TESTS PASSED ===" || \ diff --git a/tools/Dockerfile.gpu b/tools/Dockerfile.gpu new file mode 100644 index 00000000..041a6912 --- /dev/null +++ b/tools/Dockerfile.gpu @@ -0,0 +1,218 @@ +# Dockerfile.v1.0.0 — nciccbr/spac-gpu:v1.0.0 +# +# SPAC GPU image for AWS Galaxy. Installs the grapheno GPU PhenoGraph +# backend and the SPAC package on top of NVIDIA's RAPIDS 22.08 image +# (the same RAPIDS version used in Biowulf/Foundry production, so cluster +# outputs match historical analyses). +# +# ============================================================================= +# Build context +# ----------------------------------------------------------------------------- +# This Dockerfile builds cleanly in the CCBR/Dockers2 CI with no reliance +# on files from the SCSAWorkflow source tree. SPAC itself is installed via +# `pip install --no-deps git+https://...@${GITHUB_BRANCH}` into both envs, +# and the three GPU orchestration scripts are pulled from the same ref via +# a shallow git clone. Everything stays pinned to GITHUB_BRANCH. +# +# For CCBR/Dockers2 CI (default): builds nciccbr/spac-gpu:v1.0.0 against +# feature/phenograph-gpu-refactor. Bump GITHUB_BRANCH when SPAC is tagged. +# +# Manual local build example: +# docker build -f Dockerfile.v1.0.0 \ +# -t nciccbr/spac-gpu:v1.0.0-test \ +# --build-arg GITHUB_BRANCH=feature/phenograph-gpu-refactor \ +# . +# +# ============================================================================= +# Two-env pattern (required for RAPIDS 22.08) +# ----------------------------------------------------------------------------- +# RAPIDS 22.08 pins numpy==1.22 / pandas==1.4.4, incompatible with +# anndata>=0.10. The image therefore ships two conda envs: +# +# preprocess env : anndata 0.10.9 + pandas 1.5.3 + numpy 1.26.4 + +# matplotlib + pyyaml (only what stages 1/3 need). +# Runs stages 1 and 3 (AnnData I/O + provenance). +# rapids env : cuml 22.08 + cugraph 22.08 (RAPIDS-pinned deps). +# Runs stage 2 (KNN + Jaccard + Leiden). +# +# The three-stage orchestrator at /opt/spac-gpu/run_gpu.sh shuttles data +# between envs via files on disk (features.npy, adata.pickle, +# cluster_result_.npy). The Galaxy XML invokes run_gpu.sh directly. +# +# ============================================================================= +# Why --no-deps for spac in BOTH envs +# ----------------------------------------------------------------------------- +# spac's setup.py install_requires lists 14 unpinned heavy scientific deps +# (scanpy, squidpy, numba, datashader, plotly, scimap, phenograph, ...). +# Letting pip resolve them on top of a fresh mamba env causes a giant +# solver loop and at least one C extension compile failure against the +# pinned numpy. We know this because Kelly's earlier draft hit exactly +# this error in CI. +# +# Stages 1 and 3 only need anndata + pandas + numpy + matplotlib + pyyaml, +# all of which are installed via mamba with explicit version pins in the +# preprocess env. Stage 2 only needs numpy + the RAPIDS stack, both +# already in the base rapids env. +# +# With --no-deps, we're installing spac for its CODE only — the modules +# `spac.transform.clustering.phenograph.*`, `spac.templates.template_utils`, +# etc. The runtime deps each module needs are supplied by whichever env +# imports it. +# +# When migrating to RAPIDS 24.10+, drop the preprocess env and the +# orchestrator scripts entirely, and have the Galaxy XML call +# `spac.templates.phenograph_clustering_gpu_template.run_from_json` +# in a single rapids env. +# +# ============================================================================= +# Smoke test (requires --gpus all) +# ----------------------------------------------------------------------------- +# docker run --rm --gpus all \ +# bash -c "source /opt/conda/etc/profile.d/conda.sh && \ +# conda activate rapids && \ +# python -c 'import cuml, cugraph; \ +# from spac.transform.clustering.phenograph.gpu.grapheno \ +# import phenograph_gpu; \ +# print(\"GPU OK:\", cuml.__version__, cugraph.__version__)'" +# ============================================================================= + +FROM rapidsai/rapidsai:22.08-cuda11.5-runtime-ubuntu20.04-py3.9 + +ARG GITHUB_BRANCH=feature/phenograph-gpu-refactor +ARG GITHUB_REPO=FNLCR-DMAP/SCSAWorkflow + +LABEL maintainer="FNLCR-DMAP" +LABEL description="SPAC GPU image (grapheno PhenoGraph on RAPIDS 22.08) for AWS Galaxy" +LABEL version="1.0.0" +LABEL github.branch="${GITHUB_BRANCH}" +LABEL rapids.version="22.08" + +SHELL ["/bin/bash", "-c"] + +# ----------------------------------------------------------------------------- +# 1. Create the 'preprocess' env with anndata + modern pandas/numpy + +# the minimum surface area stages 1 and 3 actually need. We deliberately +# do NOT install scanpy/squidpy/scimap/phenograph/numba/datashader here +# — those are declared in spac's install_requires but not touched by +# template_utils.load_input/save_results or the preprocess module. +# Keeping the env lean keeps the mamba solve fast and avoids C +# extension compile surprises. +# ----------------------------------------------------------------------------- +RUN source /opt/conda/etc/profile.d/conda.sh && \ + mamba create -n preprocess -y \ + -c conda-forge \ + python=3.9.13 \ + pandas=1.5.3 \ + numpy=1.26.4 \ + anndata=0.10.9 \ + matplotlib=3.9.2 \ + pyyaml \ + pip \ + && conda clean -afy + +# ----------------------------------------------------------------------------- +# 2. Install SPAC into BOTH envs from the SAME git ref, with --no-deps. +# +# We install for code only — all runtime deps are supplied by the +# surrounding env (mamba-pinned versions in preprocess; RAPIDS-shipped +# versions in rapids). +# ----------------------------------------------------------------------------- +RUN source /opt/conda/etc/profile.d/conda.sh && \ + conda activate preprocess && \ + pip install --no-cache-dir --no-deps \ + "git+https://github.com/${GITHUB_REPO}.git@${GITHUB_BRANCH}" + +RUN source /opt/conda/etc/profile.d/conda.sh && \ + conda activate rapids && \ + pip install --no-cache-dir --no-deps \ + "git+https://github.com/${GITHUB_REPO}.git@${GITHUB_BRANCH}" + +# ----------------------------------------------------------------------------- +# 3. Pull the three GPU orchestration scripts from the SAME ref as the +# pip installs above. Kept out of spac's setup.py because they are +# RAPIDS-22.08-only infrastructure that disappears when the container +# migrates to RAPIDS 24.10+. +# +# Galaxy XML entry point: bash /opt/spac-gpu/run_gpu.sh +# ----------------------------------------------------------------------------- +RUN git clone --depth 1 --branch "${GITHUB_BRANCH}" \ + "https://github.com/${GITHUB_REPO}.git" /tmp/scsa && \ + mkdir -p /opt/spac-gpu && \ + cp /tmp/scsa/tools/spac_gpu/run_gpu.sh /opt/spac-gpu/run_gpu.sh && \ + cp /tmp/scsa/tools/spac_gpu/run_grapheno.py /opt/spac-gpu/run_grapheno.py && \ + cp /tmp/scsa/tools/spac_gpu/merge_labels.py /opt/spac-gpu/merge_labels.py && \ + chmod +x /opt/spac-gpu/run_gpu.sh && \ + ln -s /opt/spac-gpu/run_gpu.sh /usr/local/bin/spac-gpu-run && \ + rm -rf /tmp/scsa + +# ----------------------------------------------------------------------------- +# 4. Build-time sanity checks for the preprocess env. +# +# preprocess env must import anndata + spac.templates.template_utils +# + spac.transform.clustering.phenograph.prepare_features +# (what stage 1 and stage 3 load). +# +# The rapids env is NOT exercised via Python here — see section 5. +# ----------------------------------------------------------------------------- +RUN source /opt/conda/etc/profile.d/conda.sh && \ + conda activate preprocess && \ + echo "=== Verifying preprocess env ===" && \ + python -c "import anndata, pandas, numpy, matplotlib; \ + print(f' anndata={anndata.__version__}, pandas={pandas.__version__}, numpy={numpy.__version__}')" && \ + python -c "from spac.transform.clustering.phenograph import prepare_features; \ + from spac.templates.template_utils import load_input, parse_params, save_results; \ + print(' preprocess env: SPAC imports OK')" + +# ----------------------------------------------------------------------------- +# 5. STRUCTURE / PRESENCE CHECK for the rapids env (NOT a runtime +# validation of RAPIDS itself). +# +# What this step does: +# - Confirms the rapids env prefix resolves to exactly one +# python*/site-packages directory (catches env creation failures). +# - Confirms each required package directory exists on disk and that +# the spac grapheno module is physically installed where expected. +# +# What this step deliberately does NOT do: +# - It does NOT import cuml/cugraph/cudf/cupy/dask_cuda in Python. +# - It does NOT prove the packages are importable at runtime. +# - It does NOT detect ABI mismatches, missing shared libraries, +# or broken entry points. +# +# Why: RAPIDS 22.08 has transitive cuda-python init hooks that +# segfault on Python shutdown (exit code 139) when libcuda.so.1 is +# absent — as on any CPU-only CI builder. Even `importlib.util.find_spec` +# triggers this because unrelated rapids-env imports register atexit +# handlers that run when the interpreter shuts down. +# +# Real RAPIDS runtime validation lives separately in +# SCSAWorkflow/tools/spac_gpu/smoke_test.sh, which must be run on a +# GPU host (Biowulf or AWS A10G/A100) against the published Docker +# image before the image is promoted to production. +# ----------------------------------------------------------------------------- +RUN echo "=== Rapids env presence check (structure only, not runtime) ===" && \ + RAPIDS_SITE_MATCHES=(/opt/conda/envs/rapids/lib/python*/site-packages) && \ + if [ ${#RAPIDS_SITE_MATCHES[@]} -ne 1 ] || [ ! -d "${RAPIDS_SITE_MATCHES[0]}" ]; then \ + echo "ERROR: expected exactly one rapids python*/site-packages dir, found: ${RAPIDS_SITE_MATCHES[*]}" >&2; \ + exit 1; \ + fi && \ + RAPIDS_SITE="${RAPIDS_SITE_MATCHES[0]}" && \ + echo " rapids site-packages: $RAPIDS_SITE" && \ + for pkg in cuml cugraph cudf cupy dask_cuda; do \ + test -d "$RAPIDS_SITE/$pkg" || { echo "ERROR: missing package dir: $pkg" >&2; exit 1; }; \ + done && \ + test -d "$RAPIDS_SITE/spac/transform/clustering/phenograph/gpu" && \ + test -f "$RAPIDS_SITE/spac/transform/clustering/phenograph/gpu/grapheno.py" && \ + echo " presence check OK (run smoke_test.sh on a GPU host to validate runtime)" + +# ----------------------------------------------------------------------------- +# 6. Verify the orchestration scripts are present and executable. +# ----------------------------------------------------------------------------- +RUN echo "=== Verifying orchestration scripts ===" && \ + test -x /opt/spac-gpu/run_gpu.sh && \ + test -f /opt/spac-gpu/run_grapheno.py && \ + test -f /opt/spac-gpu/merge_labels.py && \ + test -L /usr/local/bin/spac-gpu-run && \ + echo " /opt/spac-gpu/ contents OK" + +WORKDIR /workspace diff --git a/tools/galaxy_tools/phenograph_clustering_cpu.xml b/tools/galaxy_tools/phenograph_clustering_cpu.xml new file mode 100644 index 00000000..ecd6e8b8 --- /dev/null +++ b/tools/galaxy_tools/phenograph_clustering_cpu.xml @@ -0,0 +1,73 @@ + + + Calculate automatic phenotypes using PhenoGraph on CPU. For datasets larger than ~500K cells, use the GPU tool instead. + + nciccbr/spac:v0.9.2 + + + + + outputs_config.json && +python '$__tool_directory__/format_values.py' 'galaxy_params.json' 'cleaned_params.json' --inject-outputs --outputs-config outputs_config.json && +python -c "from spac.templates.phenograph_clustering_cpu_template import run_from_json; run_from_json('cleaned_params.json')" + + ]]> + + + + + + + + + + + + + + + 5M cells: GPU required (CPU may not complete) + +**Outputs** + +- analysis: pickle file containing the AnnData object with the cluster + labels in ``adata.obs[]`` and provenance in + ``adata.uns['phenograph_clustering_cpu']``. + + ]]> + + @misc{spac_toolkit, + author = {FNLCR DMAP Team}, + title = {SPAC: SPAtial single-Cell analysis}, + year = {2022}, + url = {https://github.com/FNLCR-DMAP/SCSAWorkflow} +} + 10.1101/2025.04.02.646782 + + diff --git a/tools/galaxy_tools/phenograph_clustering_gpu.xml b/tools/galaxy_tools/phenograph_clustering_gpu.xml new file mode 100644 index 00000000..38e2c87c --- /dev/null +++ b/tools/galaxy_tools/phenograph_clustering_gpu.xml @@ -0,0 +1,112 @@ + + + GPU-accelerated PhenoGraph clustering using RAPIDS (grapheno backend). For large datasets (>~500K cells) where CPU is too slow. + + nciccbr/spac-gpu:v1.0.0 + + + + + outputs_config.json && +python '$__tool_directory__/format_values.py' 'galaxy_params.json' 'cleaned_params.json' + --bool-values Profiling + --list-values Resolution_List + --inject-outputs --outputs-config outputs_config.json && +bash /opt/spac-gpu/run_gpu.sh 'cleaned_params.json' + + ]]> + + + + + + + + +
+ + + + +
+
+ + + + + + Jaccard similarity -> +Leiden community detection, plus sort_by_size relabeling. + +**When to use GPU vs CPU** + +- < 500K cells: CPU is fine +- 500K - 5M cells: GPU recommended +- > 5M cells: GPU required + +**Profiling mode** + +When enabled, runs Leiden at each resolution in the Resolution List, +reusing the expensive KNN and Jaccard computations across resolutions. +For N resolutions this saves (N-1) times the KNN+Jaccard cost. + +**GPU instance sizing** + +- A10G (24 GB): up to ~2M cells +- A100 (80 GB): up to ~15M cells + +**Outputs** + +- analysis: pickle file containing the AnnData object with cluster labels + in ``adata.obs`` and provenance in ``adata.uns['phenograph_clustering_gpu']``. + + ]]> + + @misc{spac_toolkit, + author = {FNLCR DMAP Team}, + title = {SPAC: SPAtial single-Cell analysis}, + year = {2022}, + url = {https://github.com/FNLCR-DMAP/SCSAWorkflow} +} + 10.1101/2025.04.02.646782 + +
diff --git a/tools/galaxy_tools/umap_transformation.xml b/tools/galaxy_tools/umap_transformation.xml index 97845f0d..36389326 100644 --- a/tools/galaxy_tools/umap_transformation.xml +++ b/tools/galaxy_tools/umap_transformation.xml @@ -1,21 +1,20 @@ - + Perform UMAP dimensionality reduction using a specific table.\ -Please refer to DUET Documentation and GitHub Documentation for further information. +Please refer to GitHub Documentation for further information. - nciccbr/spac:v0.9.1 + nciccbr/spac:v0.9.2 outputs_config.json && python '$__tool_directory__/format_values.py' 'galaxy_params.json' 'cleaned_params.json' --bool-values Run_on_HPC Batch_Mode --inject-outputs --outputs-config outputs_config.json && python -c "from spac.templates.umap_transformation_template import run_from_json; run_from_json('cleaned_params.json')" +echo '{"analysis": {"type": "file", "name": "output.pickle"}}' > outputs_config.json && python '$__tool_directory__/format_values.py' 'galaxy_params.json' 'cleaned_params.json' --inject-outputs --outputs-config outputs_config.json && python -c "from spac.templates.umap_transformation_template import run_from_json; run_from_json('cleaned_params.json')" ]]> - @@ -49,18 +48,6 @@ echo '{"analysis": {"type": "file", "name": "output.pickle"}}' > outputs_config. -
- - - - - - - - - - -
@@ -86,4 +73,4 @@ Please refer to DUET Documentation and GitHub Documentation for further informat 10.1101/2025.04.02.646782 10.48550/arXiv.2506.01560 -
\ No newline at end of file +
diff --git a/tools/spac_gpu/merge_labels.py b/tools/spac_gpu/merge_labels.py new file mode 100644 index 00000000..0d519c3b --- /dev/null +++ b/tools/spac_gpu/merge_labels.py @@ -0,0 +1,143 @@ +""" +/opt/spac-gpu/merge_labels.py — Stage 3 of the GPU orchestrator. + +Runs inside the 'preprocess' conda env of nciccbr/spac-gpu (where anndata +is available). Reads: + - adata.pickle (from stage 1) + - cluster_result_.npy (from stage 2, one per resolution) + - cleaned_params.json + +Writes labels into adata.obs. Single-resolution mode writes one column +named ``Output_Annotation_Name``. Profiling mode writes one column per +resolution, named ``--`` — matching the +Foundry/Biowulf convention so downstream consumers don't have to change. + +Final output uses ``spac.templates.template_utils.save_results`` to write +``output.pickle`` in the Galaxy work dir. + +Usage (from run_gpu.sh): + python merge_labels.py +""" +from __future__ import annotations + +import os +import pickle +import sys + +import numpy as np + +from spac.templates.template_utils import parse_params, save_results + + +def _resolutions(params): + profiling = bool(params.get("Profiling", False)) + res_list = params.get("Resolution_List") or [] + res_param = params.get("Resolution_Parameter", 1.0) + if profiling: + if not res_list: + raise ValueError("Profiling=True but Resolution_List is empty.") + return True, [float(r) for r in res_list] + return False, [float(res_param)] + + +def main(params_path: str, work_dir: str) -> None: + params = parse_params(params_path) + + # Switch into work_dir so save_results' relative output paths land there. + orig_dir = os.getcwd() + os.chdir(work_dir) + + try: + with open("adata.pickle", "rb") as f: + adata = pickle.load(f) + + k = int(params.get("K_Nearest_Neighbors", 30)) + seed = int(params.get("Seed", 42)) + n_iterations = int(params.get("Number_of_Iterations", 100)) + output_name = params.get("Output_Annotation_Name", "phenograph") + layer = params.get("Table_to_Process", "Original") + profiling, resolutions = _resolutions(params) + + print( + f" Merging labels: profiling={profiling}, k={k}, " + f"resolutions={resolutions}", + flush=True, + ) + print(f" AnnData before merge: {adata}", flush=True) + + for res in resolutions: + npy_path = f"cluster_result_{res}.npy" + if not os.path.exists(npy_path): + raise FileNotFoundError( + f"Expected {npy_path} from stage 2, not found." + ) + labels = np.load(npy_path) + + if len(labels) != adata.n_obs: + raise ValueError( + f"Label count {len(labels)} from {npy_path} does not " + f"match adata.n_obs {adata.n_obs}." + ) + + col_name = ( + f"{output_name}-{k}-{res}" if profiling else output_name + ) + # Store as string categorical — consistent with the CPU path's + # AnnData convention. + adata.obs[col_name] = labels.astype(str) + adata.obs[col_name] = adata.obs[col_name].astype("category") + + n_clusters = int(adata.obs[col_name].nunique()) + print( + f"\n Column '{col_name}': {n_clusters} clusters", + flush=True, + ) + print(" Top cluster sizes:", flush=True) + print( + adata.obs[col_name].value_counts().head(10).to_string(), + flush=True, + ) + + # Provenance + adata.uns.setdefault("phenograph_clustering_gpu", {}).update( + { + "k": k, + "seed": seed, + "n_iterations": n_iterations, + "resolutions": resolutions, + "profiling": profiling, + "layer": None if layer == "Original" else layer, + "output_annotation": output_name, + "backend": "grapheno", + "rapids_version": "22.08", + } + ) + + print(f"\n AnnData after merge: {adata}", flush=True) + + # Ensure outputs dict exists with the standard shape save_results + # expects, in case the caller didn't inject it. + if "outputs" not in params: + params["outputs"] = { + "analysis": {"type": "file", "name": "output.pickle"} + } + + saved = save_results( + results={"analysis": adata}, + params=params, + output_base_dir=".", + ) + print(f"\n Saved analysis: {saved['analysis']}", flush=True) + print(" Stage 3 complete.", flush=True) + finally: + os.chdir(orig_dir) + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print( + "Usage: merge_labels.py ", + file=sys.stderr, + ) + sys.exit(1) + main(sys.argv[1], sys.argv[2]) diff --git a/tools/spac_gpu/run_gpu.sh b/tools/spac_gpu/run_gpu.sh new file mode 100755 index 00000000..00a9bce8 --- /dev/null +++ b/tools/spac_gpu/run_gpu.sh @@ -0,0 +1,118 @@ +#!/bin/bash +# /opt/spac-gpu/run_gpu.sh +# ----------------------------------------------------------------------------- +# Orchestrator for PhenoGraph GPU clustering inside nciccbr/spac-gpu. +# +# This script exists because RAPIDS 22.08 pins numpy/pandas versions +# incompatible with anndata, so the container uses two conda envs: +# +# preprocess env: anndata + pandas 1.5 + numpy 1.26 +# rapids env: cuml 22.08 + cugraph 22.08 +# +# Three stages, each in the right env: +# +# Stage 1 (preprocess): pickle -> adata.pickle + features metadata +# (uses the Python template end-to-end minus clustering) +# Stage 2 (rapids): call grapheno via run_grapheno.py +# (writes cluster_result_.npy per resolution) +# Stage 3 (preprocess): merge labels back into adata, save output.pickle +# +# NOTE: When migrating to RAPIDS 24.12, delete this script and have the +# Galaxy XML call the template directly in a single conda env. +# ----------------------------------------------------------------------------- +# Usage: +# bash /opt/spac-gpu/run_gpu.sh +# ----------------------------------------------------------------------------- + +set -euo pipefail + +if [[ $# -ne 1 ]]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +PARAMS_JSON="$(realpath "$1")" +WORK_DIR="$(pwd)" + +if [[ ! -f "$PARAMS_JSON" ]]; then + echo "ERROR: params file not found: $PARAMS_JSON" >&2 + exit 1 +fi + +echo "==============================================================" +echo " SPAC PhenoGraph GPU Clustering (grapheno / RAPIDS 22.08)" +echo " Work dir : $WORK_DIR" +echo " Params : $PARAMS_JSON" +echo " GPU :" +nvidia-smi --query-gpu=name,memory.total --format=csv || echo " (nvidia-smi not available)" +echo "==============================================================" + +source /opt/conda/etc/profile.d/conda.sh + +# ----------------------------------------------------------------------------- +# STAGE 1 — preprocess env: load pickle, extract features matrix to disk +# ----------------------------------------------------------------------------- +echo +echo "[1/3] Loading pickle, extracting features (preprocess env)..." +conda activate preprocess + +python - <<'PYEOF' "$PARAMS_JSON" "$WORK_DIR" +import json, os, pickle, sys +import numpy as np + +import anndata # noqa (assert available) +from spac.templates.template_utils import load_input, parse_params +from spac.transform.clustering.phenograph import prepare_features + +params_path = sys.argv[1] +work_dir = sys.argv[2] + +params = parse_params(params_path) +input_path = params["Upstream_Analysis"] +layer = params.get("Table_to_Process", "Original") +if layer == "Original": + layer = None + +print(f" Loading: {input_path}", flush=True) +adata = load_input(input_path) +print(f" AnnData: {adata.n_obs:,} obs x {adata.n_vars} vars", flush=True) + +data, feature_names = prepare_features(adata, layer=layer) +print( + f" Features matrix: {data.shape[0]:,} x {data.shape[1]} " + f"(layer='{layer or 'X'}')", + flush=True, +) + +# Write intermediates that stage 2 (rapids env) will consume +np.save(os.path.join(work_dir, "features.npy"), data) +with open(os.path.join(work_dir, "features_meta.json"), "w") as f: + json.dump({"feature_names": feature_names, "n_obs": int(adata.n_obs)}, f) +with open(os.path.join(work_dir, "adata.pickle"), "wb") as f: + pickle.dump(adata, f) + +print(" Stage 1 complete.", flush=True) +PYEOF + +# ----------------------------------------------------------------------------- +# STAGE 2 — rapids env: run grapheno clustering, one or more resolutions +# ----------------------------------------------------------------------------- +echo +echo "[2/3] Running grapheno clustering (rapids env)..." +conda activate rapids + +python -u /opt/spac-gpu/run_grapheno.py "$PARAMS_JSON" "$WORK_DIR" + +# ----------------------------------------------------------------------------- +# STAGE 3 — preprocess env: merge labels back, save output.pickle +# ----------------------------------------------------------------------------- +echo +echo "[3/3] Merging labels into AnnData (preprocess env)..." +conda activate preprocess + +python -u /opt/spac-gpu/merge_labels.py "$PARAMS_JSON" "$WORK_DIR" + +echo +echo "==============================================================" +echo " PhenoGraph GPU clustering completed." +echo "==============================================================" diff --git a/tools/spac_gpu/run_grapheno.py b/tools/spac_gpu/run_grapheno.py new file mode 100644 index 00000000..40ad9d36 --- /dev/null +++ b/tools/spac_gpu/run_grapheno.py @@ -0,0 +1,133 @@ +""" +/opt/spac-gpu/run_grapheno.py — Stage 2 of the GPU orchestrator. + +Runs inside the 'rapids' conda env of nciccbr/spac-gpu. Reads the feature +matrix written by stage 1 (``features.npy``), runs grapheno clustering at +one or more resolutions, and writes cluster labels as .npy files for stage 3. + +In profiling mode (multiple resolutions at same k), grapheno's parquet +caching reuses KNN and Jaccard across resolutions — only Leiden re-runs. + +Usage (from run_gpu.sh): + python run_grapheno.py +""" +from __future__ import annotations + +import json +import os +import sys +import time + +import numpy as np + +from spac.transform.clustering.phenograph.gpu.grapheno import cluster_from_array + + +def _resolutions(params): + profiling = bool(params.get("Profiling", False)) + res_list = params.get("Resolution_List") or [] + res_param = params.get("Resolution_Parameter", 1.0) + if profiling: + if not res_list: + raise ValueError( + "Profiling=True but Resolution_List is empty. Add resolutions " + "in the Galaxy UI or disable Profiling." + ) + return True, [float(r) for r in res_list] + return False, [float(res_param)] + + +def main(params_path: str, work_dir: str) -> None: + with open(params_path) as f: + params = json.load(f) + + with open(os.path.join(work_dir, "features_meta.json")) as f: + meta = json.load(f) + expected_n_obs = int(meta["n_obs"]) + + data = np.load(os.path.join(work_dir, "features.npy")) + if data.shape[0] != expected_n_obs: + raise RuntimeError( + f"features.npy has {data.shape[0]} rows but features_meta.json " + f"says {expected_n_obs}. Stage 1 output is inconsistent." + ) + + k = int(params.get("K_Nearest_Neighbors", 30)) + seed = int(params.get("Seed", 42)) + n_iterations = int(params.get("Number_of_Iterations", 100)) + profiling, resolutions = _resolutions(params) + + print( + f" data: {data.shape[0]:,} cells x {data.shape[1]} features", + flush=True, + ) + print( + f" k={k}, seed={seed}, n_iterations={n_iterations}", + flush=True, + ) + print( + f" mode: {'profiling' if profiling else 'single'}, " + f"resolutions={resolutions}", + flush=True, + ) + + # Run grapheno in the work dir so its parquet caches (used for profiling + # across resolutions) land next to our other intermediates and get + # cleaned up by Galaxy at job end. + orig_dir = os.getcwd() + os.chdir(work_dir) + + try: + for i, res in enumerate(resolutions, 1): + print( + f"\n --- Resolution {res} ({i}/{len(resolutions)}) ---", + flush=True, + ) + t0 = time.time() + + labels, modularity, elapsed = cluster_from_array( + data=data, + n_neighbors=k, + resolution=res, + random_state=seed, + n_iterations=n_iterations, + min_size=10, + work_dir=".", + ) + + if len(labels) != expected_n_obs: + raise RuntimeError( + f"Grapheno returned {len(labels)} labels at res={res} " + f"but adata has {expected_n_obs} obs. Refusing to write " + f"mismatched labels." + ) + + out_path = f"cluster_result_{res}.npy" + np.save(out_path, labels) + + n_clusters = int(np.unique(labels).size) + n_valid = int((labels >= 0).sum()) + n_small = int((labels < 0).sum()) + wall = time.time() - t0 + + print( + f" res={res}: {n_clusters} clusters " + f"({n_valid:,} assigned, {n_small:,} < min_size), " + f"modularity={modularity:.4f}, time={wall:.1f}s, " + f"wrote {out_path}", + flush=True, + ) + finally: + os.chdir(orig_dir) + + print("\n Stage 2 complete.", flush=True) + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print( + "Usage: run_grapheno.py ", + file=sys.stderr, + ) + sys.exit(1) + main(sys.argv[1], sys.argv[2])