Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ requests = ">=2.20"
pandas = ">=1.0.0"
pyam-iamc = ">=2.0.0"
nomenclature-iamc = ">=0.29"
eurostat = ">=1.1.1"
pypsa-validation-processing = { path = ".", editable = true }

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"pypsa",
"pyam-iamc",
"nomenclature-iamc",
"eurostat",
]

[project.urls]
Expand Down
132 changes: 123 additions & 9 deletions pypsa_validation_processing/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
"""Static information and general utility functions for pypsa_validation_processing."""

EU27_COUNTRY_CODES: dict[str, str] = {
from __future__ import annotations

import re
import warnings
from functools import lru_cache

try:
import eurostat
except Exception as error: # pragma: no cover
EUROSTAT_IMPORT_ERROR = error
eurostat = None
else:
EUROSTAT_IMPORT_ERROR = None

try:
from requests import RequestException
except Exception: # pragma: no cover
RequestException = RuntimeError

EUROSTAT_MAPPING_ERRORS = (
RuntimeError,
TypeError,
ValueError,
KeyError,
RequestException,
)


FALLBACK_COUNTRY_CODES: dict[str, str] = {
"AL": "Albania",
"AT": "Austria",
"BA": "Bosnia and Herzegovina",
Expand All @@ -16,8 +44,6 @@
"FI": "Finland",
"FR": "France",
"GB": "United Kingdom",
"GB0": "United Kingdom",
"GB1": "United Kingdom-Northern Ireland",
"GR": "Greece",
"HR": "Croatia",
"HU": "Hungary",
Expand All @@ -42,7 +68,10 @@
"EU": "European Union",
}


COUNTRIES_SPECIAL_CASES: dict[str, str] = {
"GB0": "United Kingdom",
"GB1": "United Kingdom-Northern Ireland",
"DE1": "Germany South-West",
"DE2": "Germany South-East",
"DE3": "Germany West",
Expand All @@ -60,7 +89,7 @@
}


NUTS_2_REGIONS: dict[str, str] = {
FALLBACK_NUTS_2_REGIONS: dict[str, str] = {
"AT11": "Burgenland",
"AT12": "Lower Austria",
"AT13": "Vienna",
Expand All @@ -69,11 +98,12 @@
"AT31": "Upper Austria",
"AT32": "Salzburg",
"AT33": "Tyrol",
# Keep historical model key for compatibility; fallback NUTS3 keeps the same key.
"AT333": "East Tyrol",
"AT34": "Vorarlberg",
}

NUTS_3_REGIONS: dict[str, str] = {
FALLBACK_NUTS_3_REGIONS: dict[str, str] = {
"AT111": "Mittelburgenland",
"AT112": "Nordburgenland",
"AT113": "Südburgenland",
Expand Down Expand Up @@ -112,10 +142,94 @@
"ATXXX": "Not regionalised/Unknown NUTS 3",
"ATZZZ": "Extra-Regio NUTS 3",
}
# NUTS 3 overwrites NUTS2, relevant because of language change of East Tyrol -> Osttirol
REGION_MAPPING = (
EU27_COUNTRY_CODES | COUNTRIES_SPECIAL_CASES | NUTS_2_REGIONS | NUTS_3_REGIONS
)


def _warn_and_return_fallback(
name: str, fallback: dict[str, str], error: Exception | None = None
) -> dict[str, str]:
if error is not None:
warnings.warn(
f"Eurostat mapping fallback for {name}: {error}",
RuntimeWarning,
stacklevel=2,
)
return dict(sorted(fallback.items()))


def _get_eurostat_geo_dic(dataset: str) -> dict[str, str]:
if eurostat is None:
raise RuntimeError(
"eurostat package is not available (install with: pixi add eurostat)"
) from EUROSTAT_IMPORT_ERROR
geo_dic = eurostat.get_dic(dataset, par="geo", frmt="dict", lang="en")
if not isinstance(geo_dic, dict):
raise TypeError(f"Unexpected Eurostat payload for {dataset}: {type(geo_dic)!r}")
return geo_dic


def _filter_geo_codes(
geo_dic: dict[str, str], pattern: str, country_prefix: str | None = None
) -> dict[str, str]:
return dict(
sorted(
{
code: name
for code, name in geo_dic.items()
if isinstance(code, str)
and isinstance(name, str)
and re.match(pattern, code)
and (country_prefix is None or code.startswith(country_prefix))
}.items()
)
)


@lru_cache(maxsize=1)
def get_country_mapping() -> dict[str, str]:
try:
mapping = _filter_geo_codes(
_get_eurostat_geo_dic("nama_10_gdp"), pattern=r"^[A-Z]{2}$"
)
if not mapping:
return _warn_and_return_fallback("countries", FALLBACK_COUNTRY_CODES)
return mapping
except EUROSTAT_MAPPING_ERRORS as error: # pragma: no cover - tests cover this path
return _warn_and_return_fallback("countries", FALLBACK_COUNTRY_CODES, error)


@lru_cache(maxsize=4)
def get_nuts_mapping(level: int, country_prefix: str | None = None) -> dict[str, str]:
if level not in (2, 3):
raise ValueError("NUTS mapping level must be 2 or 3")

dataset = "nama_10r_2gdp" if level == 2 else "nama_10r_3gdp"
pattern = r"^[A-Z]{2}[A-Z0-9]{2}$" if level == 2 else r"^[A-Z]{2}[A-Z0-9]{3}$"
fallback = FALLBACK_NUTS_2_REGIONS if level == 2 else FALLBACK_NUTS_3_REGIONS

try:
mapping = _filter_geo_codes(
_get_eurostat_geo_dic(dataset), pattern=pattern, country_prefix=country_prefix
)
if not mapping:
return _warn_and_return_fallback(f"nuts{level}", fallback)
return mapping
except EUROSTAT_MAPPING_ERRORS as error: # pragma: no cover - tests cover this path
return _warn_and_return_fallback(f"nuts{level}", fallback, error)


def create_region_mapping() -> dict[str, str]:
return (
get_country_mapping()
| get_nuts_mapping(level=2)
| get_nuts_mapping(level=3)
| COUNTRIES_SPECIAL_CASES
)


EU27_COUNTRY_CODES = get_country_mapping()
NUTS_2_REGIONS = get_nuts_mapping(level=2)
NUTS_3_REGIONS = get_nuts_mapping(level=3)
REGION_MAPPING = create_region_mapping()

UNITS_MAPPING = {
"MWh_el": "MWh",
Expand Down
20 changes: 20 additions & 0 deletions tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,26 @@ def test_preserves_locations_when_all_countries_region(self, tmp_path: Path):
iam_df = processor.structure_pyam_from_pandas(df)
assert set(iam_df.region) == {"AT1", "DE1"}

def test_maps_region_codes_when_enabled_in_region_mode(self, tmp_path: Path):
processor = _make_processor(
tmp_path,
aggregation_level="region",
country="all",
map_country_codes_to_names=True,
)
processor.common_dsd = None
idx = pd.MultiIndex.from_tuples(
[("Test|Var", "AT11", "MWh"), ("Test|Var", "DE11", "MWh")],
names=["variable", "location", "unit"],
)
df = pd.DataFrame({2020: [100.0, 200.0]}, index=idx)
with patch(
"pypsa_validation_processing.class_definitions.REGION_MAPPING",
{"AT11": "Burgenland", "DE11": "Stuttgart"},
):
iam_df = processor.structure_pyam_from_pandas(df)
assert set(iam_df.region) == {"Burgenland", "Stuttgart"}


# ---------------------------------------------------------------------------
# Tests for backward compatibility
Expand Down
56 changes: 56 additions & 0 deletions tests/test_network_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,62 @@ def test_structure_pyam_preserves_original_model_and_scenario_metadata(
assert mock_iam.call_args.kwargs["model"] == " Model Name "
assert mock_iam.call_args.kwargs["scenario"] == " Scenario\tName "

def test_structure_pyam_maps_location_when_enabled(self, tmp_path: Path):
processor = self._setup_processor(tmp_path)
processor.aggregation_level = "region"
processor.map_country_codes_to_names = True
processor.common_dsd = None

df = pd.DataFrame(
{pd.Timestamp("2020-01-01"): [1000.0]},
index=pd.MultiIndex.from_tuples(
[("Final Energy|Electricity", "AT11", "MWh_el")],
names=["variable", "location", "unit"],
),
)

with patch(
"pypsa_validation_processing.class_definitions.REGION_MAPPING",
{"AT11": "Burgenland"},
):
with patch(
"pypsa_validation_processing.class_definitions.pyam.IamDataFrame"
) as mock_iam:
mock_iam.return_value = MagicMock()
processor.structure_pyam_from_pandas(df)

mapped_df = mock_iam.call_args.kwargs["data"]
assert mapped_df["location"].tolist() == ["Burgenland"]

def test_structure_pyam_keeps_location_code_when_mapping_disabled(
self, tmp_path: Path
):
processor = self._setup_processor(tmp_path)
processor.aggregation_level = "region"
processor.map_country_codes_to_names = False
processor.common_dsd = None

df = pd.DataFrame(
{pd.Timestamp("2020-01-01"): [1000.0]},
index=pd.MultiIndex.from_tuples(
[("Final Energy|Electricity", "AT11", "MWh_el")],
names=["variable", "location", "unit"],
),
)

with patch(
"pypsa_validation_processing.class_definitions.REGION_MAPPING",
{"AT11": "Burgenland"},
):
with patch(
"pypsa_validation_processing.class_definitions.pyam.IamDataFrame"
) as mock_iam:
mock_iam.return_value = MagicMock()
processor.structure_pyam_from_pandas(df)

mapped_df = mock_iam.call_args.kwargs["data"]
assert mapped_df["location"].tolist() == ["AT11"]


# ---------------------------------------------------------------------------
# Tests for file I/O and network loading
Expand Down
98 changes: 84 additions & 14 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,99 @@
"""Tests for pypsa_validation_processing.utils."""

from __future__ import annotations

from unittest.mock import patch

import pytest

from pypsa_validation_processing.utils import REGION_MAPPING
import pypsa_validation_processing.utils as utils


@pytest.fixture(autouse=True)
def _clear_mapping_caches():
utils.get_country_mapping.cache_clear()
utils.get_nuts_mapping.cache_clear()
yield
utils.get_country_mapping.cache_clear()
utils.get_nuts_mapping.cache_clear()


class TestRegionsCodes:
def test_is_dict(self):
assert isinstance(REGION_MAPPING, dict)
def test_region_mapping_is_dict_of_strings(self):
assert isinstance(utils.REGION_MAPPING, dict)
assert all(isinstance(key, str) for key in utils.REGION_MAPPING)
assert all(isinstance(value, str) for value in utils.REGION_MAPPING.values())

def test_has_all_27_member_states(self):
# All 27 EU member state ISO codes must be present
def test_region_mapping_has_required_eu_member_state_codes(self):
expected_codes = {
"AT", "BE", "BG", "CY", "CZ", "DE", "DK", "EE", "ES", "FI",
"FR", "GR", "HR", "HU", "IE", "IT", "LT", "LU", "LV", "MT",
"NL", "PL", "PT", "RO", "SE", "SI", "SK",
}
assert expected_codes.issubset(REGION_MAPPING.keys())
assert expected_codes.issubset(utils.REGION_MAPPING)

def test_region_mapping_contains_nuts2_and_nuts3_codes(self):
assert "AT11" in utils.REGION_MAPPING
assert "AT111" in utils.REGION_MAPPING


class TestEurostatMappingLoaders:
def test_create_region_mapping_success_path_uses_eurostat_data(self):
def _mock_eurostat_geo_dic(dataset: str) -> dict[str, str]:
if dataset == "nama_10_gdp":
return {"AT": "Austria", "DE": "Germany", "DE1": "invalid"}
if dataset == "nama_10r_2gdp":
return {"AT11": "Burgenland", "ATX": "invalid"}
if dataset == "nama_10r_3gdp":
return {"AT111": "Mittelburgenland", "AT11": "invalid"}
raise AssertionError(dataset)

with patch.object(
utils, "_get_eurostat_geo_dic", side_effect=_mock_eurostat_geo_dic
):
region_mapping = utils.create_region_mapping()

assert region_mapping["AT"] == "Austria"
assert region_mapping["AT11"] == "Burgenland"
assert region_mapping["AT111"] == "Mittelburgenland"
assert "ATX" not in region_mapping
assert "DE1" in region_mapping # from special cases
assert region_mapping["DE1"] == utils.COUNTRIES_SPECIAL_CASES["DE1"]

def test_create_region_mapping_falls_back_when_eurostat_fails(self):
with patch.object(utils, "_get_eurostat_geo_dic", side_effect=RuntimeError("boom")):
region_mapping = utils.create_region_mapping()

assert region_mapping["AT"] == "Austria"
assert region_mapping["AT11"] == "Burgenland"
assert region_mapping["AT111"] == "Mittelburgenland"

def test_create_region_mapping_partial_data_uses_fallback_for_missing_dataset(self):
def _partial_geo(dataset: str) -> dict[str, str]:
if dataset == "nama_10_gdp":
return {"AT": "Austria", "DE": "Germany"}
if dataset == "nama_10r_2gdp":
return {"AT11": "Burgenland"}
if dataset == "nama_10r_3gdp":
return {}
raise AssertionError(dataset)

with patch.object(utils, "_get_eurostat_geo_dic", side_effect=_partial_geo):
region_mapping = utils.create_region_mapping()

assert region_mapping["AT11"] == "Burgenland"
assert region_mapping["AT111"] == utils.FALLBACK_NUTS_3_REGIONS["AT111"]

def test_special_cases_override_other_mapping_sources(self):
def _mock_nuts_mapping(
level: int, country_prefix: str | None = None
) -> dict[str, str]:
assert country_prefix is None
assert level in (2, 3)
return {"DE1": f"NUTS{level} DE1"}

def test_sample_mappings(self):
assert REGION_MAPPING["AT"] == "Austria"
assert REGION_MAPPING["DE"] == "Germany"
assert REGION_MAPPING["FR"] == "France"
with patch.object(utils, "get_country_mapping", return_value={"DE1": "Country DE1"}):
with patch.object(utils, "get_nuts_mapping", side_effect=_mock_nuts_mapping):
region_mapping = utils.create_region_mapping()

def test_values_are_strings(self):
for key, value in REGION_MAPPING.items():
assert isinstance(key, str), f"Key {key!r} is not a string"
assert isinstance(value, str), f"Value {value!r} for key {key!r} is not a string"
assert region_mapping["DE1"] == utils.COUNTRIES_SPECIAL_CASES["DE1"]