"""The postprocessing metric computation."""
from __future__ import annotations
import warnings
from copy import deepcopy
from typing import TYPE_CHECKING, Any
from pathlib import Path
from itertools import chain, product
from collections import Counter
import numpy as np
import pandas as pd
from wombat.core import FixedCosts
from wombat.core.library import load_yaml
def _check_frequency(frequency: str, which: str = "all") -> str:
"""Checks the frequency input to ensure it meets the correct criteria according
to the ``which`` flag.
Parameters
----------
frequency : str
The user-provided value.
which : str, optional
Designation for which combinations to check for, by default "all".
- "all": project, annual, monthly, and month-year
Returns
-------
str
The lower-case, input with white spaces removed.
Raises
------
ValueError
Raised if an invalid value was raised
"""
opts: tuple[str, ...]
if which == "all":
opts = ("project", "annual", "monthly", "month-year")
elif which == "monthly":
opts = ("project", "annual", "monthly")
elif which == "annual":
opts = ("project", "annual")
frequency = frequency.lower().strip()
if frequency not in opts:
raise ValueError(f"``frequency`` must be one of {opts}.")
return frequency
def _calculate_time_availability(
availability: pd.DataFrame,
by_turbine: bool = False,
) -> float | np.ndarray:
"""Calculates the availability ratio of the whole timeseries or the whole
timeseries, by turbine.
Parameters
----------
availability : pd.DataFrame
Timeseries array of operating ratios for all turbines.
by_turbine : bool, optional
If True, calculates the availability rate of each column, otherwise across the
whole array, by default False.
Returns
-------
float | np.ndarray
Availability ratio across the whole timeseries, or broken out by column
(turbine).
"""
availability = availability > 0
if by_turbine:
return availability.values.sum(axis=0) / availability.shape[0]
return availability.values.sum() / availability.size
[docs]
class Metrics:
"""The metric computation class for storing logs and compiling results."""
_hourly_cost = "hourly_labor_cost"
_salary_cost = "salary_labor_cost"
_labor_cost = "total_labor_cost"
_equipment_cost = "equipment_cost"
_materials_cost = "materials_cost"
_total_cost = "total_cost"
_cost_columns = [
_hourly_cost,
_salary_cost,
_labor_cost,
_equipment_cost,
_materials_cost,
_total_cost,
]
def __init__(
self,
data_dir: str | Path,
events: str | pd.DataFrame,
operations: str | pd.DataFrame,
potential: str | pd.DataFrame,
production: str | pd.DataFrame,
inflation_rate: float,
project_capacity: float,
turbine_capacities: list[float],
substation_id: str | list[str],
turbine_id: str | list[str],
substation_turbine_map: dict[str, dict[str, list[str]]],
service_equipment_names: str | list[str],
fixed_costs: str | None = None,
) -> None:
"""Initializes the Metrics class.
Parameters
----------
data_dir : str | Path
This should be the same as was used for running the analysis.
events : str | pd.DataFrame
Either a pandas ``DataFrame`` or filename to be used to read the csv log
data.
operations : str | pd.DataFrame
Either a pandas ``DataFrame`` or filename to be used to read the csv log
data.
potential : str | pd.DataFrame
Either a pandas ``DataFrame`` or a filename to be used to read the csv
potential power production data.
production : str | pd.DataFrame
Either a pandas ``DataFrame`` or a filename to be used to read the csv power
production data.
inflation_rate : float
The inflation rate to be applied to all dollar amounts from the analysis
starting year to ending year.
project_capacity : float
The project's rated capacity, in MW.
turbine_capacities : Union[float, List[float]]
The capacity of each individual turbine corresponding to ``turbine_id``, in
kW.
substation_id : str | list[str]
The substation id(s).
turbine_id : str | list[str]
The turbine id(s).
substation_turbine_map : dict[str, dict[str, list[str]]]
A copy of ``Windfarm.substation_turbine_map``. This is a dictionary mapping
of the subation IDs (keys) and a nested dictionary of its associated turbine
IDs and each turbine's total plant weighting (turbine capacity / plant
capacity).
service_equipment_names : str | list[str]
The names of the servicing equipment, corresponding to
``ServiceEquipment.settings.name`` for each ``ServiceEquipment`` in the
simulation.
fixed_costs : str | None
The filename of the project's fixed costs.
"""
self.data_dir = Path(data_dir)
if not self.data_dir.is_dir():
raise FileNotFoundError(f"{self.data_dir} does not exist")
self.inflation_rate = 1 + inflation_rate
self.project_capacity = project_capacity
if fixed_costs is None:
# Create a zero-cost FixedCosts object
self.fixed_costs = FixedCosts.from_dict({"operations": 0})
else:
if TYPE_CHECKING:
assert isinstance(fixed_costs, str)
fixed_costs = load_yaml(self.data_dir / "project/config", fixed_costs)
if TYPE_CHECKING:
assert isinstance(fixed_costs, dict)
self.fixed_costs = FixedCosts.from_dict(fixed_costs)
if isinstance(substation_id, str):
substation_id = [substation_id]
self.substation_id = substation_id
if isinstance(turbine_id, str):
turbine_id = [turbine_id]
self.turbine_id = turbine_id
self.substation_turbine_map = substation_turbine_map
self.turbine_weights = (
pd.concat([pd.DataFrame(val) for val in substation_turbine_map.values()])
.set_index("turbines")
.T
)
if isinstance(service_equipment_names, str):
service_equipment_names = [service_equipment_names]
self.service_equipment_names = sorted(set(service_equipment_names))
if isinstance(turbine_capacities, (float, int)):
turbine_capacities = [turbine_capacities]
self.turbine_capacities = turbine_capacities
if isinstance(events, str):
events = self._read_data(events)
self.events = self._apply_inflation_rate(self._tidy_data(events))
if isinstance(operations, str):
operations = self._read_data(operations)
self.operations = self._tidy_data(operations)
if isinstance(potential, str):
potential = self._read_data(potential)
self.potential = self._tidy_data(potential)
if isinstance(production, str):
production = self._read_data(production)
self.production = self._tidy_data(production)
[docs]
@classmethod
def from_simulation_outputs(cls, fpath: Path | str, fname: str) -> Metrics:
"""Creates the Metrics class from the saved outputs of a simulation for ease of
revisiting the calculated metrics.
Parameters
----------
fpath : Path | str
The full path to the file where the data was saved.
fname : Path | str
The filename for where the data was saved, which should be a direct
dictionary mapping for the Metrics initialization.
Returns
-------
Metrics
The class object.
"""
data = load_yaml(fpath, fname)
metrics = cls(**data)
return metrics
[docs]
def _tidy_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Tidies the "raw" csv-converted data to be able to be used among the
``Metrics`` class.
Parameters
----------
data : pd.DataFrame
The csv log data.
Returns
-------
pd.DataFrame
A tidied data frame to be used for all the operations in this class.
"""
# Ignore odd pandas casting error for pandas>=1.5(?)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
data = data = data.convert_dtypes()
if data.index.name != "datetime":
try:
data.datetime = pd.to_datetime(data.datetime)
except AttributeError:
data["datetime"] = pd.to_datetime(data.env_datetime)
data.index = data.datetime
data = data.drop(labels="datetime", axis=1)
data.env_datetime = pd.to_datetime(data.env_datetime)
data = data.assign(
year=data.env_datetime.dt.year,
month=data.env_datetime.dt.month,
day=data.env_datetime.dt.day,
)
return data
[docs]
def _read_data(self, fname: str) -> pd.DataFrame:
"""Reads the csv log data from library. This is intended to be used for the
events or operations data.
Parameters
----------
path : str
Path to the simulation library.
fname : str
Filename of the csv data.
Returns
-------
pd.DataFrame
Dataframe of either the events or operations data.
"""
if "events" in fname:
data = (
pd.read_csv(
self.data_dir / "outputs" / "logs" / fname,
delimiter="|",
engine="pyarrow",
dtype={
"agent": "string",
"action": "string",
"reason": "string",
"additional": "string",
"system_id": "string",
"system_name": "string",
"part_id": "string",
"part_name": "string",
"request_id": "string",
"location": "string",
},
)
.set_index("datetime")
.sort_index()
)
return data
data = pd.read_csv(
self.data_dir / "outputs" / "logs" / fname,
delimiter="|",
engine="pyarrow",
)
return data
[docs]
def _apply_inflation_rate(self, events: pd.DataFrame) -> pd.DataFrame:
"""Adjusts the cost data for compounding inflation.
Parameters
----------
inflation_rate : float
The inflation rate to be applied for each year.
events : pd.DataFrame
The events dataframe containing the project cost data.
Returns
-------
pd.DataFrame
The events dataframe with costs adjusted for inflation.
"""
adjusted_inflation = deepcopy(self.inflation_rate)
years = events.year.unique()
years.sort()
for year in years:
row_filter = events.year == year
if year > years[0]:
events.loc[row_filter, self._cost_columns] *= adjusted_inflation
adjusted_inflation *= self.inflation_rate
return events
[docs]
def time_based_availability(self, frequency: str, by: str) -> pd.DataFrame:
"""Calculates the time-based availabiliy over a project's lifetime as a single
value, annual average, or monthly average for the whole windfarm or by turbine.
.. note:: This currently assumes that if there are multiple substations, that
the turbines are all connected to multiple.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by : str
One of "windfarm" or "turbine".
Returns
-------
pd.DataFrame
The time-based availability at the desired aggregation level.
"""
frequency = _check_frequency(frequency, which="all")
by = by.lower().strip()
if by not in ("windfarm", "turbine"):
raise ValueError('``by`` must be one of "windfarm" or "turbine".')
by_turbine = by == "turbine"
# Determine the operational capacity of each turbine with substation downtime
operations_cols = ["year", "month", "day", "windfarm"] + self.turbine_id
turbine_operations = self.operations[operations_cols].copy()
for sub, val in self.substation_turbine_map.items():
turbine_operations[val["turbines"]] *= self.operations[[sub]].values
hourly = turbine_operations.loc[:, self.turbine_id]
# TODO: The below should be better summarized as:
# (availability > 0).groupby().sum() / groupby().count()
if frequency == "project":
availability = _calculate_time_availability(hourly, by_turbine=by_turbine)
if not by_turbine:
return pd.DataFrame([availability], columns=["windfarm"])
if TYPE_CHECKING:
assert isinstance(availability, np.ndarray)
availability = pd.DataFrame(
availability.reshape(1, -1), columns=self.turbine_id
)
return availability
elif frequency == "annual":
date_time = turbine_operations[["year"]]
counts = turbine_operations.groupby(by="year").count()
counts = counts[self.turbine_id] if by_turbine else counts[["windfarm"]]
annual = [
_calculate_time_availability(
hourly[date_time.year == year],
by_turbine=by_turbine,
)
for year in counts.index
]
return pd.DataFrame(annual, index=counts.index, columns=counts.columns)
elif frequency == "monthly":
date_time = turbine_operations[["month"]]
counts = turbine_operations.groupby(by="month").count()
counts = counts[self.turbine_id] if by_turbine else counts[["windfarm"]]
monthly = [
_calculate_time_availability(
hourly[date_time.month == month],
by_turbine=by_turbine,
)
for month in counts.index
]
return pd.DataFrame(monthly, index=counts.index, columns=counts.columns)
elif frequency == "month-year":
date_time = turbine_operations[["year", "month"]]
counts = turbine_operations.groupby(by=["year", "month"]).count()
counts = counts[self.turbine_id] if by_turbine else counts[["windfarm"]]
month_year = [
_calculate_time_availability(
hourly[(date_time.year == year) & (date_time.month == month)],
by_turbine=by_turbine,
)
for year, month in counts.index
]
return pd.DataFrame(month_year, index=counts.index, columns=counts.columns)
[docs]
def production_based_availability(self, frequency: str, by: str) -> pd.DataFrame:
"""Calculates the production-based availabiliy over a project's lifetime as a
single value, annual average, or monthly average for the whole windfarm or by
turbine.
.. note:: This currently assumes that if there are multiple substations, that
the turbines are all connected to multiple.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by : str
One of "windfarm" or "turbine".
Returns
-------
pd.DataFrame
The production-based availability at the desired aggregation level.
"""
frequency = _check_frequency(frequency, which="all")
by = by.lower().strip()
if by not in ("windfarm", "turbine"):
raise ValueError('``by`` must be one of "windfarm" or "turbine".')
by_turbine = by == "turbine"
if by_turbine:
production = self.production.loc[:, self.turbine_id]
potential = self.potential.loc[:, self.turbine_id]
else:
production = self.production[["windfarm"]].copy()
potential = self.potential[["windfarm"]].copy()
if frequency == "project":
production = production.values
potential = potential.values
if (potential == 0).sum() > 0:
potential[potential == 0] = 1
availability = production.sum(axis=0) / potential.sum(axis=0)
if by_turbine:
return pd.DataFrame([availability], columns=self.turbine_id)
else:
return pd.DataFrame([availability], columns=["windfarm"])
production["year"] = production.index.year.values
production["month"] = production.index.month.values
potential["year"] = potential.index.year.values
potential["month"] = potential.index.month.values
group_cols = deepcopy(self.turbine_id) if by_turbine else ["windfarm"]
if frequency == "annual":
group_cols.insert(0, "year")
production = production[group_cols].groupby("year").sum()
potential = potential[group_cols].groupby("year").sum()
elif frequency == "monthly":
group_cols.insert(0, "month")
production = production[group_cols].groupby("month").sum()
potential = potential[group_cols].groupby("month").sum()
elif frequency == "month-year":
group_cols.insert(0, "year")
group_cols.insert(0, "month")
production = production[group_cols].groupby(["year", "month"]).sum()
potential = potential[group_cols].groupby(["year", "month"]).sum()
if (potential.values == 0).sum() > 0:
potential.loc[potential.values == 0] = 1
columns = self.turbine_id
if not by_turbine:
production = production.sum(axis=1)
potential = potential.sum(axis=1)
columns = [by]
return pd.DataFrame(production / potential, columns=columns)
[docs]
def capacity_factor(self, which: str, frequency: str, by: str) -> pd.DataFrame:
"""Calculates the capacity factor over a project's lifetime as a single value,
annual average, or monthly average for the whole windfarm or by turbine.
.. note:: This currently assumes that if there are multiple substations, that
the turbines are all connected to multiple.
Parameters
----------
which : str
One of "net" or "gross".
frequency : str
One of "project", "annual", "monthly", or "month-year".
by : str
One of "windfarm" or "turbine".
Returns
-------
pd.DataFrame
The capacity factor at the desired aggregation level.
"""
which = which.lower().strip()
if which not in ("net", "gross"):
raise ValueError('``which`` must be one of "net" or "gross".')
frequency = _check_frequency(frequency, which="all")
by = by.lower().strip()
if by not in ("windfarm", "turbine"):
raise ValueError('``by`` must be one of "windfarm" or "turbine".')
by_turbine = by == "turbine"
production = self.production if which == "net" else self.potential
production = production.loc[:, self.turbine_id]
if frequency == "project":
if not by_turbine:
potential = production.shape[0] * self.project_capacity * 1000.0
production = production.values.sum()
return pd.DataFrame([production / potential], columns=["windfarm"])
potential = production.shape[0] * np.array(self.turbine_capacities)
return pd.DataFrame(production.sum(axis=0) / potential).T
production["year"] = production.index.year.values
production["month"] = production.index.month.values
if frequency == "annual":
group_cols = ["year"]
elif frequency == "monthly":
group_cols = ["month"]
elif frequency == "month-year":
group_cols = ["year", "month"]
potential = production[group_cols + self.turbine_id].groupby(group_cols).count()
production = production[group_cols + self.turbine_id].groupby(group_cols).sum()
if by_turbine:
capacity = np.array(self.turbine_capacities, dtype=float)
columns = self.turbine_id
potential *= capacity
else:
capacity = self.project_capacity
production = production.sum(axis=1)
columns = [by]
return pd.DataFrame(production / potential, columns=columns)
[docs]
def task_completion_rate(self, which: str, frequency: str) -> float | pd.DataFrame:
"""Calculates the task completion rate (including tasks that are canceled after
a replacement event) over a project's lifetime as a single value, annual
average, or monthly average for the whole windfarm or by turbine.
Parameters
----------
which : str
One of "scheduled", "unscheduled", or "both".
frequency : str
One of "project", "annual", "monthly", or "month-year".
Returns
-------
float | pd.DataFrame
The task completion rate at the desired aggregation level.
"""
which = which.lower().strip()
if which not in ("scheduled", "unscheduled", "both"):
raise ValueError(
'``which`` must be one of "scheduled", "unscheduled", or "both".'
)
frequency = _check_frequency(frequency, which="all")
if which == "scheduled":
task_filter = ["maintenance"]
elif which == "unscheduled":
task_filter = ["repair"]
else:
task_filter = ["maintenance", "repair"]
cols = ["env_datetime", "request_id"]
request_filter = [f"{el} request" for el in task_filter]
completion_filter = [
f"{task} {el}" for task in task_filter for el in ("complete", "canceled")
]
requests = self.events.loc[
self.events.action.isin(request_filter), cols
].reset_index(drop=True)
completions = self.events.loc[
self.events.action.isin(completion_filter), cols
].reset_index(drop=True)
if frequency == "project":
if requests.shape[0] == 0:
return pd.DataFrame([0.0], columns=["windfarm"])
return pd.DataFrame(
[completions.shape[0] / requests.shape[0]], columns=["windfarm"]
)
requests["year"] = requests.env_datetime.dt.year.values
requests["month"] = requests.env_datetime.dt.month.values
completions["year"] = completions.env_datetime.dt.year.values
completions["month"] = completions.env_datetime.dt.month.values
if frequency == "annual":
group_filter = ["year"]
indices = self.operations.year.unique()
elif frequency == "monthly":
group_filter = ["month"]
indices = self.operations.month.unique()
elif frequency == "month-year":
group_filter = ["year", "month"]
indices = (
self.operations[["year", "month"]]
.groupby(["year", "month"])
.value_counts()
.index.tolist()
)
group_cols = group_filter + ["request_id"]
requests = requests[group_cols].groupby(group_filter).count()
requests.loc[requests.request_id == 0] = 1
completions = completions[group_cols].groupby(group_filter).count()
missing = [ix for ix in indices if ix not in requests.index]
requests = pd.concat(
[
requests,
pd.DataFrame(
np.ones(len(missing)), index=missing, columns=requests.columns
),
]
).sort_index()
missing = [ix for ix in indices if ix not in completions.index]
completions = pd.concat(
[
completions,
pd.DataFrame(
np.ones(len(missing)), index=missing, columns=completions.columns
),
]
).sort_index()
completion_rate = pd.DataFrame(completions / requests)
completion_rate.index = completion_rate.index.set_names(group_filter)
return completion_rate.rename(
columns={"request_id": "Completion Rate", 0: "Completion Rate"}
)
[docs]
def equipment_costs(
self, frequency: str, by_equipment: bool = False
) -> pd.DataFrame:
"""Calculates the equipment costs for the simulation at a project, annual, or
monthly level with (or without) respect to equipment utilized in the simulation.
This excludes any port fees that might apply, which are included in:
``port_fees``.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by_equipment : bool, optional
Indicates whether the values are with resepect to the equipment utilized
(True) or not (False), by default False.
Returns
-------
pd.DataFrame
Returns pandas ``DataFrame`` with columns:
- year (if appropriate for frequency)
- month (if appropriate for frequency)
- then any equipment names as they appear in the logs
Raises
------
ValueError
If ``frequency`` is not one of "project", "annual", "monthly", or
"month-year".
ValueError
If ``by_equipment`` is not one of ``True`` or ``False``.
"""
frequency = _check_frequency(frequency, which="all")
if not isinstance(by_equipment, bool):
raise ValueError("`by_equipment` must be one of `True` or `False`")
if frequency == "annual":
col_filter = ["year"]
elif frequency == "monthly":
col_filter = ["month"]
elif frequency == "month-year":
col_filter = ["year", "month"]
cost_col = [self._equipment_cost]
events = self.events.loc[self.events.action != "monthly lease fee"]
if by_equipment:
if frequency == "project":
costs = (
events.loc[events[self._equipment_cost] > 0, cost_col + ["agent"]]
.groupby(["agent"])
.sum()
.fillna(0)
.reset_index(level=0)
.fillna(0)
.T
)
costs = (
costs.rename(columns=costs.iloc[0])
.drop(index="agent")
.reset_index(drop=True)
)
return costs
col_filter = ["agent"] + col_filter
costs = (
events.loc[events[self._equipment_cost] > 0, cost_col + col_filter]
.groupby(col_filter)
.sum()
.reset_index(level=0)
)
costs = pd.concat(
[
costs[costs.agent == eq][cost_col].rename(
columns={self._equipment_cost: eq}
)
for eq in costs.agent.unique()
],
axis=1,
)
return costs.fillna(value=0)
if frequency == "project":
return pd.DataFrame([events[cost_col].sum()], columns=cost_col)
costs = events[cost_col + col_filter].groupby(col_filter).sum()
return costs.fillna(0)
[docs]
def service_equipment_utilization(self, frequency: str) -> pd.DataFrame:
"""Calculates the utilization rate for each of the service equipment in the
simulation as the ratio of total number of days each of the servicing
equipment is in operation over the total number of days it's present in the
simulation. This number excludes mobilization time and the time between
visits for scheduled servicing equipment strategies.
.. note:: For tugboats in a tow-to-port scenario, this ratio will be near
100% because they are considered to be operating on an as-needed basis per
the port contracting assumptions
Parameters
----------
frequency : str
One of "project" or "annual".
Returns
-------
pd.DataFrame
The utilization rate of each of the simulation ``SerivceEquipment``.
Raises
------
ValueError
If ``frequency`` is not one of "project" or "annual".
"""
frequency = _check_frequency(frequency, which="annual")
operation_days = []
total_days = []
operating_actions = [
"traveling", # traveling between port/site or on-site
"repair",
"maintenance",
"delay", # performing work
"unmooring",
"mooring_reconnection",
"towing", # tugboat classifications
]
operating_filter = self.events.action.isin(operating_actions)
return_filter = self.events.action == "delay"
return_filter &= (
(self.events.reason == "work is complete")
& (self.events.additional == "will return next year")
) | (self.events.reason == "non-operational period")
return_filter &= self.events.additional == "will return next year"
for name in self.service_equipment_names:
equipment_filter = self.events.agent == name
_events = self.events[equipment_filter & operating_filter]
_events = _events.groupby(["year", "month", "day"]).size()
_events = _events.reset_index().groupby("year").count()[["day"]]
operation_days.append(_events.rename(columns={"day": name}))
ix_filter = equipment_filter & ~return_filter
total = self.events[ix_filter].groupby(["year", "month", "day"]).size()
total = total.reset_index().groupby("year").count()[["day"]]
total_days.append(total.rename(columns={"day": name}))
operating_df = pd.DataFrame(operation_days[0])
total_df = pd.DataFrame(total_days[0])
if len(self.service_equipment_names) > 1:
operating_df = operating_df.join(operation_days[1:], how="outer").fillna(0)
total_df = total_df.join(total_days[1:], how="outer").fillna(1)
for year in self.events.year.unique():
if year not in operating_df.index:
missing = pd.DataFrame(
np.zeros((1, operating_df.shape[1])),
index=[year],
columns=operating_df.columns,
)
operating_df = pd.concat([operating_df, missing], axis=0).sort_index()
if year not in total_df.index:
missing = pd.DataFrame(
np.ones((1, total_df.shape[1])),
index=[year],
columns=operating_df.columns,
)
total_df = pd.concat([total_df, missing], axis=0).sort_index()
if frequency == "project":
operating_df = operating_df.reset_index().sum()[
self.service_equipment_names
]
total_df = total_df.reset_index().sum()[self.service_equipment_names]
return pd.DataFrame(operating_df / total_df).T
return operating_df / total_df
[docs]
def vessel_crew_hours_at_sea(
self,
frequency: str,
by_equipment: bool = False,
vessel_crew_assumption: dict[str, float] = {},
) -> pd.DataFrame:
"""Calculates the total number of crew hours at sea that occurred during a
simulation at a project, annual, or monthly level that can be broken out by
servicing equipment. This includes time mobilizing, delayed at sea, servicing,
towing, and traveling.
.. note:: This metric is intended to be used for offshore wind simulations.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by_equipment : bool, optional
Indicates whether the values are with resepect to each tugboat (True) or not
(False), by default False.
vessel_crew_assumption : dict[str, float], optional
Dictionary of vessel names (``ServiceEquipment.settings.name``) and number
of crew members aboard to trannsform the results from vessel hours at sea
to crew hours at sea.
Returns
-------
pd.DataFrame
Returns a pandas ``DataFrame`` with columns:
- year (if appropriate for frequency)
- month (if appropriate for frequency)
- Total Crew Hours at Sea
- {ServiceEquipment.settings.name} (if broken out)
Raises
------
ValueError
If ``frequency`` is not one of "project", "annual", "monthly", or
"month-year".
ValueError
If ``by_equipment`` is not one of ``True`` or ``False``.
ValueError
If ``vessel_crew_assumption`` is not a dictionary.
"""
frequency = _check_frequency(frequency, which="all")
if not isinstance(by_equipment, bool):
raise ValueError("``by_equipment`` must be one of ``True`` or ``False``")
if not isinstance(vessel_crew_assumption, dict):
raise ValueError(
"`vessel_crew_assumption` must be a dictionary of vessel name (keys)"
" and number of crew (values)"
)
# Filter by the at sea indicators and required columns
at_sea = self.events
at_sea = at_sea.loc[
at_sea.location.isin(("enroute", "site", "system"))
& at_sea.agent.isin(self.service_equipment_names),
["agent", "year", "month", "action", "reason", "additional", "duration"],
].reset_index(drop=True)
# Create a shell for the final results
total_hours = (
self.events[["env_time", "year", "month"]]
.groupby(["year", "month"])
.count()
)
total_hours = total_hours.reset_index().rename(columns={"env_time": "N"})
total_hours.N = 0
# Apply the vessel crew assumptions
vessels = at_sea.agent.unique()
if vessel_crew_assumption != {}:
for name, n_crew in vessel_crew_assumption.items():
if name not in vessels:
continue
ix_vessel = at_sea.agent == name
at_sea.loc[ix_vessel, "duration"] *= n_crew
group_cols = ["agent"]
columns = ["Total Crew Hours at Sea"] + vessels.tolist()
if not by_equipment:
group_cols.pop(0)
columns = ["Total Crew Hours at Sea"]
at_sea = at_sea.groupby(["year", "month"]).sum()[["duration"]].reset_index()
if frequency == "project":
total_hours = pd.DataFrame([[0]], columns=["duration"])
if by_equipment:
total_hours = (
at_sea[["duration", "agent"]]
.groupby(["agent"])
.sum()
.T.reset_index(drop=True)
)
total_hours.loc[:, "Total Crew Hours at Sea"] = total_hours.sum().sum()
return total_hours[columns]
else:
return pd.DataFrame(at_sea.sum()[["duration"]]).T.rename(
columns={"duration": "Total Crew Hours at Sea"}
)
elif frequency == "annual":
additional_cols = ["year"]
total_hours = total_hours.groupby("year")[["N"]].sum()
elif frequency == "monthly":
additional_cols = ["month"]
total_hours = total_hours.groupby("month")[["N"]].sum()
elif frequency == "month-year":
additional_cols = ["year", "month"]
total_hours = total_hours.groupby(["year", "month"])[["N"]].sum()
columns = additional_cols + columns
group_cols.extend(additional_cols)
at_sea = at_sea[group_cols + ["duration"]].groupby(group_cols).sum()
if by_equipment:
total = []
for v in vessels:
total.append(at_sea.loc[v].rename(columns={"duration": v}))
total_hours = total_hours.join(
pd.concat(total, axis=1), how="outer"
).fillna(0)
total_hours.N = total_hours.sum(axis=1)
total_hours = (
total_hours.reset_index()
.rename(columns={"N": "Total Crew Hours at Sea"})[columns]
.set_index(additional_cols)
)
return total_hours
return at_sea.rename(columns={"duration": "Total Crew Hours at Sea"})
[docs]
def number_of_tows(
self, frequency: str, by_tug: bool = False, by_direction: bool = False
) -> float | pd.DataFrame:
"""Calculates the total number of tows that occurred during a simulation at a
project, annual, or monthly level that can be broken out by tugboat.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by_tug : bool, optional
Indicates whether the values are with resepect to each tugboat (True) or not
(False), by default False.
by_direction : bool, optional
Indicates whether the values are with respect to the direction a turbine is
towed (True) or not (False), by default False.
Returns
-------
float | pd.DataFrame
Returns either a float for whole project-level costs or a pandas
``DataFrame`` with columns:
- year (if appropriate for frequency)
- month (if appropriate for frequency)
- total_tows
- total_tows_to_port (if broken out)
- total_tows_to_site (if broken out)
- {ServiceEquipment.settings.name}_total_tows (if broken out)
- {ServiceEquipment.settings.name}_to_port (if broken out)
- {ServiceEquipment.settings.name}_to_site (if broken out)
Raises
------
ValueError
If ``frequency`` is not one of "project", "annual", "monthly", or
"month-year".
ValueError
If ``by_tug`` is not one of ``True`` or ``False``.
ValueError
If ``by_direction`` is not one of ``True`` or ``False``.
"""
frequency = _check_frequency(frequency, which="all")
if not isinstance(by_tug, bool):
raise ValueError("``by_tug`` must be one of ``True`` or ``False``")
if not isinstance(by_direction, bool):
raise ValueError("``by_direction`` must be one of ``True`` or ``False``")
# Filter out only the towing events
towing = self.events.loc[self.events.action == "towing"].copy()
if towing.shape[0] == 0:
# If this is accessed in an in-situ only scenario, or no tows were activated
# then return back 0
return pd.DataFrame([[0]], columns=["total_tows"])
towing.loc[:, "direction"] = "to_site"
ix_to_port = towing.reason == "towing turbine to port"
towing.loc[ix_to_port, "direction"] = "to_port"
# Get the unique directions and tugboat names
direction_suffix = ("to_port", "to_site")
tugboats = towing.agent.unique().tolist()
# Create the final column names
columns = ["total_tows"]
if by_direction:
columns.extend([f"{c}_{s}" for c in columns for s in direction_suffix])
if by_tug:
tug_columns = [f"{t}_total_tows" for t in tugboats]
if by_direction:
_columns = [f"{t}_{s}" for t in tugboats for s in direction_suffix]
tug_columns.extend(_columns)
tug_columns.sort()
columns.extend(tug_columns)
# Count the total number of tows by each possibly category
n_tows = towing.groupby(["agent", "year", "month", "direction"]).count()
n_tows = n_tows.rename(columns={"env_time": "N"})["N"].reset_index()
# Create a shell for the total tows
total_tows = (
self.events[["env_time", "year", "month"]]
.groupby(["year", "month"])
.count()["env_time"]
)
total_tows = total_tows.reset_index().rename(columns={"env_time": "N"})
total_tows.N = 0
# Create the correct time frequency for the number of tows and shell total
group_cols = ["agent", "direction"]
if frequency == "project":
time_cols = []
n_tows = n_tows[group_cols + ["N"]].groupby(group_cols).sum()
# If no further work is required, then return the sum as a 1x1 data frame
if not by_tug and not by_direction:
return pd.DataFrame(
[n_tows.reset_index().N.sum()], columns=["total_tows"]
)
total_tows = pd.DataFrame([[0]], columns=["N"])
elif frequency == "annual":
time_cols = ["year"]
columns = time_cols + columns
group_cols.extend(time_cols)
n_tows = n_tows.groupby(group_cols).sum()[["N"]]
total_tows = (
total_tows[["year", "N"]].groupby(time_cols).sum().reset_index()
)
elif frequency == "monthly":
time_cols = ["month"]
columns = time_cols + columns
group_cols.extend(time_cols)
n_tows = n_tows[group_cols + ["N"]].groupby(group_cols).sum()
total_tows = (
total_tows[["month", "N"]].groupby(time_cols).sum().reset_index()
)
elif frequency == "month-year":
# Already have month-year by default, so skip the n_tows refinement
time_cols = ["year", "month"]
columns = time_cols + columns
group_cols.extend(time_cols)
n_tows = n_tows.set_index(group_cols, drop=True)
# Create a list of the columns needed for creating the broken down totals
if frequency == "project":
total_cols = ["N"]
else:
total_cols = total_tows.drop(columns=["N"]).columns.tolist()
# Sum the number of tows by tugboat, if needed
if by_tug:
tug_sums = []
for tug in tugboats:
tug_sum = n_tows.loc[tug]
tug_sums.append(tug_sum.rename(columns={"N": tug}))
tug_sums_by_direction = pd.concat(tug_sums, axis=1).fillna(0)
if frequency == "project":
tug_sums = pd.DataFrame(tug_sums_by_direction.sum()).T
else:
tug_sums = tug_sums_by_direction.reset_index().groupby(total_cols).sum()
if TYPE_CHECKING:
assert isinstance(tug_sums, pd.DataFrame) # mypy checking
tug_sums = tug_sums.rename(
columns={t: f"{t}_total_tows" for t in tug_sums.columns}
)
if TYPE_CHECKING:
assert isinstance(tug_sums, pd.DataFrame) # mypy checking
total = pd.DataFrame(
tug_sums.sum(axis=1), columns=["total_tows"]
).reset_index()
else:
if not by_direction:
# Sum the totals, then merge the results with the shell data frame,
# and cleanup the columns
total = n_tows.reset_index().groupby(total_cols).sum().reset_index()
total_tows = total_tows.merge(total, on=total_cols, how="outer")
total_tows = total_tows.fillna(0).rename(columns={"N_y": "total_tows"})
total_tows = total_tows[columns]
if time_cols:
return total_tows.set_index(time_cols)
return total_tows
else:
total = (
n_tows.groupby(total_cols)
.sum()
.reset_index()
.rename(columns={"N": "total_tows"})
)
# Create the full total tows data
if frequency == "project":
if "index" in total.columns:
total_tows = total.drop(columns=["index"])
else:
total_tows = (
total_tows.merge(total, how="outer").drop(columns=["N"]).fillna(0)
)
total_tows = total_tows.set_index(total_cols)
# Get the sums by each direction towed, if needed
if by_direction:
if frequency == "project":
direction_sums = n_tows.reset_index().groupby("direction").sum()
for s in direction_suffix:
total_tows.loc[:, f"total_tows_{s}"] = direction_sums.loc[s, "N"]
else:
direction_sums = (
n_tows.reset_index().groupby(["direction"] + total_cols).sum()
)
for s in direction_suffix:
total_tows = total_tows.join(
direction_sums.loc[s].rename(columns={"N": f"total_tows_{s}"})
).fillna(0)
# Add in the tugboat breakdown as needed
if by_tug:
total_tows = total_tows.join(tug_sums, how="outer").fillna(0)
for s in direction_suffix:
if frequency == "project":
_total = pd.DataFrame(
tug_sums_by_direction.loc[s]
).T.reset_index(drop=True)
else:
_total = tug_sums_by_direction.loc[s]
total_tows = total_tows.join(
_total.rename(columns={t: f"{t}_{s}" for t in tugboats}),
how="outer",
).fillna(0)
total_tows = total_tows.reset_index()[columns]
if time_cols:
return total_tows.set_index(time_cols)
else:
return total_tows
total_tows = total_tows.assign(N=total_tows.sum(axis=1))
total_tows = total_tows.rename(columns={"N": "total_tows"}).reset_index()[
columns
]
if time_cols:
return total_tows.set_index(time_cols)
return total_tows
if by_tug:
total_tows = (
total_tows.join(tug_sums, how="outer").fillna(0).reset_index()[columns]
)
if time_cols:
return total_tows.set_index(time_cols)
return total_tows
if time_cols:
return total_tows[columns].set_index(time_cols)
return total_tows[columns]
[docs]
def labor_costs(
self, frequency: str, by_type: bool = False
) -> float | pd.DataFrame:
"""Calculates the labor costs for the simulation at a project, annual, or
monthly level that can be broken out by hourly and salary labor costs.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by_type : bool, optional
Indicates whether the values are with resepect to the labor types
(True) or not (False), by default False.
Returns
-------
float | pd.DataFrame
Returns either a float for whole project-level costs or a pandas
``DataFrame`` with columns:
- year (if appropriate for frequency)
- month (if appropriate for frequency)
- total_labor_cost
- hourly_labor_cost (if broken out)
- salary_labor_cost (if broken out)
Raises
------
ValueError
If ``frequency`` is not one of "project", "annual", "monthly", or
"month-year".
ValueError
If ``by_type`` is not one of ``True`` or ``False``.
"""
frequency = _check_frequency(frequency, which="all")
if not isinstance(by_type, bool):
raise ValueError("``by_type`` must be one of ``True`` or ``False``")
labor_cols = [self._hourly_cost, self._salary_cost, self._labor_cost]
if frequency == "project":
costs = pd.DataFrame(
self.events[labor_cols].sum(axis=0).values.reshape(1, -1),
columns=labor_cols,
)
if not by_type:
return costs[[self._labor_cost]]
return costs
if frequency == "annual":
group_filter = ["year"]
elif frequency == "monthly":
group_filter = ["month"]
elif frequency == "month-year":
group_filter = ["year", "month"]
costs = (
self.events.loc[:, labor_cols + group_filter]
.groupby(group_filter)
.sum()
.fillna(value=0)
)
if not by_type:
return pd.DataFrame(costs[self._labor_cost])
return costs
[docs]
def equipment_labor_cost_breakdowns(
self,
frequency: str,
by_category: bool = False,
by_equipment: bool = False,
) -> pd.DataFrame:
"""Calculates the producitivty cost and time breakdowns for the simulation at a
project, annual, or monthly level that can be broken out to include the
equipment and labor components, as well as be broken down by servicing
equipment.
.. note:: Doesn't produce a value if there's no cost associated with a "reason".
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by_category : bool, optional
Indicates whether to include the equipment and labor categories (True) or
not (False), by default False.
by_equipment : bool, optional
Indicates whether the values are with resepect to the equipment utilized
(True) or not (False), by default False.
Returns
-------
pd.DataFrame
Returns pandas ``DataFrame`` with columns:
- year (if appropriate for frequency)
- month (if appropriate for frequency)
- reason
- hourly_labor_cost (if by_category == ``True``)
- salary_labor_cost (if by_category == ``True``)
- total_labor_cost (if by_category == ``True``)
- equipment_cost (if by_category == ``True``)
- total_cost (if broken out)
- total_hours
Raises
------
ValueError
If ``frequency`` is not one of "project", "annual", "monthly", or
"month-year".
ValueError
If ``by_category`` is not one of ``True`` or ``False``.
"""
frequency = _check_frequency(frequency, which="all")
if not isinstance(by_category, bool):
raise ValueError("``by_category`` must be one of ``True`` or ``False``")
if not isinstance(by_equipment, bool):
raise ValueError("``by_equipment`` must be one of ``True`` or ``False``")
group_filter = ["action", "reason", "additional"]
if by_equipment:
group_filter.insert(0, "agent")
if frequency in ("annual", "month-year"):
group_filter.insert(0, "year")
elif frequency == "monthly":
group_filter.insert(0, "month")
if frequency == "month-year":
group_filter.insert(1, "month")
action_list = [
"delay",
"repair",
"maintenance",
"mobilization",
"transferring crew",
"traveling",
"towing",
]
equipment = self.events[self.events[self._equipment_cost] > 0].agent.unique()
costs = (
self.events.loc[
self.events.agent.isin(equipment)
& self.events.action.isin(action_list)
& ~self.events.additional.isin(["work is complete"]),
group_filter + self._cost_columns + ["duration"],
]
.groupby(group_filter)
.sum()
.reset_index()
.rename(columns={"duration": "total_hours"})
)
costs["display_reason"] = [""] * costs.shape[0]
non_shift_hours = (
"not in working hours",
"work shift has ended; waiting for next shift to start",
"no more return visits will be made",
"will return next year",
"waiting for next operational period",
"end of shift; will resume work in the next shift",
)
weather_hours = (
"weather delay",
"weather unsuitable to transfer crew",
"insufficient time to complete travel before end of the shift",
"weather unsuitable for mooring reconnection",
"weather unsuitable for unmooring",
)
costs.loc[
(costs.action == "delay") & (costs.additional.isin(non_shift_hours)),
"display_reason",
] = "Not in Shift"
costs.loc[costs.action == "repair", "display_reason"] = "Repair"
costs.loc[costs.action == "maintenance", "display_reason"] = "Maintenance"
costs.loc[costs.action == "transferring crew", "display_reason"] = (
"Crew Transfer"
)
costs.loc[costs.action == "traveling", "display_reason"] = "Site Travel"
costs.loc[costs.action == "towing", "display_reason"] = "Towing"
costs.loc[costs.action == "mobilization", "display_reason"] = "Mobilization"
costs.loc[costs.additional.isin(weather_hours), "display_reason"] = (
"Weather Delay"
)
costs.loc[costs.reason == "no requests", "display_reason"] = "No Requests"
costs.reason = costs.display_reason
drop_columns = [self._materials_cost, "display_reason", "additional", "action"]
if not by_category:
drop_columns.extend(
[
self._hourly_cost,
self._salary_cost,
self._labor_cost,
self._equipment_cost,
]
)
group_filter.pop(group_filter.index("additional"))
group_filter.pop(group_filter.index("action"))
costs = costs.drop(columns=drop_columns)
costs = costs.groupby(group_filter).sum().reset_index()
comparison_values: product[tuple[Any, Any]] | product[tuple[Any, Any, Any]]
month_year = frequency == "month-year"
if frequency in ("annual", "month-year"):
years = costs.year.unique()
reasons = costs.reason.unique()
comparison_values = product(years, reasons)
if month_year:
months = costs.month.unique()
comparison_values = product(years, months, reasons)
zeros = np.zeros(costs.shape[1] - 2).tolist()
for _year, *_month, _reason in comparison_values:
row_filter = costs.year.values == _year
row = [_year, _reason] + zeros
if month_year:
_month = _month[0]
row_filter &= costs.month.values == _month
row = [_year, _month, _reason] + zeros[:-1]
row_filter &= costs.reason.values == _reason
if costs.loc[row_filter].size > 0:
continue
costs.loc[costs.shape[0]] = row
elif frequency == "monthly":
months = costs.month.unique()
reasons = costs.reason.unique()
comparison_values = product(months, reasons)
zeros = np.zeros(costs.shape[1] - 2).tolist()
for _month, _reason in comparison_values:
row_filter = costs.month.values == _month
row_filter &= costs.reason.values == _reason
row = [_month, _reason] + zeros
if costs.loc[row_filter].size > 0:
continue
costs.loc[costs.shape[0]] = row
new_sort = [
"Maintenance",
"Repair",
"Crew Transfer",
"Site Travel",
"Towing",
"Mobilization",
"Weather Delay",
"No Requests",
"Not in Shift",
]
costs.reason = pd.Categorical(costs.reason, new_sort)
costs = costs.set_index(group_filter)
sort_order = ["reason"]
if by_equipment:
costs = costs.loc[costs.index.get_level_values("agent").isin(equipment)]
costs.index = costs.index.set_names({"agent": "equipment_name"})
sort_order = ["equipment_name", "reason"]
if frequency == "project":
return costs.sort_values(by=sort_order)
if frequency == "annual":
sort_order = ["year"] + sort_order
return costs.sort_values(by=sort_order)
if frequency == "monthly":
sort_order = ["month"] + sort_order
return costs.sort_values(by=sort_order)
sort_order = ["year", "month"] + sort_order
return costs.sort_values(by=sort_order)
[docs]
def emissions(
self,
emissions_factors: dict,
maneuvering_factor: float = 0.1,
port_engine_on_factor: float = 0.25,
) -> pd.DataFrame:
"""Calculates the emissions, typically in tons, per hour of operations for
transiting, maneuvering (calculated as a % of transiting), idling at the site
(repairs, crew transfer, weather delays), and idling at port (weather delays),
excluding waiting overnight between shifts.
Parameters
----------
emissions_factors : dict
Dictionary of emissions per hour for "transit", "maneuver", "idle at site",
and "idle at port" for each of the servicing equipment in the simulation.
maneuvering_factor : float, optional
The proportion of transit time that can be attributed to
maneuvering/positioning, by default 0.1.
port_engine_on_factor : float, optional
The proportion of idling at port time that can be attributed to having the
engine on and producing emissions, by default 0.25.
Returns
-------
pd.DataFrame
DataFrame of "duration" (hours), "distance_km", and "emissions" (tons) for
each servicing equipment in the simulation for each emissions category.
Raises
------
KeyError
Raised if any of the servicing equipment are missing from the
``emissions_factors`` dictionary.
KeyError
Raised if any of the emissions categories are missing from each servcing
equipment definition in ``emissions_factors``.
"""
if missing := set(self.service_equipment_names).difference(
[*emissions_factors]
):
raise KeyError(
f"`emissions_factors` is missing the following keys: {missing}"
)
valid_categories = ("transit", "maneuvering", "idle at port", "idle at site")
emissions_categories = list(
chain(*[[*val] for val in emissions_factors.values()])
)
emissions_input = Counter(emissions_categories)
if (
len(set(valid_categories).difference(emissions_input.keys())) > 0
or len(set(emissions_input.values())) > 1
):
raise KeyError(
"Each servicing equipment's emissions factors must have inputs for:"
f"{valid_categories}"
)
# Create the agent/duration subset
equipment_usage = (
self.events.loc[
self.events.agent.isin(self.service_equipment_names),
["agent", "action", "reason", "location", "duration", "distance_km"],
]
.groupby(["agent", "action", "reason", "location"])
.sum()
.reset_index(drop=False)
)
equipment_usage = equipment_usage.loc[
~(
(equipment_usage.action == "delay")
& equipment_usage.reason.isin(("no requests", "work is complete"))
)
]
# Map each of the locations to new categories and filter out unnecessary ones
conditions = [
equipment_usage.location.eq("site").astype(bool),
equipment_usage.location.eq("system").astype(bool),
equipment_usage.location.eq("port").astype(bool),
equipment_usage.location.eq("enroute").astype(bool),
]
values = ["idle at site", "idle at site", "idle at port", "transit"]
equipment_usage = (
equipment_usage.assign(
category=np.select(conditions, values, default="invalid")
)
.drop(["action", "reason", "location"], axis=1)
.groupby(["agent", "category"])
.sum()
.drop("invalid", level="category")
)
# Create a new emissions factor DataFrame and mapping
categories = list(set().union(emissions_categories))
emissions_summary = pd.DataFrame(
[],
index=pd.MultiIndex.from_product(
[[*emissions_factors], categories], names=["agent", "category"]
),
)
factors = [
[(eq, cat), ef]
for eq, d in emissions_factors.items()
for cat, ef in d.items()
]
emissions_summary.loc[[ix for (ix, _) in factors], "emissions_factors"] = [
ef for (_, ef) in factors
]
# Combine the emissions factors and the calculate the total distribution
equipment_usage = equipment_usage.join(emissions_summary, how="outer").fillna(0)
# Adjust the transiting time to account for maneuvering
transiting = equipment_usage.index.get_level_values("category") == "transit"
manuevering = (
equipment_usage.index.get_level_values("category") == "maneuvering"
)
equipment_usage.loc[manuevering, "duration"] = (
equipment_usage.loc[transiting, "duration"].values * maneuvering_factor
)
equipment_usage.loc[transiting, "duration"] = equipment_usage.loc[
transiting, "duration"
] * (1 - maneuvering_factor)
# Adjust the idling at port time to only account for when the engine is on
port = equipment_usage.index.get_level_values("category") == "idle at port"
equipment_usage.loc[port, "duration"] = (
equipment_usage.loc[transiting, "duration"].values * port_engine_on_factor
)
equipment_usage = (
equipment_usage.fillna(0)
.assign(
emissions=equipment_usage.duration * equipment_usage.emissions_factors
)
.drop(columns=["emissions_factors"])
.fillna(0, axis=1)
)
return equipment_usage
[docs]
def component_costs(
self, frequency: str, by_category: bool = False, by_action: bool = False
) -> pd.DataFrame:
"""Calculates the component costs for the simulation at a project, annual, or
monthly level that can be broken out by cost categories. This will not sum to
the total cost because it is does not include times where there is no work being
done, but costs are being accrued.
.. note:: It should be noted that the costs will include costs accrued from both
weather delays and shift-to-shift delays. In the future these will be
disentangled.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by_category : bool, optional
Indicates whether the values are with resepect to the various cost
categories (True) or not (False), by default False.
by_action : bool, optional
Indicates whether component costs are going to be further broken out by the
action being performed--repair, maintenance, and delay--(True) or not
(False), by default False.
Returns
-------
float | pd.DataFrame
Returns either a float for whole project-level costs or a pandas
``DataFrame`` with columns:
- year (if appropriate for frequency)
- month (if appropriate for frequency)
- component
- action (if broken out)
- materials_cost (if broken out)
- total_labor_cost (if broken out)
- equipment_cost (if broken out)
- total_cost
Raises
------
ValueError
If ``frequency`` is not one of "project", "annual", "monthly", or
"month-year".
ValueError
If ``by_category`` is not one of ``True`` or ``False``.
ValueError
If ``by_action`` is not one of ``True`` or ``False``.
"""
frequency = _check_frequency(frequency, which="all")
if not isinstance(by_category, bool):
raise ValueError("``by_equipment`` must be one of ``True`` or ``False``")
if not isinstance(by_action, bool):
raise ValueError("``by_equipment`` must be one of ``True`` or ``False``")
part_filter = ~self.events.part_id.isna() & ~self.events.part_id.isin([""])
events = self.events.loc[part_filter].copy()
# Need to simplify the cable identifiers to exclude the connection information
events.loc[:, "component"] = [el.split("::")[0] for el in events.part_id.values]
group_filter = []
if frequency == "annual":
group_filter.extend(["year"])
elif frequency == "monthly":
group_filter.extend(["month"])
elif frequency == "month-year":
group_filter.extend(["year", "month"])
group_filter.append("component")
cost_cols = ["total_cost"]
if by_category:
cost_cols[0:0] = [
self._materials_cost,
self._labor_cost,
self._equipment_cost,
]
if by_action:
repair_map = {
val: "repair" for val in ("repair request", "repair", "repair_complete")
}
maintenance_map = {
val: "maintenance"
for val in (
"maintenance request",
"maintenance",
"maintenance_complete",
)
}
delay_map = {"delay": "delay"}
action_map = {**repair_map, **maintenance_map, **delay_map}
events.action = events.action.map(action_map)
group_filter.append("action")
month_year = frequency == "month-year"
zeros = np.zeros(len(cost_cols)).tolist()
costs = (
events[group_filter + cost_cols].groupby(group_filter).sum().reset_index()
)
if not by_action:
costs.loc[:, "action"] = np.zeros(costs.shape[0])
cols = costs.columns.to_list()
_ix = cols.index("component") + 1
cols[_ix:_ix] = ["action"]
cols.pop(-1)
costs = costs.loc[:, cols]
comparison_values: (
product[tuple[Any, Any]]
| product[tuple[Any, Any, Any]]
| product[tuple[Any, Any, Any, Any]]
)
if frequency in ("annual", "month-year"):
years = costs.year.unique()
components = costs.component.unique()
actions = costs.action.unique()
comparison_values = product(years, components, actions)
if month_year:
months = costs.month.unique()
comparison_values = product(years, months, components, actions)
for _year, *_month, _component, _action in comparison_values:
row_filter = costs.year.values == _year
row_filter &= costs.component.values == _component
row_filter &= costs.action.values == _action
row = [_year, _component, _action] + zeros
if month_year:
_month = _month[0]
row_filter &= costs.month.values == _month
row = [_year, _month, _component, _action] + zeros
if costs.loc[row_filter].size > 0:
continue
costs.loc[costs.shape[0]] = row
elif frequency == "monthly":
months = costs.month.unique()
components = costs.component.unique()
actions = costs.action.unique()
comparison_values = product(months, actions, components)
for _month, _action, _component in comparison_values:
row_filter = costs.month.values == _month
row_filter &= costs.component.values == _component
row_filter &= costs.action.values == _action
row = [_month, _component, _action] + zeros
if costs.loc[row_filter].size > 0:
continue
costs.loc[costs.shape[0]] = row
elif frequency == "project":
components = costs.component.unique()
actions = costs.action.unique()
comparison_values = product(actions, components)
for _action, _component in comparison_values:
row_filter = costs.component.values == _component
row_filter &= costs.action.values == _action
row = [_component, _action] + zeros
if costs.loc[row_filter].size > 0:
continue
costs.loc[costs.shape[0]] = row
sort_cols = group_filter + cost_cols
if group_filter != []:
costs = costs.sort_values(group_filter)
if sort_cols != []:
costs = costs.loc[:, sort_cols]
costs = costs.reset_index(drop=True)
return costs if group_filter == [] else costs.set_index(group_filter)
[docs]
def port_fees(self, frequency: str) -> pd.DataFrame:
"""Calculates the port fees for the simulation at a project, annual, or monthly
level. This excludes any equipment or labor costs, which are included in:
``equipment_costs``.
Parameters
----------
frequency : str
One of "project" or "annual", "monthly", ".
Returns
-------
pd.DataFrame
The broken out by time port fees with
Raises
------
ValueError
If ``frequency`` not one of "project" or "annual".
"""
frequency = _check_frequency(frequency, which="all")
column = "port_fees"
port_fee = self.events.loc[
self.events.action == "monthly lease fee",
["year", "month", "equipment_cost"],
].rename(columns={"equipment_cost": column})
if port_fee.shape[0] == 0:
return pd.DataFrame([[0]], columns=[column])
if frequency == "project":
return pd.DataFrame([port_fee.sum(axis=0).loc[column]], columns=[column])
elif frequency == "annual":
return port_fee[["year"] + [column]].groupby(["year"]).sum()
elif frequency == "monthly":
return port_fee[["month"] + [column]].groupby(["month"]).sum()
elif frequency == "month-year":
return (
port_fee[["year", "month"] + [column]].groupby(["year", "month"]).sum()
)
[docs]
def project_fixed_costs(self, frequency: str, resolution: str) -> pd.DataFrame:
"""Calculates the fixed costs of a project at the project and annual frequencies
at a given cost breakdown resolution.
Parameters
----------
frequency : str
One of "project" or "annual", "monthly", ".
resolution : st
One of "low", "medium", or "high", where the values correspond to:
- low: ``FixedCosts.resolution["low"]``, corresponding to itemized costs.
- medium: ``FixedCosts.resolution["medium"]``, corresponding to the
overarching cost categories.
- high: ``FixedCosts.resolution["high"]``, corresponding to a lump sum.
These values can also be seen through the ``FixedCosts.hierarchy``
Returns
-------
pd.DataFrame
The project's fixed costs as a sum or annualized with high, medium, and low
resolution as desired.
Raises
------
ValueError
If ``frequency`` not one of "project" or "annual".
ValueError
If ``resolution`` must be one of "low", "medium", or "high".
"""
frequency = _check_frequency(frequency, which="all")
resolution = resolution.lower().strip()
if resolution not in ("low", "medium", "high"):
raise ValueError(
'``resolution`` must be one of "low", "medium", or "high".'
)
# Get the appropriate values and convert to the currency base
keys = self.fixed_costs.resolution[resolution]
vals = (
np.array([[getattr(self.fixed_costs, key) for key in keys]])
* self.project_capacity
* 1000
)
total = (
self.operations[["year", "month", "env_time"]]
.groupby(["year", "month"])
.count()
)
total = total.rename(columns={"env_time": "N"})
total.N = 1.0
operation_hours = (
self.operations[["year", "month", "env_time"]]
.groupby(["year", "month"])
.count()
)
operation_hours = operation_hours.rename(columns={"env_time": "N"})
costs = pd.DataFrame(total.values * vals, index=total.index, columns=keys)
costs *= operation_hours.values.reshape(-1, 1) / 8760.0
adjusted_inflation = np.array(
[self.inflation_rate ** (i // 12) for i in range(costs.shape[0])]
)
costs *= adjusted_inflation.reshape(-1, 1)
if frequency == "project":
costs = pd.DataFrame(costs.reset_index(drop=True).sum()).T
elif frequency == "annual":
costs = costs.reset_index().groupby("year").sum().drop(columns=["month"])
elif frequency == "monthly":
costs = costs.reset_index().groupby("month").sum().drop(columns=["year"])
return costs
[docs]
def opex(self, frequency: str, by_category: bool = False) -> pd.DataFrame:
"""Calculates the project's OpEx for the simulation at a project, annual, or
monthly level.
Parameters
----------
frequency : str
One of project, annual, monthly, or month-year.
by_category : bool, optional
Indicates whether the values are with resepect to the various cost
categories (True) or not (False), by default False.
Returns
-------
pd.DataFrame
The project's OpEx broken out at the desired time and category resolution.
"""
frequency = _check_frequency(frequency, which="all")
# Get the materials costs and remove the component-level breakdown
materials = self.component_costs(frequency=frequency, by_category=True)
materials = materials.loc[:, ["materials_cost"]].reset_index()
if frequency == "project":
materials = pd.DataFrame(materials.loc[:, ["materials_cost"]].sum()).T
else:
if frequency == "annual":
group_col = ["year"]
elif frequency == "monthly":
group_col = ["month"]
elif frequency == "month-year":
group_col = ["year", "month"]
materials = (
materials[group_col + ["materials_cost"]].groupby(group_col).sum()
)
# Port fees will produce an 1x1 dataframe if values aren't present, so recreate
# it with the appropriate dimension
port_fees = self.port_fees(frequency=frequency)
if frequency != "project" and port_fees.shape == (1, 1):
port_fees = pd.DataFrame([], columns=["port_fees"], index=materials.index)
port_fees = port_fees.fillna(0)
# Create a list of data frames for the OpEx components
opex_items = [
self.project_fixed_costs(frequency=frequency, resolution="low"),
port_fees,
self.equipment_costs(frequency=frequency),
self.labor_costs(frequency=frequency),
materials,
]
# Join the data frames and sum along the time axis and return
column = "OpEx"
opex = pd.concat(opex_items, axis=1)
opex.loc[:, column] = opex.sum(axis=1)
if by_category:
return opex
return opex[[column]]
[docs]
def process_times(self) -> pd.DataFrame:
"""Calculates the time, in hours, to complete a repair/maintenance request, on
both a request to completion basis, and the actual time to complete the repair.
Returns
-------
pd.DataFrame
- category (index): repair/maintenance category
- time_to_completion: total number of hours from the time of request to the
time of completion
- process_time: total number of hours it took for the equipment to complete
- the request.
- downtime: total number of hours where the operations were below 100%.
- N: total number of processes in the category.
"""
events_valid = self.events.loc[self.events.request_id != "na"]
# Summarize all the requests data
request_df = (
events_valid[["request_id", "env_time", "duration"]]
.groupby("request_id")
.sum()
.sort_index()
)
request_df_min = (
events_valid[["request_id", "env_time", "duration"]]
.groupby("request_id")
.min()
.sort_index()
)
request_df_max = (
events_valid[["request_id", "env_time", "duration"]]
.groupby("request_id")
.max()
.sort_index()
)
# Summarize all the downtime-specific data for all requests
downtime_df = events_valid.loc[events_valid.system_operating_level < 1][
["request_id", "env_time", "duration"]
]
downtime_df_min = (
downtime_df[["request_id", "env_time", "duration"]]
.groupby("request_id")
.min()
.sort_index()
)
downtime_df_max = (
downtime_df[["request_id", "env_time", "duration"]]
.groupby("request_id")
.max()
.sort_index()
)
reason_df = (
events_valid.drop_duplicates(subset=["request_id"])[
["request_id", "reason"]
]
.set_index("request_id")
.sort_index()
)
# Summarize the time to first repair/maintenance activity
submitted_df = (
events_valid.loc[
events_valid.action.isin(("repair request", "maintenance request")),
["request_id", "env_time"],
]
.set_index("request_id")
.sort_index()
)
action_df = (
events_valid.loc[
events_valid.action.isin(("repair", "maintenance")),
["request_id", "env_time"],
]
.groupby("request_id")
.min()
.sort_index()
)
time_to_repair_df = action_df.subtract(submitted_df, axis="index")
# Create the timing dataframe
timing = pd.DataFrame([], index=request_df_min.index)
timing = timing.join(reason_df[["reason"]]).rename(
columns={"reason": "category"}
)
timing = timing.join(
request_df_min[["env_time"]]
.join(request_df_max[["env_time"]], lsuffix="_min", rsuffix="_max")
.diff(axis=1)[["env_time_max"]]
.rename(columns={"env_time_max": "time_to_completion"})
)
timing = timing.join(request_df[["duration"]]).rename(
columns={"duration": "process_time"}
)
timing = timing.join(
downtime_df_min[["env_time"]]
.join(downtime_df_max[["env_time"]], lsuffix="_min", rsuffix="_max")
.diff(axis=1)[["env_time_max"]]
.rename(columns={"env_time_max": "downtime"})
)
timing = timing.join(
time_to_repair_df.rename(columns={"env_time": "time_to_start"})
)
timing["N"] = 1
# Return only the categorically summed data
return timing.groupby("category").sum().sort_index()
[docs]
def power_production(
self, frequency: str, by: str = "windfarm", units: str = "gwh"
) -> float | pd.DataFrame:
"""Calculates the power production for the simulation at a project, annual, or
monthly level that can be broken out by turbine.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
by : str
One of "windfarm" or "turbine".
units : str
One of "gwh", "mwh", or "kwh".
Returns
-------
float | pd.DataFrame
Returns either a float for whole project-level costs or a pandas
``DataFrame`` with columns:
- year (if appropriate for frequency)
- month (if appropriate for frequency)
- total_power_production
- <turbine_id>_power_production (if broken out)
Raises
------
ValueError
If ``frequency`` is not one of "project", "annual", "monthly", or
"month-year".
ValueError
If ``by_turbine`` is not one of ``True`` or ``False``.
"""
frequency = _check_frequency(frequency, which="all")
by = by.lower().strip()
if by not in ("windfarm", "turbine"):
raise ValueError('``by`` must be one of "windfarm" or "turbine".')
by_turbine = by == "turbine"
if units not in ("gwh", "mwh", "kwh"):
raise ValueError('``units`` must be one of "gwh", "mwh", or "kwh".')
if units == "gwh":
divisor = 1e6
label = "Project Energy Production (GWh)"
elif units == "mwh":
divisor = 1e3
label = "Project Energy Production (MWh)"
else:
divisor = 1
label = "Project Energy Production (kWh)"
if frequency == "annual":
group_cols = ["year"]
elif frequency == "monthly":
group_cols = ["month"]
elif frequency == "month-year":
group_cols = ["year", "month"]
col_filter = ["windfarm"]
if by_turbine:
col_filter.extend(self.turbine_id)
if frequency == "project":
production = self.production[col_filter].sum(axis=0)
production = (
pd.DataFrame(
production.values.reshape(1, -1),
columns=col_filter,
index=[label],
)
/ divisor
)
return production
return (
self.production[group_cols + col_filter].groupby(by=group_cols).sum()
/ divisor
)
# Windfarm Financials
[docs]
def npv(
self, frequency: str, discount_rate: float = 0.025, offtake_price: float = 80
) -> pd.DataFrame:
"""Calculates the net present value of the windfarm at a project, annual, or
monthly resolution given a base discount rate and offtake price.
.. note:: This function will be improved over time to incorporate more of the
financial parameter at play, such as PPAs.
Parameters
----------
frequency : str
One of "project", "annual", "monthly", or "month-year".
discount_rate : float, optional
The rate of return that could be earned on alternative investments, by
default 0.025.
offtake_price : float, optional
Price of energy, per MWh, by default 80.
Returns
-------
pd.DataFrame
The project net prsent value at the desired time resolution.
"""
frequency = _check_frequency(frequency, which="all")
# Gather the OpEx, and revenues
expenditures = self.opex("month-year")
production = self.power_production("month-year")
revenue: pd.DataFrame = production / 1000 * offtake_price # MWh
# Instantiate the NPV with the required calculated data and compute the result
npv = revenue.join(expenditures).rename(columns={"windfarm": "revenue"})
N = npv.shape[0]
npv.loc[:, "discount"] = np.full(N, 1 + discount_rate) ** np.arange(N)
npv.loc[:, "NPV"] = (npv.revenue.values - npv.OpEx.values) / npv.discount.values
# Aggregate the results to the required resolution
if frequency == "project":
return pd.DataFrame(npv.reset_index().sum()).T[["NPV"]]
elif frequency == "annual":
return npv.reset_index().groupby("year").sum()[["NPV"]]
elif frequency == "monthly":
return npv.reset_index().groupby("month").sum()[["NPV"]]
return npv[["NPV"]]