"""Provides various utility functions that don't fit within a common theme."""
from __future__ import annotations
import math
from string import digits, punctuation
from typing import TYPE_CHECKING, Callable
from functools import cache
import numpy as np
import pandas as pd
# Don't repeat the most common inputs that occur when there are no state changes, but
# the result is needed again for logging
[docs]
@cache
def _mean(*args) -> float:
"""Multiplies two numbers. Used for a reduce operation.
Parameters
----------
args : int | float
The values to compute the mean over
Returns
-------
float
The average of the values provided
"""
return np.mean(args)
[docs]
def create_variable_from_string(string: str) -> str:
"""Creates a valid Python variable style string from a passed string.
Parameters
----------
string : str
The string to convert into a Python-friendly variable name.
Returns
-------
str
A Python-valid variable name.
Examples
--------
>>> example_string = "*Electrical!*_ _System$*_"
>>> print(create_variable_from_string(example_string))
'electrical_system'
"""
new_string = (
string.lstrip(punctuation + digits)
.translate(str.maketrans("", "", punctuation.replace("_", " ")))
.lower()
)
return new_string
[docs]
def IEC_power_curve(
windspeed_column: np.ndarray | pd.Series,
power_column: np.ndarray | pd.Series,
bin_width: float = 0.5,
windspeed_start: float = 0.0,
windspeed_end: float = 30.0,
) -> Callable:
"""Direct copyfrom OpenOA:
https://github.com/NREL/OpenOA/blob/main/operational_analysis/toolkits/power_curve/functions.py#L16-L57
Use IEC 61400-12-1-2 method for creating wind-speed binned power curve.
Parameters
----------
windspeed_column : np.ndarray | pandas.Series
The power curve's windspeed values, in m/s.
power_column : np.ndarray | pandas.Series
The power curve's output power values, in kW.
bin_width : float
Width of windspeed bin, default is 0.5 m/s according to standard, by default
0.5.
windspeed_start : float
Left edge of first windspeed bin, where all proceeding values will be 0.0,
by default 0.0.
windspeed_end : float
Right edge of last windspeed bin, where all following values will be 0.0, by
default 30.0.
Returns
-------
Callable
Python function of the power curve, of type (Array[float] -> Array[float]),
that maps input windspeed value(s) to ouptut power value(s).
"""
if not isinstance(windspeed_column, pd.Series):
windspeed_column = pd.Series(windspeed_column)
if not isinstance(power_column, pd.Series):
power_column = pd.Series(power_column)
# Set up evenly spaced bins of fixed width, with np.inf for any value over the max
n_bins = int(np.ceil((windspeed_end - windspeed_start) / bin_width)) + 1
bins = np.append(np.linspace(windspeed_start, windspeed_end, n_bins), [np.inf])
# Initialize an array which will hold the mean values of each bin
P_bin = np.ones(len(bins) - 1) * np.nan
# Compute the mean of each bin and set corresponding P_bin
for ibin in range(0, len(bins) - 1):
indices = (windspeed_column >= bins[ibin]) & (windspeed_column < bins[ibin + 1])
P_bin[ibin] = power_column.loc[indices].mean()
# Linearly interpolate any missing bins
P_bin = pd.Series(data=P_bin).interpolate(method="linear").bfill().values
# Create a closure over the computed bins which computes the power curve value for
# arbitrary array-like input
def pc_iec(x):
P = np.zeros(np.shape(x))
for i in range(0, len(bins) - 1):
idx = np.where((x >= bins[i]) & (x < bins[i + 1]))
P[idx] = P_bin[i]
cutoff_idx = (x < windspeed_start) | (x > windspeed_end)
P[cutoff_idx] = 0.0
return P
return pc_iec
[docs]
def calculate_stack_current(
power: np.ndarray | pd.Series,
p1: int | float,
p2: int | float,
p3: int | float,
p4: int | float,
p5: int | float,
) -> np.ndarray | pd.Series:
"""Convert power produced, in kW to current, in amperes (I) using the efficiency
curve from the
`H2Integrage PEM electrolysis model <https://github.com/NREL/H2Integrate/blob/main/h2integrate/simulation/technologies/hydrogen/electrolysis/PEM_H2_LT_electrolyzer_Clusters.py>`_.
.. math::
i_{stack} = p1 * power^{3} + p2 * power^{2} + p3 * power + p4 * power^{1/2} + p5
Parameters
----------
power : np.ndarray | pd.Series
Power produced, in kW.
p1 : int | float
First coefficient.
p2 : int | float
Second coefficient.
p3 : int | float
Third coefficient
p4 : int | float
Fourth coefficient.
p5 : int | float
Intercept.
Returns
-------
np.ndarray | pd.Series
Converted current from the farm's power production.
"""
return p1 * power**3 + p2 * power**2 + p3 * power + p4 * np.sqrt(power) + p5
[docs]
def calculate_hydrogen_production(
rated_capacity: float,
FE: float = 0.9999999,
n_cells: int = 135,
turndown_ratio: float = 0.1,
*,
efficiency_rate: float | None = None,
p1: int | float | None = None,
p2: int | float | None = None,
p3: int | float | None = None,
p4: int | float | None = None,
p5: int | float | None = None,
) -> Callable:
"""Create the hydrogen production curve for an electrolyzer. One of
:py:attr:`efficiency_rate` or a complete set of polynomial values (:py:attr:`p1`,
:py:attr:`p2`, :py:attr:`p3`, :py:attr:`p4`, and :py:attr:`p5`) for the power
to current conversion must be provided.
kWh are converted to current using the following efficiency formulation when the
polynomial inputs are all provided, and the :py:attr:`efficiency_rate` is nonzero.
.. math::
i_{stack} = p1 * power^{3} + p2 * power^{2} + p3 * power + p4 * power^{1/2} + p5
Parameters
----------
rated_capacity : float
Rated maximum input power of the electrolyzer, in kW.
FE : float, optional
Faradic efficiency, by default 0.9999999.
n_cells : int, optional
Number of cells per 1 MW stack, by default 135.
turndown_ratio : float, optional
Minimum input power as a ratio of the rated capacity, by default 0.1.
efficiency_rate : float, optional
Energy efficiency in kWh per kg H2. Required if ``p1`` through ``p5 are not
provided.
p1 : int | float | None
First coefficient of the efficiency polynomial curve. Required if
:py:attr:`efficiency_rate` is not provided, by default None.
p2 : int | float | None
Second coefficient of the efficiency polynomial curve. Required if
:py:attr:`efficiency_rate` is not provided, by default None.
p3 : int | float | None
Third coefficient of the efficiency polynomial curve. Required if
:py:attr:`efficiency_rate` is not provided, by default None.
p4 : int | float | None
Fourth coefficient of the efficiency polynomial curve. Required if
:py:attr:`efficiency_rate` is not provided, by default None.
p5 : int | float | None
Intercept of the efficiency polynomial curve. Required if
:py:attr:`efficiency_rate` is not provided, by default None.
Returns
-------
Callable
Function to calculate H₂ production (kg/hr) from power input (kW).
Raises
------
ValueError
Raised when :py:attr:`turndown_ratio` is outside the range [0, 1].
ValueError
Raised when :py:attr:`efficiency_rate` is provided and less than or equal to 0.
ValueError
Raised when neither the :py:attr:`efficiency_rate` nor polynomial values (
:py:attr:`p1` through :py:attr:`p5`) are provided.
ValueError
Raised both the :py:attr:`efficiency_rate` and polynomial values (
:py:attr:`p1` through :py:attr:`p5`) are provided.
"""
if turndown_ratio > 1 or turndown_ratio < 0:
msg = f"`turndown_ratio` must be between 0 and 1, not: {turndown_ratio}"
raise ValueError(msg)
use_efficiency = False
if efficiency_rate is not None:
if efficiency_rate > 0:
use_efficiency = True
else:
raise ValueError("`efficiency_rate` must be a positive number.")
poly_names = ("p1", "p2", "p3", "p4", "p5")
poly_vals = (p1, p2, p3, p4, p5)
invalid_polynomials = [n for n, v in zip(poly_names, poly_vals) if v is None]
use_polynomial = len(invalid_polynomials) == 0
if not use_efficiency and not use_polynomial:
msg = (
"One of `efficiency_rate` or all polynomial values (`p1`, `p2`, `p3`,"
f" `p4`, `p5`) must be provided. Received {efficiency_rate=},"
f" {p1=}, {p2=}, {p3=}, {p4=}, {p5=}."
)
raise ValueError(msg)
if use_efficiency and use_polynomial:
msg = (
"Only one of `efficiency_rate` or all polynomial values (`p1`, `p2`, `p3`,"
f" `p4`, `p5`) must be provided. Received {efficiency_rate=},"
f" {p1=}, {p2=}, {p3=}, {p4=}, {p5=}."
)
raise ValueError(msg)
F = 96485.34 # Faraday's Constant (C/mol)
SECONDS = 3600 # seconds per hour
molar_mass_h2 = 2.016 # molecular weight of H2 (grams/mol)
min_power = rated_capacity * turndown_ratio
def h2(power):
power = np.where(power > rated_capacity, rated_capacity, power)
h2_rate_kg_hr = np.zeros_like(power)
if use_efficiency:
h2_rate_kg_hr = power / efficiency_rate
else:
if TYPE_CHECKING:
assert isinstance(p1, int | float)
assert isinstance(p2, int | float)
assert isinstance(p3, int | float)
assert isinstance(p4, int | float)
assert isinstance(p5, int | float)
i = calculate_stack_current(power, p1, p2, p3, p4, p5)
h2_rate_g_s = i * n_cells * FE * molar_mass_h2 / (2 * F)
h2_rate_kg_hr = h2_rate_g_s * SECONDS / 1e3
h2_rate_kg_hr[h2_rate_kg_hr < 0] = 0
h2_rate_kg_hr[power < min_power] = 0
return h2_rate_kg_hr
return h2
[docs]
def calculate_windfarm_operational_level(
operations: pd.DataFrame,
turbine_id: np.ndarray | list[str],
turbine_weights: pd.DataFrame,
substation_turbine_map: dict[str, dict[str, np.ndarray]],
) -> pd.DataFrame:
"""Calculates the overall wind farm operational level, accounting for substation
downtime by multiplying the sum of all downstream turbine operational levels by
the substation's operational level.
Parameters
----------
operations : pd.DataFrame
The turbine and substation operational level DataFrame.
turbine_id : np.ndarray | list[str]
The turbine ids that match :py:attr:`Windfarm.turbine_id`.
turbine_weights : pd.DataFrame
The turbine weights, coming from :py:attr:`Windfarm.turbine_weights`.
substation_turbine_map : dict[str, dict[str, np.ndarray]]
The :py:attr:`Windfarm.substation_turbine_map`.
Notes
-----
This is a crude cap on the operations, and so a smarter way of capping
the availability should be added in the future.
Returns
-------
pd.DataFrame
The aggregate wind farm operational level.
"""
turbines = turbine_weights[turbine_id].values * operations[turbine_id]
total = np.sum(
[
np.array(
[[math.fsum(row)] for _, row in turbines[val["turbines"]].iterrows()]
).reshape(-1, 1)
for sub, val in substation_turbine_map.items()
],
axis=0,
)
return total