""""Defines the Cable class and cable simulations."""
from __future__ import annotations
from typing import TYPE_CHECKING
from itertools import zip_longest
from collections.abc import Generator
import numpy as np
import simpy
from wombat.core import (
Failure,
Maintenance,
RepairRequest,
SubassemblyData,
WombatEnvironment,
)
[docs]
class Cable:
"""The cable system/asset class.
Parameters
----------
windfarm : ``wombat.windfarm.Windfarm``
The ``Windfarm`` object.
env : WombatEnvironment
The simulation environment.
cable_id : str
The unique identifier for the cable.
connection_type : str
The type of cable. Must be one of "array" or "export".
start_node : str
The starting point (``system.id``) (turbine or substation) of the cable segment.
cable_data : dict
The dictionary defining the cable segment.
"""
def __init__(
self,
windfarm,
env: WombatEnvironment,
connection_type: str,
start_node: str,
end_node: str,
cable_data: dict,
name: str | None = None,
) -> None:
"""Initializes the ``Cable`` class.
Parameters
----------
windfarm : ``wombat.windfarm.Windfarm``
The ``Windfarm`` object.
env : WombatEnvironment
The simulation environment.
connection_type : str
One of "export" or "array".
cable_id : str
The unique identifier for the cable.
start_node : str
The starting point (``system.id``) (turbine or substation) of the cable
segment.
end_node : str
The ending point (``system.id``) (turbine or substation) of the cable
segment.
cable_data : dict
The dictionary defining the cable segment.
name : str | None
The name of the cable to use during logging.
"""
self.env = env
self.windfarm = windfarm
self.connection_type = connection_type
self.start_node = start_node
self.end_node = end_node
self.id = f"cable::{start_node}::{end_node}"
self.system = windfarm.system(start_node)
if self.connection_type not in ("array", "export"):
raise ValueError(
f"Input to `connection_type` for {self.id} must be one of 'array'"
" or 'export'."
)
cable_data = {
**cable_data,
"system_value": self.system.value,
"rng": self.env.random_generator,
}
self.data = SubassemblyData.from_dict(cable_data)
self.name = self.data.name if name is None else name
self.operating_level = 1.0
self.servicing = self.env.event()
self.servicing_queue = self.env.event()
self.downstream_failure = self.env.event()
self.broken = self.env.event()
# Ensure events start as processed and inactive
self.servicing.succeed()
self.servicing_queue.succeed()
self.downstream_failure.succeed()
self.broken.succeed()
# TODO: need to get the time scale of a distribution like this
self.processes = dict(self._create_processes())
[docs]
def set_string_details(self, start_node: str, substation: str):
"""Sets the starting turbine for the string to be used for traversing the
correct upstream connections when resetting after a failure.
Parameters
----------
start_node : str
The ``System.id`` for the starting turbine on a string.
substation : str
The ``System.id`` for the string's connecting substation.
"""
self.string_start = start_node
self.substation = substation
[docs]
def finish_setup(self) -> None:
"""Creates the ``upstream_nodes`` and ``upstream_cables`` attributes for use by
the cable when triggering usptream failures and resetting them after the repair
is complete.
"""
wf_map = self.windfarm.wind_farm_map
if self.connection_type == "array":
turbines = [self.end_node]
if self.end_node == self.string_start:
_turbines, cables = wf_map.get_upstream_connections(
self.substation, self.string_start, self.string_start
)
else:
_turbines, cables = wf_map.get_upstream_connections(
self.substation, self.string_start, self.end_node
)
turbines.extend(_turbines)
if self.connection_type == "export":
turbines, cables = wf_map.get_upstream_connections_from_substation(
self.substation
)
self.upstream_nodes = turbines
self.upstream_cables = cables
[docs]
def _create_processes(self):
"""Creates the processes for each of the failure and maintenance types.
Yields
------
Tuple[Union[str, int], simpy.events.Process]
Creates a dictionary to keep track of the running processes within the
subassembly.
"""
for failure in self.data.failures:
desc = failure.description
yield desc, self.env.process(self.run_single_failure(failure))
for i, maintenance in enumerate(self.data.maintenance):
desc = maintenance.description
yield desc, self.env.process(self.run_single_maintenance(maintenance))
[docs]
def recreate_processes(self) -> None:
"""If a cable is being reset after a replacement, then all processes are
assumed to be reset to 0, and not pick back up where they left off.
"""
self.processes = dict(self._create_processes())
[docs]
def interrupt_processes(self, replacement: str | None = None) -> None:
"""Interrupts all of the running processes within the subassembly except for the
process associated with failure that triggers the catastrophic failure.
Parameters
----------
subassembly : Subassembly
The subassembly that should have all processes interrupted.
replacement: bool, optional
If a subassebly `id` is provided, this indicates the interruption is caused
by its replacement event. Defaults to None.
"""
cause = "failure"
if self.id == replacement:
cause = "replacement"
for _, process in self.processes.items():
try:
process.interrupt(cause=cause)
except RuntimeError:
# This error occurs for the process halting all other processes.
pass
[docs]
def interrupt_all_subassembly_processes(
self, replacement: str | None = None
) -> None:
"""Thin wrapper for ``interrupt_processes`` for consistent usage with system.
Parameters
----------
replacement: bool, optional
If a subassebly `id` is provided, this indicates the interruption is caused
by its replacement event. Defaults to None.
"""
self.interrupt_processes(replacement=replacement)
[docs]
def stop_all_upstream_processes(self, failure: Failure | Maintenance) -> None:
"""Stops all upstream turbines and cables from producing power by creating a
``env.event()`` for each ``System.cable_failure`` and
``Cable.downstream_failure``, respectively. In the case of an export cable, each
string is traversed to stop the substation and upstream turbines and cables.
Parameters
----------
failure : Failre
The ``Failure`` that is causing a string shutdown.
"""
shared_logging = {
"agent": self.id,
"action": "repair request",
"reason": failure.description,
"additional": "downstream cable failure",
"request_id": failure.request_id,
}
upstream_nodes = self.upstream_nodes
upstream_cables = self.upstream_cables
if self.connection_type == "export":
# Flatten the list of lists for shutting down the upstream connections
upstream_nodes = [el for string in upstream_nodes for el in string]
upstream_cables = [el for string in upstream_cables for el in string]
# Turn off the subation
substation = self.windfarm.system(self.end_node)
substation.cable_failure = self.env.event()
substation.interrupt_all_subassembly_processes()
self.env.log_action(
system_id=self.end_node,
system_name=substation.name,
system_ol=substation.operating_level,
part_ol=np.nan,
**shared_logging, # type: ignore
)
for t_id, c_id in zip_longest(upstream_nodes, upstream_cables, fillvalue=None):
if TYPE_CHECKING:
assert isinstance(t_id, str)
turbine = self.windfarm.system(t_id)
turbine.cable_failure = self.env.event()
turbine.interrupt_all_subassembly_processes()
self.env.log_action(
system_id=t_id,
system_name=turbine.name,
system_ol=turbine.operating_level,
part_ol=np.nan,
**shared_logging, # type: ignore
)
# If at the end of the string, skip any operations on non-existent cables
if c_id is None:
continue
cable = self.windfarm.cable(c_id)
cable.downstream_failure = self.env.event()
cable.interrupt_all_subassembly_processes()
self.env.log_action(
system_id=c_id,
system_name=cable.name,
part_id=c_id,
part_name=cable.name,
system_ol=np.nan,
part_ol=cable.operating_level,
**shared_logging, # type: ignore
)
[docs]
def trigger_request(self, action: Maintenance | Failure):
"""Triggers the actual repair or maintenance logic for a failure or maintenance
event, respectively.
Parameters
----------
action : Maintenance | Failure
The maintenance or failure event that triggers a ``RepairRequest``.
"""
which = "maintenance" if isinstance(action, Maintenance) else "repair"
current_ol = self.operating_level
self.operating_level *= 1 - action.operation_reduction
# Automatically submit a repair request
# NOTE: mypy is not caught up with attrs yet :(
repair_request = RepairRequest( # type: ignore
system_id=self.id,
system_name=self.name,
subassembly_id=self.id,
subassembly_name=self.name,
severity_level=action.level,
details=action,
cable=True,
upstream_turbines=self.upstream_nodes,
upstream_cables=self.upstream_cables,
prior_operating_level=current_ol,
)
repair_request = self.system.repair_manager.register_request(repair_request)
self.env.log_action(
system_id=self.id,
system_name=self.name,
part_id=self.id,
part_name=self.name,
system_ol=self.operating_level,
part_ol=self.operating_level,
agent=self.name,
action=f"{which} request",
reason=action.description,
additional=f"severity level {action.level}",
request_id=repair_request.request_id,
)
if action.operation_reduction == 1:
self.broken = self.env.event()
self.interrupt_all_subassembly_processes()
self.stop_all_upstream_processes(action)
# Remove previously submitted requests as a replacement is required
if action.replacement:
_ = self.system.repair_manager.purge_subassembly_requests(
self.id, self.id, exclude=[repair_request.request_id]
)
self.system.repair_manager.submit_request(repair_request)
[docs]
def run_single_maintenance(self, maintenance: Maintenance) -> Generator:
"""Runs a process to trigger one type of maintenance request throughout the
simulation.
Parameters
----------
maintenance : Maintenance
A maintenance category.
Yields
------
simpy.events.Timeout
Time between maintenance requests.
"""
while True:
hours_to_next = maintenance.frequency
if hours_to_next == 0:
remainder = self.env.max_run_time - self.env.now
try:
yield self.env.timeout(remainder)
except simpy.Interrupt:
remainder -= self.env.now
else:
while hours_to_next > 0:
start = -1 # Ensure an interruption before processing is caught
try:
# If the replacement has not been completed, then wait
yield self.servicing & self.downstream_failure & self.broken
start = self.env.now
yield self.env.timeout(hours_to_next)
hours_to_next = 0
self.trigger_request(maintenance)
except simpy.Interrupt as i:
if i.cause == "replacement":
return
hours_to_next -= 0 if start == -1 else self.env.now - start
[docs]
def run_single_failure(self, failure: Failure) -> Generator:
"""Runs a process to trigger one type of failure repair request throughout the
simulation.
Parameters
----------
failure : Failure
A failure classification.
Yields
------
simpy.events.Timeout
Time between failure events that need to request a repair.
"""
while True:
hours_to_next = failure.hours_to_next_failure()
if hours_to_next is None:
remainder = self.env.max_run_time - self.env.now
try:
yield self.env.timeout(remainder)
except simpy.Interrupt:
remainder -= self.env.now
else:
if TYPE_CHECKING:
assert isinstance(hours_to_next, (int, float)) # mypy helper
while hours_to_next > 0: # type: ignore
start = -1 # Ensure an interruption before processing is caught
try:
yield self.servicing & self.downstream_failure & self.broken
start = self.env.now
yield self.env.timeout(hours_to_next)
hours_to_next = 0
self.trigger_request(failure)
except simpy.Interrupt as i:
if i.cause == "replacement":
return
hours_to_next -= 0 if start == -1 else self.env.now - start