"""Load a fixed-capacity designed system from SDOM output CSV snapshots.
Phase 1 of the resiliency module reads the per-scenario capacities from a
prior SDOM design run plus the matching previous-stage input time-series CSVs
and returns a :class:`~sdom.resiliency.system_state.DesignedSystem`.
Notes
-----
- ``OutputGeneration_*.csv`` and ``OutputStorage_*.csv`` are intentionally
ignored at this stage (per the resiliency-module plan).
- Files whose names contain ``Phase1`` are excluded from snapshot discovery.
- The previous-stage inputs directory does NOT contain a
``formulations.csv``; CSVs are therefore read directly with pandas
rather than via :func:`sdom.io_manager.load_data`.
"""
from __future__ import annotations
import glob
import logging
import os
from pathlib import Path
import pandas as pd
from sdom.resiliency.system_state import DesignedSystem
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Defaults
# ---------------------------------------------------------------------------
_DEFAULT_FORMULATION_MAP: dict[str, str] = {
"Imports": "ImportsWithDemandChargesFormulation",
"Exports": "ExportsFormulation",
"Storage": "StorageFormulation",
"Thermal": "ThermalFormulation",
"Solar": "VREFormulation",
"Wind": "VREFormulation",
"Hydro": "HydroFormulation",
"Nuclear": "NuclearFormulation",
"OtherRenewables": "OtherRenewablesFormulation",
"Load": "LoadFormulation",
}
_VRE_AGGREGATE_TECHS = {"Solar PV", "Wind", "All generation", "All technologies"}
# Metric labels in OutputSummary mapped to canonical storage capacity keys.
_STORAGE_METRIC_MAP = {
"charge power capacity": "Cap_Pch",
"discharge power capacity": "Cap_Pdis",
"energy capacity": "Cap_E",
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _find_snapshot_file(snapshot_dir: Path, prefix: str, year: int) -> Path:
"""Return the snapshot CSV matching ``{year}_{prefix}_*.csv``.
Parameters
----------
snapshot_dir : pathlib.Path
Directory to search.
prefix : str
File prefix, e.g. ``"OutputSummary"``.
year : int
Year used in the file name.
Returns
-------
pathlib.Path
Raises
------
FileNotFoundError
When no matching file is found.
"""
pattern = str(snapshot_dir / f"{year}_{prefix}_*.csv")
matches = [Path(p) for p in glob.glob(pattern) if "Phase1" not in os.path.basename(p)]
if not matches:
raise FileNotFoundError(
f"No {prefix} snapshot file matching pattern '{pattern}' "
f"(excluding Phase1 files) was found."
)
# Deterministic selection: shortest filename wins (avoids picking ad-hoc copies).
matches.sort(key=lambda p: (len(p.name), p.name))
return matches[0]
def _detect_scenario_column(df: pd.DataFrame) -> str:
"""Return the scenario-filter column name (``Scenario`` or ``Run``)."""
for col in ("Scenario", "Run"):
if col in df.columns:
return col
raise ValueError(
"Snapshot CSV is missing a scenario column; expected one of "
"['Scenario', 'Run']."
)
def _filter_scenario(df: pd.DataFrame, scenario_id: int, file_label: str) -> tuple[pd.DataFrame, int]:
"""Apply hybrid scenario-id resolution to a snapshot DataFrame.
Parameters
----------
df : pandas.DataFrame
Snapshot DataFrame containing a ``Scenario`` or ``Run`` column.
scenario_id : int
User-requested scenario id.
file_label : str
Short label used in error / warning messages.
Returns
-------
tuple of (pandas.DataFrame, int)
Filtered DataFrame plus the resolved scenario id.
Raises
------
ValueError
When multiple scenarios are present and none match ``scenario_id``.
"""
col = _detect_scenario_column(df)
unique_ids = sorted(int(v) for v in df[col].dropna().unique())
if len(unique_ids) == 1:
only = unique_ids[0]
if int(scenario_id) != only:
logger.warning(
"%s: only scenario_id=%s is present; ignoring user-supplied scenario_id=%s.",
file_label,
only,
scenario_id,
)
return df[df[col] == only].copy(), only
if int(scenario_id) not in unique_ids:
raise ValueError(
f"{file_label}: scenario_id={scenario_id} not found. "
f"Available scenario ids: {unique_ids}."
)
return df[df[col] == int(scenario_id)].copy(), int(scenario_id)
def _warn_zero_capacity(tech: str, value: float) -> None:
logger.warning(
"Technology '%s' has capacity=%s; excluding from designed system.",
tech,
value,
)
def _normalize_metric(metric: object) -> str:
return str(metric).strip().lower()
def _load_summary_capacities(summary_path: Path, scenario_id: int):
"""Parse OutputSummary into raw capacity dicts.
Returns
-------
dict
``{"storage_caps_raw", "thermal_caps_raw", "solar_total",
"wind_total", "scenario_id"}``.
"""
df = pd.read_csv(summary_path)
df, resolved_id = _filter_scenario(df, scenario_id, file_label=summary_path.name)
storage_caps_raw: dict[str, dict[str, float]] = {}
thermal_caps_raw: dict[str, float] = {}
solar_total = 0.0
wind_total = 0.0
for _, row in df.iterrows():
metric = _normalize_metric(row["Metric"])
tech = row["Technology"]
try:
value = float(row["Optimal Value"])
except (TypeError, ValueError):
continue
# Storage capacities
if metric in _STORAGE_METRIC_MAP:
if pd.isna(tech) or tech == "All storage":
continue
key = _STORAGE_METRIC_MAP[metric]
storage_caps_raw.setdefault(str(tech), {})[key] = value
continue
# Generation / VRE capacity rows
if metric == "capacity":
if pd.isna(tech):
continue
tech_str = str(tech).strip()
if tech_str == "Solar PV":
solar_total = value
elif tech_str == "Wind":
wind_total = value
elif tech_str in _VRE_AGGREGATE_TECHS:
continue
else:
thermal_caps_raw[tech_str] = value
return {
"storage_caps_raw": storage_caps_raw,
"thermal_caps_raw": thermal_caps_raw,
"solar_total": solar_total,
"wind_total": wind_total,
"scenario_id": resolved_id,
}
def _load_vre_per_plant(snapshot_dir: Path, scenario_id: int, year: int):
"""Read OutputSelectedVRE and return per-plant capacity dicts."""
path = _find_snapshot_file(snapshot_dir, "OutputSelectedVRE", year)
df = pd.read_csv(path)
# Strip stray spaces on header names (the file ships with "Selection ").
df.columns = [c.strip() for c in df.columns]
df, _ = _filter_scenario(df, scenario_id, file_label=path.name)
cap_col = "Capacity (MW)"
id_col = "VRE unit ID"
tech_col = "Technology"
sel_col = "Selection"
solar_caps: dict[str, float] = {}
wind_caps: dict[str, float] = {}
for _, row in df.iterrows():
plant_id = str(row[id_col]).strip()
selection = float(row[sel_col]) if sel_col in df.columns else 1.0
if selection <= 0.0:
continue
capacity = float(row[cap_col]) * selection
tech = str(row[tech_col]).strip()
if tech == "Solar PV":
solar_caps[plant_id] = capacity
elif tech == "Wind":
wind_caps[plant_id] = capacity
return solar_caps, wind_caps
def _read_hourly_csv(path: Path) -> pd.DataFrame:
"""Read a `*Hour`-indexed hourly CSV, normalising the index column."""
df = pd.read_csv(path)
df.columns = [c.lstrip("*").strip() for c in df.columns]
if "Hour" in df.columns:
df = df.set_index("Hour")
return df
def _hourly_series(path: Path, value_col: str | None = None) -> pd.Series:
df = _read_hourly_csv(path)
if value_col is None:
value_col = df.columns[0]
s = df[value_col].astype(float)
s.name = value_col
return s
def _load_input_csvs(inputs_dir: Path, year: int) -> dict[str, pd.DataFrame | pd.Series]:
"""Load all hourly / parameter input CSVs for the given year."""
def p(name: str) -> Path:
return inputs_dir / name
return {
"load": _hourly_series(p(f"Load_hourly_{year}.csv")),
"cf_solar": _read_hourly_csv(p(f"CFSolar_{year}.csv")).astype(float),
"cf_wind": _read_hourly_csv(p(f"CFWind_{year}.csv")).astype(float),
"nuclear": _hourly_series(p(f"Nucl_hourly_{year}.csv")),
"hydro": _hourly_series(p(f"lahy_hourly_{year}.csv")),
"other_renewables": _hourly_series(p(f"otre_hourly_{year}.csv")),
"import_cap": _hourly_series(p(f"Import_Cap_{year}.csv")),
"import_price": _hourly_series(p(f"import_prices_{year}.csv")),
"export_cap": _hourly_series(p(f"Export_Cap_{year}.csv")),
"export_price": _hourly_series(p(f"export_prices_{year}.csv")),
"phi_fix_t": _hourly_series(p("fixed_dem_charges.csv")),
"phi_var_t": _hourly_series(p("var_dem_charges.csv")),
"storage_data": pd.read_csv(p(f"StorageData_{year}.csv"), index_col=0),
"balancing_units": pd.read_csv(p(f"Data_Balancing_units_{year}.csv")),
}
def _compute_month_of_hour(year: int) -> pd.Series:
"""Return a Series mapping hour-of-year (1..8760) to month (1..12)."""
idx = pd.date_range(start=f"{year}-01-01", periods=8760, freq="h")
return pd.Series(idx.month.values, index=range(1, 8761), name="month")
def _build_storage_caps(
storage_caps_raw: dict[str, dict[str, float]],
storage_data: pd.DataFrame,
) -> dict[str, dict[str, float]]:
"""Combine snapshot capacities with StorageData parameters.
``storage_data`` is indexed by parameter name (P_Capex, Eff, FOM, VOM, ...)
with one column per storage technology.
"""
out: dict[str, dict[str, float]] = {}
techs_in_data = list(storage_data.columns)
for tech in techs_in_data:
cap = storage_caps_raw.get(tech, {})
cap_e = float(cap.get("Cap_E", 0.0) or 0.0)
cap_pch = float(cap.get("Cap_Pch", 0.0) or 0.0)
cap_pdis = float(cap.get("Cap_Pdis", 0.0) or 0.0)
if cap_e <= 0.0 and cap_pch <= 0.0 and cap_pdis <= 0.0:
_warn_zero_capacity(tech, cap_e)
continue
# StorageData parameters: ``Eff`` is round-trip-style efficiency in
# this project; we reuse it for both charge and discharge until a more
# detailed split is added.
eff = float(storage_data.at["Eff", tech]) if "Eff" in storage_data.index else 1.0
vom = float(storage_data.at["VOM", tech]) if "VOM" in storage_data.index else 0.0
out[tech] = {
"Cap_Pch": cap_pch,
"Cap_Pdis": cap_pdis,
"Cap_E": cap_e,
"eta_ch": eff,
"eta_dis": eff,
"soc_min_frac": 0.0,
"vom": vom,
}
return out
def _build_thermal_caps(
thermal_caps_raw: dict[str, float],
balancing_units: pd.DataFrame,
) -> dict[str, dict[str, float]]:
"""Aggregate thermal-tech capacity / cost parameters.
The previous-stage ``Data_Balancing_units_{year}.csv`` lists individual
plants without a Technology column. We therefore aggregate by mean of
HeatRate / FuelCost / VOM across all rows (documented behavior).
"""
out: dict[str, dict[str, float]] = {}
if balancing_units.empty:
agg_heat = agg_fuel = agg_vom = 0.0
else:
agg_heat = float(balancing_units["HeatRate"].mean()) if "HeatRate" in balancing_units.columns else 0.0
agg_fuel = float(balancing_units["FuelCost"].mean()) if "FuelCost" in balancing_units.columns else 0.0
agg_vom = float(balancing_units["VOM"].mean()) if "VOM" in balancing_units.columns else 0.0
for tech, capacity in thermal_caps_raw.items():
if not capacity or float(capacity) <= 0.0:
_warn_zero_capacity(tech, capacity)
continue
var_cost = agg_heat * agg_fuel + agg_vom
out[tech] = {
"capacity_MW": float(capacity),
"heat_rate": agg_heat,
"fuel_cost": agg_fuel,
"vom": agg_vom,
"var_cost": var_cost,
}
return out
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def load_designed_system(
snapshot_dir,
*,
inputs_dir,
year=2030,
scenario_id=1,
formulation_overrides=None,
):
"""Load a fixed-capacity designed system from SDOM output snapshots.
Parameters
----------
snapshot_dir : str or pathlib.Path
Directory containing ``OutputSummary_*`` and ``OutputSelectedVRE_*``
CSVs from a prior SDOM design run. ``OutputGeneration_*`` and
``OutputStorage_*`` files are ignored at this stage; files containing
``Phase1`` in their names are also excluded.
inputs_dir : str or pathlib.Path
Directory holding the previous-stage time-series and parameter CSVs
(``Load_hourly_*``, ``CFSolar_*``, ``CFWind_*``,
``Data_Balancing_units_*``, ``StorageData_*``, ``Import_Cap_*``,
``import_prices_*``, ``Export_Cap_*``, ``export_prices_*``,
``fixed_dem_charges.csv``, ``var_dem_charges.csv``).
year : int, optional
Calendar year used in CSV filenames. Default ``2030``.
scenario_id : int, optional
Scenario / Run id to extract from snapshot CSVs. Default ``1``.
If a snapshot file contains a single unique scenario, that one is
used and a warning is emitted when ``scenario_id`` differs.
If multiple scenarios are present, ``scenario_id`` must match one
of them; otherwise :class:`ValueError` is raised listing the
available ids.
formulation_overrides : dict, optional
Mapping ``{component: formulation_name}`` overlaid on the default
formulation map.
Returns
-------
DesignedSystem
Raises
------
FileNotFoundError
When the ``OutputSummary_*`` snapshot is missing.
ValueError
When ``scenario_id`` cannot be resolved against the snapshot.
Examples
--------
>>> ds = load_designed_system(
... "Data/resiliency_eval/3MW_critical_load_24hrs_outage_24hrs_recovery",
... inputs_dir="Data/resiliency_eval/inputs_previous_stage/Paper_PGnE/Paper",
... year=2030,
... scenario_id=1,
... )
>>> ds.scenario_id
1
"""
snapshot_dir = Path(snapshot_dir)
inputs_dir = Path(inputs_dir)
logger.info(
"Loading designed system: snapshot_dir=%s, inputs_dir=%s, year=%s, scenario_id=%s.",
snapshot_dir,
inputs_dir,
year,
scenario_id,
)
logger.debug("Locating OutputSummary snapshot for year=%s.", year)
summary_path = _find_snapshot_file(snapshot_dir, "OutputSummary", year)
logger.debug("Reading summary capacities from %s.", summary_path.name)
summary_info = _load_summary_capacities(summary_path, scenario_id)
resolved_id = summary_info["scenario_id"]
logger.debug("Resolved scenario_id=%s.", resolved_id)
logger.debug("Loading per-plant VRE selections from snapshot.")
solar_caps, wind_caps = _load_vre_per_plant(snapshot_dir, resolved_id, year)
logger.debug(
"Per-plant VRE counts: solar=%d, wind=%d.", len(solar_caps), len(wind_caps)
)
logger.debug("Loading previous-stage input CSVs from %s.", inputs_dir)
inputs = _load_input_csvs(inputs_dir, year)
logger.debug("Combining snapshot capacities with StorageData parameters.")
storage_caps = _build_storage_caps(
summary_info["storage_caps_raw"], inputs["storage_data"]
)
logger.debug("Aggregating thermal-tech capacity / cost parameters.")
thermal_caps = _build_thermal_caps(
summary_info["thermal_caps_raw"], inputs["balancing_units"]
)
formulation_map = dict(_DEFAULT_FORMULATION_MAP)
if formulation_overrides:
logger.debug("Applying formulation overrides: %s.", formulation_overrides)
formulation_map.update(formulation_overrides)
logger.info(
"Designed system loaded: storage=%d, thermal=%d, solar plants=%d, wind plants=%d.",
len(storage_caps),
len(thermal_caps),
len(solar_caps),
len(wind_caps),
)
return DesignedSystem(
storage_caps=storage_caps,
thermal_caps=thermal_caps,
solar_caps=solar_caps,
wind_caps=wind_caps,
load=inputs["load"],
cf_solar=inputs["cf_solar"],
cf_wind=inputs["cf_wind"],
nuclear=inputs["nuclear"],
hydro=inputs["hydro"],
other_renewables=inputs["other_renewables"],
import_cap=inputs["import_cap"],
import_price=inputs["import_price"],
export_cap=inputs["export_cap"],
export_price=inputs["export_price"],
phi_fix_t=inputs["phi_fix_t"],
phi_var_t=inputs["phi_var_t"],
month_of_hour=_compute_month_of_hour(year),
scenario_id=resolved_id,
year=year,
formulation_map=formulation_map,
)