import logging
import re
import pandas as pd
import os
import csv
from datetime import datetime
from pyomo.environ import sqrt
from .common.utilities import safe_pyomo_value, check_file_exists, compare_lists, concatenate_dataframes, get_dict_string_void_list_from_keys_in_list, get_complete_path
from .constants import (
INPUT_CSV_NAMES,
MW_TO_KW,
VALID_HYDRO_FORMULATIONS_TO_BUDGET_MAP,
VALID_IMPORTS_EXPORTS_FORMULATIONS_TO_DESCRIPTION_MAP,
VALID_NETWORK_FORMULATIONS_TO_DESCRIPTION_MAP,
DEFAULT_NETWORK_FORMULATION,
DEFAULT_AREA_ID,
AREA_TAG_DELIMITER,
COPPER_PLATE_NETWORK,
AREA_TRANSPORTATION_MODEL_NETWORK,
AREA_TRANSPORTATION_MODEL_NETWORK_REQUIRED_INPUTS,
RUN_OF_RIVER_FORMULATION,
IMPORTS_EXPORTS_CAPACITY_PRICE_NET_LOAD,
)
# Compiled once: matches "<entity>@<area_id>@" with no extra '@' characters.
_AREA_TAG_RE = re.compile(
rf"^(?P<entity>[^{re.escape(AREA_TAG_DELIMITER)}]*)"
rf"{re.escape(AREA_TAG_DELIMITER)}"
rf"(?P<area>[^{re.escape(AREA_TAG_DELIMITER)}]+)"
rf"{re.escape(AREA_TAG_DELIMITER)}$"
)
# Sentinel used to distinguish 'no default supplied' from 'default is None'.
_GET_FORMULATION_NO_DEFAULT = object()
# ---------------------------------------------------------------------------------
# Per-area (zonal) parsing helpers
# ---------------------------------------------------------------------------------
def _parse_area_tagged_header(header):
"""Parse a wide-CSV column header for an ``@area_id@`` tag.
The hybrid encoding for zonal data tags wide-CSV column names with
``<entity><AREA_TAG_DELIMITER><area_id><AREA_TAG_DELIMITER>``. Legacy
column names without the delimiter are accepted and reported with
``area_id == None`` so the caller can decide whether to assign
``DEFAULT_AREA_ID`` (legacy file) or raise (mixed legacy + tagged).
Parameters
----------
header : str
Column header to parse.
Returns
-------
tuple[str, str | None]
``(entity_name, area_id)`` where ``area_id`` is ``None`` when the
header contains no delimiter.
Raises
------
ValueError
If the delimiter appears in the header but the overall shape is not
``<entity>@<area_id>@`` (stray ``@``, double tag, lone delimiter).
"""
header = str(header)
if AREA_TAG_DELIMITER not in header:
return header, None
match = _AREA_TAG_RE.match(header)
if match is None:
raise ValueError(
f"Invalid area tag in column header '{header}'. Expected format "
f"'<entity>{AREA_TAG_DELIMITER}<area_id>{AREA_TAG_DELIMITER}'."
)
return match.group("entity"), match.group("area")
def _split_wide_by_area(df, *, file_label):
"""Split a wide-format DataFrame into per-area DataFrames using header tags.
The first column is treated as the time / property key and is preserved
in every per-area slice. All non-key columns must be either fully
untagged (legacy file → single ``DEFAULT_AREA_ID`` slice) or fully tagged
with ``@area_id@`` (zonal file → one slice per observed area).
Parameters
----------
df : pandas.DataFrame or None
Source DataFrame. ``None`` or empty returns ``({}, set())``.
file_label : str
Human-readable file identifier used in error messages.
Returns
-------
tuple[dict[str, pandas.DataFrame], set[str]]
``(per_area, observed_areas)``. ``per_area`` maps ``area_id`` to a
DataFrame containing the key column and the columns belonging to
that area, with the ``@area_id@`` tag stripped from headers.
``observed_areas`` is the set of explicitly tagged area ids (empty
for fully untagged legacy files).
Raises
------
ValueError
If the file mixes untagged and tagged non-key columns, or any tagged
column header has an invalid shape.
"""
if df is None or df.empty:
return {}, set()
key_col = df.columns[0]
parsed = []
untagged = 0
tagged = 0
for col in df.columns[1:]:
entity, area = _parse_area_tagged_header(col)
if area is None:
untagged += 1
else:
tagged += 1
parsed.append((col, entity, area))
if untagged and tagged:
raise ValueError(
f"{file_label} mixes legacy (untagged) and {AREA_TAG_DELIMITER}area_id"
f"{AREA_TAG_DELIMITER}-tagged columns. All non-key columns must be "
f"either all untagged or all tagged."
)
if tagged == 0:
return {DEFAULT_AREA_ID: df.copy()}, set()
# Group source columns by area, then build per-area DataFrames in one
# shot (avoids pandas PerformanceWarning on fragmented frames).
grouped: dict[str, list[tuple[str, str]]] = {}
observed: set[str] = set()
for orig, entity, area in parsed:
observed.add(area)
grouped.setdefault(area, []).append((orig, entity))
per_area: dict[str, pd.DataFrame] = {}
for area, items in grouped.items():
sources = [orig for orig, _ in items]
renames = {orig: entity for orig, entity in items}
per_area[area] = df[[key_col, *sources]].rename(columns=renames).copy()
return per_area, observed
def _split_row_by_area(df, *, id_col, file_label):
"""Split a row-oriented DataFrame by an optional ``area_id`` column.
Files such as ``CapSolar.csv`` / ``CapWind.csv`` use Encoding A: an
optional ``area_id`` column tags every row. Plant identifiers (``id_col``)
must be globally unique across areas. Legacy files (no ``area_id`` column)
are returned as a single ``DEFAULT_AREA_ID`` slice.
Parameters
----------
df : pandas.DataFrame or None
Source DataFrame. ``None`` or empty returns ``({}, set())``.
id_col : str
Name of the globally unique row identifier (e.g. ``"sc_gid"``).
file_label : str
Human-readable file identifier used in error messages.
Returns
-------
tuple[dict[str, pandas.DataFrame], set[str]]
``(per_area, observed_areas)``.
Raises
------
ValueError
If duplicate ``id_col`` values appear across all areas.
"""
if df is None or df.empty:
return {}, set()
if "area_id" not in df.columns:
return {DEFAULT_AREA_ID: df.copy()}, set()
if id_col in df.columns and df[id_col].duplicated().any():
dups = (
df.loc[df[id_col].duplicated(keep=False), id_col]
.astype(str)
.unique()
.tolist()
)
raise ValueError(
f"{file_label}: '{id_col}' values must be globally unique across "
f"areas; duplicates found: {dups}."
)
per_area: dict[str, pd.DataFrame] = {}
observed: set[str] = set()
for area, sub in df.groupby("area_id", sort=False):
area_str = str(area)
observed.add(area_str)
per_area[area_str] = sub.copy()
return per_area, observed
def _split_cf_by_plant_area(cf_df, cap_per_area, *, id_col):
"""Group capacity-factor columns by area via the cap-table plant→area map.
``CFSolar.csv`` / ``CFWind.csv`` keep a single column per plant
identifier (already globally unique). The plant→area mapping is recovered
from ``CapSolar.csv`` / ``CapWind.csv`` (already split per area).
Parameters
----------
cf_df : pandas.DataFrame or None
Wide capacity-factor DataFrame; first column is the time key and
every other column header is a plant id.
cap_per_area : dict[str, pandas.DataFrame]
Per-area capacity tables keyed by ``area_id``.
id_col : str
Plant-id column name in the cap tables (e.g. ``"sc_gid"``).
Returns
-------
dict[str, pandas.DataFrame]
Per-area capacity-factor DataFrames; columns are the time key plus
the plant ids assigned to that area.
"""
if cf_df is None or cf_df.empty:
return {}
key_col = cf_df.columns[0]
plant_to_area: dict[str, str] = {}
for area, sub in cap_per_area.items():
if id_col not in sub.columns:
continue
for plant_id in sub[id_col].astype(str):
plant_to_area[plant_id] = area
# Group columns by area first, then assemble each per-area DataFrame in
# one go (avoids pandas PerformanceWarning about fragmented frames).
cols_by_area: dict[str, list[str]] = {}
for col in cf_df.columns[1:]:
area = plant_to_area.get(str(col), DEFAULT_AREA_ID)
cols_by_area.setdefault(area, []).append(col)
per_area: dict[str, pd.DataFrame] = {}
for area, cols in cols_by_area.items():
per_area[area] = cf_df[[key_col, *cols]].copy()
return per_area
def _combine_per_area_imp_exp(cap_per_area, price_per_area):
"""Merge per-area capacity and price DataFrames (imports or exports).
Parameters
----------
cap_per_area : dict[str, pandas.DataFrame]
Per-area capacity tables (first column is the time key).
price_per_area : dict[str, pandas.DataFrame]
Per-area price tables (first column is the time key).
Returns
-------
dict[str, pandas.DataFrame]
Per-area DataFrames containing the time key plus ``cap`` / ``price``
columns merged on the time key.
"""
per_area: dict[str, pd.DataFrame] = {}
for area in set(cap_per_area) | set(price_per_area):
cap = cap_per_area.get(area)
price = price_per_area.get(area)
if cap is not None and price is not None:
key_col = cap.columns[0]
per_area[area] = cap.merge(price, on=key_col, how="outer")
else:
per_area[area] = (cap if cap is not None else price).copy()
return per_area
def _load_areas(input_data_dir):
"""Load ``areas.csv`` if present, otherwise synthesize the default area.
Parameters
----------
input_data_dir : str
Path to the SDOM input data folder.
Returns
-------
tuple[list[dict], bool]
``(areas, present_on_disk)``. ``areas`` is a list of
``{"area_id": str, "description": str}`` dicts (always at least one
entry). ``present_on_disk`` is ``True`` when ``areas.csv`` was found.
Raises
------
ValueError
If ``areas.csv`` is present but missing the required ``area_id``
column.
"""
path = get_complete_path(input_data_dir, INPUT_CSV_NAMES["areas"])
if path:
df = pd.read_csv(path)
if "area_id" not in df.columns:
raise ValueError("areas.csv must include an 'area_id' column.")
if "description" not in df.columns:
df["description"] = ""
df = df.copy()
df["area_id"] = df["area_id"].astype(str)
df["description"] = df["description"].fillna("").astype(str)
return df[["area_id", "description"]].to_dict(orient="records"), True
return (
[{"area_id": DEFAULT_AREA_ID, "description": "Default area"}],
False,
)
def _validate_observed_areas(observed, *, areas, areas_csv_present, source_label):
"""Validate that observed area_ids reference declared areas.
When ``areas.csv`` is present, every observed area must appear in
``areas`` (ERROR otherwise). When ``areas.csv`` is absent, any observed
area is accepted (the caller will synthesize the area set).
Parameters
----------
observed : set[str]
Area ids harvested from the per-device parser.
areas : list[dict]
Currently-known area list.
areas_csv_present : bool
Whether ``areas.csv`` was loaded from disk.
source_label : str
Human-readable identifier of the file that produced ``observed``,
used in error messages.
Raises
------
ValueError
If ``areas.csv`` is present and ``observed`` references an unknown
``area_id``.
"""
if not observed or not areas_csv_present:
return
declared = {a["area_id"] for a in areas}
unknown = sorted(observed - declared)
if unknown:
raise ValueError(
f"{source_label}: references unknown area_id(s) {unknown} not "
f"declared in areas.csv (declared: {sorted(declared)})."
)
def _augment_with_per_area_views(data, *, input_data_dir):
"""Populate ``per_area_*`` views and validate the area encoding.
Post-processing step run after the legacy global keys have been loaded
into ``data``. Legacy single-area folders end up with all per-area dicts
keyed by ``DEFAULT_AREA_ID`` and the existing global keys untouched.
Parameters
----------
data : dict
The data dictionary returned by ``load_data`` so far.
input_data_dir : str
Path to the SDOM input data folder (used to locate ``areas.csv``).
Returns
-------
dict
The same ``data`` dict, mutated in place, with the new keys
documented in PRD §4.4 (``areas``, ``per_area_demand``,
``per_area_pv_plants``, etc.).
Raises
------
ValueError
On any of the validation rules in PRD §4.5 (mixed columns, stray
``@``, unknown area references, duplicate plant ids, …).
"""
areas, areas_present = _load_areas(input_data_dir)
storage_df = data.get("storage_data")
wide_specs = [
("load_data", "Load_hourly.csv"),
("nuclear_data", "Nucl_hourly.csv"),
("large_hydro_data", "lahy_hourly.csv"),
("large_hydro_max", "lahy_max_hourly.csv"),
("large_hydro_min", "lahy_min_hourly.csv"),
("other_renewables_data", "otre_hourly.csv"),
("cap_imports", "Import_Cap.csv"),
("price_imports", "Import_Prices.csv"),
("cap_exports", "Export_Cap.csv"),
("price_exports", "Export_Prices.csv"),
]
# Fast path for legacy single-area folders: no areas.csv and no area encoding
# in headers or row tables. This avoids expensive split/validation work while
# preserving the exact per_area_* key surface expected downstream.
has_tagged_wide_headers = any(
df is not None
and not df.empty
and any(
AREA_TAG_DELIMITER in str(col)
for col in df.columns[1:]
)
for key, _ in wide_specs
for df in [data.get(key)]
)
has_tagged_storage_headers = (
storage_df is not None
and not storage_df.empty
and any(AREA_TAG_DELIMITER in str(col) for col in storage_df.columns)
)
has_row_area_column = any(
df is not None and not df.empty and "area_id" in df.columns
for df in [
data.get("thermal_data"),
data.get("cap_solar"),
data.get("cap_wind"),
]
)
if (
not areas_present
and not has_tagged_wide_headers
and not has_tagged_storage_headers
and not has_row_area_column
):
default_area = DEFAULT_AREA_ID
if not areas:
areas = [
{
"area_id": default_area,
"description": f"Area {default_area}",
}
]
def _single_area_view(df):
if df is None or df.empty:
return {}
return {default_area: df.copy()}
per_area_hydro = {}
for label, key in (
("LargeHydro", "large_hydro_data"),
("LargeHydro_Max", "large_hydro_max"),
("LargeHydro_Min", "large_hydro_min"),
):
sub = data.get(key)
if sub is None or sub.empty:
continue
key_col = sub.columns[0]
value_cols = [c for c in sub.columns if c != key_col]
renamed = sub.rename(
columns={
c: (label if len(value_cols) == 1 else f"{label}__{c}")
for c in value_cols
}
)
if default_area not in per_area_hydro:
per_area_hydro[default_area] = renamed
else:
per_area_hydro[default_area] = per_area_hydro[default_area].merge(
renamed, on=key_col, how="outer"
)
data["areas"] = areas
data["per_area_demand"] = _single_area_view(data.get("load_data"))
data["per_area_pv_plants"] = _single_area_view(data.get("cap_solar"))
data["per_area_wind_plants"] = _single_area_view(data.get("cap_wind"))
data["per_area_balancing_units"] = _single_area_view(data.get("thermal_data"))
data["per_area_storage"] = _single_area_view(storage_df)
data["per_area_hydro"] = per_area_hydro
data["per_area_nuclear"] = _single_area_view(data.get("nuclear_data"))
data["per_area_other_renewables"] = _single_area_view(data.get("other_renewables_data"))
data["per_area_imports"] = _combine_per_area_imp_exp(
_single_area_view(data.get("cap_imports")),
_single_area_view(data.get("price_imports")),
)
data["per_area_exports"] = _combine_per_area_imp_exp(
_single_area_view(data.get("cap_exports")),
_single_area_view(data.get("price_exports")),
)
data["per_area_capacity_factors_pv"] = _single_area_view(data.get("cf_solar"))
data["per_area_capacity_factors_wind"] = _single_area_view(data.get("cf_wind"))
return data
splits: dict[str, dict[str, pd.DataFrame]] = {}
observed_total: set[str] = set()
for key, label in wide_specs:
per_area, observed = _split_wide_by_area(data.get(key), file_label=label)
_validate_observed_areas(
observed,
areas=areas,
areas_csv_present=areas_present,
source_label=label,
)
splits[key] = per_area
observed_total |= observed
# StorageData.csv is read with index_col=0; rebuild the wide form for parsing.
if storage_df is not None and not storage_df.empty:
index_label = storage_df.index.name or "Property"
storage_wide = storage_df.reset_index().rename(
columns={storage_df.index.name or "index": index_label}
)
per_area_storage_wide, observed_storage = _split_wide_by_area(
storage_wide, file_label="StorageData.csv"
)
_validate_observed_areas(
observed_storage,
areas=areas,
areas_csv_present=areas_present,
source_label="StorageData.csv",
)
per_area_storage = {
area: sub.set_index(sub.columns[0])
for area, sub in per_area_storage_wide.items()
}
else:
per_area_storage, observed_storage = {}, set()
observed_total |= observed_storage
# Data_BalancingUnits.csv is currently row-oriented (Plant_id, MaxCapacity, ...).
# Treat it as Encoding A (optional area_id column) for backward compatibility
# with the existing fixtures; the PRD's wide-form example is aspirational and
# will be revisited when the long-format migration lands.
per_area_balancing_units, observed_bu = _split_row_by_area(
data.get("thermal_data"), id_col="Plant_id", file_label="Data_BalancingUnits.csv"
)
_validate_observed_areas(
observed_bu,
areas=areas,
areas_csv_present=areas_present,
source_label="Data_BalancingUnits.csv",
)
observed_total |= observed_bu
per_area_pv_plants, observed_pv = _split_row_by_area(
data.get("cap_solar"), id_col="sc_gid", file_label="CapSolar.csv"
)
_validate_observed_areas(
observed_pv,
areas=areas,
areas_csv_present=areas_present,
source_label="CapSolar.csv",
)
observed_total |= observed_pv
per_area_wind_plants, observed_wind = _split_row_by_area(
data.get("cap_wind"), id_col="sc_gid", file_label="CapWind.csv"
)
_validate_observed_areas(
observed_wind,
areas=areas,
areas_csv_present=areas_present,
source_label="CapWind.csv",
)
observed_total |= observed_wind
per_area_cf_pv = _split_cf_by_plant_area(
data.get("cf_solar"), per_area_pv_plants, id_col="sc_gid"
)
per_area_cf_wind = _split_cf_by_plant_area(
data.get("cf_wind"), per_area_wind_plants, id_col="sc_gid"
)
per_area_imports = _combine_per_area_imp_exp(
splits["cap_imports"], splits["price_imports"]
)
per_area_exports = _combine_per_area_imp_exp(
splits["cap_exports"], splits["price_exports"]
)
# Hydro composite per area: merge run-of-river / max / min on the time key.
hydro_components = {
"LargeHydro": splits["large_hydro_data"],
"LargeHydro_Max": splits.get("large_hydro_max", {}),
"LargeHydro_Min": splits.get("large_hydro_min", {}),
}
per_area_hydro: dict[str, pd.DataFrame] = {}
for label, src in hydro_components.items():
for area, sub in src.items():
if sub is None or sub.empty:
continue
key_col = sub.columns[0]
value_cols = [c for c in sub.columns if c != key_col]
renamed = sub.rename(
columns={
c: (label if len(value_cols) == 1 else f"{label}__{c}")
for c in value_cols
}
)
if area not in per_area_hydro:
per_area_hydro[area] = renamed
else:
per_area_hydro[area] = per_area_hydro[area].merge(
renamed, on=key_col, how="outer"
)
# Synthesize areas from observed tags when areas.csv was absent.
if not areas_present and observed_total:
areas = [
{"area_id": area, "description": f"Area {area}"}
for area in sorted(observed_total)
]
data["areas"] = areas
data["per_area_demand"] = splits["load_data"]
data["per_area_pv_plants"] = per_area_pv_plants
data["per_area_wind_plants"] = per_area_wind_plants
data["per_area_balancing_units"] = per_area_balancing_units
data["per_area_storage"] = per_area_storage
data["per_area_hydro"] = per_area_hydro
data["per_area_nuclear"] = splits["nuclear_data"]
data["per_area_other_renewables"] = splits["other_renewables_data"]
data["per_area_imports"] = per_area_imports
data["per_area_exports"] = per_area_exports
data["per_area_capacity_factors_pv"] = per_area_cf_pv
data["per_area_capacity_factors_wind"] = per_area_cf_wind
return data
# ---------------------------------------------------------------------------------
# Zonal topology (commit #5): interconnections + per-line hourly capacities
# ---------------------------------------------------------------------------------
def _load_interconnections(input_data_dir, *, areas):
"""Load the inter-area transmission topology from ``interconnections.csv``.
Reads the line definitions used by the
``AreaTransportationModelNetwork`` formulation. The file is optional;
when absent an empty list is returned so legacy and copper-plate
fixtures continue to load. When present, it is fully validated.
Parameters
----------
input_data_dir : str
Path to the SDOM input data folder.
areas : list of dict
Declared areas as returned by ``_load_areas``. Used as the
foreign-key target for ``from_area`` / ``to_area``.
Returns
-------
list of dict
One ``{"line_id": str, "from_area": str, "to_area": str}`` per row,
preserving the file order. Returns ``[]`` when the file is absent
or empty.
Raises
------
ValueError
If required columns are missing, ``line_id`` or
``(from_area, to_area)`` pairs are duplicated, a self-loop is
declared, or an area reference does not exist in ``areas``.
"""
path = get_complete_path(input_data_dir, INPUT_CSV_NAMES["interconnections"])
if not path:
return []
df = pd.read_csv(path)
required = {"line_id", "from_area", "to_area"}
missing = required - set(df.columns)
if missing:
raise ValueError(
f"interconnections.csv is missing required column(s): "
f"{sorted(missing)}."
)
if df.empty:
return []
df = df.copy()
for col in ("line_id", "from_area", "to_area"):
df[col] = df[col].astype(str)
# Duplicate line_id ----------------------------------------------------
dup_ids = df.loc[df["line_id"].duplicated(keep=False), "line_id"].unique().tolist()
if dup_ids:
raise ValueError(
f"interconnections.csv: duplicate line_id value(s) {sorted(dup_ids)}."
)
# Duplicate (from_area, to_area) pairs ---------------------------------
pair_dup_mask = df.duplicated(subset=["from_area", "to_area"], keep=False)
if pair_dup_mask.any():
dup_pairs = (
df.loc[pair_dup_mask, ["from_area", "to_area"]]
.drop_duplicates()
.apply(lambda r: (r["from_area"], r["to_area"]), axis=1)
.tolist()
)
raise ValueError(
f"interconnections.csv: duplicate (from_area, to_area) pair(s) "
f"{dup_pairs}."
)
# Self-loops -----------------------------------------------------------
self_loops = df.loc[df["from_area"] == df["to_area"], "line_id"].tolist()
if self_loops:
raise ValueError(
f"interconnections.csv: self-loops are not allowed; offending "
f"line_id(s): {self_loops}."
)
# Foreign-key check on areas ------------------------------------------
declared = {a["area_id"] for a in areas}
referenced = set(df["from_area"]).union(df["to_area"])
unknown = sorted(referenced - declared)
if unknown:
raise ValueError(
f"interconnections.csv: references unknown area_id(s) {unknown} "
f"not declared in areas.csv (declared: {sorted(declared)})."
)
return df[["line_id", "from_area", "to_area"]].to_dict(orient="records")
def _load_one_line_cap(input_data_dir, filename, *, lines, n_hours, direction):
"""Load and validate one of ``LineCap_FT.csv`` / ``LineCap_TF.csv``.
Returns an empty DataFrame when the file is absent (the caller decides
whether the absence is acceptable for the active Network formulation).
Parameters
----------
input_data_dir : str
Path to the SDOM input data folder.
filename : str
``"LineCap_FT.csv"`` or ``"LineCap_TF.csv"``.
lines : list of dict
Lines returned by ``_load_interconnections``. Used to validate the
column set of the capacity file.
n_hours : int
Expected number of rows (defaults to 8760 in the public wrapper).
direction : str
Human label (``"FT"`` or ``"TF"``) used in error messages.
Returns
-------
pandas.DataFrame
Indexed by hour, with one column per ``line_id``. Empty if the
file is absent or ``lines`` is empty.
Raises
------
ValueError
If the column set differs from ``{l["line_id"] for l in lines}``,
the row count differs from ``n_hours``, or any value is negative.
"""
path = get_complete_path(input_data_dir, filename)
if not path:
return pd.DataFrame()
df = pd.read_csv(path)
if df.shape[1] < 2:
raise ValueError(
f"{filename}: expected an hour-index column followed by one "
f"column per line_id; found {df.shape[1]} column(s)."
)
key_col = df.columns[0]
df = df.copy()
df[key_col] = pd.to_numeric(df[key_col], errors="coerce")
df = df.sort_values(key_col).reset_index(drop=True)
cap = df.set_index(key_col)
cap.columns = cap.columns.astype(str)
# Without lines we still return what we read so callers can introspect;
# but the column / non-negativity checks below are skipped because they
# reference the `lines` set.
if not lines:
return cap
expected_cols = {l["line_id"] for l in lines}
actual_cols = set(cap.columns)
if actual_cols != expected_cols:
missing = sorted(expected_cols - actual_cols)
extra = sorted(actual_cols - expected_cols)
raise ValueError(
f"{filename}: column set must match interconnections.csv line_ids. "
f"Missing: {missing}; unexpected: {extra}."
)
if len(cap) != n_hours:
raise ValueError(
f"{filename}: expected {n_hours} hourly rows, found {len(cap)}."
)
neg_mask = (cap < 0)
if neg_mask.values.any():
# Identify the first offending (row, column) pair for the message.
rows, cols = neg_mask.values.nonzero()
first_row = int(rows[0])
first_col = cap.columns[int(cols[0])]
bad_value = cap.iat[first_row, int(cols[0])]
raise ValueError(
f"{filename}: line capacities must be non-negative; first "
f"violation at row index {first_row} (hour={cap.index[first_row]}), "
f"line_id='{first_col}', value={bad_value}."
)
# Reorder columns to match the lines listing for stable downstream use.
cap = cap[[l["line_id"] for l in lines]]
return cap
def _load_line_capacities(input_data_dir, *, lines, n_hours=8760):
"""Load both directional line-capacity files (``FT`` and ``TF``).
Parameters
----------
input_data_dir : str
Path to the SDOM input data folder.
lines : list of dict
Lines returned by ``_load_interconnections``.
n_hours : int, optional
Expected number of rows in each capacity file. Default ``8760``.
Returns
-------
tuple of pandas.DataFrame
``(line_cap_ft, line_cap_tf)``. Empty DataFrames are returned when
``lines`` is empty or the corresponding file is absent. Columns are
ordered to match ``lines``.
Raises
------
ValueError
Propagated from :func:`_load_one_line_cap` (column set mismatch,
wrong row count, negative values).
"""
if not lines:
return pd.DataFrame(), pd.DataFrame()
line_cap_ft = _load_one_line_cap(
input_data_dir, INPUT_CSV_NAMES["line_cap_ft"],
lines=lines, n_hours=n_hours, direction="FT",
)
line_cap_tf = _load_one_line_cap(
input_data_dir, INPUT_CSV_NAMES["line_cap_tf"],
lines=lines, n_hours=n_hours, direction="TF",
)
return line_cap_ft, line_cap_tf
# ---------------------------------------------------------------------------------
# Aggregation fallback (commit #6): zonal data + CopperPlateNetwork
# ---------------------------------------------------------------------------------
def _sum_per_area_wide(per_area, *, single_col_name=None):
"""Aggregate a wide per-area dict into a single-column wide DataFrame.
All per-area frames must share the same first ("key") column. Non-key
columns are summed across areas. When ``single_col_name`` is provided,
the resulting non-key column is renamed accordingly; otherwise the
original (tag-stripped) column name from the first area is reused.
Parameters
----------
per_area : dict[str, pandas.DataFrame]
Per-area wide DataFrames as produced by :func:`_split_wide_by_area`.
single_col_name : str, optional
Name to assign to the aggregated non-key column.
Returns
-------
pandas.DataFrame or None
The aggregated wide DataFrame with the original key column followed
by a single value column. ``None`` when ``per_area`` is empty.
"""
if not per_area:
return None
first = next(iter(per_area.values()))
key_col = first.columns[0]
accumulator = None
for sub in per_area.values():
# Sum non-key columns within each area first (handles the rare case
# of multiple tagged columns sharing the same area / entity name).
value_cols = [c for c in sub.columns if c != key_col]
if not value_cols:
continue
per_hour = sub[value_cols].sum(axis=1).to_frame(name="_value")
per_hour[key_col] = sub[key_col].values
if accumulator is None:
accumulator = per_hour
else:
accumulator = accumulator.merge(
per_hour, on=key_col, how="outer", suffixes=("", "_b")
)
accumulator["_value"] = (
accumulator["_value"].fillna(0.0)
+ accumulator["_value_b"].fillna(0.0)
)
accumulator = accumulator.drop(columns="_value_b")
if accumulator is None:
return None
accumulator = accumulator[[key_col, "_value"]]
name = single_col_name
if name is None:
# Reuse the first non-key column name from the first area as a
# sensible default (e.g. "Load" from the "Load@A1@" header).
first_value_cols = [c for c in first.columns if c != key_col]
name = first_value_cols[0] if first_value_cols else "value"
return accumulator.rename(columns={"_value": name})
def _capacity_weighted_average_prices(cap_per_area, price_per_area, *, label):
"""Aggregate per-area prices using per-area capacities as weights.
Hour-by-hour, the aggregated price is
.. math::
\\bar{c}_h = \\frac{\\sum_a c_{a,h}\\,\\overline{cap}_{a,h}}
{\\sum_a \\overline{cap}_{a,h}}.
When the total capacity at hour ``h`` is zero the simple unweighted mean
of the per-area prices is used (and a single ``WARNING`` log is emitted
once per file). Returns ``None`` when no per-area data is available.
Parameters
----------
cap_per_area : dict[str, pandas.DataFrame]
Per-area capacity DataFrames; first column is the time key, second
column is the capacity series.
price_per_area : dict[str, pandas.DataFrame]
Per-area price DataFrames; first column is the time key, second
column is the price series.
label : str
``"imports"`` or ``"exports"`` — used only for log messages.
Returns
-------
pandas.DataFrame or None
Wide DataFrame with the time-key column followed by a single price
column whose name is taken from the first per-area price frame.
"""
if not price_per_area:
return None
first_price = next(iter(price_per_area.values()))
key_col = first_price.columns[0]
price_name = next(c for c in first_price.columns if c != key_col)
areas = list(price_per_area.keys())
weighted_num = None
weight_den = None
fallback_mean = None
for area in areas:
price_df = price_per_area[area]
price_col = next(c for c in price_df.columns if c != key_col)
price_series = price_df.set_index(key_col)[price_col]
cap_df = cap_per_area.get(area)
if cap_df is not None and not cap_df.empty:
cap_col = next(c for c in cap_df.columns if c != key_col)
cap_series = cap_df.set_index(key_col)[cap_col]
else:
cap_series = pd.Series(0.0, index=price_series.index)
# Align indices defensively.
cap_series, price_series = cap_series.align(price_series, fill_value=0.0)
contrib = cap_series * price_series
if weighted_num is None:
weighted_num = contrib
weight_den = cap_series
fallback_mean = price_series.copy()
fallback_count = pd.Series(1, index=price_series.index)
else:
weighted_num = weighted_num.add(contrib, fill_value=0.0)
weight_den = weight_den.add(cap_series, fill_value=0.0)
fallback_mean = fallback_mean.add(price_series, fill_value=0.0)
fallback_count = fallback_count.add(
pd.Series(1, index=price_series.index), fill_value=0
)
fallback_mean = fallback_mean / fallback_count
zero_mask = weight_den.fillna(0.0) == 0
aggregated = weighted_num / weight_den.replace({0.0: pd.NA})
aggregated = aggregated.where(~zero_mask, fallback_mean)
if zero_mask.any():
logging.warning(
"Aggregation fallback (%s prices): %d hour(s) had zero total "
"capacity across areas; falling back to unweighted mean for "
"those hours.",
label, int(zero_mask.sum()),
)
out = aggregated.to_frame(name=price_name).reset_index()
return out
def _strip_area_tags_from_storage(storage_df):
"""Return a copy of ``storage_data`` with ``@area_id@`` tags stripped.
Parameters
----------
storage_df : pandas.DataFrame
Storage data DataFrame indexed by property (e.g. ``P_Capex``);
columns are storage tech identifiers, possibly tagged with
``@area_id@``.
Returns
-------
pandas.DataFrame
Same DataFrame with tags removed from column headers.
Raises
------
ValueError
If two columns collapse to the same (untagged) tech identifier.
"""
if storage_df is None or storage_df.empty:
return storage_df
new_cols = []
for col in storage_df.columns:
entity, _ = _parse_area_tagged_header(str(col))
new_cols.append(entity)
duplicates = sorted(
{name for name in new_cols if new_cols.count(name) > 1}
)
if duplicates:
raise ValueError(
"StorageData.csv aggregation fallback: storage tech identifiers "
f"collide across areas after stripping {AREA_TAG_DELIMITER}area_id"
f"{AREA_TAG_DELIMITER} tags: {duplicates}. Make storage tech "
"identifiers globally unique (e.g. 'Li-Ion-A1', 'Li-Ion-A2') "
"before aggregating to CopperPlateNetwork."
)
out = storage_df.copy()
out.columns = new_cols
return out
def _aggregate_to_single_area(data):
"""Collapse a multi-area ``data`` dict into a synthetic single area.
Implements the PRD §4.6 aggregation rules used when the input folder
contains zonal data (``|areas| > 1``) but the active formulation is
``CopperPlateNetwork``. Hourly profiles are summed across areas;
import/export prices are aggregated as a capacity-weighted average;
row-oriented per-device tables shed their ``area_id`` column; the
storage table sheds its ``@area_id@`` header tags. Capacity-factor
tables are kept as-is (plant ids are already globally unique).
Mutates ``data`` in place: legacy global keys are replaced with their
aggregated single-column / single-area counterparts, ``data["areas"]``
collapses to ``[{"area_id": DEFAULT_AREA_ID}]``, and every
``per_area_*`` view is rebuilt to contain the single ``DEFAULT_AREA_ID``
key.
Parameters
----------
data : dict
Data dictionary already populated by :func:`_augment_with_per_area_views`.
Returns
-------
dict
The same ``data`` dict, mutated in place.
Raises
------
ValueError
Propagated from :func:`_strip_area_tags_from_storage` when storage
tech identifiers collide across areas.
"""
n_areas_in = len(data.get("areas", []))
logging.warning(
"Aggregating %d areas into a single 'default' area for "
"CopperPlateNetwork.",
n_areas_in,
)
# ---- Hourly profiles: sum across areas ------------------------------
sum_specs = [
("load_data", "per_area_demand", "Load"),
("nuclear_data", "per_area_nuclear", "Nuclear"),
("other_renewables_data", "per_area_other_renewables", "OtherRenewables"),
# Hydro hourly bounds / runs of river — keep the original column name
# from the first area (e.g. "LargeHydro").
("large_hydro_data", None, None),
("large_hydro_max", None, None),
("large_hydro_min", None, None),
("cap_imports", None, None),
("cap_exports", None, None),
]
for global_key, per_area_key, single_name in sum_specs:
if global_key not in data or data.get(global_key) is None:
continue
if per_area_key is not None:
per_area = data.get(per_area_key, {})
else:
# Re-derive per-area split from the current global wide DataFrame.
per_area, _ = _split_wide_by_area(
data[global_key], file_label=global_key
)
aggregated = _sum_per_area_wide(per_area, single_col_name=single_name)
if aggregated is not None:
data[global_key] = aggregated
# ---- Import / Export prices: capacity-weighted average --------------
for label, cap_key, price_key in (
("imports", "cap_imports", "price_imports"),
("exports", "cap_exports", "price_exports"),
):
if data.get(price_key) is None:
continue
cap_per_area, _ = _split_wide_by_area(
data.get(cap_key), file_label=cap_key
) if data.get(cap_key) is not None else ({}, set())
price_per_area, _ = _split_wide_by_area(
data[price_key], file_label=price_key
)
aggregated = _capacity_weighted_average_prices(
cap_per_area, price_per_area, label=label
)
if aggregated is not None:
data[price_key] = aggregated
# ---- Row-oriented per-device tables: drop area_id column ------------
for global_key in ("cap_solar", "cap_wind", "thermal_data"):
df = data.get(global_key)
if df is not None and "area_id" in df.columns:
data[global_key] = df.drop(columns=["area_id"]).copy()
# ---- Storage table: strip @area_id@ tags from column headers --------
storage_df = data.get("storage_data")
if storage_df is not None:
data["storage_data"] = _strip_area_tags_from_storage(storage_df)
# Recompute derived storage tech lists from the newly tag-stripped frame.
data["STORAGE_SET_J_TECHS"] = (
data["storage_data"].columns.astype(str).tolist()
)
if "Coupled" in data["storage_data"].index:
data["STORAGE_SET_B_TECHS"] = (
data["storage_data"]
.columns[data["storage_data"].loc["Coupled"] == 1]
.astype(str)
.tolist()
)
# ---- CFSolar / CFWind unchanged (plant ids globally unique) ---------
# ---- Collapse areas + rebuild per_area_* dicts to single key --------
data["areas"] = [
{"area_id": DEFAULT_AREA_ID, "description": "Aggregated default area"}
]
data["per_area_demand"] = {DEFAULT_AREA_ID: data.get("load_data")}
data["per_area_nuclear"] = {DEFAULT_AREA_ID: data.get("nuclear_data")}
data["per_area_other_renewables"] = {
DEFAULT_AREA_ID: data.get("other_renewables_data")
}
# Hydro composite — merge run-of-river / max / min on the time key.
hydro_components = [
("LargeHydro", data.get("large_hydro_data")),
("LargeHydro_Max", data.get("large_hydro_max")),
("LargeHydro_Min", data.get("large_hydro_min")),
]
hydro_merged = None
for label, sub in hydro_components:
if sub is None or sub.empty:
continue
key_col = sub.columns[0]
value_cols = [c for c in sub.columns if c != key_col]
renamed = sub.rename(columns={c: label for c in value_cols})
hydro_merged = (
renamed
if hydro_merged is None
else hydro_merged.merge(renamed, on=key_col, how="outer")
)
data["per_area_hydro"] = (
{DEFAULT_AREA_ID: hydro_merged} if hydro_merged is not None else {}
)
# Imports / exports composite — merge cap + price on the time key.
def _merge_cap_price(cap_df, price_df):
if cap_df is None and price_df is None:
return None
if cap_df is None:
return price_df.copy()
if price_df is None:
return cap_df.copy()
return cap_df.merge(price_df, on=cap_df.columns[0], how="outer")
imp_merged = _merge_cap_price(data.get("cap_imports"), data.get("price_imports"))
exp_merged = _merge_cap_price(data.get("cap_exports"), data.get("price_exports"))
data["per_area_imports"] = (
{DEFAULT_AREA_ID: imp_merged} if imp_merged is not None else {}
)
data["per_area_exports"] = (
{DEFAULT_AREA_ID: exp_merged} if exp_merged is not None else {}
)
# Per-device tables.
if data.get("cap_solar") is not None:
data["per_area_pv_plants"] = {DEFAULT_AREA_ID: data["cap_solar"]}
if data.get("cap_wind") is not None:
data["per_area_wind_plants"] = {DEFAULT_AREA_ID: data["cap_wind"]}
if data.get("thermal_data") is not None:
data["per_area_balancing_units"] = {DEFAULT_AREA_ID: data["thermal_data"]}
if data.get("storage_data") is not None:
data["per_area_storage"] = {DEFAULT_AREA_ID: data["storage_data"]}
# Capacity factors: a single area now holds every plant column.
if data.get("cf_solar") is not None:
data["per_area_capacity_factors_pv"] = {DEFAULT_AREA_ID: data["cf_solar"]}
if data.get("cf_wind") is not None:
data["per_area_capacity_factors_wind"] = {DEFAULT_AREA_ID: data["cf_wind"]}
return data
[docs]
def load_data( input_data_dir:str = '.\\Data\\' ):
"""Load all required SDOM input datasets from CSV files in the specified directory.
Reads and validates all input CSV files needed for SDOM optimization including
VRE data, fixed generation profiles, storage characteristics, thermal units,
scalars, and formulation specifications. Performs data consistency checks and
filters datasets based on completeness.
Args:
input_data_dir (str, optional): Path to directory containing input CSV files.
Defaults to '.\\Data\\'. Should contain all required files defined in
constants.INPUT_CSV_NAMES.
Returns:
dict: Dictionary containing loaded and processed data with keys:
- 'formulations' (pd.DataFrame): Component formulation specifications
- 'solar_plants', 'wind_plants' (list): Plant IDs for VRE technologies
- 'cf_solar', 'cf_wind' (pd.DataFrame): Hourly capacity factors
- 'cap_solar', 'cap_wind' (pd.DataFrame): Plant CAPEX and capacity data
- 'load_data' (pd.DataFrame): Hourly electricity demand
- 'nuclear_data' (pd.DataFrame): Hourly nuclear generation
- 'large_hydro_data' (pd.DataFrame): Hourly hydropower generation/availability
- 'large_hydro_max', 'large_hydro_min' (pd.DataFrame): Hydro bounds
(if budget formulation)
- 'other_renewables_data' (pd.DataFrame): Hourly other renewable generation
- 'storage_data' (pd.DataFrame): Storage technology characteristics
- 'STORAGE_SET_J_TECHS', 'STORAGE_SET_B_TECHS' (list): Storage tech identifiers
- 'thermal_data' (pd.DataFrame): Thermal balancing unit parameters
- 'scalars' (pd.DataFrame): System-level scalar parameters
- 'import_cap', 'export_cap', 'import_prices', 'export_prices' (pd.DataFrame):
Trade data (if import/export formulation active)
- 'complete_solar_data', 'complete_wind_data' (pd.DataFrame): Filtered VRE data
- 'filtered_cap_solar_dict', 'filtered_cap_wind_dict' (dict): Capacity mappings
Raises:
FileNotFoundError: If any required input file is missing from input_data_dir.
ValueError: If formulation specifications are invalid.
Notes:
- All numeric data rounded to 5 decimal places for consistency
- VRE plant lists filtered to include only plants with complete data
- Conditionally loads hydro bounds and import/export data based on formulations
- Uses flexible filename matching via normalize_string() for CSV files
- Logs detailed progress at debug level for troubleshooting data loading issues
"""
logging.info(
"[%s] Starting data load step.",
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
logging.info("Loading SDOM input data...")
logging.debug("- Trying to load formulations data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["formulations"], "CSV file to specify the formulations for different components")
if input_file_path != "":
formulations = pd.read_csv( input_file_path )
logging.debug("- Trying to load VRE data...")
# THE SET CSV FILES WERE REMOVED
# input_file_path = os.path.join(input_data_dir, INPUT_CSV_NAMES["solar_plants"])
# if check_file_exists(input_file_path, "solar plants ids"):
# solar_plants = pd.read_csv( input_file_path, header=None )[0].tolist()
# input_file_path = os.path.join(input_data_dir, INPUT_CSV_NAMES["wind_plants"])
# if check_file_exists(input_file_path, "wind plants ids"):
# wind_plants = pd.read_csv( input_file_path, header=None )[0].tolist()
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["cf_solar"], "Capacity factors for pv solar")
if input_file_path != "":
cf_solar = pd.read_csv(input_file_path).round(5)
cf_solar.columns = cf_solar.columns.astype(str)
solar_plants = cf_solar.columns[1:].tolist()
logging.debug( f"-- It were loaded a total of {len( solar_plants )} solar plants profiles." )
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["cf_wind"], "Capacity factors for wind")
if input_file_path != "":
cf_wind = pd.read_csv(input_file_path).round(5)
cf_wind.columns = cf_wind.columns.astype(str)
wind_plants = cf_wind.columns[1:].tolist()
logging.debug( f"-- It were loaded a total of {len( wind_plants )} wind plants profiles." )
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["cap_solar"], "Capex information for solar")
if input_file_path != "":
cap_solar = pd.read_csv(input_file_path).round(5)
cap_solar['sc_gid'] = cap_solar['sc_gid'].astype(str)
solar_plants_capex = cap_solar['sc_gid'].tolist()
compare_lists(solar_plants, solar_plants_capex, text_comp="solar plants", list_names=["CF", "Capex"])
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["cap_wind"], "Capex information for wind")
if input_file_path != "":
cap_wind = pd.read_csv(input_file_path).round(5)
cap_wind['sc_gid'] = cap_wind['sc_gid'].astype(str)
wind_plants_capex = cap_wind['sc_gid'].tolist()
compare_lists(wind_plants, wind_plants_capex, text_comp="wind plants", list_names=["CF", "Capex"])
logging.debug("- Trying to load demand data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["load_data"], "load data")
if input_file_path != "":
load_data = pd.read_csv(input_file_path).round(5)
logging.debug("- Trying to load nuclear data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["nuclear_data"], "nuclear data")
if input_file_path != "":
nuclear_data = pd.read_csv(input_file_path).round(5)
logging.debug("- Trying to load large hydro data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["large_hydro_data"], "large hydro data")
if input_file_path != "":
large_hydro_data = pd.read_csv(input_file_path).round(5)
logging.debug("- Trying to load other renewables data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["other_renewables_data"], "other renewables data")
if input_file_path != "":
other_renewables_data = pd.read_csv(input_file_path).round(5)
logging.debug("- Trying to load storage data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["storage_data"], "Storage data")
if input_file_path != "":
storage_data = pd.read_csv(input_file_path, index_col=0).round(5)
storage_set_j_techs = storage_data.columns[0:].astype(str).tolist()
storage_set_b_techs = storage_data.columns[ storage_data.loc["Coupled"] == 1 ].astype( str ).tolist()
logging.debug("- Trying to load thermal generation data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["thermal_data"], "thermal data")
if input_file_path != "":
thermal_data = pd.read_csv(input_file_path).round(5)
logging.debug("- Trying to load scalars data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["scalars"], "scalars")
if input_file_path != "":
scalars = pd.read_csv( input_file_path, index_col="Parameter" )
#os.chdir('../')
data_dict = {
"formulations": formulations,
"solar_plants": solar_plants,
"wind_plants": wind_plants,
"load_data": load_data,
"nuclear_data": nuclear_data,
"large_hydro_data": large_hydro_data,
"other_renewables_data": other_renewables_data,
"cf_solar": cf_solar,
"cf_wind": cf_wind,
"cap_solar": cap_solar,
"cap_wind": cap_wind,
"storage_data": storage_data,
"STORAGE_SET_J_TECHS": storage_set_j_techs,
"STORAGE_SET_B_TECHS": storage_set_b_techs,
"thermal_data": thermal_data,
"scalars": scalars,
}
# --- Network (zonal) formulation: backward-compatible default ---
network_formulation = get_network_formulation(data_dict)
check_formulation(network_formulation, VALID_NETWORK_FORMULATIONS_TO_DESCRIPTION_MAP.keys())
hydro_formulation = get_formulation(data_dict, component='hydro')
check_formulation( hydro_formulation, VALID_HYDRO_FORMULATIONS_TO_BUDGET_MAP.keys() )
if not (hydro_formulation == RUN_OF_RIVER_FORMULATION):
logging.debug("- Hydro was set to MonthlyBudgetFormulation. Trying to load large hydro max/min data...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["large_hydro_max"], "large hydro Maximum capacity data")
if input_file_path != "":
large_hydro_max = pd.read_csv(input_file_path).round(5)
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["large_hydro_min"], "large hydro Minimum capacity data")
if input_file_path != "":
large_hydro_min = pd.read_csv(input_file_path).round(5)
data_dict["large_hydro_max"] = large_hydro_max
data_dict["large_hydro_min"] = large_hydro_min
logging.debug("- Trying to load imports data...")
imports_formulation = get_formulation(data_dict, component='imports')
check_formulation( imports_formulation, VALID_IMPORTS_EXPORTS_FORMULATIONS_TO_DESCRIPTION_MAP.keys() )
if (imports_formulation == IMPORTS_EXPORTS_CAPACITY_PRICE_NET_LOAD):
logging.debug("- Imports was set to CapacityPriceNetLoadFormulation. Trying to load capacity and price...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["cap_imports"], "Imports hourly upper bound capacity data")
if input_file_path != "":
cap_imports = pd.read_csv(input_file_path).round(5)
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["price_imports"], "Imports hourly price data")
if input_file_path != "":
price_imports = pd.read_csv(input_file_path).round(5)
data_dict["cap_imports"] = cap_imports
data_dict["price_imports"] = price_imports
logging.debug("- Trying to load exports data...")
exports_formulation = get_formulation(data_dict, component='exports')
check_formulation( exports_formulation, VALID_IMPORTS_EXPORTS_FORMULATIONS_TO_DESCRIPTION_MAP.keys() )
if (exports_formulation == IMPORTS_EXPORTS_CAPACITY_PRICE_NET_LOAD):
logging.debug("- Exports was set to CapacityPriceNetLoadFormulation. Trying to load capacity and price...")
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["cap_exports"], "Exports hourly upper bound capacity data")
if input_file_path != "":
cap_exports = pd.read_csv(input_file_path).round(5)
input_file_path = check_file_exists(input_data_dir, INPUT_CSV_NAMES["price_exports"], "Exports hourly price data")
if input_file_path != "":
price_exports = pd.read_csv(input_file_path).round(5)
data_dict["cap_exports"] = cap_exports
data_dict["price_exports"] = price_exports
_augment_with_per_area_views(data_dict, input_data_dir=input_data_dir)
# ------------------------------------------------------------------
# Aggregation fallback (commit #6): zonal data + CopperPlateNetwork.
# When the user ships a multi-area input folder but selects (or
# defaults to) ``CopperPlateNetwork``, collapse every per-area entity
# into a single synthetic ``default`` area following PRD §4.6.
# Transmission CSVs are dropped with a WARNING (no transmission in
# copper-plate).
# ------------------------------------------------------------------
aggregated = False
if (
get_network_formulation(data_dict) == COPPER_PLATE_NETWORK
and len(data_dict.get("areas", [])) > 1
):
_aggregate_to_single_area(data_dict)
aggregated = True
if any(
get_complete_path(input_data_dir, name)
for name in AREA_TRANSPORTATION_MODEL_NETWORK_REQUIRED_INPUTS
):
logging.warning(
"Aggregation fallback: dropping interregional transmission "
"files (%s) because Network=%s has no transmission.",
", ".join(AREA_TRANSPORTATION_MODEL_NETWORK_REQUIRED_INPUTS),
COPPER_PLATE_NETWORK,
)
# ------------------------------------------------------------------
# Zonal topology + line capacities (commit #5).
# Skipped entirely when the aggregation fallback above collapsed the
# input to a single synthetic area. Otherwise parsed when present so
# legacy folders pick up empty defaults without raising; the
# AreaTransportationModelNetwork formulation *requires* all three CSVs.
# ------------------------------------------------------------------
if aggregated:
lines = []
line_cap_ft, line_cap_tf = pd.DataFrame(), pd.DataFrame()
else:
lines = _load_interconnections(input_data_dir, areas=data_dict["areas"])
line_cap_ft, line_cap_tf = _load_line_capacities(
input_data_dir, lines=lines
)
if get_network_formulation(data_dict) == AREA_TRANSPORTATION_MODEL_NETWORK:
missing_files = [
name
for name in AREA_TRANSPORTATION_MODEL_NETWORK_REQUIRED_INPUTS
if not get_complete_path(input_data_dir, name)
]
if missing_files:
raise ValueError(
f"Network={AREA_TRANSPORTATION_MODEL_NETWORK} requires the "
f"following file(s) to be present: {missing_files}."
)
data_dict["lines"] = lines
data_dict["line_cap_ft"] = line_cap_ft
data_dict["line_cap_tf"] = line_cap_tf
return data_dict
# ---------------------------------------------------------------------------------
# Export results to CSV files
# ---------------------------------------------------------------------------------
[docs]
def export_results(results, case: str, output_dir: str = "./results_pyomo/"):
"""Export optimization results to CSV files.
Writes the results from an OptimizationResults object to CSV files in the
specified directory. Creates output directory if it doesn't exist.
Parameters
----------
results : OptimizationResults
The optimization results object from run_solver().
case : str or int
Case identifier used in output filenames to distinguish between
different scenarios or runs.
output_dir : str, optional
Directory path for output files. Defaults to './results_pyomo/'.
Directory will be created if it doesn't exist.
Returns
-------
None
Output Files
------------
OutputGeneration_{case}.csv
Hourly dispatch results containing: Scenario, Hour, Solar PV/Wind
generation and curtailment, Thermal, hydro, nuclear, other renewables
generation, Storage net charge/discharge, imports, exports, Load.
OutputStorage_{case}.csv
Hourly storage operation for each technology: Hour, Technology,
Charging power (MW), Discharging power (MW), State of charge (MWh).
OutputSummary_{case}.csv
Summary metrics including: Total costs, Installed capacities by
technology, Total generation by technology, Demand statistics,
Cost breakdowns (VRE, storage, thermal CAPEX/FOM/VOM).
OutputThermalGeneration_{case}.csv
Disaggregated hourly thermal generation by plant (only if more than
one thermal plant exists).
OutputInterregionalExchanges_{case}.csv
Hourly per-line interregional exchanges (PRD §2.4 schema). Emitted
only for zonal runs (``Network = AreaTransportationModelNetwork``)
when ``results.interregional_exchanges_df`` is non-empty. Columns:
``line_id, from_area, to_area, hour, flow_signed_MW, flow_FT_MW,
flow_TF_MW, cap_FT_MW, cap_TF_MW, utilization_FT, utilization_TF``.
Row count is ``|L| * n_hours``.
Notes
-----
This function accepts either an OptimizationResults dataclass (new API)
or the legacy tuple return from run_solver (deprecated).
"""
# Import here to avoid circular imports
from .results import OptimizationResults
logging.info("Exporting SDOM results...")
os.makedirs(output_dir, exist_ok=True)
# Handle both new OptimizationResults and legacy model input
if isinstance(results, OptimizationResults):
_export_from_results_object(results, case, output_dir)
else:
# Legacy support: assume it's a model object
logging.warning(
"export_results() received a model object instead of OptimizationResults. "
"This usage is deprecated. Please use the OptimizationResults from run_solver()."
)
_export_from_model_legacy(results, case, output_dir)
def _export_from_results_object(results, case: str, output_dir: str):
"""Export results from OptimizationResults object to CSV files.
Parameters
----------
results : OptimizationResults
The optimization results object.
case : str
Case identifier for filenames.
output_dir : str
Output directory path.
"""
logging.info("Exporting csv files containing SDOM results...")
# Save generation results to CSV
logging.debug("-- Saving generation results to CSV...")
gen_df = results.get_generation_dataframe()
if not gen_df.empty:
# Update scenario column with the case name
gen_df["Scenario"] = case
gen_df.to_csv(os.path.join(output_dir, f"OutputGeneration_{case}.csv"), index=False)
# Save storage results to CSV
logging.debug("-- Saving storage results to CSV...")
storage_df = results.get_storage_dataframe()
if not storage_df.empty:
storage_df.to_csv(os.path.join(output_dir, f"OutputStorage_{case}.csv"), index=False)
# Save summary results to CSV
logging.debug("-- Saving summary results to CSV...")
summary_df = results.get_summary_dataframe()
if not summary_df.empty:
summary_df.to_csv(os.path.join(output_dir, f"OutputSummary_{case}.csv"), index=False)
# Save thermal generation results to CSV (if available)
logging.debug("-- Saving disaggregated thermal generation results to CSV...")
thermal_df = results.get_thermal_generation_dataframe()
if not thermal_df.empty:
thermal_df.to_csv(os.path.join(output_dir, f"OutputThermalGeneration_{case}.csv"), index=False)
# Save installed power plants results to CSV
logging.debug("-- Saving installed power plants results to CSV...")
installed_plants_df = results.get_installed_plants_dataframe()
if not installed_plants_df.empty:
installed_plants_df.to_csv(os.path.join(output_dir, f"OutputInstalledPowerPlants_{case}.csv"), index=False)
# Save interregional exchanges results to CSV (zonal / AreaTransportationModelNetwork only)
logging.debug("-- Saving interregional exchanges results to CSV...")
interregional_df = getattr(results, "interregional_exchanges_df", None)
if interregional_df is not None and not interregional_df.empty:
interregional_df.to_csv(
os.path.join(output_dir, f"OutputInterregionalExchanges_{case}.csv"),
index=False,
)
def _export_from_model_legacy(model, case, output_dir="./results_pyomo/"):
"""Legacy export function that works directly with a model object.
This is the original implementation preserved for backward compatibility.
Parameters
----------
model : pyomo.core.base.PyomoModel.ConcreteModel
Solved Pyomo model instance.
case : str
Case identifier for filenames.
output_dir : str
Output directory path.
"""
logging.info("Exporting SDOM results (legacy mode)...")
os.makedirs(output_dir, exist_ok=True)
# Initialize results dictionaries column: [values]
logging.debug("--Initializing results dictionaries...")
gen_results = {
"Scenario": [],
"Hour": [],
"Solar PV Generation (MW)": [],
"Solar PV Curtailment (MW)": [],
"Wind Generation (MW)": [],
"Wind Curtailment (MW)": [],
"All Thermal Generation (MW)": [],
"Hydro Generation (MW)": [],
"Nuclear Generation (MW)": [],
"Other Renewables Generation (MW)": [],
"Imports (MW)": [],
"Storage Charge/Discharge (MW)": [],
"Exports (MW)": [],
"Load (MW)": [],
"Net Load (MW)": [],
}
storage_results = {
"Hour": [],
"Technology": [],
"Charging power (MW)": [],
"Discharging power (MW)": [],
"State of charge (MWh)": [],
}
# Extract generation results
logging.debug("--Extracting generation results...")
for h in model.h:
solar_gen = safe_pyomo_value(model.pv.generation[h])
solar_curt = safe_pyomo_value(model.pv.curtailment[h])
wind_gen = safe_pyomo_value(model.wind.generation[h])
wind_curt = safe_pyomo_value(model.wind.curtailment[h])
gas_cc_gen = sum(safe_pyomo_value(model.thermal.generation[h, bu]) for bu in model.thermal.plants_set)
hydro = safe_pyomo_value(model.hydro.generation[h])
nuclear = safe_pyomo_value(model.nuclear.alpha * model.nuclear.ts_parameter[h]) if hasattr(model.nuclear, "alpha") else 0
other_renewables = safe_pyomo_value(model.other_renewables.alpha * model.other_renewables.ts_parameter[h]) if hasattr(model.other_renewables, "alpha") else 0
imports = safe_pyomo_value(model.imports.variable[h]) if hasattr(model.imports, "variable") else 0
exports = safe_pyomo_value(model.exports.variable[h]) if hasattr(model.exports, "variable") else 0
load = safe_pyomo_value(model.demand.ts_parameter[h]) if hasattr(model.demand, "ts_parameter") else 0
net_load = safe_pyomo_value(model.net_load[h]) if hasattr(model, "net_load") else 0
# Only append results if all values are valid (not None)
if None not in [solar_gen, solar_curt, wind_gen, wind_curt, gas_cc_gen, hydro, imports, exports, load]:
gen_results["Hour"].append(h)
gen_results["Solar PV Generation (MW)"].append(solar_gen)
gen_results["Solar PV Curtailment (MW)"].append(solar_curt)
gen_results["Wind Generation (MW)"].append(wind_gen)
gen_results["Wind Curtailment (MW)"].append(wind_curt)
gen_results["All Thermal Generation (MW)"].append(gas_cc_gen)
gen_results["Hydro Generation (MW)"].append(hydro)
gen_results["Nuclear Generation (MW)"].append(nuclear)
gen_results["Other Renewables Generation (MW)"].append(other_renewables)
gen_results["Imports (MW)"].append(imports)
power_to_storage = sum(safe_pyomo_value(model.storage.PC[h, j]) or 0 for j in model.storage.j) - sum(safe_pyomo_value(model.storage.PD[h, j]) or 0 for j in model.storage.j)
gen_results["Storage Charge/Discharge (MW)"].append(power_to_storage)
gen_results["Exports (MW)"].append(exports)
gen_results["Load (MW)"].append(load)
gen_results["Net Load (MW)"].append(net_load)
gen_results["Scenario"].append(case)
# Extract storage results
logging.debug("--Extracting storage results...")
for h in model.h:
for j in model.storage.j:
charge_power = safe_pyomo_value(model.storage.PC[h, j])
discharge_power = safe_pyomo_value(model.storage.PD[h, j])
soc = safe_pyomo_value(model.storage.SOC[h, j])
if None not in [charge_power, discharge_power, soc]:
storage_results["Hour"].append(h)
storage_results["Technology"].append(j)
storage_results["Charging power (MW)"].append(charge_power)
storage_results["Discharging power (MW)"].append(discharge_power)
storage_results["State of charge (MWh)"].append(soc)
# Summary results (total capacities and costs)
## Total cost
logging.debug("--Extracting summary results...")
total_cost = pd.DataFrame.from_dict(
{"Total cost": [None, 1, safe_pyomo_value(model.Obj()), "$US"]},
orient="index",
columns=["Technology", "Run", "Optimal Value", "Unit"],
)
total_cost = total_cost.reset_index(names="Metric")
summary_results = total_cost
## Total capacity
cap = {}
cap["Thermal"] = sum(safe_pyomo_value(model.thermal.plant_installed_capacity[bu]) for bu in model.thermal.plants_set)
cap["Solar PV"] = safe_pyomo_value(model.pv.total_installed_capacity)
cap["Wind"] = safe_pyomo_value(model.wind.total_installed_capacity)
cap["All"] = cap["Thermal"] + cap["Solar PV"] + cap["Wind"]
summary_results = concatenate_dataframes(summary_results, cap, run=1, unit="MW", metric="Capacity")
## Charge power capacity
storage_tech_list = list(model.storage.j)
charge = {}
sum_all = 0.0
for tech in storage_tech_list:
charge[tech] = safe_pyomo_value(model.storage.Pcha[tech])
sum_all += charge[tech]
charge["All"] = sum_all
summary_results = concatenate_dataframes(summary_results, charge, run=1, unit="MW", metric="Charge power capacity")
## Discharge power capacity
dcharge = {}
sum_all = 0.0
for tech in storage_tech_list:
dcharge[tech] = safe_pyomo_value(model.storage.Pdis[tech])
sum_all += dcharge[tech]
dcharge["All"] = sum_all
summary_results = concatenate_dataframes(summary_results, dcharge, run=1, unit="MW", metric="Discharge power capacity")
## Average power capacity
avgpocap = {}
sum_all = 0.0
for tech in storage_tech_list:
avgpocap[tech] = (charge[tech] + dcharge[tech]) / 2
sum_all += avgpocap[tech]
avgpocap["All"] = sum_all
summary_results = concatenate_dataframes(summary_results, avgpocap, run=1, unit="MW", metric="Average power capacity")
## Energy capacity
encap = {}
sum_all = 0.0
for tech in storage_tech_list:
encap[tech] = safe_pyomo_value(model.storage.Ecap[tech])
sum_all += encap[tech]
encap["All"] = sum_all
summary_results = concatenate_dataframes(summary_results, encap, run=1, unit="MWh", metric="Energy capacity")
## Discharge duration
dis_dur = {}
for tech in storage_tech_list:
dis_dur[tech] = safe_pyomo_value(sqrt(model.storage.data["Eff", tech]) * model.storage.Ecap[tech] / (model.storage.Pdis[tech] + 1e-15))
summary_results = concatenate_dataframes(summary_results, dis_dur, run=1, unit="h", metric="Duration")
## Generation
gen = {}
gen["Thermal"] = safe_pyomo_value(model.thermal.total_generation)
gen["Solar PV"] = safe_pyomo_value(model.pv.total_generation)
gen["Wind"] = safe_pyomo_value(model.wind.total_generation)
gen["Other renewables"] = safe_pyomo_value(sum(model.other_renewables.ts_parameter[h] for h in model.h))
gen["Hydro"] = safe_pyomo_value(sum(model.hydro.generation[h] for h in model.h))
gen["Nuclear"] = safe_pyomo_value(sum(model.nuclear.ts_parameter[h] for h in model.h))
# Storage energy discharging
sum_all = 0.0
storage_tech_list = list(model.storage.j)
for tech in storage_tech_list:
gen[tech] = safe_pyomo_value(sum(model.storage.PD[h, tech] for h in model.h))
sum_all += gen[tech]
gen["All"] = gen["Thermal"] + gen["Solar PV"] + gen["Wind"] + gen["Other renewables"] + gen["Hydro"] + gen["Nuclear"] + sum_all
summary_results = concatenate_dataframes(summary_results, gen, run=1, unit="MWh", metric="Total generation")
imp_exp = {}
imp_exp["Imports"] = safe_pyomo_value(sum(model.imports.variable[h] for h in model.h)) if hasattr(model.imports, "variable") else 0
imp_exp["Exports"] = safe_pyomo_value(sum(model.exports.variable[h] for h in model.h)) if hasattr(model.exports, "variable") else 0
summary_results = concatenate_dataframes(summary_results, imp_exp, run=1, unit="MWh", metric="Total Imports/Exports")
## Storage energy discharging
sum_all = 0.0
stodisch = {}
for tech in storage_tech_list:
stodisch[tech] = safe_pyomo_value(sum(model.storage.PD[h, tech] for h in model.h))
sum_all += stodisch[tech]
stodisch["All"] = sum_all
summary_results = concatenate_dataframes(summary_results, stodisch, run=1, unit="MWh", metric="Storage energy discharging")
## Demand
dem = {}
dem["demand"] = sum(model.demand.ts_parameter[h] for h in model.h)
summary_results = concatenate_dataframes(summary_results, dem, run=1, unit="MWh", metric="Total demand")
## Storage energy charging
sum_all = 0.0
stoch = {}
for tech in storage_tech_list:
stoch[tech] = safe_pyomo_value(sum(model.storage.PC[h, tech] for h in model.h))
sum_all += stoch[tech]
stoch["All"] = sum_all
summary_results = concatenate_dataframes(summary_results, stoch, run=1, unit="MWh", metric="Storage energy charging")
## CAPEX
capex = {}
capex["Solar PV"] = safe_pyomo_value(model.pv.capex_cost_expr)
capex["Wind"] = safe_pyomo_value(model.wind.capex_cost_expr)
capex["Thermal"] = safe_pyomo_value(model.thermal.capex_cost_expr)
capex["All"] = capex["Solar PV"] + capex["Wind"] + capex["Thermal"]
summary_results = concatenate_dataframes(summary_results, capex, run=1, unit="$US", metric="CAPEX")
## Power CAPEX
pcapex = {}
sum_all = 0.0
for tech in storage_tech_list:
pcapex[tech] = safe_pyomo_value(model.storage.power_capex_cost_expr[tech])
sum_all += pcapex[tech]
pcapex["All"] = sum_all
summary_results = concatenate_dataframes(summary_results, pcapex, run=1, unit="$US", metric="Power-CAPEX")
## Energy CAPEX and Total CAPEX
ecapex = {}
tcapex = {}
sum_all = 0.0
sum_all_t = 0.0
for tech in storage_tech_list:
ecapex[tech] = safe_pyomo_value(model.storage.energy_capex_cost_expr[tech])
sum_all += ecapex[tech]
tcapex[tech] = pcapex[tech] + ecapex[tech]
sum_all_t += tcapex[tech]
ecapex["All"] = sum_all
tcapex["All"] = sum_all_t
summary_results = concatenate_dataframes(summary_results, ecapex, run=1, unit="$US", metric="Energy-CAPEX")
summary_results = concatenate_dataframes(summary_results, tcapex, run=1, unit="$US", metric="Total-CAPEX")
## FOM
fom = {}
sum_all = 0.0
fom["Thermal"] = safe_pyomo_value(model.thermal.fixed_om_cost_expr)
fom["Solar PV"] = safe_pyomo_value(model.pv.fixed_om_cost_expr)
fom["Wind"] = safe_pyomo_value(model.wind.fixed_om_cost_expr)
for tech in storage_tech_list:
fom[tech] = safe_pyomo_value(
MW_TO_KW * model.storage.data["CostRatio", tech] * model.storage.data["FOM", tech] * model.storage.Pcha[tech]
+ MW_TO_KW * (1 - model.storage.data["CostRatio", tech]) * model.storage.data["FOM", tech] * model.storage.Pdis[tech]
)
sum_all += fom[tech]
fom["All"] = fom["Thermal"] + fom["Solar PV"] + fom["Wind"] + sum_all
summary_results = concatenate_dataframes(summary_results, fom, run=1, unit="$US", metric="FOM")
## VOM
vom = {}
sum_all = 0.0
vom["Thermal"] = safe_pyomo_value(model.thermal.total_vom_cost_expr)
for tech in storage_tech_list:
vom[tech] = safe_pyomo_value(model.storage.data["VOM", tech] * sum(model.storage.PD[h, tech] for h in model.h))
sum_all += vom[tech]
vom["All"] = vom["Thermal"] + sum_all
summary_results = concatenate_dataframes(summary_results, vom, run=1, unit="$US", metric="VOM")
fuel_cost = {}
fuel_cost["Thermal"] = safe_pyomo_value(model.thermal.total_fuel_cost_expr)
summary_results = concatenate_dataframes(summary_results, fuel_cost, run=1, unit="$US", metric="Fuel-Cost")
## OPEX
opex = {}
sum_all = 0.0
opex["Thermal"] = fom["Thermal"] + vom["Thermal"]
opex["Solar PV"] = fom["Solar PV"]
opex["Wind"] = fom["Wind"]
for tech in storage_tech_list:
opex[tech] = fom[tech] + vom[tech]
sum_all += opex[tech]
opex["All"] = opex["Thermal"] + opex["Solar PV"] + opex["Wind"] + sum_all
summary_results = concatenate_dataframes(summary_results, opex, run=1, unit="$US", metric="OPEX")
# IMPORTS/EXPORTS COSTS
cost_revenue = {}
cost_revenue["Imports Cost"] = safe_pyomo_value(model.imports.total_cost_expr)
summary_results = concatenate_dataframes(summary_results, cost_revenue, run=1, unit="$US", metric="Cost")
cost_revenue = {}
cost_revenue["Exports Revenue"] = safe_pyomo_value(model.exports.total_cost_expr)
summary_results = concatenate_dataframes(summary_results, cost_revenue, run=1, unit="$US", metric="Revenue")
## Equivalent number of cycles
cyc = {}
for tech in storage_tech_list:
cyc[tech] = safe_pyomo_value(gen[tech] / (model.storage.Ecap[tech] + 1e-15))
summary_results = concatenate_dataframes(summary_results, cyc, run=1, unit="-", metric="Equivalent number of cycles")
## VRE Curtailment
pv_curtailment = safe_pyomo_value(model.pv.total_curtailment) if hasattr(model.pv, "total_curtailment") else 0.0
wind_curtailment = safe_pyomo_value(model.wind.total_curtailment) if hasattr(model.wind, "total_curtailment") else 0.0
pv_generation = safe_pyomo_value(model.pv.total_generation) if hasattr(model.pv, "total_generation") else 0.0
wind_generation = safe_pyomo_value(model.wind.total_generation) if hasattr(model.wind, "total_generation") else 0.0
total_vre_curtailment_mwh = pv_curtailment + wind_curtailment
total_vre_availability = pv_generation + wind_generation + pv_curtailment + wind_curtailment
total_vre_curtailment_pct = (total_vre_curtailment_mwh / total_vre_availability * 100) if total_vre_availability > 0 else 0.0
vre_curt_mwh = {"Solar PV": pv_curtailment, "Wind": wind_curtailment, "All": total_vre_curtailment_mwh}
summary_results = concatenate_dataframes(summary_results, vre_curt_mwh, run=1, unit="MWh", metric="Total VRE curtailment")
vre_curt_pct = {"All": total_vre_curtailment_pct}
summary_results = concatenate_dataframes(summary_results, vre_curt_pct, run=1, unit="%", metric="VRE curtailment percentage")
logging.info("Exporting csv files containing SDOM results...")
# Save generation results to CSV
logging.debug("-- Saving generation results to CSV...")
if gen_results["Hour"]:
with open(output_dir + f"OutputGeneration_{case}.csv", mode="w", newline="") as file:
writer = csv.DictWriter(file, fieldnames=gen_results.keys())
writer.writeheader()
writer.writerows([dict(zip(gen_results, t)) for t in zip(*gen_results.values())])
# Save storage results to CSV
logging.debug("-- Saving storage results to CSV...")
if storage_results["Hour"]:
with open(output_dir + f"OutputStorage_{case}.csv", mode="w", newline="") as file:
writer = csv.DictWriter(file, fieldnames=storage_results.keys())
writer.writeheader()
writer.writerows([dict(zip(storage_results, t)) for t in zip(*storage_results.values())])
# Save summary results to CSV
logging.debug("-- Saving summary results to CSV...")
if len(summary_results) > 0:
summary_results.to_csv(output_dir + f"OutputSummary_{case}.csv", index=False)
if len(model.thermal.plants_set) <= 1:
return
thermal_gen_columns = ["Hour"] + [str(plant) for plant in model.thermal.plants_set]
disaggregated_thermal_gen_results = get_dict_string_void_list_from_keys_in_list(thermal_gen_columns)
for h in model.h:
disaggregated_thermal_gen_results["Hour"].append(h)
for plant in model.thermal.plants_set:
disaggregated_thermal_gen_results[plant].append(safe_pyomo_value(model.thermal.generation[h, plant]))
logging.debug("-- Saving disaggregated thermal generation results to CSV...")
if disaggregated_thermal_gen_results["Hour"]:
with open(output_dir + f"OutputThermalGeneration_{case}.csv", mode="w", newline="") as file:
writer = csv.DictWriter(file, fieldnames=disaggregated_thermal_gen_results.keys())
writer.writeheader()
writer.writerows([dict(zip(disaggregated_thermal_gen_results, t)) for t in zip(*disaggregated_thermal_gen_results.values())])