diff --git a/pypsa_validation_processing/configs/mapping.default.yaml b/pypsa_validation_processing/configs/mapping.default.yaml index 12f27c9..1644152 100644 --- a/pypsa_validation_processing/configs/mapping.default.yaml +++ b/pypsa_validation_processing/configs/mapping.default.yaml @@ -6,6 +6,7 @@ # Primary Energy|Coal: primary_energy_coal Final Energy [by Carrier]|Electricity: Final_Energy_by_Carrier__Electricity +Final Energy [by Carrier]|Oil: Final_Energy_by_Carrier__Oil Final Energy [by Sector]|Transportation: Final_Energy_by_Sector__Transportation Final Energy [by Sector]|Industry: Final_Energy_by_Sector__Industry Final Energy [by Sector]|Agriculture: Final_Energy_by_Sector__Agriculture \ No newline at end of file diff --git a/pypsa_validation_processing/statistics_functions.py b/pypsa_validation_processing/statistics_functions.py index b7afb71..8c8c7a0 100644 --- a/pypsa_validation_processing/statistics_functions.py +++ b/pypsa_validation_processing/statistics_functions.py @@ -26,13 +26,14 @@ def (network: pypsa.Network) -> pd.Series: import pandas as pd import numpy as np import pypsa -from pypsa_validation_processing.utils import statistics_kwargs as kwargs from pypsa_validation_processing.utils import ( statistics_kwargs_for_filtering as kwargs_filtering, + statistics_kwargs as kwargs, + UNITS_MAPPING, ) from pypsa_validation_processing.utils import ( - statistics_grouping_index, get_energy_totals_domestic_share, + create_location_index_from_cupperplate, ) @@ -133,6 +134,181 @@ def Final_Energy_by_Carrier__Electricity( return result +def Final_Energy_by_Carrier__Oil( + n: pypsa.Network, + aggregate_per_year: bool = True, +) -> pd.Series | pd.DataFrame: + """Extract fossil final-energy oil demand from a PyPSA Network. + + Returns the final energy from oil carriers after removing an estimated + renewable-oil share. + + Parameters + ---------- + n : pypsa.Network + PyPSA network to process. + aggregate_per_year : bool, optional + If ``True`` (default), aggregate over all snapshots and return a + :class:`pandas.Series`. If ``False``, return a + :class:`pandas.DataFrame` with snapshots as columns. + + Returns + ------- + pd.Series | pd.DataFrame + Fossil oil final energy with MultiIndex including ``location`` and + ``unit``. + Returns data at regional level as provided by the PyPSA network. + Country-level aggregation is handled by + Network_Processor._aggregate_to_country() if configured. + + Notes + ----- + Total oil final energy is built from: + - agriculture machinery oil (Load), + - residential/commercial oil boiler demand (rural + urban decentral), + - land transport oil (Load). + + ``naphtha for industry`` is intentionally excluded because it is treated + as non-energy use and therefore not part of Final Energy variables. + + Regionalization from the copperplate topology is performed by deriving a + region code from demand- or production-bus names and applying + :func:`create_location_index_from_cupperplate` before regrouping to + ``kwargs["groupby"]``. + + The renewable-oil fraction is computed per region as: + + ``renewable oil production in region / total oil demand in region``. + + Renewable production is based on supply from selected renewable-oil + carriers, while total oil demand is based on withdrawals from oil-using + carriers. If the fraction exceeds 1 (i.e., renewable production is larger + than regional oil demand), it is clipped to 1, so the fossil share becomes + zero in that region. Cross-regional export/import effects of renewable oil + are not represented in this statistic. + + ``UNITS_MAPPING`` is applied inside this function to enable multiplication + with demand-side units of renewable-oil-fraction``. + """ + + # Final Energy|Agricultur|Liquids - agriculture machinery oil + agri = n.statistics.withdrawal( + carrier="agriculture machinery oil", + components="Load", + aggregate_time=aggregate_per_year, + **kwargs, + ) + + # Final Energy|Residential and Commercial|Liquids - urban decentral oil boiler, rural oil boiler + raw_rescom = n.statistics.withdrawal( + bus_carrier="oil", + carrier=["rural oil boiler", "urban decentral oil boiler"], + groupby=kwargs_filtering["groupby"] + ["bus1"], + aggregate_time=aggregate_per_year, + ) + if raw_rescom.empty: + rescom = raw_rescom + else: + raw_rescom = raw_rescom.drop("Store", errors="ignore") + usage_location = [ + bus.split(" ")[0] for bus in list(raw_rescom.index.get_level_values("bus1")) + ] + rescom = ( + create_location_index_from_cupperplate(raw_rescom, usage_location) + .groupby(kwargs["groupby"]) + .sum() + ) + + # Final Energy|Transportation|Liquids + transpo = n.statistics.withdrawal( + carrier="land transport oil", + components="Load", + aggregate_time=aggregate_per_year, + **kwargs, + ) + + series_list = [ + agri, + rescom, + transpo, + ] + series_list = [series for series in series_list if not series.empty] + + total = pd.concat(series_list).groupby(kwargs["groupby"]).sum() + + # non-fossil parts from renewable-oil production per location + # renewable oil production + non_fossil_parts = n.statistics.supply( + bus_carrier="oil", + carrier=[ + "unsustainable bioliquids", + "biomass to liquid", + "biomass to liquid CC", + "electrobiofuels", + "Fischer-Tropsch", + ], + at_port="bus1", + components="Link", + groupby=kwargs_filtering["groupby"] + ["bus0"], + ) + home_location = [ + bus.split(" ")[0] + for bus in list(non_fossil_parts.index.get_level_values("bus0")) + ] + + non_fossil_parts = create_location_index_from_cupperplate( + non_fossil_parts, home_location + ) + non_fossil_parts = non_fossil_parts.groupby(kwargs["groupby"]).sum() + + # all oil use + all_oil = n.statistics.withdrawal( + bus_carrier="oil", + carrier=[ + "land transport oil", + "naphtha for industry", + "shipping oil", + "kerosene for aviation", + "agriculture machinery oil", + "urban central oil CHP", + "oil", + "rural oil boiler", + "urban decentral oil boiler", + ], + components="Link", + at_port="bus0", + groupby=["bus1", "carrier", "location", "unit"], + ) + + home_location = [ + bus.split(" ")[0] for bus in list(all_oil.index.get_level_values("bus1")) + ] + + all_oil = create_location_index_from_cupperplate(all_oil, home_location) + all_oil = all_oil.groupby(kwargs["groupby"]).sum() + + non_fossil_fraction = non_fossil_parts.div(all_oil) + zero_oil = all_oil.eq(0).reindex(non_fossil_fraction.index, fill_value=False) + non_fossil_fraction = non_fossil_fraction.mask(zero_oil, 1.0) + + non_fossil_fraction = non_fossil_fraction.clip(upper=1) # TODO: Issue #53 + non_fossil_fraction = non_fossil_fraction.rename(index=UNITS_MAPPING) + non_fossil_fraction = non_fossil_fraction.groupby( + kwargs["groupby"] + ).mean() # avoid double-indexing + total = total.rename(index=UNITS_MAPPING) + total = total.groupby(kwargs["groupby"]).sum() + + # cover edge-case with no oil demand + if all_oil.empty: + non_fossil_fraction = total * 0.0 + 1.0 + else: + non_fossil_fraction = non_fossil_fraction.reindex_like(total).fillna(0.0) + + fossil_oil = total.mul(1 - non_fossil_fraction, axis=0) + return fossil_oil + + def Final_Energy_by_Sector__Transportation( n: pypsa.Network, aggregate_per_year: bool = True, diff --git a/pypsa_validation_processing/utils.py b/pypsa_validation_processing/utils.py index f2ff5ab..7b149d0 100644 --- a/pypsa_validation_processing/utils.py +++ b/pypsa_validation_processing/utils.py @@ -128,6 +128,7 @@ "land transport": "MWh", "t_co2": "t", "": "", + "MWh": "MWh", } ## standards for statistics-functions @@ -172,3 +173,43 @@ def get_energy_totals_domestic_share( domestic = energy_totals[f"total domestic {kind}"] international = energy_totals[f"total international {kind}"] return (domestic / (domestic + international)).values[0] + + +def create_location_index_from_cupperplate( + raw_input: pd.Series | pd.DataFrame, usage_location_list: list +): + """ + Replace the ``location`` level values of an indexed object. + + This helper rebuilds the index of ``raw_input`` from its index frame and + overwrites the ``location`` column with values from + ``usage_location_list``. It is mainly used when location information from + a copperplate-carrier result must be mapped back to explicit regional labels. + + Parameters + ---------- + raw_input : pandas.Series or pandas.DataFrame + Input object with a (Multi)Index that includes a ``location`` level. + The function preserves data values and index level order/names. + usage_location_list : list + New location values to assign row-by-row. Must have the same length as + ``raw_input``. + + Returns + ------- + pandas.Series or pandas.DataFrame + A copy of ``raw_input`` with the same data and a rebuilt index where + the ``location`` level has been replaced. + + Raises + ------ + ValueError + If ``usage_location_list`` length does not match the number of rows, or + if the index cannot be reconstructed with the existing index names. + """ + idx_df = raw_input.index.to_frame(index=False) + idx_df["location"] = pd.Index(usage_location_list).to_numpy() + new_index = pd.MultiIndex.from_frame(idx_df, names=raw_input.index.names) + output = raw_input.copy() + output.index = new_index + return output diff --git a/tests/test_statistics_functions.py b/tests/test_statistics_functions.py index a73c3fd..6f992c4 100644 --- a/tests/test_statistics_functions.py +++ b/tests/test_statistics_functions.py @@ -7,6 +7,7 @@ from pypsa_validation_processing.statistics_functions import ( Final_Energy_by_Carrier__Electricity, + Final_Energy_by_Carrier__Oil, Final_Energy_by_Sector__Industry, Final_Energy_by_Sector__Agriculture, Final_Energy_by_Sector__Transportation, @@ -219,6 +220,284 @@ def test_issues_expected_withdrawal_queries(self): assert calls[4]["components"] == "Link" +# --------------------------------------------------------------------------- +# Tests for Final_Energy_by_Carrier__Oil +# --------------------------------------------------------------------------- + + +class TestFinalEnergyByCarrierOil: + """Test suite for Final_Energy_by_Carrier__Oil function.""" + + class _OilStatisticsAccessor: + """Deterministic accessor tailored to oil final-energy tests.""" + + def __init__( + self, + *, + rescom_empty: bool = False, + all_oil_value: float = 200.0, + all_oil_empty: bool = False, + non_fossil_empty: bool = False, + ): + self.rescom_empty = rescom_empty + self.all_oil_value = all_oil_value + self.all_oil_empty = all_oil_empty + self.non_fossil_empty = non_fossil_empty + + def _to_result( + self, + *, + index: pd.MultiIndex, + values: list[float], + aggregate_time: bool, + ) -> pd.Series | pd.DataFrame: + if aggregate_time: + return pd.Series(values, index=index, dtype=float) + timestamps = pd.date_range( + "2019-01-01", periods=4, freq="6h", name="snapshot" + ) + return pd.DataFrame( + {ts: values for ts in timestamps}, index=index, dtype=float + ) + + def withdrawal( + self, + bus_carrier: str | None = None, + carrier: list[str] | str | None = None, + components: str | list[str] | None = None, + aggregate_time: bool = True, + groupby: list[str] | None = None, + at_port: str | None = None, + **kwargs: object, + ) -> pd.Series | pd.DataFrame: + if groupby is None: + groupby = ["location", "unit"] + + # Agriculture and land-transport final demand (Load) + if carrier == "agriculture machinery oil" and components == "Load": + idx = pd.MultiIndex.from_tuples( + [("AT1", "MWh_th")], names=["location", "unit"] + ) + return self._to_result( + index=idx, values=[100.0], aggregate_time=aggregate_time + ) + + if carrier == "land transport oil" and components == "Load": + idx = pd.MultiIndex.from_tuples( + [("AT1", "MWh_th")], names=["location", "unit"] + ) + return self._to_result( + index=idx, values=[300.0], aggregate_time=aggregate_time + ) + + # Residential/commercial demand requiring copperplate -> location mapping via bus1 + if ( + bus_carrier == "oil" + and isinstance(carrier, list) + and set(carrier) == {"rural oil boiler", "urban decentral oil boiler"} + ): + idx_names = ["name", "bus", "carrier", "location", "unit", "bus1"] + if self.rescom_empty: + empty_idx = pd.MultiIndex.from_arrays( + [[] for _ in idx_names], names=idx_names + ) + return self._to_result( + index=empty_idx, + values=[], + aggregate_time=aggregate_time, + ) + idx = pd.MultiIndex.from_tuples( + [ + ( + "rural_boiler_load", + "AT1 oil", + "rural oil boiler", + "EU", + "MWh_th", + "AT1 oil", + ), + ( + "urban_boiler_load", + "AT1 oil", + "urban decentral oil boiler", + "EU", + "MWh_th", + "AT1 oil", + ), + ], + names=idx_names, + ) + return self._to_result( + index=idx, + values=[50.0, 50.0], + aggregate_time=aggregate_time, + ) + + # Total oil use denominator for non-fossil share + if ( + bus_carrier == "oil" + and components == "Link" + and at_port == "bus0" + and groupby == ["bus1", "carrier", "location", "unit"] + ): + if self.all_oil_empty: + empty_idx = pd.MultiIndex.from_arrays( + [[], [], [], []], + names=["bus1", "carrier", "location", "unit"], + ) + return self._to_result( + index=empty_idx, + values=[], + aggregate_time=aggregate_time, + ) + idx = pd.MultiIndex.from_tuples( + [ + ( + "AT1 oil", + "land transport oil", + "EU", + "MWh_th", + ) + ], + names=["bus1", "carrier", "location", "unit"], + ) + return self._to_result( + index=idx, + values=[self.all_oil_value], + aggregate_time=aggregate_time, + ) + + raise AssertionError( + f"Unexpected withdrawal call: bus_carrier={bus_carrier}, carrier={carrier}, components={components}, groupby={groupby}, at_port={at_port}" + ) + + def supply( + self, + bus_carrier: str | None = None, + carrier: list[str] | str | None = None, + at_port: str | None = None, + components: str | list[str] | None = None, + groupby: list[str] | None = None, + aggregate_time: bool = True, + **kwargs: object, + ) -> pd.Series | pd.DataFrame: + if ( + bus_carrier == "oil" + and components == "Link" + and at_port == "bus1" + and groupby == ["name", "bus", "carrier", "location", "unit", "bus0"] + ): + if self.non_fossil_empty: + empty_idx = pd.MultiIndex.from_arrays( + [[], [], [], [], [], []], + names=["name", "bus", "carrier", "location", "unit", "bus0"], + ) + return self._to_result( + index=empty_idx, + values=[], + aggregate_time=aggregate_time, + ) + idx = pd.MultiIndex.from_tuples( + [ + ( + "renewable_oil_link", + "AT1 oil", + "biomass to liquid", + "EU", + "MWh_th", + "AT1 oil", + ) + ], + names=["name", "bus", "carrier", "location", "unit", "bus0"], + ) + return self._to_result( + index=idx, values=[500.0], aggregate_time=aggregate_time + ) + + raise AssertionError( + f"Unexpected supply call: bus_carrier={bus_carrier}, carrier={carrier}, components={components}, groupby={groupby}, at_port={at_port}" + ) + + class _OilNetwork: + """Minimal network object exposing only the statistics accessor.""" + + def __init__( + self, + *, + rescom_empty: bool = False, + all_oil_value: float = 200.0, + all_oil_empty: bool = False, + non_fossil_empty: bool = False, + ): + self.statistics = TestFinalEnergyByCarrierOil._OilStatisticsAccessor( + rescom_empty=rescom_empty, + all_oil_value=all_oil_value, + all_oil_empty=all_oil_empty, + non_fossil_empty=non_fossil_empty, + ) + + def test_clips_non_fossil_share_above_one_to_zero_fossil(self): + """Renewable oil production above total demand should yield zero fossil oil.""" + result = Final_Energy_by_Carrier__Oil(self._OilNetwork()) + + assert isinstance(result, pd.Series) + assert isinstance(result.index, pd.MultiIndex) + assert result.index.names == ["location", "unit"] + assert result.loc[("AT1", "MWh")] == pytest.approx(0.0) + + def test_handles_empty_rescom_without_failing(self): + """Function should work even when residential/commercial oil demand is empty.""" + result = Final_Energy_by_Carrier__Oil(self._OilNetwork(rescom_empty=True)) + + assert isinstance(result, pd.Series) + assert isinstance(result.index, pd.MultiIndex) + assert result.index.names == ["location", "unit"] + # non-fossil fraction is clipped to 1, so fossil share remains zero. + assert result.loc[("AT1", "MWh")] == pytest.approx(0.0) + + def test_handles_zero_total_oil_demand_denominator(self): + """Division by zero in non-fossil share denominator should not crash.""" + result = Final_Energy_by_Carrier__Oil(self._OilNetwork(all_oil_value=0.0)) + + assert isinstance(result, pd.Series) + assert result.loc[("AT1", "MWh")] == pytest.approx(0.0) + + def test_no_renewable_oil_production_fossil_equals_total(self): + """Without renewable oil supply, fossil oil should equal total oil demand.""" + result = Final_Energy_by_Carrier__Oil(self._OilNetwork(non_fossil_empty=True)) + + assert isinstance(result, pd.Series) + assert isinstance(result.index, pd.MultiIndex) + assert result.index.names == ["location", "unit"] + assert not result.isna().any() + # 100 (agri) + 100 (res/com) + 300 (transport) = 500 + assert result.loc[("AT1", "MWh")] == pytest.approx(500.0) + + def test_handles_empty_all_oil_without_failing(self): + """Function should work when total oil-withdrawal denominator is empty.""" + result = Final_Energy_by_Carrier__Oil( + self._OilNetwork(all_oil_empty=True, non_fossil_empty=True) + ) + + assert isinstance(result, pd.Series) + assert isinstance(result.index, pd.MultiIndex) + assert result.index.names == ["location", "unit"] + assert not result.isna().any() + assert (result == 0.0).all() + + def test_returns_dataframe_for_aggregate_per_year_false(self): + """Function should return a timeseries DataFrame for aggregate_per_year=False.""" + result = Final_Energy_by_Carrier__Oil( + self._OilNetwork(), + aggregate_per_year=False, + ) + + assert isinstance(result, pd.DataFrame) + assert isinstance(result.index, pd.MultiIndex) + assert result.index.names == ["location", "unit"] + assert isinstance(result.columns, pd.DatetimeIndex) + + # --------------------------------------------------------------------------- # Tests for Final_Energy_by_Sector__Transportation # ---------------------------------------------------------------------------