"""Dataclasses describing the fixed-capacity designed system and baseline state.
These containers are populated by :mod:`sdom.resiliency.data_loader` and consumed
by the (future) baseline and outage dispatch builders.
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
_RESULTS_VERSION = "1"
_DEFAULT_RESULTS_DIR = "results_resiliency"
def _summarize_outage_spec(outage_spec) -> dict | None:
"""Return a JSON-safe summary of an :class:`OutageSpec` or ``None``."""
if outage_spec is None:
return None
summary: dict[str, Any] = {}
duration = getattr(outage_spec, "duration_hours", None)
if duration is not None:
summary["duration_hours"] = int(duration)
recovery = getattr(outage_spec, "recovery_hours", None)
if isinstance(recovery, dict):
summary["recovery_hours"] = {str(k): int(v) for k, v in recovery.items()}
elif recovery is not None:
summary["recovery_hours"] = int(recovery)
outaged = getattr(outage_spec, "outaged_assets", None)
if isinstance(outaged, dict):
summary["outaged_assets_components"] = sorted(str(k) for k in outaged.keys())
return summary or None
[docs]
@dataclass
class DesignedSystem:
"""Fixed-capacity designed system loaded from SDOM output snapshots.
Parameters
----------
storage_caps : dict
Mapping ``{tech: {"Cap_Pch", "Cap_Pdis", "Cap_E", "eta_ch",
"eta_dis", "soc_min_frac", "vom"}}`` for each storage technology with
non-zero capacity. Capacities are in MW / MWh.
thermal_caps : dict
Mapping ``{tech: {"capacity_MW", "heat_rate", "fuel_cost",
"vom", "var_cost"}}`` for each thermal technology with non-zero
capacity. ``var_cost = heat_rate * fuel_cost + vom``.
solar_caps : dict
Mapping ``{plant_id: capacity_MW}`` for selected solar plants.
wind_caps : dict
Mapping ``{plant_id: capacity_MW}`` for selected wind plants.
load, nuclear, hydro, other_renewables : pandas.Series
Hourly time-series (length 8760) indexed by hour-of-year (1..8760).
cf_solar, cf_wind : pandas.DataFrame
Hourly capacity factors with columns indexed by plant id.
import_cap, import_price, export_cap, export_price : pandas.Series
Hourly grid-exchange capacity and price series.
phi_fix_t, phi_var_t : pandas.Series
Hourly fixed and variable demand-charge tariffs (USD/MW or USD/MWh).
month_of_hour : pandas.Series
Mapping from hour-of-year (1..8760) to calendar month (1..12) used to
bill demand charges per month.
scenario_id : int
Scenario / Run id resolved from the snapshot CSVs.
year : int
Calendar year of the snapshot.
formulation_map : dict
Mapping ``{component: formulation_name}`` resolved from defaults
plus user-provided overrides.
"""
storage_caps: dict[str, dict[str, float]] = field(default_factory=dict)
thermal_caps: dict[str, dict[str, float]] = field(default_factory=dict)
solar_caps: dict[str, float] = field(default_factory=dict)
wind_caps: dict[str, float] = field(default_factory=dict)
load: pd.Series | None = None
cf_solar: pd.DataFrame | None = None
cf_wind: pd.DataFrame | None = None
nuclear: pd.Series | None = None
hydro: pd.Series | None = None
other_renewables: pd.Series | None = None
import_cap: pd.Series | None = None
import_price: pd.Series | None = None
export_cap: pd.Series | None = None
export_price: pd.Series | None = None
phi_fix_t: pd.Series | None = None
phi_var_t: pd.Series | None = None
month_of_hour: pd.Series | None = None
scenario_id: int = 1
year: int = 2030
formulation_map: dict[str, str] = field(default_factory=dict)
[docs]
@dataclass
class BaselineState:
"""Placeholder container for baseline-dispatch outputs (Phase 2).
Parameters
----------
soc_trajectory : pandas.DataFrame, optional
Hourly state-of-charge per storage technology (hour x tech).
solver_status : str, optional
Solver termination status from the baseline run.
objective_value : float, optional
Baseline objective value (USD).
metadata : dict, optional
Free-form solver / run metadata.
"""
soc_trajectory: pd.DataFrame | None = None
solver_status: str | None = None
objective_value: float | None = None
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class BaselineDispatchResults:
"""Trajectories and metadata produced by :func:`run_baseline_dispatch`.
Parameters
----------
soc_trajectory : pandas.DataFrame
Hourly state-of-charge per storage technology, indexed by hour and
with one column per tech (MWh).
pcha_trajectory, pdis_trajectory : pandas.DataFrame
Hourly charge / discharge per storage tech (MW).
pthermal_trajectory : pandas.DataFrame
Hourly thermal dispatch per balancing-unit Plant_id (MW). Empty
``DataFrame`` when no thermal units survive the snapshot filter.
psolar_trajectory, pwind_trajectory : pandas.DataFrame
Hourly dispatched solar / wind power per plant id (MW).
pimp, pexp : pandas.Series
Hourly imports / exports (MW).
nuclear, hydro, other_renewables, load : pandas.Series
Hourly time-series parameters echoed from the input system (MW).
month_of_hour : pandas.Series
Hour -> month mapping used by the demand-charge billing.
objective_value : float
Operational objective value (USD).
solver_status : str
Solver termination condition (e.g. ``"optimal"``).
metadata : dict, optional
Free-form solver / run metadata.
"""
soc_trajectory: pd.DataFrame | None = None
pcha_trajectory: pd.DataFrame | None = None
pdis_trajectory: pd.DataFrame | None = None
pthermal_trajectory: pd.DataFrame | None = None
psolar_trajectory: pd.DataFrame | None = None
pwind_trajectory: pd.DataFrame | None = None
pimp: pd.Series | None = None
pexp: pd.Series | None = None
nuclear: pd.Series | None = None
hydro: pd.Series | None = None
other_renewables: pd.Series | None = None
load: pd.Series | None = None
month_of_hour: pd.Series | None = None
objective_value: float | None = None
solver_status: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class ResiliencyResults:
"""Per-hour outage outcomes (lightweight, Phase 5).
Aggregate metrics (LOLP, LOLE, percentiles) and plotting are added in
Phase 6.
Parameters
----------
per_hour : pandas.DataFrame
Indexed by ``hour`` (anchor ``start_hour``). Columns include
``["EUE", "USE_hours", "max_unserved_MW", "objective_value",
"solver_status", "solve_time_s", "truncated", "error_message"]``.
metadata : dict
Free-form run metadata. Conventionally includes
``{"n_workers_used", "outage_spec", "n_hours", "solver"}``.
"""
per_hour: pd.DataFrame
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""Return ``per_hour`` with the index promoted to a ``hour`` column.
Returns
-------
pandas.DataFrame
A copy of :attr:`per_hour` with ``hour`` as a regular column,
sorted by ``hour``.
"""
df = self.per_hour.reset_index()
if df.columns[0] != "hour":
df = df.rename(columns={df.columns[0]: "hour"})
return df.sort_values("hour").reset_index(drop=True)
[docs]
def eue_total(self) -> float:
"""Return the sum of per-hour expected unserved energy (MWh).
Returns
-------
float
"""
if "EUE" not in self.per_hour.columns:
return 0.0
return float(self.per_hour["EUE"].fillna(0.0).sum())
# ------------------------------------------------------------------
# Phase 6 - aggregate metrics
# ------------------------------------------------------------------
def _evaluated_frame(self) -> pd.DataFrame:
"""Return per-hour records with errored solves removed."""
df = self.per_hour
if "solver_status" in df.columns:
df = df[df["solver_status"] != "error"]
return df
def _aggregate_metrics(self) -> dict:
"""Compute the aggregate-metrics dict (Phase 6 spec)."""
df = self._evaluated_frame()
n_eval = int(len(df))
if "solver_status" in self.per_hour.columns:
n_err = int((self.per_hour["solver_status"] == "error").sum())
else:
n_err = 0
if n_eval == 0:
return {
"LOLP": float("nan"),
"LOLE": float("nan"),
"mean_EUE": float("nan"),
"max_EUE": float("nan"),
"EUE_p50": float("nan"),
"EUE_p95": float("nan"),
"EUE_p99": float("nan"),
"n_hours_evaluated": 0,
"n_errors": n_err,
}
eue = df["EUE"].astype(float).to_numpy() if "EUE" in df.columns else np.zeros(n_eval)
if "USE_hours" in df.columns:
use_hours = df["USE_hours"].astype(float).to_numpy()
else:
use_hours = np.zeros(n_eval)
return {
"LOLP": float(np.mean(eue > 0.0)),
"LOLE": float(np.mean(use_hours)),
"mean_EUE": float(np.mean(eue)),
"max_EUE": float(np.max(eue)),
"EUE_p50": float(np.percentile(eue, 50, method="linear")),
"EUE_p95": float(np.percentile(eue, 95, method="linear")),
"EUE_p99": float(np.percentile(eue, 99, method="linear")),
"n_hours_evaluated": n_eval,
"n_errors": n_err,
}
[docs]
def metrics(self, *, level: str = "aggregate"):
"""Aggregate or per-hour resiliency metrics.
Parameters
----------
level : {"aggregate", "per_hour"}, optional
``"aggregate"`` (default) returns a ``dict`` of scalar metrics
computed over the evaluated hours (errored hours excluded).
``"per_hour"`` returns a copy of :attr:`per_hour` with ``hour``
promoted to a column.
Returns
-------
dict or pandas.DataFrame
Raises
------
ValueError
If ``level`` is not one of the supported values.
Notes
-----
Aggregate metrics exclude rows with ``solver_status == "error"``;
the count of excluded rows is reported as ``n_errors``.
"""
if level == "aggregate":
return self._aggregate_metrics()
if level == "per_hour":
return self.to_dataframe()
raise ValueError(
f"Invalid level={level!r}. Expected 'aggregate' or 'per_hour'."
)
[docs]
def lolp(self) -> float:
"""Return the loss-of-load probability across evaluated hours.
Returns
-------
float
"""
return float(self._aggregate_metrics()["LOLP"])
[docs]
def lole(self) -> float:
"""Return the loss-of-load expectation (mean USE hours per scenario).
Returns
-------
float
"""
return float(self._aggregate_metrics()["LOLE"])
[docs]
def eue(self, *, p: float | None = None) -> float:
"""Return the mean EUE or an empirical percentile of EUE.
Parameters
----------
p : float, optional
Quantile in ``(0, 1)``. Default ``None`` returns the mean EUE.
Returns
-------
float
Raises
------
ValueError
If ``p`` is provided and not in ``(0, 1)``.
"""
df = self._evaluated_frame()
if "EUE" not in df.columns or len(df) == 0:
return float("nan")
eue = df["EUE"].astype(float).to_numpy()
if p is None:
return float(np.mean(eue))
if not (0.0 < float(p) < 1.0):
raise ValueError(f"Quantile p={p!r} must lie in the open interval (0, 1).")
return float(np.percentile(eue, float(p) * 100.0, method="linear"))
# ------------------------------------------------------------------
# Phase 6 - persistence
# ------------------------------------------------------------------
[docs]
def save(self, path: str | Path | None = None) -> Path:
"""Persist per-hour records and aggregate metrics to disk.
Parameters
----------
path : str or pathlib.Path, optional
Output directory. Default: ``./results_resiliency/`` relative to
the current working directory. The directory is created if it
does not exist.
Returns
-------
pathlib.Path
The directory the artifacts were written to.
Raises
------
ImportError
If no Parquet engine (``pyarrow`` or ``fastparquet``) is
available.
Notes
-----
Writes two files to ``path``:
* ``per_hour.parquet`` - the per-hour DataFrame.
* ``summary.json`` - aggregate metrics + JSON-safe metadata.
"""
out_dir = Path(path) if path is not None else Path.cwd() / _DEFAULT_RESULTS_DIR
out_dir.mkdir(parents=True, exist_ok=True)
logger.info("Saving ResiliencyResults to %s.", out_dir)
parquet_path = out_dir / "per_hour.parquet"
try:
self.per_hour.to_parquet(parquet_path, engine="auto")
except (ImportError, ValueError) as exc:
raise ImportError(
"Saving ResiliencyResults requires a Parquet engine. Install "
"'pyarrow' (recommended) or 'fastparquet'."
) from exc
summary_payload = self._build_summary_payload()
(out_dir / "summary.json").write_text(
json.dumps(summary_payload, indent=2, default=str), encoding="utf-8"
)
logger.debug(
"ResiliencyResults persisted: per_hour.parquet (%d rows) + summary.json.",
len(self.per_hour),
)
return out_dir
def _build_summary_payload(self) -> dict:
"""Return the JSON-safe payload written to ``summary.json``."""
meta_safe: dict[str, Any] = {}
for key in ("n_workers_used", "n_hours", "solver"):
if key in self.metadata:
value = self.metadata[key]
meta_safe[key] = value if _is_json_safe(value) else str(value)
outage_summary = _summarize_outage_spec(self.metadata.get("outage_spec"))
if outage_summary is not None:
meta_safe["outage_spec_summary"] = outage_summary
return {
"version": _RESULTS_VERSION,
"aggregate_metrics": self._aggregate_metrics(),
"metadata": meta_safe,
}
[docs]
@classmethod
def load(cls, path: str | Path) -> "ResiliencyResults":
"""Load a previously-saved :class:`ResiliencyResults` from ``path``.
Parameters
----------
path : str or pathlib.Path
Directory that previously received :meth:`save`.
Returns
-------
ResiliencyResults
Raises
------
FileNotFoundError
If ``per_hour.parquet`` or ``summary.json`` is missing.
"""
in_dir = Path(path)
parquet_path = in_dir / "per_hour.parquet"
summary_path = in_dir / "summary.json"
missing = [str(p) for p in (parquet_path, summary_path) if not p.exists()]
if missing:
raise FileNotFoundError(
f"Expected ResiliencyResults artifacts at {in_dir} "
f"(missing: {missing})."
)
try:
per_hour = pd.read_parquet(parquet_path, engine="auto")
except (ImportError, ValueError) as exc:
raise ImportError(
"Loading ResiliencyResults requires a Parquet engine. Install "
"'pyarrow' (recommended) or 'fastparquet'."
) from exc
summary = json.loads(summary_path.read_text(encoding="utf-8"))
metadata = dict(summary.get("metadata", {}))
return cls(per_hour=per_hour, metadata=metadata)
def _is_json_safe(value) -> bool:
"""Return ``True`` if ``value`` can be JSON-serialised by ``json.dumps``."""
try:
json.dumps(value)
except TypeError:
return False
return True