import logging
#from pympler import muppy, summary
#from pympler import muppy, summary
from pyomo.opt import SolverFactory, SolverStatus, TerminationCondition, check_available_solvers
from pyomo.util.infeasible import log_infeasible_constraints
from pyomo.environ import ConcreteModel, Objective, Block, minimize
from .initializations import initialize_sets, initialize_params
from .common.utilities import safe_pyomo_value
from .models.formulations_vre import add_vre_variables, add_vre_expressions, add_vre_balance_constraints
from .models.formulations_thermal import add_thermal_variables, add_thermal_expressions, add_thermal_constraints
from .models.formulations_resiliency import add_resiliency_variables, add_resiliency_constraints
from .models.formulations_storage import add_storage_variables, add_storage_expressions, add_storage_constraints
from .models.formulations_system import objective_rule, add_system_expressions, add_system_constraints
from .models.formulations_imports_exports import add_imports_variables, add_exports_variables, add_imports_exports_cost_expressions, add_imports_constraints, add_exports_constraints
from .models.formulations_hydro import add_hydro_variables, add_hydro_run_of_river_constraints, add_hydro_budget_constraints
from .constants import MW_TO_KW
from .io_manager import get_formulation
from .utils_performance_meassure import ModelInitProfiler
from .results import OptimizationResults, collect_results_from_model
# ---------------------------------------------------------------------------------
# Model initialization
# Safe value function for uninitialized variables/parameters
[docs]
def initialize_model(data, n_hours = 8760, with_resilience_constraints=False, model_name="SDOM_Model"):
"""
Initializes and configures a Pyomo optimization model for the SDOM framework.
This function sets up the model structure, including sets, parameters, variables,
objective function, and constraints for power system optimization. It supports
optional resilience constraints and allows customization of the model name and
simulation horizon.
Profiling is always enabled: time and memory usage are measured for each
initialization step and a summary table is printed at the end. The profiler
is attached to the model as `model.profiler` for programmatic access.
Args:
data (dict): Input data required for model initialization, including system
parameters, time series, and technology characteristics.
n_hours (int, optional): Number of hours to simulate (default is 8760,
representing a full year).
with_resilience_constraints (bool, optional): If True, adds resilience-related
constraints to the model (default is False).
model_name (str, optional): Name to assign to the Pyomo model instance
(default is "SDOM_Model").
Returns:
ConcreteModel: A fully initialized Pyomo ConcreteModel object ready for
optimization. The model includes a 'profiler' attribute containing
the ModelInitProfiler instance with detailed timing and memory data.
"""
# Initialize profiler (always enabled for time and memory measurement)
profiler = ModelInitProfiler(track_memory=True, enabled=True)
profiler.start()
logging.info("Instantiating SDOM Pyomo optimization model...")
def create_model_and_blocks():
"""Helper to create model and blocks as a single profiled step."""
m = ConcreteModel(name=model_name)
m.hydro = Block()
m.imports = Block()
m.exports = Block()
m.demand = Block()
m.nuclear = Block()
m.other_renewables = Block()
if with_resilience_constraints:
m.resiliency = Block()
m.storage = Block()
m.thermal = Block()
m.pv = Block()
m.wind = Block()
return m
model = profiler.measure_step("Create model & blocks", create_model_and_blocks)
# Initialize sets
logging.info("Initializing model sets...")
profiler.measure_step("Initialize sets", initialize_sets, model, data, n_hours=n_hours)
# Initialize parameters
logging.info("Initializing model parameters...")
profiler.measure_step("Initialize parameters", initialize_params, model, data)
# ----------------------------------- Variables -----------------------------------
logging.info("Adding variables to the model...")
# VRE variables
logging.debug("-- Adding VRE variables...")
profiler.measure_step("Add VRE variables", add_vre_variables, model)
# VRE expressions
logging.debug("-- Adding VRE expressions...")
profiler.measure_step("Add VRE expressions", add_vre_expressions, model)
# Thermal variables
logging.debug("-- Adding thermal generation variables...")
profiler.measure_step("Add thermal variables", add_thermal_variables, model)
# Thermal expressions
logging.debug("-- Adding thermal generation expressions...")
profiler.measure_step("Add thermal expressions", add_thermal_expressions, model)
# Resilience variables
if with_resilience_constraints:
logging.debug("-- Adding resiliency variables...")
profiler.measure_step("Add resiliency variables", add_resiliency_variables, model)
# Storage variables
logging.debug("--Adding storage variables...")
profiler.measure_step("Add storage variables", add_storage_variables, model)
# Storage expressions
logging.debug("--Adding storage expressions...")
profiler.measure_step("Add storage expressions", add_storage_expressions, model)
# Hydro variables
logging.debug("-- Adding hydropower generation variables...")
profiler.measure_step("Add hydro variables", add_hydro_variables, model)
# Imports variables
if get_formulation(data, component="Imports") != "NotModel":
logging.debug("-- Adding Imports variables...")
profiler.measure_step("Add imports variables", add_imports_variables, model)
# Exports variables
if get_formulation(data, component="Exports") != "NotModel":
logging.debug("-- Adding Exports variables...")
profiler.measure_step("Add exports variables", add_exports_variables, model)
# Imports/Exports cost expressions
profiler.measure_step("Add imports/exports cost expressions",
add_imports_exports_cost_expressions, model, data)
# System expressions
profiler.measure_step("Add system expressions", add_system_expressions, model)
# -------------------------------- Objective function -------------------------------
logging.info("Adding objective function to the model...")
def add_objective():
model.Obj = Objective(rule=objective_rule, sense=minimize)
profiler.measure_step("Add objective function", add_objective)
# ----------------------------------- Constraints -----------------------------------
logging.info("Adding constraints to the model...")
# System constraints
logging.debug("-- Adding system constraints...")
profiler.measure_step("Add system constraints", add_system_constraints, model, data)
# Resiliency constraints
if with_resilience_constraints:
logging.debug("-- Adding resiliency constraints...")
profiler.measure_step("Add resiliency constraints", add_resiliency_constraints, model)
# VRE balance constraints
logging.debug("-- Adding VRE balance constraints...")
profiler.measure_step("Add VRE balance constraints", add_vre_balance_constraints, model)
# Storage constraints
logging.debug("-- Adding storage constraints...")
profiler.measure_step("Add storage constraints", add_storage_constraints, model)
# Thermal constraints
logging.debug("-- Adding thermal generation constraints...")
profiler.measure_step("Add thermal constraints", add_thermal_constraints, model)
# Hydro constraints
logging.debug("-- Adding hydropower generation constraints...")
if get_formulation(data, component="hydro") == "RunOfRiverFormulation":
profiler.measure_step("Add hydro run-of-river constraints",
add_hydro_run_of_river_constraints, model, data)
else:
profiler.measure_step("Add hydro budget constraints",
add_hydro_budget_constraints, model, data)
# Imports constraints
if get_formulation(data, component="Imports") != "NotModel":
logging.debug("-- Adding Imports constraints...")
profiler.measure_step("Add imports constraints", add_imports_constraints, model, data)
# Exports constraints
if get_formulation(data, component="Exports") != "NotModel":
logging.debug("-- Adding Exports constraints...")
profiler.measure_step("Add exports constraints", add_exports_constraints, model, data)
# Finalize profiling
profiler.stop()
profiler.print_summary_table(logging.getLogger())
# Attach profiler to model for programmatic access
model.profiler = profiler
return model
# ---------------------------------------------------------------------------------
# Results collection function - DEPRECATED, use collect_results_from_model from results.py
# Kept for backward compatibility
def collect_results(model):
"""Collect results from a solved model (DEPRECATED).
This function is deprecated. Use `collect_results_from_model` from the results
module instead, which returns an OptimizationResults dataclass.
Parameters
----------
model : pyomo.core.base.PyomoModel.ConcreteModel
The Pyomo model instance containing the optimization results.
Returns
-------
dict
A dictionary containing collected results for backward compatibility.
.. deprecated::
Use :func:`sdom.results.collect_results_from_model` instead.
"""
import warnings
warnings.warn(
"collect_results is deprecated. Use collect_results_from_model from results.py instead.",
DeprecationWarning,
stacklevel=2,
)
logging.info("Collecting SDOM results...")
results = {}
results["Total_Cost"] = safe_pyomo_value(model.Obj.expr)
# Capacity and generation results
logging.debug("Collecting capacity results...")
results["Total_CapCC"] = safe_pyomo_value(model.thermal.total_installed_capacity)
results["Total_CapPV"] = safe_pyomo_value(model.pv.total_installed_capacity)
results["Total_CapWind"] = safe_pyomo_value(model.wind.total_installed_capacity)
results["Total_CapScha"] = {j: safe_pyomo_value(model.storage.Pcha[j]) for j in model.storage.j}
results["Total_CapSdis"] = {j: safe_pyomo_value(model.storage.Pdis[j]) for j in model.storage.j}
results["Total_EcapS"] = {j: safe_pyomo_value(model.storage.Ecap[j]) for j in model.storage.j}
# Generation and dispatch results
logging.debug("Collecting generation dispatch results...")
results["Total_GenPV"] = safe_pyomo_value(model.pv.total_generation)
results["Total_GenWind"] = safe_pyomo_value(model.wind.total_generation)
results["Total_GenS"] = {j: sum(safe_pyomo_value(model.storage.PD[h, j]) for h in model.h) for j in model.storage.j}
results["SolarPVGen"] = {h: safe_pyomo_value(model.pv.generation[h]) for h in model.h}
results["WindGen"] = {h: safe_pyomo_value(model.wind.generation[h]) for h in model.h}
results["AggThermalGen"] = {h: sum(safe_pyomo_value(model.thermal.generation[h, bu]) for bu in model.thermal.plants_set) for h in model.h}
results["SolarCapex"] = safe_pyomo_value(model.pv.capex_cost_expr)
results["WindCapex"] = safe_pyomo_value(model.wind.capex_cost_expr)
results["SolarFOM"] = safe_pyomo_value(model.pv.fixed_om_cost_expr)
results["WindFOM"] = safe_pyomo_value(model.wind.fixed_om_cost_expr)
logging.debug("Collecting storage results...")
storage_tech_list = list(model.storage.j)
for tech in storage_tech_list:
results[f"{tech}PowerCapex"] = model.storage.CRF[tech] * (
MW_TO_KW * model.storage.data["CostRatio", tech] * model.storage.data["P_Capex", tech] * model.storage.Pcha[tech]
+ MW_TO_KW * (1 - model.storage.data["CostRatio", tech]) * model.storage.data["P_Capex", tech] * model.storage.Pdis[tech]
)
results[f"{tech}EnergyCapex"] = model.storage.CRF[tech] * MW_TO_KW * model.storage.data["E_Capex", tech] * model.storage.Ecap[tech]
results[f"{tech}FOM"] = (
MW_TO_KW * model.storage.data["CostRatio", tech] * model.storage.data["FOM", tech] * model.storage.Pcha[tech]
+ MW_TO_KW * (1 - model.storage.data["CostRatio", tech]) * model.storage.data["FOM", tech] * model.storage.Pdis[tech]
)
results[f"{tech}VOM"] = model.storage.data["VOM", tech] * sum(model.storage.PD[h, tech] for h in model.h)
results["TotalThermalCapex"] = sum(
model.thermal.FCR[bu] * MW_TO_KW * model.thermal.CAPEX_M[bu] * model.thermal.plant_installed_capacity[bu]
for bu in model.thermal.plants_set
)
results["ThermalFuel"] = sum(
(model.thermal.fuel_price[bu] * model.thermal.heat_rate[bu]) * sum(model.thermal.generation[h, bu] for h in model.h)
for bu in model.thermal.plants_set
)
results["ThermalFOM"] = safe_pyomo_value(model.thermal.fixed_om_cost_expr)
results["ThermalVOM"] = sum(
model.thermal.VOM_M[bu] * sum(model.thermal.generation[h, bu] for h in model.h) for bu in model.thermal.plants_set
)
return results
[docs]
def get_default_solver_config_dict(solver_name="cbc", executable_path=".\\Solver\\bin\\cbc.exe"):
"""Generate a default solver configuration dictionary with standard SDOM settings.
Creates a pre-configured dictionary for solver initialization with recommended
settings for SDOM optimization problems. Includes solver options and solve
keywords for controlling optimization behavior.
Args:
solver_name (str, optional): Solver to use. Supported values:
- 'cbc': COIN-OR CBC open-source MILP solver (requires executable_path)
- 'highs': HiGHS open-source MILP solver (uses appsi interface)
- 'xpress': FICO Xpress commercial solver (uses direct interface)
Defaults to 'cbc'.
executable_path (str, optional): Path to solver executable file. Required
for CBC solver. Defaults to '.\\Solver\\bin\\cbc.exe'.
Returns:
dict: Configuration dictionary with keys:
- 'solver_name' (str): Solver identifier for SolverFactory
- 'executable_path' (str): Path to executable (CBC only)
- 'options' (dict): Solver options (mip_rel_gap, etc.)
- 'solve_keywords' (dict): Arguments for solver.solve() call (tee,
load_solutions, logfile, timelimit, etc.)
Notes:
Default MIP relative gap is 0.002 (0.2%). Log output is written to
'solver_log.txt'. Solution loading and timing reports are enabled by default.
HiGHS uses 'appsi_highs' interface for better performance.
"""
solver_dict = {
"solver_name": "appsi_" + solver_name,
"options":{
#"loglevel": 3,
"mip_rel_gap": 0.002,
#"keepfiles": True,
#"logfile": "solver_log.txt", # The filename used to store output for shell solvers
},
"solve_keywords":{
"tee": True, #If true solver output is printed both to the standard output as well as saved to the log file.
"load_solutions": True, #If True (the default), then solution values are automically transfered to Var objects on the model
"report_timing": True, #If True (the default), then timing information is reported
"logfile": "solver_log.txt", # The filename used to store output for shell solvers
#"solnfile": "./results_pyomo/solver_soln.txt", # The filename used to store the solution for shell solvers
"timelimit": None, # The number of seconds that a shell solver is run before it is terminated. (default is None)
},
}
if solver_name == "cbc":
solver_dict["solver_name"] = solver_name
solver_dict["executable_path"] = executable_path
elif solver_name == "xpress":
solver_dict["solver_name"] = "xpress_direct"
#solver_dict = {"solver_name": "xpress",}
#solver_dict["executable_path"] = executable_path
return solver_dict
# Run solver function
[docs]
def run_solver(model, solver_config_dict: dict, case_name: str = "run") -> OptimizationResults:
"""Solve the optimization model and return structured results.
Solves the given optimization model using the configured solver and collects
all results into an OptimizationResults dataclass.
Parameters
----------
model : pyomo.core.base.PyomoModel.ConcreteModel
The Pyomo optimization model to be solved. The model must have an
attribute 'GenMix_Target' that can be set.
solver_config_dict : dict
Solver configuration dictionary from get_default_solver_config_dict().
case_name : str, optional
Case identifier for labeling results. Defaults to "run".
Returns
-------
OptimizationResults
A dataclass containing all optimization results including:
- termination_condition: Solver termination status
- total_cost: Objective value
- generation_df: Hourly generation dispatch DataFrame
- storage_df: Hourly storage operation DataFrame
- summary_df: Summary metrics DataFrame
- capacity: Installed capacities by technology
- storage_capacity: Storage capacities (charge, discharge, energy)
- cost_breakdown: Detailed cost breakdown
- problem_info: Solver problem information
Raises
------
RuntimeError
If the solver is not available on the system.
Notes
-----
If the solver does not find an optimal solution, the returned
OptimizationResults will have is_optimal=False and minimal data populated.
"""
logging.info("Starting to solve SDOM model...")
solver = configure_solver(solver_config_dict)
target_value = float(model.GenMix_Target.value)
logging.info(f"Running optimization for GenMix_Target = {target_value:.2f}")
solver_result = solver.solve(
model,
tee=solver_config_dict["solve_keywords"].get("tee", True),
load_solutions=solver_config_dict["solve_keywords"].get("load_solutions", True),
timelimit=solver_config_dict["solve_keywords"].get("timelimit", None),
report_timing=solver_config_dict["solve_keywords"].get("report_timing", True),
keepfiles=solver_config_dict["solve_keywords"].get("keepfiles", True),
)
if (solver_result.solver.status == SolverStatus.ok) and (
solver_result.solver.termination_condition == TerminationCondition.optimal
):
# Collect results using the new structured approach
results = collect_results_from_model(model, solver_result, case_name)
else:
logging.warning(f"Solver did not find an optimal solution for GenMix_Target = {target_value:.2f}.")
logging.warning("Logging infeasible constraints...")
log_infeasible_constraints(model)
# Return minimal results with solver info
results = OptimizationResults(
termination_condition=str(solver_result.solver.termination_condition),
solver_status=str(solver_result.solver.status),
gen_mix_target=target_value,
)
# Still extract problem info if available
if solver_result.problem:
problem = solver_result.problem[0]
# Helper to extract value from Pyomo ScalarData objects
def get_value(val):
if hasattr(val, 'value'):
return val.value
return val
results.problem_info = {
"Number of constraints": get_value(problem.get("Number of constraints", 0)),
"Number of variables": get_value(problem.get("Number of variables", 0)),
"Number of binary variables": get_value(problem.get("Number of binary variables", 0)),
"Number of objectives": get_value(problem.get("Number of objectives", 0)),
"Number of nonzeros": get_value(problem.get("Number of nonzeros", 0)),
}
return results