Source code for sdom.results

"""Module for SDOM optimization results data structures and utilities."""

import logging
from dataclasses import dataclass, field
from typing import Any

import numpy as np
import pandas as pd
from pyomo.environ import sqrt, value as pyo_value

from .common.utilities import safe_pyomo_value
from .constants import MW_TO_KW


def _value_or_nan(obj) -> float:
    """Return a float value or NaN when not initialized/available."""
    val = getattr(obj, "value", None)
    if val is None:
        val = safe_pyomo_value(obj)
    return np.nan if val is None else float(val)


def _value_or_zero(obj) -> float:
    """Return a float value or zero when not initialized/available."""
    val = getattr(obj, "value", None)
    if val is None:
        val = safe_pyomo_value(obj)
    return 0.0 if val is None else float(val)


[docs] @dataclass class OptimizationResults: """Data class containing all optimization results from SDOM. This class stores the complete results from an SDOM optimization run, organized into DataFrames for different result categories (generation, storage, summary) and provides convenient accessors for specific metrics. Attributes ---------- termination_condition : str The solver termination condition (e.g., 'optimal', 'infeasible'). solver_status : str The solver status (e.g., 'ok', 'warning'). total_cost : float The total objective value (cost) from the optimization. gen_mix_target : float The generation mix target value used in this run. generation_df : pd.DataFrame Hourly generation dispatch results for all technologies. storage_df : pd.DataFrame Hourly storage operation results (charge, discharge, SOC). thermal_generation_df : pd.DataFrame Disaggregated hourly thermal generation by plant. installed_plants_df : pd.DataFrame Installed capacity for each individual power plant (solar, wind, thermal). summary_df : pd.DataFrame Summary metrics including capacities, costs, and totals. problem_info : dict Solver problem information (constraints, variables, etc.). capacity : dict Installed capacity by technology. storage_capacity : dict Storage capacity details (charge, discharge, energy). generation_totals : dict Total generation by technology. cost_breakdown : dict Detailed cost breakdown (CAPEX, OPEX, FOM, VOM). """ # Solver information termination_condition: str = "" solver_status: str = "" # Main objective total_cost: float = 0.0 gen_mix_target: float = 0.0 # DataFrames for CSV export generation_df: pd.DataFrame = field(default_factory=pd.DataFrame) storage_df: pd.DataFrame = field(default_factory=pd.DataFrame) thermal_generation_df: pd.DataFrame = field(default_factory=pd.DataFrame) installed_plants_df: pd.DataFrame = field(default_factory=pd.DataFrame) summary_df: pd.DataFrame = field(default_factory=pd.DataFrame) # Problem info from solver problem_info: dict = field(default_factory=dict) # Capacity results capacity: dict = field(default_factory=dict) storage_capacity: dict = field(default_factory=dict) # Generation totals generation_totals: dict = field(default_factory=dict) # Cost breakdown cost_breakdown: dict = field(default_factory=dict) # ---------------------------------------------------------------------------------- # Zonal-aware optional fields (PRD §6.1). # # These fields are populated only when the solved model is a zonal # ``AreaTransportationModelNetwork`` model (detected by the presence of # ``model.A``/``model.area``). On the legacy copper-plate path they keep # their default empty values so existing code paths are unaffected. # ---------------------------------------------------------------------------------- is_zonal: bool = False areas: list = field(default_factory=list) lines: list = field(default_factory=list) area_capacity: dict = field(default_factory=dict) area_storage_capacity: dict = field(default_factory=dict) area_generation_totals: dict = field(default_factory=dict) area_cost_breakdown: dict = field(default_factory=dict) area_generation_df: dict = field(default_factory=dict) area_storage_df: dict = field(default_factory=dict) area_thermal_generation_df: dict = field(default_factory=dict) area_installed_plants_df: dict = field(default_factory=dict) area_summary_df: dict = field(default_factory=dict) interregional_exchanges_df: pd.DataFrame = field(default_factory=pd.DataFrame) # ---------------------------------------------------------------------------------- # Convenience properties for backward compatibility and easy access # ---------------------------------------------------------------------------------- @property def is_optimal(self) -> bool: """Check if the solution is optimal.""" return self.termination_condition == "optimal" # Capacity accessors @property def total_cap_thermal(self) -> float: """Total installed thermal capacity (MW).""" return self.capacity.get("Thermal", 0.0) @property def total_cap_pv(self) -> float: """Total installed solar PV capacity (MW).""" return self.capacity.get("Solar PV", 0.0) @property def total_cap_wind(self) -> float: """Total installed wind capacity (MW).""" return self.capacity.get("Wind", 0.0) @property def total_cap_storage_charge(self) -> dict: """Storage charging power capacity by technology (MW).""" return self.storage_capacity.get("charge", {}) @property def total_cap_storage_discharge(self) -> dict: """Storage discharging power capacity by technology (MW).""" return self.storage_capacity.get("discharge", {}) @property def total_cap_storage_energy(self) -> dict: """Storage energy capacity by technology (MWh).""" return self.storage_capacity.get("energy", {}) # Generation accessors @property def total_gen_pv(self) -> float: """Total solar PV generation (MWh).""" return self.generation_totals.get("Solar PV", 0.0) @property def total_gen_wind(self) -> float: """Total wind generation (MWh).""" return self.generation_totals.get("Wind", 0.0) @property def total_gen_thermal(self) -> float: """Total thermal generation (MWh).""" return self.generation_totals.get("Thermal", 0.0) # ---------------------------------------------------------------------------------- # DataFrame accessors # ----------------------------------------------------------------------------------
[docs] def get_generation_dataframe(self) -> pd.DataFrame: """Get the hourly generation dispatch DataFrame. Returns ------- pd.DataFrame DataFrame with columns: Scenario, Hour, Solar PV Generation (MW), Solar PV Curtailment (MW), Wind Generation (MW), Wind Curtailment (MW), All Thermal Generation (MW), Hydro Generation (MW), Nuclear Generation (MW), Other Renewables Generation (MW), Imports (MW), Storage Charge/Discharge (MW), Exports (MW), Load (MW). """ return self.generation_df.copy()
[docs] def get_storage_dataframe(self) -> pd.DataFrame: """Get the hourly storage operation DataFrame. Returns ------- pd.DataFrame DataFrame with columns: Hour, Technology, Charging power (MW), Discharging power (MW), State of charge (MWh). """ return self.storage_df.copy()
[docs] def get_thermal_generation_dataframe(self) -> pd.DataFrame: """Get the disaggregated hourly thermal generation DataFrame. Returns ------- pd.DataFrame DataFrame with columns: Hour, and one column per thermal plant. """ return self.thermal_generation_df.copy()
[docs] def get_summary_dataframe(self) -> pd.DataFrame: """Get the summary metrics DataFrame. Returns ------- pd.DataFrame DataFrame with columns: Metric, Technology, Run, Optimal Value, Unit. """ return self.summary_df.copy()
[docs] def get_installed_plants_dataframe(self) -> pd.DataFrame: """Get the installed power plants capacity DataFrame. Returns ------- pd.DataFrame DataFrame with columns: Plant ID, Technology, Installed Capacity (MW), Max Capacity (MW), Capacity Fraction. """ return self.installed_plants_df.copy()
# ---------------------------------------------------------------------------------- # Problem info accessors # ----------------------------------------------------------------------------------
[docs] def get_problem_info(self) -> dict: """Get solver problem information. Returns ------- dict Dictionary with keys: Number of constraints, Number of variables, Number of binary variables, Number of objectives, Number of nonzeros. """ return self.problem_info.copy()
[docs] def collect_results_from_model(model, solver_result, case_name: str = "run") -> OptimizationResults: """Collect all optimization results from a solved Pyomo model. Dispatches to the legacy single-area collector or the zonal collector based on whether ``model`` exposes a top-level area set ``model.A`` and a per-area ``Block`` ``model.area`` (the convention established by :func:`sdom.optimization_main._initialize_model_zonal`). On the **legacy** path the returned :class:`OptimizationResults` matches today's schema bit-identically (locked by ``tests/test_zonal_legacy_regression.py``). On the **zonal** path the same top-level DataFrames (``generation_df``, ``storage_df``, ``thermal_generation_df``, ``installed_plants_df``) are populated as the concatenation of per-area frames with a leading ``Area`` column, and a battery of new per-area dict fields plus :attr:`OptimizationResults.interregional_exchanges_df` are filled in. See PRD §2.4 / §6.1. Parameters ---------- model : pyomo.core.base.PyomoModel.ConcreteModel The solved Pyomo model instance. solver_result : pyomo.opt.SolverResults The solver results object from ``solver.solve()``. case_name : str, optional Case identifier for the scenario column. Defaults to ``"run"``. Returns ------- OptimizationResults A dataclass containing all optimization results. Notes ----- Top-level ``summary_df`` is **left empty under the zonal path**; per-area summaries are populated in :attr:`OptimizationResults.area_summary_df` instead. A system-level zonal summary is a follow-up. CSV emission of ``interregional_exchanges_df`` is also a follow-up (commit #11). """ is_zonal = hasattr(model, "A") and hasattr(model, "area") if is_zonal: return _collect_results_zonal(model, solver_result, case_name=case_name) return _collect_results_copperplate(model, solver_result, case_name=case_name)
def _collect_results_copperplate(model, solver_result, *, case_name: str = "run") -> OptimizationResults: """Collect results from a solved single-area copper-plate model. This is the historical body of :func:`collect_results_from_model` extracted verbatim into a private helper so the new dispatcher can route here unchanged for the legacy path. **Do not modify this body without updating ``tests/test_zonal_legacy_regression.py``** — it locks the objective and per-technology values produced by this function. Parameters mirror :func:`collect_results_from_model`. """ logging.info("Collecting SDOM results...") results = OptimizationResults() # Extract solver information results.termination_condition = str(solver_result.solver.termination_condition) results.solver_status = str(solver_result.solver.status) # Extract problem info if solver_result.problem: problem = solver_result.problem[0] # Helper to extract value from Pyomo ScalarData objects def get_value(val): if hasattr(val, 'value'): return val.value return val results.problem_info = { "Number of constraints": get_value(problem.get("Number of constraints", 0)), "Number of variables": get_value(problem.get("Number of variables", 0)), "Number of binary variables": get_value(problem.get("Number of binary variables", 0)), "Number of objectives": get_value(problem.get("Number of objectives", 0)), "Number of nonzeros": get_value(problem.get("Number of nonzeros", 0)), } # Total cost results.total_cost = safe_pyomo_value(model.Obj.expr) results.gen_mix_target = float(model.GenMix_Target.value) # ---------------------------------------------------------------------------------- # Collect capacity results # ---------------------------------------------------------------------------------- logging.debug("Collecting capacity results...") # Generation capacities results.capacity = { "Thermal": safe_pyomo_value(model.thermal.total_installed_capacity), "Solar PV": safe_pyomo_value(model.pv.total_installed_capacity), "Wind": safe_pyomo_value(model.wind.total_installed_capacity), } results.capacity["All"] = ( results.capacity["Thermal"] + results.capacity["Solar PV"] + results.capacity["Wind"] ) # Storage capacities storage_tech_list = list(model.storage.j) charge_cap = {} discharge_cap = {} energy_cap = {} for tech in storage_tech_list: charge_cap[tech] = safe_pyomo_value(model.storage.Pcha[tech]) discharge_cap[tech] = safe_pyomo_value(model.storage.Pdis[tech]) energy_cap[tech] = safe_pyomo_value(model.storage.Ecap[tech]) charge_cap["All"] = sum(charge_cap[t] for t in storage_tech_list) discharge_cap["All"] = sum(discharge_cap[t] for t in storage_tech_list) energy_cap["All"] = sum(energy_cap[t] for t in storage_tech_list) results.storage_capacity = { "charge": charge_cap, "discharge": discharge_cap, "energy": energy_cap, } # ---------------------------------------------------------------------------------- # Collect generation totals # ---------------------------------------------------------------------------------- logging.debug("Collecting generation totals...") results.generation_totals = { "Thermal": safe_pyomo_value(model.thermal.total_generation), "Solar PV": safe_pyomo_value(model.pv.total_generation), "Wind": safe_pyomo_value(model.wind.total_generation), "Other renewables": safe_pyomo_value(sum(model.other_renewables.ts_parameter[h] for h in model.h)) * safe_pyomo_value(model.other_renewables.alpha), "Hydro": safe_pyomo_value(sum(model.hydro.generation[h] for h in model.h)) * safe_pyomo_value(model.hydro.alpha), "Nuclear": safe_pyomo_value(sum(model.nuclear.ts_parameter[h] for h in model.h)) * safe_pyomo_value(model.nuclear.alpha), } # Storage discharge totals storage_discharge_total = 0.0 for tech in storage_tech_list: tech_discharge = safe_pyomo_value(sum(model.storage.PD[h, tech] for h in model.h)) results.generation_totals[tech] = tech_discharge storage_discharge_total += tech_discharge results.generation_totals["All"] = ( results.generation_totals["Thermal"] + results.generation_totals["Solar PV"] + results.generation_totals["Wind"] + results.generation_totals["Other renewables"] + results.generation_totals["Hydro"] + results.generation_totals["Nuclear"] + storage_discharge_total ) # ---------------------------------------------------------------------------------- # Collect cost breakdown # ---------------------------------------------------------------------------------- logging.debug("Collecting cost breakdown...") # CAPEX capex = { "Solar PV": safe_pyomo_value(model.pv.capex_cost_expr), "Wind": safe_pyomo_value(model.wind.capex_cost_expr), "Thermal": safe_pyomo_value(model.thermal.capex_cost_expr), } capex["All"] = capex["Solar PV"] + capex["Wind"] + capex["Thermal"] # Storage CAPEX power_capex = {} energy_capex = {} for tech in storage_tech_list: power_capex[tech] = safe_pyomo_value(model.storage.power_capex_cost_expr[tech]) energy_capex[tech] = safe_pyomo_value(model.storage.energy_capex_cost_expr[tech]) power_capex["All"] = sum(power_capex[t] for t in storage_tech_list) energy_capex["All"] = sum(energy_capex[t] for t in storage_tech_list) # FOM fom = { "Thermal": safe_pyomo_value(model.thermal.fixed_om_cost_expr), "Solar PV": safe_pyomo_value(model.pv.fixed_om_cost_expr), "Wind": safe_pyomo_value(model.wind.fixed_om_cost_expr), } fom_storage_total = 0.0 for tech in storage_tech_list: fom[tech] = safe_pyomo_value( MW_TO_KW * model.storage.data["CostRatio", tech] * model.storage.data["FOM", tech] * model.storage.Pcha[tech] + MW_TO_KW * (1 - model.storage.data["CostRatio", tech]) * model.storage.data["FOM", tech] * model.storage.Pdis[tech] ) fom_storage_total += fom[tech] fom["All"] = fom["Thermal"] + fom["Solar PV"] + fom["Wind"] + fom_storage_total # VOM vom = { "Thermal": safe_pyomo_value(model.thermal.total_vom_cost_expr), } vom_storage_total = 0.0 for tech in storage_tech_list: vom[tech] = safe_pyomo_value(model.storage.data["VOM", tech] * sum(model.storage.PD[h, tech] for h in model.h)) vom_storage_total += vom[tech] vom["All"] = vom["Thermal"] + vom_storage_total # Fuel cost fuel_cost = { "Thermal": safe_pyomo_value(model.thermal.total_fuel_cost_expr), } # Imports/Exports costs imports_cost = safe_pyomo_value(model.imports.total_cost_expr) exports_revenue = safe_pyomo_value(model.exports.total_cost_expr) results.cost_breakdown = { "capex": capex, "power_capex": power_capex, "energy_capex": energy_capex, "fom": fom, "vom": vom, "fuel_cost": fuel_cost, "imports_cost": imports_cost, "exports_revenue": exports_revenue, } # ---------------------------------------------------------------------------------- # Build generation DataFrame # ---------------------------------------------------------------------------------- logging.debug("Building generation DataFrame...") gen_data = { "Scenario": [], "Hour": [], "Solar PV Generation (MW)": [], "Solar PV Curtailment (MW)": [], "Wind Generation (MW)": [], "Wind Curtailment (MW)": [], "All Thermal Generation (MW)": [], "Hydro Generation (MW)": [], "Nuclear Generation (MW)": [], "Other Renewables Generation (MW)": [], "Imports (MW)": [], "Storage Charge/Discharge (MW)": [], "Exports (MW)": [], "Load (MW)": [], "Net Load (MW)": [], } for h in model.h: solar_gen = safe_pyomo_value(model.pv.generation[h]) solar_curt = safe_pyomo_value(model.pv.curtailment[h]) wind_gen = safe_pyomo_value(model.wind.generation[h]) wind_curt = safe_pyomo_value(model.wind.curtailment[h]) thermal_gen = sum(safe_pyomo_value(model.thermal.generation[h, bu]) for bu in model.thermal.plants_set) hydro = safe_pyomo_value(model.hydro.generation[h]) nuclear = safe_pyomo_value(model.nuclear.alpha * model.nuclear.ts_parameter[h]) if hasattr(model.nuclear, "alpha") else 0 other_renewables = safe_pyomo_value(model.other_renewables.alpha * model.other_renewables.ts_parameter[h]) if hasattr(model.other_renewables, "alpha") else 0 imports = safe_pyomo_value(model.imports.variable[h]) if hasattr(model.imports, "variable") else 0 exports = safe_pyomo_value(model.exports.variable[h]) if hasattr(model.exports, "variable") else 0 load = safe_pyomo_value(model.demand.ts_parameter[h]) if hasattr(model.demand, "ts_parameter") else 0 net_load = safe_pyomo_value(model.net_load[h]) if hasattr(model, "net_load") else 0 power_to_storage = sum(safe_pyomo_value(model.storage.PC[h, j]) or 0 for j in model.storage.j) - sum(safe_pyomo_value(model.storage.PD[h, j]) or 0 for j in model.storage.j) if None not in [solar_gen, solar_curt, wind_gen, wind_curt, thermal_gen, hydro, imports, exports, load]: gen_data["Scenario"].append(case_name) gen_data["Hour"].append(h) gen_data["Solar PV Generation (MW)"].append(solar_gen) gen_data["Solar PV Curtailment (MW)"].append(solar_curt) gen_data["Wind Generation (MW)"].append(wind_gen) gen_data["Wind Curtailment (MW)"].append(wind_curt) gen_data["All Thermal Generation (MW)"].append(thermal_gen) gen_data["Hydro Generation (MW)"].append(hydro) gen_data["Nuclear Generation (MW)"].append(nuclear) gen_data["Other Renewables Generation (MW)"].append(other_renewables) gen_data["Imports (MW)"].append(imports) gen_data["Storage Charge/Discharge (MW)"].append(power_to_storage) gen_data["Exports (MW)"].append(exports) gen_data["Load (MW)"].append(load) gen_data["Net Load (MW)"].append(net_load) results.generation_df = pd.DataFrame(gen_data) # ---------------------------------------------------------------------------------- # Build storage DataFrame # ---------------------------------------------------------------------------------- logging.debug("Building storage DataFrame...") storage_data = { "Hour": [], "Technology": [], "Charging power (MW)": [], "Discharging power (MW)": [], "State of charge (MWh)": [], } for h in model.h: for j in model.storage.j: charge_power = safe_pyomo_value(model.storage.PC[h, j]) discharge_power = safe_pyomo_value(model.storage.PD[h, j]) soc = safe_pyomo_value(model.storage.SOC[h, j]) if None not in [charge_power, discharge_power, soc]: storage_data["Hour"].append(h) storage_data["Technology"].append(j) storage_data["Charging power (MW)"].append(charge_power) storage_data["Discharging power (MW)"].append(discharge_power) storage_data["State of charge (MWh)"].append(soc) results.storage_df = pd.DataFrame(storage_data) # ---------------------------------------------------------------------------------- # Build thermal generation DataFrame (disaggregated) # ---------------------------------------------------------------------------------- logging.debug("Building thermal generation DataFrame...") if len(model.thermal.plants_set) > 1: thermal_data = {"Hour": []} for plant in model.thermal.plants_set: thermal_data[str(plant)] = [] for h in model.h: thermal_data["Hour"].append(h) for plant in model.thermal.plants_set: thermal_data[str(plant)].append(safe_pyomo_value(model.thermal.generation[h, plant])) results.thermal_generation_df = pd.DataFrame(thermal_data) # ---------------------------------------------------------------------------------- # Build installed power plants DataFrame # ---------------------------------------------------------------------------------- logging.debug("Building installed power plants DataFrame...") installed_plants_data = { "Plant ID": [], "Technology": [], "Installed Capacity (MW)": [], "Max Capacity (MW)": [], "Capacity Fraction": [], } # Solar PV plants for plant in model.pv.plants_set: installed_cap = safe_pyomo_value(model.pv.plant_installed_capacity[plant]) max_cap = safe_pyomo_value(model.pv.max_capacity[plant]) cap_fraction = safe_pyomo_value(model.pv.capacity_fraction[plant]) installed_plants_data["Plant ID"].append(str(plant)) installed_plants_data["Technology"].append("Solar PV") installed_plants_data["Installed Capacity (MW)"].append(installed_cap) installed_plants_data["Max Capacity (MW)"].append(max_cap) installed_plants_data["Capacity Fraction"].append(cap_fraction) # Wind plants for plant in model.wind.plants_set: installed_cap = safe_pyomo_value(model.wind.plant_installed_capacity[plant]) max_cap = safe_pyomo_value(model.wind.max_capacity[plant]) cap_fraction = safe_pyomo_value(model.wind.capacity_fraction[plant]) installed_plants_data["Plant ID"].append(str(plant)) installed_plants_data["Technology"].append("Wind") installed_plants_data["Installed Capacity (MW)"].append(installed_cap) installed_plants_data["Max Capacity (MW)"].append(max_cap) installed_plants_data["Capacity Fraction"].append(cap_fraction) # Thermal plants for plant in model.thermal.plants_set: installed_cap = safe_pyomo_value(model.thermal.plant_installed_capacity[plant]) max_cap = safe_pyomo_value(model.thermal.data["MaxCapacity", plant]) # For thermal, capacity fraction is installed/max (there's no explicit fraction variable) cap_fraction = installed_cap / max_cap if max_cap > 0 else 0.0 installed_plants_data["Plant ID"].append(str(plant)) installed_plants_data["Technology"].append("Thermal") installed_plants_data["Installed Capacity (MW)"].append(installed_cap) installed_plants_data["Max Capacity (MW)"].append(max_cap) installed_plants_data["Capacity Fraction"].append(cap_fraction) results.installed_plants_df = pd.DataFrame(installed_plants_data) # ---------------------------------------------------------------------------------- # Build summary DataFrame # ---------------------------------------------------------------------------------- logging.debug("Building summary DataFrame...") results.summary_df = _build_summary_dataframe(model, results, storage_tech_list) return results def _build_summary_dataframe(model, results: OptimizationResults, storage_tech_list: list) -> pd.DataFrame: """Build the summary DataFrame from results. Parameters ---------- model : pyomo.core.base.PyomoModel.ConcreteModel The solved Pyomo model instance. results : OptimizationResults The results object with collected data. storage_tech_list : list List of storage technology identifiers. Returns ------- pd.DataFrame Summary DataFrame with metrics. """ from .common.utilities import concatenate_dataframes # Total cost total_cost = pd.DataFrame.from_dict( {"Total cost": [None, 1, results.total_cost, "$US"]}, orient="index", columns=["Technology", "Run", "Optimal Value", "Unit"], ) total_cost = total_cost.reset_index(names="Metric") summary_results = total_cost # Capacity summary_results = concatenate_dataframes(summary_results, results.capacity, run=1, unit="MW", metric="Capacity") # Storage capacities summary_results = concatenate_dataframes( summary_results, results.storage_capacity["charge"], run=1, unit="MW", metric="Charge power capacity" ) summary_results = concatenate_dataframes( summary_results, results.storage_capacity["discharge"], run=1, unit="MW", metric="Discharge power capacity" ) # Average power capacity avgpocap = {} for tech in storage_tech_list: avgpocap[tech] = (results.storage_capacity["charge"][tech] + results.storage_capacity["discharge"][tech]) / 2 avgpocap["All"] = sum(avgpocap[t] for t in storage_tech_list) summary_results = concatenate_dataframes(summary_results, avgpocap, run=1, unit="MW", metric="Average power capacity") # Energy capacity summary_results = concatenate_dataframes( summary_results, results.storage_capacity["energy"], run=1, unit="MWh", metric="Energy capacity" ) # Duration dis_dur = {} for tech in storage_tech_list: dis_dur[tech] = safe_pyomo_value( sqrt(model.storage.data["Eff", tech]) * model.storage.Ecap[tech] / (model.storage.Pdis[tech] + 1e-15) ) summary_results = concatenate_dataframes(summary_results, dis_dur, run=1, unit="h", metric="Duration") # Generation summary_results = concatenate_dataframes( summary_results, results.generation_totals, run=1, unit="MWh", metric="Total generation" ) # Imports/Exports totals imp_exp = {} imp_exp["Imports"] = safe_pyomo_value(sum(model.imports.variable[h] for h in model.h)) if hasattr(model.imports, "variable") else 0 imp_exp["Exports"] = safe_pyomo_value(sum(model.exports.variable[h] for h in model.h)) if hasattr(model.exports, "variable") else 0 summary_results = concatenate_dataframes(summary_results, imp_exp, run=1, unit="MWh", metric="Total Imports/Exports") # Storage discharge stodisch = {tech: results.generation_totals.get(tech, 0.0) for tech in storage_tech_list} stodisch["All"] = sum(stodisch[t] for t in storage_tech_list) summary_results = concatenate_dataframes(summary_results, stodisch, run=1, unit="MWh", metric="Storage energy discharging") # Demand dem = {"demand": sum(model.demand.ts_parameter[h] for h in model.h)} summary_results = concatenate_dataframes(summary_results, dem, run=1, unit="MWh", metric="Total demand") # Storage charging stoch = {} for tech in storage_tech_list: stoch[tech] = safe_pyomo_value(sum(model.storage.PC[h, tech] for h in model.h)) stoch["All"] = sum(stoch[t] for t in storage_tech_list) summary_results = concatenate_dataframes(summary_results, stoch, run=1, unit="MWh", metric="Storage energy charging") # CAPEX summary_results = concatenate_dataframes( summary_results, results.cost_breakdown["capex"], run=1, unit="$US", metric="CAPEX" ) # Power CAPEX summary_results = concatenate_dataframes( summary_results, results.cost_breakdown["power_capex"], run=1, unit="$US", metric="Power-CAPEX" ) # Energy CAPEX summary_results = concatenate_dataframes( summary_results, results.cost_breakdown["energy_capex"], run=1, unit="$US", metric="Energy-CAPEX" ) # Total CAPEX (storage) tcapex = {} for tech in storage_tech_list: tcapex[tech] = results.cost_breakdown["power_capex"][tech] + results.cost_breakdown["energy_capex"][tech] tcapex["All"] = sum(tcapex[t] for t in storage_tech_list) summary_results = concatenate_dataframes(summary_results, tcapex, run=1, unit="$US", metric="Total-CAPEX") # FOM summary_results = concatenate_dataframes(summary_results, results.cost_breakdown["fom"], run=1, unit="$US", metric="FOM") # VOM summary_results = concatenate_dataframes(summary_results, results.cost_breakdown["vom"], run=1, unit="$US", metric="VOM") # Fuel cost summary_results = concatenate_dataframes( summary_results, results.cost_breakdown["fuel_cost"], run=1, unit="$US", metric="Fuel-Cost" ) # OPEX opex = {} opex["Thermal"] = results.cost_breakdown["fom"]["Thermal"] + results.cost_breakdown["vom"]["Thermal"] opex["Solar PV"] = results.cost_breakdown["fom"]["Solar PV"] opex["Wind"] = results.cost_breakdown["fom"]["Wind"] opex_storage_total = 0.0 for tech in storage_tech_list: opex[tech] = results.cost_breakdown["fom"][tech] + results.cost_breakdown["vom"][tech] opex_storage_total += opex[tech] opex["All"] = opex["Thermal"] + opex["Solar PV"] + opex["Wind"] + opex_storage_total summary_results = concatenate_dataframes(summary_results, opex, run=1, unit="$US", metric="OPEX") # Imports/Exports costs cost_revenue = {"Imports Cost": results.cost_breakdown["imports_cost"]} summary_results = concatenate_dataframes(summary_results, cost_revenue, run=1, unit="$US", metric="Cost") cost_revenue = {"Exports Revenue": results.cost_breakdown["exports_revenue"]} summary_results = concatenate_dataframes(summary_results, cost_revenue, run=1, unit="$US", metric="Revenue") # Equivalent number of cycles cyc = {} for tech in storage_tech_list: cyc[tech] = safe_pyomo_value(results.generation_totals.get(tech, 0.0) / (model.storage.Ecap[tech] + 1e-15)) summary_results = concatenate_dataframes(summary_results, cyc, run=1, unit="-", metric="Equivalent number of cycles") # VRE Curtailment pv_curtailment = safe_pyomo_value(model.pv.total_curtailment) if hasattr(model.pv, "total_curtailment") else 0.0 wind_curtailment = safe_pyomo_value(model.wind.total_curtailment) if hasattr(model.wind, "total_curtailment") else 0.0 pv_generation = safe_pyomo_value(model.pv.total_generation) if hasattr(model.pv, "total_generation") else 0.0 wind_generation = safe_pyomo_value(model.wind.total_generation) if hasattr(model.wind, "total_generation") else 0.0 total_vre_curtailment_mwh = pv_curtailment + wind_curtailment total_vre_availability = pv_generation + wind_generation + pv_curtailment + wind_curtailment total_vre_curtailment_pct = (total_vre_curtailment_mwh / total_vre_availability * 100) if total_vre_availability > 0 else 0.0 vre_curt_mwh = {"Solar PV": pv_curtailment, "Wind": wind_curtailment, "All": total_vre_curtailment_mwh} summary_results = concatenate_dataframes(summary_results, vre_curt_mwh, run=1, unit="MWh", metric="Total VRE curtailment") vre_curt_pct = {"All": total_vre_curtailment_pct} summary_results = concatenate_dataframes(summary_results, vre_curt_pct, run=1, unit="%", metric="VRE curtailment percentage") return summary_results # --------------------------------------------------------------------------------- # Zonal (AreaTransportationModelNetwork) results collection # --------------------------------------------------------------------------------- def _collect_host_metrics(host, hours, *, case_name: str = "run") -> dict: """Collect dispatch and capacity metrics from a single host block. Mirrors the legacy collection logic in :func:`_collect_results_copperplate` but operates on an arbitrary ``host`` (either the top-level model or an area sub-block ``model.area[a]``). Imports / exports terms are guarded by ``hasattr`` so this helper is safe to call on a zonal area block where the imports / exports variables and cost expressions are intentionally not added. Parameters ---------- host : pyomo.environ.Block or pyomo.environ.ConcreteModel The block exposing ``pv``, ``wind``, ``thermal``, ``storage``, ``hydro``, ``demand``, ``nuclear``, ``other_renewables`` (and, on legacy hosts, ``imports``/``exports``) sub-blocks. hours : iterable of int Hours to iterate over (e.g. ``list(model.h)`` or ``list(host.h)``). case_name : str, optional Scenario label for the per-row ``"Scenario"`` column of the generation DataFrame. Defaults to ``"run"``. Returns ------- dict Bundle with keys: - ``capacity`` : dict by technology (Thermal, Solar PV, Wind, All). - ``storage_capacity`` : dict with ``charge``/``discharge``/``energy``. - ``storage_tech_list`` : list of storage tech identifiers. - ``generation_totals`` : dict by technology (incl. each storage tech). - ``cost_breakdown`` : dict (``capex``, ``power_capex``, ``energy_capex``, ``fom``, ``vom``, ``fuel_cost``, ``imports_cost``, ``exports_revenue``). - ``generation_df``, ``storage_df``, ``thermal_generation_df``, ``installed_plants_df`` : pandas DataFrames with the same schema as the legacy top-level frames. """ hours = list(hours) n_hours = len(hours) storage_tech_list = list(host.storage.j) storage_tech_arr = np.array(storage_tech_list, dtype=object) thermal_plants = list(host.thermal.plants_set) n_thermal = len(thermal_plants) thermal_plants_arr = np.array(thermal_plants, dtype=object) # One-pass extraction of solved values for hourly outputs. solar_gen = np.fromiter( (_value_or_nan(host.pv.generation[h]) for h in hours), dtype=float, count=n_hours, ) solar_curt = np.fromiter( (_value_or_nan(host.pv.curtailment[h]) for h in hours), dtype=float, count=n_hours, ) wind_gen = np.fromiter( (_value_or_nan(host.wind.generation[h]) for h in hours), dtype=float, count=n_hours, ) wind_curt = np.fromiter( (_value_or_nan(host.wind.curtailment[h]) for h in hours), dtype=float, count=n_hours, ) hydro_gen = np.fromiter( (_value_or_nan(host.hydro.generation[h]) for h in hours), dtype=float, count=n_hours, ) if n_thermal > 0: thermal_flat = np.fromiter( ( _value_or_nan(host.thermal.generation[h, bu]) for h in hours for bu in thermal_plants ), dtype=float, count=n_hours * n_thermal, ) thermal_mat = thermal_flat.reshape(n_hours, n_thermal) else: thermal_mat = np.zeros((n_hours, 0), dtype=float) thermal_total_per_hour = np.nansum(thermal_mat, axis=1) n_storage = len(storage_tech_list) if n_storage > 0: storage_pc_flat = np.fromiter( ( _value_or_nan(host.storage.PC[h, j]) for h in hours for j in storage_tech_list ), dtype=float, count=n_hours * n_storage, ) storage_pd_flat = np.fromiter( ( _value_or_nan(host.storage.PD[h, j]) for h in hours for j in storage_tech_list ), dtype=float, count=n_hours * n_storage, ) storage_soc_flat = np.fromiter( ( _value_or_nan(host.storage.SOC[h, j]) for h in hours for j in storage_tech_list ), dtype=float, count=n_hours * n_storage, ) storage_pc_mat = storage_pc_flat.reshape(n_hours, n_storage) storage_pd_mat = storage_pd_flat.reshape(n_hours, n_storage) storage_soc_mat = storage_soc_flat.reshape(n_hours, n_storage) storage_net_per_hour = np.nansum(storage_pc_mat, axis=1) - np.nansum( storage_pd_mat, axis=1 ) else: storage_pc_mat = np.zeros((n_hours, 0), dtype=float) storage_pd_mat = np.zeros((n_hours, 0), dtype=float) storage_soc_mat = np.zeros((n_hours, 0), dtype=float) storage_net_per_hour = np.zeros(n_hours, dtype=float) has_nuclear_alpha = hasattr(host.nuclear, "alpha") has_other_alpha = hasattr(host.other_renewables, "alpha") nuclear_alpha = _value_or_zero(host.nuclear.alpha) if has_nuclear_alpha else 0.0 other_alpha = ( _value_or_zero(host.other_renewables.alpha) if has_other_alpha else 0.0 ) nuclear_ts = np.fromiter( (_value_or_zero(host.nuclear.ts_parameter[h]) for h in hours), dtype=float, count=n_hours, ) other_ts = np.fromiter( (_value_or_zero(host.other_renewables.ts_parameter[h]) for h in hours), dtype=float, count=n_hours, ) nuclear_gen = nuclear_alpha * nuclear_ts other_gen = other_alpha * other_ts if hasattr(host, "imports") and hasattr(host.imports, "variable"): imports_series = np.fromiter( (_value_or_nan(host.imports.variable[h]) for h in hours), dtype=float, count=n_hours, ) else: imports_series = np.zeros(n_hours, dtype=float) if hasattr(host, "exports") and hasattr(host.exports, "variable"): exports_series = np.fromiter( (_value_or_nan(host.exports.variable[h]) for h in hours), dtype=float, count=n_hours, ) else: exports_series = np.zeros(n_hours, dtype=float) load_series = np.fromiter( (_value_or_nan(host.demand.ts_parameter[h]) for h in hours), dtype=float, count=n_hours, ) if hasattr(host, "net_load"): net_load_series = np.fromiter( (_value_or_nan(host.net_load[h]) for h in hours), dtype=float, count=n_hours, ) else: net_load_series = np.zeros(n_hours, dtype=float) # Capacities ----------------------------------------------------------- capacity = { "Thermal": safe_pyomo_value(host.thermal.total_installed_capacity), "Solar PV": safe_pyomo_value(host.pv.total_installed_capacity), "Wind": safe_pyomo_value(host.wind.total_installed_capacity), } capacity["All"] = capacity["Thermal"] + capacity["Solar PV"] + capacity["Wind"] charge_cap = {t: safe_pyomo_value(host.storage.Pcha[t]) for t in storage_tech_list} discharge_cap = {t: safe_pyomo_value(host.storage.Pdis[t]) for t in storage_tech_list} energy_cap = {t: safe_pyomo_value(host.storage.Ecap[t]) for t in storage_tech_list} charge_cap["All"] = sum(charge_cap[t] for t in storage_tech_list) discharge_cap["All"] = sum(discharge_cap[t] for t in storage_tech_list) energy_cap["All"] = sum(energy_cap[t] for t in storage_tech_list) storage_capacity = { "charge": charge_cap, "discharge": discharge_cap, "energy": energy_cap, } # Generation totals ---------------------------------------------------- generation_totals = { "Thermal": float(np.nansum(thermal_total_per_hour)), "Solar PV": float(np.nansum(solar_gen)), "Wind": float(np.nansum(wind_gen)), "Other renewables": float(np.nansum(other_gen)), "Hydro": float(np.nansum(hydro_gen) * _value_or_zero(host.hydro.alpha)), "Nuclear": float(np.nansum(nuclear_gen)), } storage_discharge_total = 0.0 for idx, tech in enumerate(storage_tech_list): tech_discharge = float(np.nansum(storage_pd_mat[:, idx])) generation_totals[tech] = tech_discharge storage_discharge_total += tech_discharge generation_totals["All"] = ( generation_totals["Thermal"] + generation_totals["Solar PV"] + generation_totals["Wind"] + generation_totals["Other renewables"] + generation_totals["Hydro"] + generation_totals["Nuclear"] + storage_discharge_total ) # Cost breakdown ------------------------------------------------------- capex = { "Solar PV": safe_pyomo_value(host.pv.capex_cost_expr), "Wind": safe_pyomo_value(host.wind.capex_cost_expr), "Thermal": safe_pyomo_value(host.thermal.capex_cost_expr), } capex["All"] = capex["Solar PV"] + capex["Wind"] + capex["Thermal"] power_capex = { t: safe_pyomo_value(host.storage.power_capex_cost_expr[t]) for t in storage_tech_list } energy_capex = { t: safe_pyomo_value(host.storage.energy_capex_cost_expr[t]) for t in storage_tech_list } power_capex["All"] = sum(power_capex[t] for t in storage_tech_list) energy_capex["All"] = sum(energy_capex[t] for t in storage_tech_list) fom = { "Thermal": safe_pyomo_value(host.thermal.fixed_om_cost_expr), "Solar PV": safe_pyomo_value(host.pv.fixed_om_cost_expr), "Wind": safe_pyomo_value(host.wind.fixed_om_cost_expr), } fom_storage_total = 0.0 for tech in storage_tech_list: fom[tech] = safe_pyomo_value( MW_TO_KW * host.storage.data["CostRatio", tech] * host.storage.data["FOM", tech] * host.storage.Pcha[tech] + MW_TO_KW * (1 - host.storage.data["CostRatio", tech]) * host.storage.data["FOM", tech] * host.storage.Pdis[tech] ) fom_storage_total += fom[tech] fom["All"] = fom["Thermal"] + fom["Solar PV"] + fom["Wind"] + fom_storage_total vom = {"Thermal": safe_pyomo_value(host.thermal.total_vom_cost_expr)} vom_storage_total = 0.0 for idx, tech in enumerate(storage_tech_list): vom[tech] = safe_pyomo_value( host.storage.data["VOM", tech] * float(np.nansum(storage_pd_mat[:, idx])) ) vom_storage_total += vom[tech] vom["All"] = vom["Thermal"] + vom_storage_total fuel_cost = {"Thermal": safe_pyomo_value(host.thermal.total_fuel_cost_expr)} imports_cost = ( safe_pyomo_value(host.imports.total_cost_expr) if hasattr(host, "imports") and hasattr(host.imports, "total_cost_expr") else 0.0 ) exports_revenue = ( safe_pyomo_value(host.exports.total_cost_expr) if hasattr(host, "exports") and hasattr(host.exports, "total_cost_expr") else 0.0 ) cost_breakdown = { "capex": capex, "power_capex": power_capex, "energy_capex": energy_capex, "fom": fom, "vom": vom, "fuel_cost": fuel_cost, "imports_cost": imports_cost, "exports_revenue": exports_revenue, } # Generation DataFrame ------------------------------------------------- valid_mask = ( ~np.isnan(solar_gen) & ~np.isnan(solar_curt) & ~np.isnan(wind_gen) & ~np.isnan(wind_curt) & ~np.isnan(thermal_total_per_hour) & ~np.isnan(hydro_gen) & ~np.isnan(imports_series) & ~np.isnan(exports_series) & ~np.isnan(load_series) ) hours_arr = np.asarray(hours) generation_df = pd.DataFrame( { "Scenario": np.full(np.count_nonzero(valid_mask), case_name, dtype=object), "Hour": hours_arr[valid_mask], "Solar PV Generation (MW)": solar_gen[valid_mask], "Solar PV Curtailment (MW)": solar_curt[valid_mask], "Wind Generation (MW)": wind_gen[valid_mask], "Wind Curtailment (MW)": wind_curt[valid_mask], "All Thermal Generation (MW)": thermal_total_per_hour[valid_mask], "Hydro Generation (MW)": hydro_gen[valid_mask], "Nuclear Generation (MW)": nuclear_gen[valid_mask], "Other Renewables Generation (MW)": other_gen[valid_mask], "Imports (MW)": imports_series[valid_mask], "Storage Charge/Discharge (MW)": storage_net_per_hour[valid_mask], "Exports (MW)": exports_series[valid_mask], "Load (MW)": load_series[valid_mask], "Net Load (MW)": net_load_series[valid_mask], } ) # Storage DataFrame ---------------------------------------------------- if n_storage > 0: hour_repeat = np.repeat(hours_arr, n_storage) tech_tile = np.tile(storage_tech_arr, n_hours) charge_flat = storage_pc_mat.reshape(-1) discharge_flat = storage_pd_mat.reshape(-1) soc_flat = storage_soc_mat.reshape(-1) storage_valid = ( ~np.isnan(charge_flat) & ~np.isnan(discharge_flat) & ~np.isnan(soc_flat) ) storage_df = pd.DataFrame( { "Hour": hour_repeat[storage_valid], "Technology": tech_tile[storage_valid], "Charging power (MW)": charge_flat[storage_valid], "Discharging power (MW)": discharge_flat[storage_valid], "State of charge (MWh)": soc_flat[storage_valid], } ) else: storage_df = pd.DataFrame( columns=[ "Hour", "Technology", "Charging power (MW)", "Discharging power (MW)", "State of charge (MWh)", ] ) # Thermal generation DataFrame (disaggregated) ------------------------- thermal_generation_df = pd.DataFrame() if n_thermal > 1: thermal_cols = [str(plant) for plant in thermal_plants_arr] thermal_generation_df = pd.DataFrame(thermal_mat, columns=thermal_cols) thermal_generation_df.insert(0, "Hour", hours_arr) # Installed plants DataFrame ------------------------------------------ installed_plants_data = { "Plant ID": [], "Technology": [], "Installed Capacity (MW)": [], "Max Capacity (MW)": [], "Capacity Fraction": [], } for plant in host.pv.plants_set: installed_cap = safe_pyomo_value(host.pv.plant_installed_capacity[plant]) max_cap = safe_pyomo_value(host.pv.max_capacity[plant]) cap_fraction = safe_pyomo_value(host.pv.capacity_fraction[plant]) installed_plants_data["Plant ID"].append(str(plant)) installed_plants_data["Technology"].append("Solar PV") installed_plants_data["Installed Capacity (MW)"].append(installed_cap) installed_plants_data["Max Capacity (MW)"].append(max_cap) installed_plants_data["Capacity Fraction"].append(cap_fraction) for plant in host.wind.plants_set: installed_cap = safe_pyomo_value(host.wind.plant_installed_capacity[plant]) max_cap = safe_pyomo_value(host.wind.max_capacity[plant]) cap_fraction = safe_pyomo_value(host.wind.capacity_fraction[plant]) installed_plants_data["Plant ID"].append(str(plant)) installed_plants_data["Technology"].append("Wind") installed_plants_data["Installed Capacity (MW)"].append(installed_cap) installed_plants_data["Max Capacity (MW)"].append(max_cap) installed_plants_data["Capacity Fraction"].append(cap_fraction) for plant in host.thermal.plants_set: installed_cap = safe_pyomo_value(host.thermal.plant_installed_capacity[plant]) max_cap = safe_pyomo_value(host.thermal.data["MaxCapacity", plant]) cap_fraction = installed_cap / max_cap if max_cap > 0 else 0.0 installed_plants_data["Plant ID"].append(str(plant)) installed_plants_data["Technology"].append("Thermal") installed_plants_data["Installed Capacity (MW)"].append(installed_cap) installed_plants_data["Max Capacity (MW)"].append(max_cap) installed_plants_data["Capacity Fraction"].append(cap_fraction) installed_plants_df = pd.DataFrame(installed_plants_data) return { "capacity": capacity, "storage_capacity": storage_capacity, "storage_tech_list": storage_tech_list, "generation_totals": generation_totals, "cost_breakdown": cost_breakdown, "generation_df": generation_df, "storage_df": storage_df, "thermal_generation_df": thermal_generation_df, "installed_plants_df": installed_plants_df, } def _merge_dict_sum(target: dict, source: dict) -> None: """Sum scalar values from ``source`` into ``target`` in-place. Numeric values are added; nested dicts are merged recursively. Used to aggregate per-area capacity / cost / generation dicts into the system-level rollups under the zonal path. Parameters ---------- target : dict Destination dict (mutated). source : dict Per-area dict whose values are added to ``target``. """ for key, val in source.items(): if isinstance(val, dict): sub = target.setdefault(key, {}) _merge_dict_sum(sub, val) elif isinstance(val, (int, float)): target[key] = target.get(key, 0.0) + float(val) else: # Non-numeric (e.g. nested list) — overwrite if absent. target.setdefault(key, val) def _build_interregional_exchanges_df(model) -> pd.DataFrame: """Build the per-line / per-hour interregional exchanges DataFrame. Implements PRD §2.4: one row per ``(line_id, hour)`` with signed flow, decomposed FT / TF flows, asymmetric capacities, and NaN-safe utilization ratios. Parameters ---------- model : pyomo.environ.ConcreteModel Solved zonal model with ``model.L``, ``model.h``, ``model.f``, ``model.line_from``, ``model.line_to``, ``model.LineCap_FT``, ``model.LineCap_TF``. Returns ------- pandas.DataFrame Empty DataFrame if ``model.L`` is empty; otherwise a DataFrame with the schema specified in PRD §2.4. """ columns = [ "line_id", "from_area", "to_area", "hour", "flow_signed_MW", "flow_FT_MW", "flow_TF_MW", "cap_FT_MW", "cap_TF_MW", "utilization_FT", "utilization_TF", ] if not hasattr(model, "L") or len(model.L) == 0: return pd.DataFrame(columns=columns) lines = list(model.L) hours = list(model.h) n_lines = len(lines) n_hours = len(hours) n_rows = n_lines * n_hours line_ids = np.repeat(np.asarray(lines, dtype=object), n_hours) hour_ids = np.tile(np.asarray(hours), n_lines) from_areas = np.repeat( np.asarray([pyo_value(model.line_from[l]) for l in lines], dtype=object), n_hours, ) to_areas = np.repeat( np.asarray([pyo_value(model.line_to[l]) for l in lines], dtype=object), n_hours, ) f_signed = np.fromiter( (_value_or_zero(model.f[l, h]) for l in lines for h in hours), dtype=float, count=n_rows, ) cap_ft = np.fromiter( (float(pyo_value(model.LineCap_FT[l, h])) for l in lines for h in hours), dtype=float, count=n_rows, ) cap_tf = np.fromiter( (float(pyo_value(model.LineCap_TF[l, h])) for l in lines for h in hours), dtype=float, count=n_rows, ) flow_ft = np.maximum(f_signed, 0.0) flow_tf = np.maximum(-f_signed, 0.0) df = pd.DataFrame( { "line_id": line_ids, "from_area": from_areas, "to_area": to_areas, "hour": hour_ids, "flow_signed_MW": f_signed, "flow_FT_MW": flow_ft, "flow_TF_MW": flow_tf, "cap_FT_MW": cap_ft, "cap_TF_MW": cap_tf, }, columns=columns[:-2], ) df["utilization_FT"] = np.where( df["cap_FT_MW"].to_numpy() > 0, df["flow_FT_MW"].to_numpy() / np.where( df["cap_FT_MW"].to_numpy() > 0, df["cap_FT_MW"].to_numpy(), 1.0 ), np.nan, ) df["utilization_TF"] = np.where( df["cap_TF_MW"].to_numpy() > 0, df["flow_TF_MW"].to_numpy() / np.where( df["cap_TF_MW"].to_numpy() > 0, df["cap_TF_MW"].to_numpy(), 1.0 ), np.nan, ) return df[columns] def _collect_results_zonal(model, solver_result, *, case_name: str = "run") -> OptimizationResults: """Collect zonal results from a solved ``AreaTransportationModelNetwork`` model. Builds per-area DataFrames and dicts via :func:`_collect_host_metrics` on each ``model.area[a]`` block, aggregates them into top-level frames with a leading ``Area`` column, sums the scalar dicts across areas for system-level rollups, and constructs the :attr:`OptimizationResults.interregional_exchanges_df` per PRD §2.4. Top-level :attr:`OptimizationResults.summary_df` is **left empty**: the legacy :func:`_build_summary_dataframe` is keyed off ``model.storage`` / ``model.demand`` / etc. which under the zonal path live on per-area sub-blocks. Per-area summaries are built and attached to :attr:`OptimizationResults.area_summary_df` instead. A system-level zonal summary is a follow-up. Parameters mirror :func:`collect_results_from_model`. """ logging.info("Collecting SDOM zonal results...") results = OptimizationResults() results.is_zonal = True results.areas = list(model.A) if hasattr(model, "L"): results.lines = [ { "line_id": l, "from_area": pyo_value(model.line_from[l]), "to_area": pyo_value(model.line_to[l]), } for l in model.L ] # Solver information --------------------------------------------------- results.termination_condition = str(solver_result.solver.termination_condition) results.solver_status = str(solver_result.solver.status) if solver_result.problem: problem = solver_result.problem[0] def _gv(val): return val.value if hasattr(val, "value") else val results.problem_info = { "Number of constraints": _gv(problem.get("Number of constraints", 0)), "Number of variables": _gv(problem.get("Number of variables", 0)), "Number of binary variables": _gv(problem.get("Number of binary variables", 0)), "Number of objectives": _gv(problem.get("Number of objectives", 0)), "Number of nonzeros": _gv(problem.get("Number of nonzeros", 0)), } # Top-level scalars ---------------------------------------------------- results.total_cost = safe_pyomo_value(model.Obj.expr) results.gen_mix_target = float(model.GenMix_Target.value) # Hours come from the top-level model (mirrors per-area host.h). hours = list(model.h) # Per-area collection -------------------------------------------------- gen_pieces = [] storage_pieces = [] thermal_pieces = [] plants_pieces = [] for a in results.areas: host = model.area[a] bundle = _collect_host_metrics(host, hours, case_name=case_name) results.area_capacity[a] = bundle["capacity"] results.area_storage_capacity[a] = bundle["storage_capacity"] results.area_generation_totals[a] = bundle["generation_totals"] results.area_cost_breakdown[a] = bundle["cost_breakdown"] gen_df_a = bundle["generation_df"].copy() if not gen_df_a.empty: gen_df_a.insert(0, "Area", a) gen_pieces.append(gen_df_a) results.area_generation_df[a] = bundle["generation_df"] storage_df_a = bundle["storage_df"].copy() if not storage_df_a.empty: insert_pos = list(storage_df_a.columns).index("Hour") + 1 storage_df_a.insert(insert_pos, "Area", a) storage_pieces.append(storage_df_a) results.area_storage_df[a] = bundle["storage_df"] thermal_df_a = bundle["thermal_generation_df"].copy() if not thermal_df_a.empty: insert_pos = list(thermal_df_a.columns).index("Hour") + 1 thermal_df_a.insert(insert_pos, "Area", a) thermal_pieces.append(thermal_df_a) results.area_thermal_generation_df[a] = bundle["thermal_generation_df"] plants_df_a = bundle["installed_plants_df"].copy() if not plants_df_a.empty: insert_pos = list(plants_df_a.columns).index("Plant ID") + 1 plants_df_a.insert(insert_pos, "Area", a) plants_pieces.append(plants_df_a) results.area_installed_plants_df[a] = bundle["installed_plants_df"] # Per-area summary frame: reuse _build_summary_dataframe with the # area block as the host. The summary helper only touches # ``storage`` / ``demand`` / ``imports`` / ``exports`` / ``pv`` / # ``wind`` / ``h`` on the host, all of which exist on the area # block (imports/exports guarded by hasattr). ``total_cost`` for # the per-area summary is the sum of the area's cost components. per_area_results = OptimizationResults( total_cost=( bundle["cost_breakdown"]["capex"]["All"] + bundle["cost_breakdown"]["power_capex"]["All"] + bundle["cost_breakdown"]["energy_capex"]["All"] + bundle["cost_breakdown"]["fom"]["All"] + bundle["cost_breakdown"]["vom"]["All"] + bundle["cost_breakdown"]["fuel_cost"]["Thermal"] + bundle["cost_breakdown"]["imports_cost"] - bundle["cost_breakdown"]["exports_revenue"] ), capacity=bundle["capacity"], storage_capacity=bundle["storage_capacity"], generation_totals=bundle["generation_totals"], cost_breakdown=bundle["cost_breakdown"], ) try: results.area_summary_df[a] = _build_summary_dataframe( host, per_area_results, bundle["storage_tech_list"] ) except Exception as exc: # pragma: no cover - defensive logging.warning( "Could not build per-area summary for area '%s': %s", a, exc ) results.area_summary_df[a] = pd.DataFrame() # Top-level concatenated DataFrames ------------------------------------ if gen_pieces: results.generation_df = pd.concat(gen_pieces, ignore_index=True) if storage_pieces: results.storage_df = pd.concat(storage_pieces, ignore_index=True) if thermal_pieces: results.thermal_generation_df = pd.concat(thermal_pieces, ignore_index=True) if plants_pieces: results.installed_plants_df = pd.concat(plants_pieces, ignore_index=True) # System-level scalar rollups (sum across areas) ----------------------- capacity_sys: dict = {} storage_capacity_sys: dict = {} generation_totals_sys: dict = {} cost_breakdown_sys: dict = {} for a in results.areas: _merge_dict_sum(capacity_sys, results.area_capacity[a]) _merge_dict_sum(storage_capacity_sys, results.area_storage_capacity[a]) _merge_dict_sum(generation_totals_sys, results.area_generation_totals[a]) _merge_dict_sum(cost_breakdown_sys, results.area_cost_breakdown[a]) results.capacity = capacity_sys results.storage_capacity = storage_capacity_sys results.generation_totals = generation_totals_sys results.cost_breakdown = cost_breakdown_sys # Interregional exchanges (PRD §2.4) ----------------------------------- results.interregional_exchanges_df = _build_interregional_exchanges_df(model) # Top-level summary intentionally left empty under the zonal path; see # the function docstring. CSV writers guard with ``if not df.empty``. logging.info( "Zonal results collected: %d areas, %d lines, summary_df left empty " "(see area_summary_df for per-area summaries).", len(results.areas), len(results.lines), ) return results