"""Provides the O&M Enviroment class; a subclass of simpy.Environment."""
from __future__ import annotations
import io
import csv
import math
import logging
import datetime as dt
from typing import TYPE_CHECKING
from pathlib import Path
from datetime import datetime, timedelta
import numpy as np
import simpy
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.csv # pylint: disable=W0611
from simpy.events import Event
import wombat # pylint: disable=W0611
from wombat.utilities import hours_until_future_hour
from wombat.core.data_classes import parse_date
if TYPE_CHECKING:
from wombat.windfarm import Windfarm
EVENTS_COLUMNS = [
"datetime",
"env_datetime",
"env_time",
"agent",
"action",
"reason",
"additional",
"system_id",
"system_name",
"part_id",
"part_name",
"system_operating_level",
"part_operating_level",
"duration",
"distance_km",
"request_id",
"location",
"materials_cost",
"hourly_labor_cost",
"salary_labor_cost",
"total_labor_cost",
"equipment_cost",
"total_cost",
]
[docs]
class WombatEnvironment(simpy.Environment):
"""The primary mechanism for powering an O&M simulation. This object has insight
into all other simulation objects, and controls the timing, date/time stamps, and
weather conditions.
Parameters
----------
data_dir : pathlib.Path | str
Directory where the inputs are stored and where to save outputs.
weather_file : str
Name of the weather file. Should be contained within ``data_dir``/weather/, with
columns "datetime", "windspeed", and, optionally, "waveheight". The datetime
column should adhere to the following format: "MM/DD/YY HH:MM", in 24-hour time.
workday_start : int
Starting time for the repair crew, in 24 hour local time. This can be overridden
by an ``ServiceEquipmentData`` object that operates outside of the "typical"
working hours.
workday_end : int
Ending time for the repair crew, in 24 hour local time. This can be overridden
by an ``ServiceEquipmentData`` object that operates outside of the "typical"
working hours.
simulation_name : str | None, optional
Name of the simulation; will be used for naming the log file, by default None.
If ``None``, then the current time will be used. Will always save to
``data_dir``/outputs/logs/``simulation_name``.log.
.. note: spaces (" ") will be replaced with underscores ("_"), for example:
"my example analysis" becomes "my_example_analysis".
start_year : int | None, optional
Custom starting year for the weather profile, by default None. If ``None`` or
less than the first year of the weather profile, this will be ignored.
end_year : int | None, optional
Custom ending year for the weather profile, by default None. If ``None`` or
greater than the last year of the weather profile, this will be ignored.
port_distance : int | float
The simulation-wide daily travel distance for servicing equipment. This
should be used as a base setting when multiple or all servicing equipment
will be operating out of the same base location, but can be individually
modified.
non_operational_start : str | datetime.datetime | None
The starting month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of prohibited operations. When defined at the environment level,
an undefined or later starting date will be overridden for all servicing
equipment and any modeled port, by default None.
non_operational_end : str | datetime.datetime | None
The ending month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of prohibited operations. When defined at the environment level,
an undefined or earlier ending date will be overridden for all servicing
equipment and any modeled port, by default None.
reduced_speed_start : str | datetime.datetime | None
The starting month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of reduced speed operations. When defined at the environment level,
an undefined or later starting date will be overridden for all servicing
equipment and any modeled port, by default None.
reduced_speed_end : str | datetime.datetime | None
The ending month and day, e.g., MM/DD, M/D, MM-DD, etc. for an annualized
period of reduced speed operations. When defined at the environment level,
an undefined or earlier ending date will be overridden for all servicing
equipment and any modeled port, by default None.
reduced_speed : float
The maximum operating speed during the annualized reduced speed operations.
When defined at the environment level, an undefined or faster value will be
overridden for all servicing equipment and any modeled port, by default 0.0.
random_seed : int | None
The random seed to be passed to a universal NumPy ``default_rng`` object to
generate Weibull random generators, by default None.
random_generator: np.random._generator.Generator | None
An optional numpy random generator that can be provided to seed a simulation
with the same generator each time, in place of the random seed. If a
:py:attr:`random_seed` is also provided, this will override the random seed,
by default None.
Raises
------
FileNotFoundError
Raised if ``data_dir`` cannot be found.
"""
def __init__(
self,
data_dir: Path | str,
weather_file: str,
workday_start: int,
workday_end: int,
simulation_name: str | None = None,
start_year: int | None = None,
end_year: int | None = None,
port_distance: int | float | None = None,
non_operational_start: str | dt.datetime | None = None,
non_operational_end: str | dt.datetime | None = None,
reduced_speed_start: str | dt.datetime | None = None,
reduced_speed_end: str | dt.datetime | None = None,
reduced_speed: float = 0.0,
random_seed: int | None = None,
random_generator: np.random._generator.Generator | None = None,
) -> None:
"""Initialization."""
super().__init__()
self.data_dir = Path(data_dir).resolve()
if not self.data_dir.is_dir():
raise FileNotFoundError(f"{self.data_dir} does not exist")
self.workday_start = int(workday_start)
self.workday_end = int(workday_end)
if not 0 <= self.workday_start <= 24:
raise ValueError("workday_start must be a valid 24hr time before midnight.")
if not 0 <= self.workday_end <= 24:
raise ValueError("workday_end must be a valid 24hr time.")
if self.workday_end <= self.workday_start:
raise ValueError(
"Work shifts must end after they start ({self.workday_start}hrs)."
)
self.port_distance = port_distance
self.weather = self._weather_setup(weather_file, start_year, end_year)
self.weather_dates = pd.DatetimeIndex(
self.weather.get_column("datetime").to_pandas()
).to_pydatetime()
self.max_run_time = self.weather.shape[0]
self.shift_length = self.workday_end - self.workday_start
# Set the environmental consideration parameters
self.non_operational_start = parse_date(non_operational_start)
self.non_operational_end = parse_date(non_operational_end)
self.reduced_speed_start = parse_date(reduced_speed_start)
self.reduced_speed_end = parse_date(reduced_speed_end)
self.reduced_speed = reduced_speed
if random_generator is not None:
self.random_generator = random_generator
self.random_seed = None
elif random_seed is not None:
self.random_seed = random_seed
self.random_generator = np.random.default_rng(seed=random_seed)
else:
self.random_seed = None
self.random_generator = np.random.default_rng()
self.simulation_name = simulation_name
self._logging_setup()
self.process(self._log_actions())
[docs]
def _register_windfarm(self, windfarm: Windfarm) -> None:
"""Adds the simulation windfarm to the class attributes."""
self.windfarm = windfarm
[docs]
def run(self, until: int | float | Event | None = None):
"""Extends the ``simpy.Environment.run`` method to change the default behavior
if no argument is passed to ``until``, which will now run a simulation until the
end of the weather profile is reached.
Parameters
----------
until : Optional[Union[int, float, Event]], optional
When to stop the simulation, by default None. See documentation on
``simpy.Environment.run`` for more details.
"""
# If running a paused simulation, then reopen the file and append, but only if
# the simulation time is lower than the upper bound
time_check = self.now < self.max_run_time
if self._events_csv.closed and time_check: # type: ignore
self._events_csv = open(self.events_log_fname, "a")
self._events_writer = csv.DictWriter(
self._events_csv, delimiter="|", fieldnames=EVENTS_COLUMNS
)
if hasattr(self, "windfarm") and self._operations_csv.closed and time_check:
self._operations_csv: io.TextIOWrapper = open(
self.operations_log_fname, "a"
)
self.windfarm._setup_logger(initial=False)
if until is None:
until = self.max_run_time
elif until > self.max_run_time:
until = self.max_run_time
try:
super().run(until=until)
except BaseException as e:
# Flush the logs to so the simulation up to the point of failure is logged
self._events_writer.writerows(self._events_buffer)
self._events_buffer.clear()
self._events_csv.close()
self._operations_writer.writerows(self._operations_buffer)
self._operations_buffer.clear()
self._operations_csv.close()
print(
f"Simulation failed at hour {self.now:,.6f},"
f" simulation time: {self.simulation_time}"
)
raise e
# Ensure all logged events make it to their target file
self._events_writer.writerows(self._events_buffer)
self._events_buffer.clear()
self._events_csv.close()
self._operations_writer.writerows(self._operations_buffer)
self._operations_buffer.clear()
self._operations_csv.close()
[docs]
def _logging_setup(self) -> None:
"""Completes the setup for logging data."""
if self.simulation_name is None:
self.simulation_name = simulation = "wombat"
else:
simulation = self.simulation_name.replace(" ", "_")
dt_stamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
events_log_fname = f"{dt_stamp}_{simulation}_events.csv"
operations_log_fname = f"{dt_stamp}_{simulation}_operations.csv"
power_potential_fname = f"{dt_stamp}_{simulation}_power_potential.csv"
power_production_fname = f"{dt_stamp}_{simulation}_power_production.csv"
metrics_input_fname = f"{dt_stamp}_{simulation}_metrics_inputs.yaml"
log_path = self.data_dir / "results"
if not log_path.exists():
log_path.mkdir()
self.events_log_fname = log_path / events_log_fname
self.operations_log_fname = log_path / operations_log_fname
self.power_potential_fname = log_path / power_potential_fname
self.power_production_fname = log_path / power_production_fname
self.metrics_input_fname = log_path / metrics_input_fname
_dir = self.data_dir / "results"
if not _dir.is_dir():
_dir.mkdir()
self._events_csv = open(self.events_log_fname, "w")
self._operations_csv = open(self.operations_log_fname, "w")
self._events_writer = csv.DictWriter(
self._events_csv, delimiter="|", fieldnames=EVENTS_COLUMNS
)
self._events_writer.writeheader()
self._events_buffer: list[dict] = []
self._operations_buffer: list[dict] = []
[docs]
def get_random_seconds(self, low: int = 0, high: int = 10) -> float:
"""Generate a random number of seconds to wait, between :py:attr:`low` and
:py:attr:`high`.
Parameters
----------
low : int, optional
Minimum number of seconds to wait, by default 0.
high : int, optional
Maximum number of seconds to wait, by default 10.
Returns
-------
float
Number of seconds to wait.
"""
seconds_to_wait, *_ = (
self.random_generator.integers(low=low, high=high, size=1) / 3600.0
)
return seconds_to_wait
@property
def simulation_time(self) -> datetime:
"""Current time within the simulation ("datetime" column within weather)."""
now = self.now
minutes = now % 1 * 60
if now == self.max_run_time:
_dt = self.weather_dates[math.floor(now - 1)]
_dt + timedelta(hours=1)
else:
_dt = self.weather_dates[math.floor(now)]
minutes, seconds = math.floor(minutes), math.ceil(minutes % 1 * 60)
return _dt + timedelta(minutes=minutes, seconds=seconds)
[docs]
def is_workshift(self, workday_start: int = -1, workday_end: int = -1) -> bool:
"""Check if the current simulation time is within the windfarm's working hours.
Parameters
----------
workday_start : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_end`` must also be
provided in order to be used.
workday_end : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_start`` must also be
provided in order to be used.
Returns
-------
bool
True if it's valid working hours, False otherwise.
"""
if -1 in (workday_start, workday_end):
# Return True if the shift is around the clock
if self.workday_start == 0 and self.workday_end == 24:
return True
return self.workday_start <= self.simulation_time.hour < self.workday_end
# Return true if the shift is around the clock
if workday_start == 0 and workday_end == 24:
return True
return workday_start <= self.simulation_time.hour < workday_end
[docs]
def hour_in_shift(
self, hour: int, workday_start: int = -1, workday_end: int = -1
) -> bool:
"""Checks whether an ``hour`` is within the working hours.
Parameters
----------
hour : int
Hour of the day.
workday_start : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_end`` must also be
provided in order to be used.
workday_end : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object. ``workday_start`` must also be
provided in order to be used.
Returns
-------
bool
True if ``hour`` is during working hours, False otherwise.
"""
if -1 in (workday_start, workday_end):
return self.workday_start <= hour < self.workday_end
return workday_start <= hour < workday_end
[docs]
def hours_to_next_shift(self, workday_start: int = -1) -> float:
"""Time until the next work shift starts, in hours.
Parameters
----------
workday_start : int
A valid hour in 24 hour time, by default -1. This should only be provided
from an ``ServiceEquipmentData`` object.
Returns
-------
float
Hours until the next shift starts.
"""
current = self.simulation_time
start = self.workday_start if workday_start == -1 else workday_start
if current.hour < start:
# difference between now and workday start
return hours_until_future_hour(current, start)
elif current.hour == start == 0:
# Need to manually move forward one whole day to avoid an infinite loop
return hours_until_future_hour(current, 24)
else:
# time to midnight + hour of workday start
return start + hours_until_future_hour(current, 0)
@property
def current_time(self) -> str:
"""Timestamp for the current time as a datetime.datetime.strftime."""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
[docs]
def date_ix(self, date: dt.datetime | dt.date) -> int:
"""The first index of a future date. This corresponds to the number of hours
until this dates from the very beginning of the simulation.
Parameters
----------
date : datetime.datetime | datetime.date
A date within the environment's simulation range.
Returns
-------
int
Index of the weather profile corresponds to the first hour of ``date``.
"""
if isinstance(date, dt.datetime):
date = date.date()
ix, *_ = self.weather.filter(pl.col("datetime") == date)
return ix.item()
[docs]
def _weather_setup(
self,
weather_file: str,
start_year: int | None = None,
end_year: int | None = None,
) -> pl.DataFrame:
"""Reads the weather data from the "<inputs>/weather" directory, and creates the
``start_date`` and ``end_date`` time stamps for the simulation.
This also fills any missing data with zeros and interpolates the values of any
missing datetime entries.
Parameters
----------
weather_file : str
Name of the weather file to be used by the environment. Should be contained
within ``data_dir/weather``.
start_year : Optional[int], optional
Custom starting year for the weather profile, by default None. If ``None``
or less than the first year of the weather profile, this will be ignored.
end_year : Optional[int], optional
Custom ending year for the weather profile, by default None. If ``None`` or
greater than the last year of the weather profile, this will be ignored.
Returns
-------
pd.DataFrame
The wind (and wave) timeseries.
"""
REQUIRED = ["windspeed", "waveheight"]
# PyArrow datetime conversion setup
convert_options = pa.csv.ConvertOptions(
timestamp_parsers=[
"%m/%d/%y %H:%M",
"%m/%d/%y %I:%M",
"%m/%d/%y %H:%M:%S",
"%m/%d/%y %I:%M:%S",
"%m/%d/%Y %H:%M",
"%m/%d/%Y %I:%M",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %I:%M:%S",
"%m-%d-%y %H:%M",
"%m-%d-%y %I:%M",
"%m-%d-%y %H:%M:%S",
"%m-%d-%y %I:%M:%S",
"%m-%d-%Y %H:%M",
"%m-%d-%Y %I:%M",
"%m-%d-%Y %H:%M:%S",
"%m-%d-%Y %I:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %I:%M",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %I:%M:%S",
]
)
weather = (
pl.from_pandas(
pa.csv.read_csv(
self.data_dir / "weather" / weather_file,
convert_options=convert_options,
)
.to_pandas()
.fillna(0.0)
.set_index("datetime")
.sort_index()
.resample("h")
.interpolate(limit_direction="both") # , limit=5)
.reset_index(drop=False)
)
.with_row_index()
.with_columns(
[
pl.col("datetime").cast(pl.Datetime).dt.cast_time_unit("ns"),
(pl.col("datetime").dt.hour()).alias("hour"),
]
)
)
missing = set(REQUIRED).difference(weather.columns)
if missing:
raise KeyError(
"The weather data are missing the following required columns:"
f" {missing}"
)
# Create the start and end points
self.start_datetime = weather.get_column("datetime").dt.min()
self.end_datetime = weather.get_column("datetime").dt.max()
self.start_year = self.start_datetime.year
self.end_year = self.end_datetime.year
if start_year is None and end_year is None:
return weather
if start_year is None:
pass
elif start_year > self.end_year:
raise ValueError(
f"'start_year' ({start_year}) occurs after the last available year"
f" in the weather data (range: {self.end_year})"
)
else:
# Filter for the provided, validated starting year and update the attribute
weather = (
weather.filter(pl.col("datetime").dt.year() >= start_year)
.drop("index")
.with_row_index()
)
self.start_datetime = weather.get_column("datetime").dt.min()
start_year = self.start_year = self.start_datetime.year
if end_year is None:
pass
elif start_year is None and end_year < self.start_year:
raise ValueError(
f"The provided 'end_year' ({end_year}) is before the start_year"
f" ({self.start_year})"
)
elif start_year is not None:
if end_year < start_year:
raise ValueError(
f"The provided 'end_year' ({end_year}) is before the start_year"
f" ({start_year})"
)
else:
# Filter for the provided, validated ending year and update
weather = weather.filter(pl.col("datetime").dt.year() <= end_year)
self.end_datetime = weather.get_column("datetime").dt.max()
self.end_year = self.end_datetime.year
else:
# Filter for the provided, validated ending year and update the attribute
weather = weather.filter(pl.col("datetime").dt.year() <= end_year)
self.end_datetime = weather.get_column("datetime").dt.max()
self.end_year = self.end_datetime.year
column_order = weather.columns
column_order.insert(0, column_order.pop(column_order.index("hour")))
column_order.insert(0, column_order.pop(column_order.index("waveheight")))
column_order.insert(0, column_order.pop(column_order.index("windspeed")))
column_order.insert(0, column_order.pop(column_order.index("datetime")))
column_order.insert(0, column_order.pop(column_order.index("index")))
# Ensure the columns are ordered correctly and re-compute pandas-compatible ix
return weather.select(column_order).drop("index").with_row_index()
@property
def weather_now(self) -> pl.DataFrame:
"""The current weather.
Returns
-------
pl.DataFrame
A length 1 slice from the weather profile at the current ``int()`` rounded
hour, in simulation time.
"""
# Rounds down because we won't arrive at the next weather event until that hour
now = int(self.now)
return self.weather.slice(now, 1)
[docs]
def weather_forecast(
self, hours: int | float
) -> tuple[pl.Series, pl.Series, pl.Series, pl.Series]:
"""Returns the datetime, wind, wave, and hour data for the next ``hours`` hours,
starting from the current hour's weather.
Parameters
----------
hours : Union[int, float]
Number of hours to look ahead, rounds up to the nearest hour.
Returns
-------
tuple[pl.Series, pl.Series, pl.Series, pl.Series]
Each of the relevant columns (datetime, wind, wave, hour) from the weather
profile.
"""
# If it's not on the hour, ensure we're looking ``hours`` hours into the future
start = math.floor(self.now)
_, ix, wind, wave, hour, *_ = self.weather.slice(
start, math.ceil(hours) + math.ceil(self.now % 1)
)
return ix, hour, wind, wave
[docs]
def log_action(
self,
*,
agent: str,
action: str,
reason: str,
additional: str = "",
system_id: str = "",
system_name: str = "",
part_id: str = "",
part_name: str = "",
system_ol: float | int = 0,
part_ol: float | int = 0,
duration: float = 0,
distance_km: float = 0,
request_id: str = "na",
location: str = "na",
materials_cost: int | float = 0,
hourly_labor_cost: int | float = 0,
salary_labor_cost: int | float = 0,
equipment_cost: int | float = 0,
) -> None:
"""Formats the logging messages into the expected format for logging.
Parameters
----------
agent : str
Agent performing the action.
action : str
Action that was taken.
reason : str
Reason an action was taken.
additional : str
Any additional information that needs to be logged.
system_id : str
Turbine ID, ``System.id``, by default "".
system_name : str
Turbine name, ``System.name``, by default "".
part_id : str
Subassembly, component, or cable ID, ``_.id``, by default "".
part_name : str
Subassembly, component, or cable name, ``_.name``, by default "".
system_ol : float | int
Turbine operating level, ``System.operating_level``. Use an empty string
for n/a, by default 0.
part_ol : float | int
Subassembly, component, or cable operating level, ``_.operating_level``. Use
an empty string for n/a, by default 0.
request_id : str
The ``RepairManager`` assigned request_id found in
``RepairRequest.request_id``, by default "na".
location : str
The location of where the event ocurred: should be one of site, port,
enroute, or system, by default "na".
duration : float
Length of time the action lasted, by default 0.
distance : float
Distance traveled, in km, if applicable, by default 0.
materials_cost : Union[int, float], optional
Total cost of materials for action, in USD, by default 0.
hourly_labor_cost : Union[int, float], optional
Total cost of hourly labor for action, in USD, by default 0.
salary_labor_cost : Union[int, float], optional
Total cost of salaried labor for action, in USD, by default 0.
equipment_cost : Union[int, float], optional
Total cost of equipment for action, in USD, by default 0.
"""
valid_locations = ("site", "system", "port", "enroute", "na")
if location not in valid_locations:
raise ValueError(
f"Event logging `location` must be one of: {valid_locations}"
)
total_labor_cost = hourly_labor_cost + salary_labor_cost
total_cost = total_labor_cost + equipment_cost + materials_cost
now = self.simulation_time
row = {
"datetime": dt.datetime.now(),
"env_datetime": now,
"env_time": self.now,
"system_id": system_id,
"system_name": system_name,
"part_id": part_id,
"part_name": part_name,
"system_operating_level": system_ol,
"part_operating_level": part_ol,
"agent": agent,
"action": action,
"reason": reason,
"additional": additional,
"duration": duration,
"distance_km": distance_km,
"request_id": request_id,
"location": location,
"materials_cost": materials_cost,
"hourly_labor_cost": hourly_labor_cost,
"salary_labor_cost": salary_labor_cost,
"equipment_cost": equipment_cost,
"total_labor_cost": total_labor_cost,
"total_cost": total_cost,
}
# Don't log the initiation of a crew transfer that can forced at the end of an
# operation but happens to be after the end of the simulation
if now <= self.end_datetime:
self._events_buffer.append(row)
[docs]
def _log_actions(self):
"""Writes the action log items every 8000 hours."""
HOURS = 8000
while True:
yield self.timeout(HOURS)
self._events_writer.writerows(self._events_buffer)
self._events_buffer.clear()
[docs]
def load_events_log_dataframe(self) -> pd.DataFrame:
"""Imports the logging file created in ``run`` and returns it as a formatted
``pandas.DataFrame``.
Returns
-------
pd.DataFrame
The formatted logging data from a simulation.
"""
log_df = (
pd.read_csv(
self.events_log_fname,
delimiter="|",
engine="pyarrow",
dtype={
"agent": "string",
"action": "string",
"reason": "string",
"additional": "string",
"system_id": "string",
"system_name": "string",
"part_id": "string",
"part_name": "string",
"request_id": "string",
"location": "string",
},
)
.set_index("datetime")
.sort_index()
)
return log_df
[docs]
def _calculate_windfarm_total(
self, op: pd.DataFrame, prod: pd.DataFrame | None = None
) -> pd.DataFrame:
"""Calculates the overall wind farm operational level, accounting for substation
downtime by multiplying the sum of all downstream turbine operational levels by
the substation's operational level.
Parameters
----------
op : pd.DataFrame
The turbine and substation operational level DataFrame.
Notes
-----
This is a crude cap on the operations, and so a smarter way of capping
the availability should be added in the future.
Returns
-------
pd.DataFrame
The aggregate wind farm operational level.
"""
t_id = self.windfarm.turbine_id
turbines = self.windfarm.turbine_weights[t_id].values * op[t_id]
total = np.sum(
[
op[[sub]]
* np.array(
[
[math.fsum(row)]
for _, row in turbines[val["turbines"]].iterrows()
]
).reshape(-1, 1)
for sub, val in self.windfarm.substation_turbine_map.items()
],
axis=0,
)
return total
[docs]
def _calculate_adjusted_production(
self, op: pd.DataFrame, prod: pd.DataFrame
) -> pd.DataFrame:
"""Calculates the overall wind farm power production and adjusts individual
turbine production by accounting for substation downtime. This is done by
multiplying the all downstream turbine operational levels by the substation's
operational level.
Parameters
----------
op : pd.DataFrame
The operational level DataFrame with turbine, substation, and windfarm
columns.
prod : pd.DataFrame
The turbine energy production DataFrame.
Notes
-----
This is a crude cap on the operations, and so a smarter way of capping
the availability should be added in the future.
Returns
-------
pd.DataFrame
Either the aggregate wind farm operational level or the total wind farm
energy production if the :py:attr:`prod` is provided.
"""
# Adjust individual turbine production for substation downtime
prod = prod.copy()
for sub, val in self.windfarm.substation_turbine_map.items():
prod[val["turbines"]] *= op[[sub]].values
prod.windfarm = prod[self.windfarm.turbine_id].sum(axis=1)
return prod[["windfarm"]]
[docs]
def load_operations_log_dataframe(self) -> pd.DataFrame:
"""Imports the logging file created in ``run`` and returns it as a formatted
``pandas.DataFrame``.
Returns
-------
pd.DataFrame
The formatted logging data from a simulation.
"""
log_df = (
pd.read_csv(
self.operations_log_fname,
delimiter="|",
engine="pyarrow",
)
.set_index("datetime")
.sort_values("datetime")
)
log_df["windfarm"] = self._calculate_windfarm_total(log_df)
return log_df
[docs]
def power_production_potential_to_csv( # type: ignore
self,
windfarm: wombat.windfarm.Windfarm,
operations: pd.DataFrame | None = None,
return_df: bool = True,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Creates the power production ``DataFrame`` and optionally returns it.
Parameters
----------
windfarm : wombat.windfarm.Windfarm
The simulation's windfarm object.
operations : Optional[pd.DataFrame], optional
The operations log ``DataFrame`` if readily available, by default None. If
``None``, then it will be created through
``load_operations_log_dataframe()``.
return_df : bool, optional
Indicator to return the power production for further usage, by default True.
Returns
-------
Tuple[pd.DataFrame, pd.DataFrame]
The power potential and production timeseries data.
"""
write_options = pa.csv.WriteOptions(delimiter="|")
if operations is None:
operations = self.load_operations_log_dataframe().sort_values("env_time")
turbines = windfarm.turbine_id
windspeed = self.weather.to_pandas().set_index("datetime").windspeed
windspeed = windspeed.loc[operations.env_datetime].values
potential_df = pd.DataFrame(
[],
index=operations.env_datetime,
columns=["env_time", "env_datetime", "windspeed", "windfarm"]
+ turbines.tolist(),
)
potential_df[turbines] = np.vstack(
[windfarm.system(t_id).power(windspeed) for t_id in turbines]
).T
potential_df = potential_df.assign(
windspeed=windspeed,
windfarm=potential_df[turbines].sum(axis=1),
env_time=operations.env_time.values,
env_datetime=operations.env_datetime.values,
)
pa.csv.write_csv(
pa.Table.from_pandas(potential_df),
self.power_potential_fname,
write_options=write_options,
)
# TODO: The actual windfarm production needs to be clipped at each subgraph to
# the max of the substation's operating capacity and then summed.
production_df = potential_df.copy()
production_df[turbines] *= operations[turbines].values
production_df.windfarm = self._calculate_adjusted_production(
operations, production_df
)
pa.csv.write_csv(
pa.Table.from_pandas(production_df),
self.power_production_fname,
write_options=write_options,
)
if return_df:
return potential_df, production_df
[docs]
def cleanup_log_files(self) -> None:
"""Convenience method to clear the output log files in case a large
batch of simulations is being run and there are space limitations.
... warning:: This shuts down the loggers, so no more logging will be able
to be performed.
"""
# NOTE: Everything is wrapped in a try/except clause to protect against failure
# when inevitably a file has already been deleted on accident, or if in the
# dataframe generation step, the original logs were deleted
logging.shutdown()
if not self._events_csv.closed:
self._events_csv.close()
if not self._operations_csv.closed:
self._operations_csv.close()
try:
self.events_log_fname.unlink()
except FileNotFoundError:
pass
try:
self.operations_log_fname.unlink()
except FileNotFoundError:
pass
try:
self.power_potential_fname.unlink()
except FileNotFoundError:
pass
try:
self.power_production_fname.unlink()
except FileNotFoundError:
pass
try:
self.metrics_input_fname.unlink()
except FileNotFoundError:
pass