Source code for sdom.parametric.study

"""ParametricStudy orchestrator for SDOM sensitivity analysis."""

import itertools
import logging
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Dict, List, Optional

import pandas as pd

from ..io_manager import export_results
from ..results import OptimizationResults
from .sweeps import ScalarSweep, StorageFactorSweep, TsSweep
from .worker import _run_single_case

logger = logging.getLogger(__name__)


[docs] class ParametricStudy: """Run a multi-dimensional parametric sensitivity study in parallel. Accepts scalar, storage-factor, and time-series sweep definitions, constructs the full Cartesian product of all sweep dimensions, and dispatches each combination to a separate worker process via :class:`concurrent.futures.ProcessPoolExecutor`. Parameters ---------- base_data : dict SDOM data dictionary returned by :func:`sdom.load_data`. This object is **never** modified; each worker process receives its own deep copy before applying mutations. solver_config : dict Solver configuration dict from :func:`sdom.get_default_solver_config_dict`. n_hours : int, optional Number of simulation hours. Defaults to ``8760``. output_dir : str or None, optional Directory where per-case sub-directories and the summary CSV will be written. Pass ``None`` to skip all disk output. n_cores : int or None, optional Number of worker processes. Capped internally at ``max(1, os.cpu_count() - 1)``. Pass ``None`` to use the maximum safe count. Examples -------- >>> study = ParametricStudy(base_data=data, solver_config=solver_cfg) >>> study.add_scalar_sweep("scalars", "GenMix_Target", [0.8, 0.9, 1.0]) >>> study.add_ts_sweep("load_data", [0.95, 1.05]) >>> results = study.run() # 3 × 2 = 6 cases """
[docs] def __init__( self, base_data: dict, solver_config: dict, n_hours: int = 8760, output_dir: Optional[str] = None, n_cores: Optional[int] = None, ) -> None: self._base_data = base_data self._solver_config = solver_config self._n_hours = n_hours self._output_dir = output_dir self._n_cores = self._resolve_n_cores(n_cores) self._scalar_sweeps: List[ScalarSweep] = [] self._storage_factor_sweeps: List[StorageFactorSweep] = [] self._ts_sweeps: List[TsSweep] = [] # Populated by run(); consumed by analytic_tools self._case_metadata: List[dict] = []
# ------------------------------------------------------------------ # Read-only properties # ------------------------------------------------------------------ @property def output_dir(self) -> Optional[str]: """The root output directory passed to the constructor (may be ``None``).""" return self._output_dir @property def case_metadata(self) -> List[dict]: """Per-case metadata populated after :meth:`run` is called. Returns a list of dicts, one per case in Cartesian-product order (matching the order of the list returned by :meth:`run`). Each dict contains: - ``"case_name"`` — filesystem-safe case identifier - ``"case_index"`` — zero-based position in the Cartesian product - One additional key per registered sweep dimension, using the *param_name* (scalar / storage-factor sweeps) or *ts_key* (time-series sweeps) as the key, and the swept value as the value. Returns an empty list before :meth:`run` is called. """ return list(self._case_metadata) # ------------------------------------------------------------------ # Public sweep registration methods # ------------------------------------------------------------------
[docs] def add_scalar_sweep(self, data_key: str, param_name: str, values: list) -> None: """Register a scalar parameter sweep. Each value in *values* replaces ``data[data_key].loc[param_name, "Value"]`` for one case dimension. Parameters ---------- data_key : str Key in the SDOM data dict (e.g. ``"scalars"``). param_name : str Row label of the parameter (e.g. ``"GenMix_Target"``). values : list of float Discrete values to sweep over. """ self._scalar_sweeps.append(ScalarSweep(data_key, param_name, list(values))) logger.debug( "Registered ScalarSweep: data['%s']['%s'] over %d values", data_key, param_name, len(values), )
[docs] def add_storage_factor_sweep(self, param_name: str, factors: list) -> None: """Register a multiplicative storage-parameter sweep. Each factor scales the entire ``data["storage_data"].loc[param_name]`` row (all storage technologies) uniformly. Parameters ---------- param_name : str Row label in ``data["storage_data"]`` (e.g. ``"P_Capex"``). factors : list of float Multiplicative factors to apply. """ self._storage_factor_sweeps.append(StorageFactorSweep(param_name, list(factors))) logger.debug( "Registered StorageFactorSweep: storage_data['%s'] over %d factors", param_name, len(factors), )
[docs] def add_ts_sweep(self, ts_key: str, factors: list) -> None: """Register a time-series multiplicative sweep. Each factor scales the numeric column of ``data[ts_key]``. The column name is resolved automatically from :data:`sdom.parametric.mutations.TS_KEY_TO_COLUMN`. Parameters ---------- ts_key : str Key in the SDOM data dict (e.g. ``"load_data"``). factors : list of float Multiplicative scaling factors. """ self._ts_sweeps.append(TsSweep(ts_key, list(factors))) logger.debug( "Registered TsSweep: data['%s'] over %d factors", ts_key, len(factors), )
# ------------------------------------------------------------------ # Main execution # ------------------------------------------------------------------
[docs] def run(self) -> List[OptimizationResults]: """Execute all parametric combinations in parallel. Constructs the Cartesian product of all registered sweeps, submits every case to a :class:`~concurrent.futures.ProcessPoolExecutor`, reports progress as jobs complete, exports per-case CSVs (if *output_dir* was specified), and writes a summary CSV. Returns ------- list of OptimizationResults One entry per combination, in Cartesian-product order (matching the order cases were submitted). Cases that failed have ``is_optimal == False`` and a descriptive ``termination_condition``. """ case_dicts = self._build_case_dicts() # Persist lightweight metadata (no data/solver payloads) for analytic_tools self._case_metadata = [ { "case_name": cd["case_name"], "case_index": cd["case_index"], **{param: val for _, param, val in cd.get("scalar_mutations", [])}, **{param: factor for param, factor in cd.get("storage_factor_mutations", [])}, **{ts_key: factor for ts_key, factor in cd.get("ts_mutations", [])}, } for cd in case_dicts ] n_total = len(case_dicts) if n_total == 0: logger.warning("ParametricStudy.run(): no sweeps registered — nothing to run.") return [] logger.info( "ParametricStudy: starting %d cases on %d worker(s).", n_total, self._n_cores, ) # Pre-create output root if needed if self._output_dir: os.makedirs(self._output_dir, exist_ok=True) # Map future → case_dict so we can report and export on completion ordered_results: List[Optional[OptimizationResults]] = [None] * n_total with ProcessPoolExecutor(max_workers=self._n_cores) as executor: future_to_case = { executor.submit(_run_single_case, cd): cd for cd in case_dicts } completed = 0 for future in as_completed(future_to_case): cd = future_to_case[future] case_name: str = cd["case_name"] completed += 1 try: result: OptimizationResults = future.result() except Exception as exc: # noqa: BLE001 logger.error( "[%d/%d] Case '%s' raised an unhandled exception: %s", completed, n_total, case_name, exc, exc_info=True, ) result = OptimizationResults( termination_condition="exception", solver_status="error", gen_mix_target=float("nan"), ) status = "OK" if result.is_optimal else "FAILED" logger.info( "[%d/%d] %s — case '%s'", completed, n_total, status, case_name ) # Export per-case CSVs immediately (main process, not worker) if self._output_dir and result.is_optimal: case_output_dir = os.path.join(self._output_dir, case_name) export_results(result, case=case_name, output_dir=case_output_dir) ordered_results[cd["case_index"]] = result # Write summary CSV if self._output_dir: self._write_summary_csv(case_dicts, ordered_results) return ordered_results # type: ignore[return-value]
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _resolve_n_cores(self, requested: Optional[int]) -> int: """Return a safe worker count, capped at ``cpu_count - 1``.""" max_safe = max(1, (os.cpu_count() or 1) - 1) if requested is None: return max_safe capped = max(1, min(requested, max_safe)) if capped < requested: logger.warning( "ParametricStudy: requested %d cores but only %d are safe to use " "(cpu_count=%d). Using %d.", requested, capped, os.cpu_count(), capped, ) return capped def _build_case_dicts(self) -> List[dict]: """Build one case dict per Cartesian-product combination. Returns ------- list of dict Each dict is the payload passed to :func:`_run_single_case`. """ # Each sweep dimension is a list of (label, mutation-spec) pairs dimensions: list = [] for sweep in self._scalar_sweeps: dim = [ (f"{sweep.param_name}={v}", ("scalar", sweep.data_key, sweep.param_name, v)) for v in sweep.values ] dimensions.append(dim) for sweep in self._storage_factor_sweeps: dim = [ (f"{sweep.param_name}x{f}", ("storage_factor", sweep.param_name, f)) for f in sweep.factors ] dimensions.append(dim) for sweep in self._ts_sweeps: dim = [ (f"{sweep.ts_key}x{f}", ("ts", sweep.ts_key, f)) for f in sweep.factors ] dimensions.append(dim) if not dimensions: return [] case_dicts = [] for i, combination in enumerate(itertools.product(*dimensions)): # combination is a tuple of (label, mutation_spec) per dimension labels, mutations = zip(*combination) case_name = _make_safe_name("_".join(labels)) scalar_mutations = [] storage_factor_mutations = [] ts_mutations = [] for mut in mutations: if mut[0] == "scalar": _, data_key, param_name, value = mut scalar_mutations.append((data_key, param_name, value)) elif mut[0] == "storage_factor": _, param_name, factor = mut storage_factor_mutations.append((param_name, factor)) elif mut[0] == "ts": _, ts_key, factor = mut ts_mutations.append((ts_key, factor)) case_dicts.append({ "data": self._base_data, "solver_config": self._solver_config, "n_hours": self._n_hours, "case_name": case_name, "case_index": i, "scalar_mutations": scalar_mutations, "storage_factor_mutations": storage_factor_mutations, "ts_mutations": ts_mutations, }) # Detect and disambiguate colliding safe names by appending the index. # Two distinct combinations can produce the same safe name because # _make_safe_name collapses several characters to '_'. name_counts: Dict[str, int] = {} for cd in case_dicts: name_counts[cd["case_name"]] = name_counts.get(cd["case_name"], 0) + 1 for cd in case_dicts: if name_counts[cd["case_name"]] > 1: cd["case_name"] = f"{cd['case_name']}_{cd['case_index']}" logger.info( "ParametricStudy: %d case(s) generated from %d sweep dimension(s).", len(case_dicts), len(dimensions), ) return case_dicts def _write_summary_csv( self, case_dicts: List[dict], results: List[Optional[OptimizationResults]], ) -> None: """Write ``parametric_summary.csv`` to *output_dir*. Parameters ---------- case_dicts : list of dict Case descriptors (same order as *results*). results : list of OptimizationResults or None Collected results, one per case. """ rows = [] for cd, res in zip(case_dicts, results): row: dict = {"case_name": cd["case_name"]} # Add one column per swept parameter value for data_key, param_name, value in cd.get("scalar_mutations", []): row[f"{data_key}.{param_name}"] = value for param_name, factor in cd.get("storage_factor_mutations", []): row[f"storage_data.{param_name}_factor"] = factor for ts_key, factor in cd.get("ts_mutations", []): row[f"{ts_key}_factor"] = factor if res is not None: row["is_optimal"] = res.is_optimal row["total_cost"] = res.total_cost row["solver_status"] = res.solver_status row["termination_condition"] = res.termination_condition else: row["is_optimal"] = False row["total_cost"] = None row["solver_status"] = "unknown" row["termination_condition"] = "unknown" rows.append(row) summary_df = pd.DataFrame(rows) summary_path = os.path.join(self._output_dir, "parametric_summary.csv") summary_df.to_csv(summary_path, index=False) logger.info("ParametricStudy: summary CSV written to '%s'.", summary_path)
# --------------------------------------------------------------------------- # Utilities # --------------------------------------------------------------------------- def _make_safe_name(name: str) -> str: """Return a filesystem-safe version of *name*. Replaces characters that are problematic on Windows/Linux file systems (``/``, ``\\``, ``:``, ``*``, ``?``, ``"``, ``<``, ``>``, ``|``) with underscores and strips leading/trailing whitespace. """ for ch in r'/\\:*?"<>| ': name = name.replace(ch, "_") return name.strip("_")