"""Parallel orchestrator for the SDOM Resiliency Evaluation module (Phase 5).
This module exposes :func:`run_resiliency_evaluation`, which fans out one
short-horizon outage LP per anchor hour, solves each independently with HiGHS,
and aggregates the per-hour outcomes into a :class:`ResiliencyResults`
container.
Each per-hour problem is built by :func:`sdom.resiliency.build_outage_dispatch`
(Phase 4) and solved in its own (possibly remote) Python process. Workers do
not exchange Pyomo objects: the orchestrator pickles the lightweight
``DesignedSystem`` / ``BaselineDispatchResults`` / ``OutageSpec`` payload and
each worker rebuilds, solves and discards its model independently.
Math reference: ``dev_guidelines/resiliency evaluation/math_model.md``
section 7 (per-hour metrics: EUE, USE_hours, max unserved MW).
"""
from __future__ import annotations
import logging
import os
import time
import traceback
from concurrent.futures import ProcessPoolExecutor
from typing import Any, Iterable
import pandas as pd
import pyomo.environ as pyo
from sdom.resiliency.outage_dispatch import build_outage_dispatch
from sdom.resiliency.outage_scenarios import OutageSpec
from sdom.resiliency.system_state import (
BaselineDispatchResults,
DesignedSystem,
ResiliencyResults,
)
logger = logging.getLogger(__name__)
__all__ = ["run_resiliency_evaluation"]
# Module-level handle so tests can monkeypatch the underlying builder to
# simulate a worker failure on a specific hour. Workers reach this through
# ``_solve_one_hour`` which dereferences the symbol at call time.
_build_outage_dispatch = build_outage_dispatch
# Slack tolerance used when counting hours with unserved energy.
_USE_EPS = 1e-6
# Per-hour record schema (kept in one place for column ordering).
_PER_HOUR_COLUMNS = [
"EUE",
"USE_hours",
"max_unserved_MW",
"objective_value",
"solver_status",
"solve_time_s",
"truncated",
"error_message",
]
def _resolve_solver(solver: str):
"""Return a Pyomo solver factory, preferring ``appsi_highs`` for ``highs``.
Parameters
----------
solver : str
Solver name. ``"highs"`` first tries ``appsi_highs`` and falls back
to ``highs``; any other value is forwarded as-is to
:func:`pyomo.environ.SolverFactory`.
Returns
-------
pyomo.opt.solver.SolverBase
"""
if solver == "highs":
for name in ("appsi_highs", "highs"):
try:
s = pyo.SolverFactory(name)
if s is not None and s.available(exception_flag=False):
return s
except Exception: # pragma: no cover - solver discovery
continue
return pyo.SolverFactory("highs")
return pyo.SolverFactory(solver)
def _compute_truncation(
*,
start_hour: int,
duration_hours: int,
max_recovery: int,
n_hours: int,
) -> bool:
intended_end = start_hour + duration_hours + max_recovery - 1
clipped_end = min(intended_end, n_hours)
return clipped_end < intended_end
def _solve_one_hour(payload: dict[str, Any]) -> dict[str, Any]:
"""Build, solve and summarise a single-hour outage LP.
Module-level (picklable) so it can be dispatched by a
:class:`concurrent.futures.ProcessPoolExecutor` on Windows ``spawn``
start method.
Parameters
----------
payload : dict
Flat dictionary with keys
``baseline_results``, ``outage_spec``, ``designed_system``,
``start_hour``, ``slack_penalty``, ``curtailment_penalty``,
``min_soc_per_tech``, ``n_hours``, ``solver``, ``solver_options``.
Returns
-------
dict
Per-hour record with keys matching :data:`_PER_HOUR_COLUMNS` plus
``"start_hour"``. On a worker failure all numeric metrics are set
to 0/NaN, ``solver_status`` is ``"error"`` and ``error_message``
carries the formatted exception.
"""
start_hour = int(payload["start_hour"])
n_hours = int(payload["n_hours"])
outage_spec: OutageSpec = payload["outage_spec"]
designed_system: DesignedSystem = payload["designed_system"]
duration_hours = int(outage_spec.duration_hours)
recovery_per_tech = outage_spec.resolve_recovery_hours(designed_system)
max_recovery = max(recovery_per_tech.values()) if recovery_per_tech else 0
truncated = _compute_truncation(
start_hour=start_hour,
duration_hours=duration_hours,
max_recovery=max_recovery,
n_hours=n_hours,
)
record: dict[str, Any] = {
"start_hour": start_hour,
"EUE": 0.0,
"USE_hours": 0,
"max_unserved_MW": 0.0,
"objective_value": float("nan"),
"solver_status": "error",
"solve_time_s": 0.0,
"truncated": bool(truncated),
"error_message": "",
}
t0 = time.perf_counter()
try:
model = _build_outage_dispatch(
payload["baseline_results"],
start_hour=start_hour,
outage_spec=outage_spec,
designed_system=designed_system,
slack_penalty=float(payload["slack_penalty"]),
curtailment_penalty=float(payload["curtailment_penalty"]),
min_soc_per_tech=payload.get("min_soc_per_tech"),
n_hours=n_hours,
profile=bool(payload.get("profile", False)),
)
solver = _resolve_solver(str(payload["solver"]))
solver_options = payload.get("solver_options") or {}
res = solver.solve(model, options=solver_options)
status = str(res.solver.termination_condition)
u_values = [float(pyo.value(model.u[t])) for t in model.h]
eue = float(sum(u_values))
use_hours = int(sum(1 for v in u_values if v > _USE_EPS))
max_unserved = float(max(u_values)) if u_values else 0.0
obj = float(pyo.value(model.objective))
record.update(
EUE=eue,
USE_hours=use_hours,
max_unserved_MW=max_unserved,
objective_value=obj,
solver_status=status,
error_message="",
)
except Exception as exc: # noqa: BLE001 - failure isolation by design
record["solver_status"] = "error"
record["error_message"] = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"
finally:
record["solve_time_s"] = float(time.perf_counter() - t0)
return record
def _resolve_designed_system(
baseline_results: BaselineDispatchResults,
designed_system: DesignedSystem | None,
) -> DesignedSystem:
if designed_system is not None:
if not isinstance(designed_system, DesignedSystem):
raise TypeError("designed_system must be a DesignedSystem instance.")
return designed_system
md = baseline_results.metadata or {}
ds = md.get("designed_system")
if ds is None:
raise ValueError(
"designed_system was not provided and baseline_results.metadata "
"does not contain a 'designed_system' entry. Pass the "
"DesignedSystem explicitly via the designed_system kwarg."
)
if not isinstance(ds, DesignedSystem):
raise TypeError(
"baseline_results.metadata['designed_system'] must be a "
"DesignedSystem instance."
)
return ds
def _resolve_n_workers(
n_workers: int | None,
n_payloads: int,
) -> int:
if n_workers is None:
cpu = os.cpu_count() or 1
resolved = max(1, cpu - 1)
else:
resolved = int(n_workers)
if resolved < 1:
raise ValueError("n_workers must be >= 1.")
# Never spawn more workers than we have hours to evaluate.
return max(1, min(resolved, max(1, n_payloads)))
[docs]
def run_resiliency_evaluation(
baseline_results,
*,
outage_spec,
designed_system=None,
hours=None,
slack_penalty=10_000.0,
curtailment_penalty=0.0,
min_soc_per_tech=None,
n_hours=8760,
n_workers=None,
solver="highs",
solver_options=None,
profile_outages=False,
):
"""Run the per-hour outage evaluation in parallel and aggregate metrics.
For every anchor hour ``h`` in ``hours`` the runner builds the
short-horizon outage LP via :func:`build_outage_dispatch`, solves it
with HiGHS, and records ``EUE``, ``USE_hours``, ``max_unserved_MW``
and the objective value. Per-hour problems are independent and are
solved in parallel via :class:`concurrent.futures.ProcessPoolExecutor`
when ``n_workers > 1``; ``n_workers == 1`` (or a single-hour run)
falls back to an in-process serial loop.
Parameters
----------
baseline_results : BaselineDispatchResults
Output of :func:`run_baseline_dispatch`. Must carry an SOC
trajectory; if its ``metadata`` does not contain a
``"designed_system"`` entry, ``designed_system`` must be supplied.
outage_spec : OutageSpec
Outage / de-rating specification, broadcast to every anchor hour.
designed_system : DesignedSystem, optional
Source of truth for capacities and time series. Takes precedence
over ``baseline_results.metadata['designed_system']`` when both
are provided.
hours : iterable of int, optional
Anchor hours to evaluate. ``None`` (default) evaluates every hour
``1..n_hours``. Duplicates are removed; the result is sorted.
slack_penalty : float, optional
Penalty (USD/MWh) applied to slack ``u[t]``. Default ``10_000``.
curtailment_penalty : float, optional
Penalty applied to curtailed VRE energy (USD/MWh). Default ``0``.
min_soc_per_tech : dict, optional
Operational SOC floor per storage tech (fraction of ``Cap_E``);
forwarded to the outage builder.
n_hours : int, optional
Length of the baseline horizon used for end-of-year clipping.
Default ``8760``.
n_workers : int, optional
Worker pool size. ``None`` -> ``max(1, os.cpu_count() - 1)``;
``1`` forces serial mode. Always clamped to ``len(hours)``.
solver : str, optional
Pyomo solver name. ``"highs"`` first tries ``appsi_highs``.
Default ``"highs"``.
solver_options : dict, optional
Solver options forwarded to ``solver.solve(..., options=...)``.
profile_outages : bool, optional
When ``True`` and ``n_workers == 1``, profile every per-hour
outage build via
:class:`~sdom.utils_performance_meassure.ModelInitProfiler`.
Ignored (with a warning) when ``n_workers > 1`` because each
worker would emit its own summary on a separate process.
Default ``False``.
Returns
-------
ResiliencyResults
Per-hour records (sorted by ``start_hour``) plus run metadata
including ``n_workers_used`` and a reference to ``outage_spec``.
Raises
------
ValueError
If ``designed_system`` cannot be resolved from arguments or
metadata, or if ``n_workers < 1``.
TypeError
If ``baseline_results`` is not a :class:`BaselineDispatchResults`
instance, or ``outage_spec`` is not an :class:`OutageSpec`.
Notes
-----
Failure isolation: if any worker raises an exception, that hour's
record is marked ``solver_status="error"`` and its ``error_message``
field carries the formatted traceback. Other hours continue normally.
"""
if not isinstance(baseline_results, BaselineDispatchResults):
raise TypeError(
"baseline_results must be a BaselineDispatchResults instance."
)
if not isinstance(outage_spec, OutageSpec):
raise TypeError("outage_spec must be an OutageSpec instance.")
ds = _resolve_designed_system(baseline_results, designed_system)
n_hours = int(n_hours)
if n_hours <= 0:
raise ValueError("n_hours must be a positive integer.")
if hours is None:
hour_list = list(range(1, n_hours + 1))
else:
hour_list = sorted({int(h) for h in hours})
for h in hour_list:
if not (1 <= h <= n_hours):
raise ValueError(
f"hours contains {h}, which is outside [1, {n_hours}]."
)
n_workers_used = _resolve_n_workers(n_workers, len(hour_list))
profile_outages_effective = bool(profile_outages) and n_workers_used == 1
if profile_outages and not profile_outages_effective:
logger.warning(
"profile_outages=True is ignored when n_workers > 1 "
"(would emit one summary per worker process)."
)
logger.info(
"Running resiliency evaluation: %d anchor hour(s), n_workers=%d, solver=%r, "
"slack_penalty=%g.",
len(hour_list),
n_workers_used,
solver,
slack_penalty,
)
payloads = [
{
"baseline_results": baseline_results,
"outage_spec": outage_spec,
"designed_system": ds,
"start_hour": h,
"slack_penalty": float(slack_penalty),
"curtailment_penalty": float(curtailment_penalty),
"min_soc_per_tech": min_soc_per_tech,
"n_hours": n_hours,
"solver": solver,
"solver_options": dict(solver_options) if solver_options else {},
"profile": profile_outages_effective,
}
for h in hour_list
]
if not payloads:
records: list[dict[str, Any]] = []
elif n_workers_used == 1:
logger.debug("Solving %d outage problem(s) serially.", len(payloads))
records = [_solve_one_hour(p) for p in payloads]
else:
logger.debug(
"Dispatching %d outage problem(s) to ProcessPoolExecutor with %d worker(s).",
len(payloads),
n_workers_used,
)
with ProcessPoolExecutor(max_workers=n_workers_used) as pool:
# ``map`` preserves the order of ``payloads`` regardless of
# worker completion order.
records = list(pool.map(_solve_one_hour, payloads))
records.sort(key=lambda r: int(r["start_hour"]))
if records:
df = pd.DataFrame(records)
df = df.set_index("start_hour")
df.index.name = "hour"
# Ensure consistent column ordering even if a column is missing.
for col in _PER_HOUR_COLUMNS:
if col not in df.columns:
df[col] = pd.NA
df = df[_PER_HOUR_COLUMNS]
else:
df = pd.DataFrame(columns=_PER_HOUR_COLUMNS)
df.index.name = "hour"
metadata = {
"n_workers_used": int(n_workers_used),
"outage_spec": outage_spec,
"n_hours": n_hours,
"solver": solver,
"n_hours_evaluated": len(hour_list),
}
n_errors = (
int((df["solver_status"] == "error").sum())
if "solver_status" in df.columns and not df.empty
else 0
)
logger.info(
"Resiliency evaluation complete: %d hour(s) processed, %d worker error(s).",
len(hour_list),
n_errors,
)
return ResiliencyResults(per_hour=df, metadata=metadata)