"""The servicing equipment module provides a small number of utility functions specific
to the operations of servicing equipment and the `ServiceEquipment` class that provides
the repair and transportation logic for scheduled, unscheduled, and unscheduled towing
servicing equipment.
"""
# TODO: NEED A SPECIFIC STARTUP METHOD
from __future__ import annotations
from copy import deepcopy
from math import ceil
from typing import TYPE_CHECKING, Any
from pathlib import Path
from datetime import timedelta
from functools import cache
from itertools import zip_longest
from collections.abc import Generator
import numpy as np
import pandas as pd
import polars as pl
from simpy.events import Event, Process, Timeout
from pandas.core.indexes.datetimes import DatetimeIndex
from wombat.core import (
Maintenance,
RepairManager,
RepairRequest,
WombatEnvironment,
ServiceEquipmentData,
)
from wombat.windfarm import Windfarm
from wombat.utilities import HOURS_IN_DAY, hours_until_future_hour
from wombat.core.mixins import RepairsMixin
from wombat.core.library import load_yaml
from wombat.windfarm.system import System
from wombat.core.data_classes import (
UNSCHEDULED_STRATEGIES,
ScheduledServiceEquipmentData,
UnscheduledServiceEquipmentData,
)
from wombat.windfarm.system.cable import Cable
from wombat.windfarm.system.subassembly import Subassembly
if TYPE_CHECKING:
from wombat.core import Port
[docs]
def consecutive_groups(data: np.ndarray, step_size: int = 1) -> list[np.ndarray]:
"""Generates the subgroups of an array where the difference between two sequential
elements is equal to the ``step_size``. The intent is to find the length of delays
in a weather forecast.
Parameters
----------
data : np.ndarray
An array of integers.
step_size : int, optional
The step size to be considered a consecutive number, by default 1.
Returns
-------
list[np.ndarray]
A list of arrays of the consecutive elements of ``data``.
"""
# Consecutive groups in delay
groups = np.split(data, np.where(np.diff(data) != step_size)[0] + 1)
return groups
[docs]
def calculate_delay_from_forecast(
forecast: np.ndarray, hours_required: int | float
) -> tuple[bool, int]:
"""Calculates the delay from the binary weather forecast for if that hour is all
clear for operations.
Parameters
----------
forecast : np.ndarray
Truth array to indicate if that hour satisfies the weather limit requirements.
hours_required : np.ndarray
The minimum clear weather window required, in hours.
Returns
-------
tuple[bool, int]
Indicator if a window is found (``True``) or not (``False``), and the number
of hours the event needs to be delayed in order to start.
"""
safe_operating_windows = consecutive_groups(np.where(forecast)[0])
window_lengths = np.array([window.size for window in safe_operating_windows])
clear_windows = np.where(window_lengths >= hours_required)[0]
if clear_windows.size == 0:
return False, forecast.shape[0]
return True, safe_operating_windows[clear_windows[0]][0]
[docs]
def validate_end_points(start: str, end: str, no_intrasite: bool = False) -> None:
"""Checks the starting and ending locations for traveling and towing.
Parameters
----------
start : str
The starting location; should be on of: "site", "system", or "port".
end : str
The ending location; should be on of: "site", "system", or "port".
no_intrasite : bool
A flag to disable intrasite travel, so that ``start`` and ``end`` cannot
both be "system", by default False.
Raises
------
ValueError
Raised if the starting location is invalid.
ValueError
Raised if the ending location is invalid
ValueError
Raised if "system" is provided to both ``start`` and ``end``, but
``no_intrasite`` is set to ``True``.
"""
if start not in ("port", "site", "system"):
raise ValueError(
"``start`` location must be one of 'port', 'site', or 'system'!"
)
if end not in ("port", "site", "system"):
raise ValueError("``end`` location must be one of 'port', 'site', or 'system'!")
if no_intrasite and (start == end == "system"):
raise ValueError("No travel within the site is allowed for this process")
[docs]
def reset_system_operations(system: System, subassembly_resets: list[str]) -> None:
"""Completely resets the failure and maintenance events for a given system
and its subassemblies, and puts each ``Subassembly.operating_level`` back to 100%.
.. note:: This is only intended to be used in conjunction with a tow-to-port
repair where a turbine will be completely serviced.
Parameters
----------
system : System
The turbine to be reset.
subassembly_resets : list[str]
The `subassembly_id`s to reset to good as new, if not assuming all
subassemblies.
"""
for subassembly in system.subassemblies:
if subassembly.name in subassembly_resets:
subassembly.operating_level = 1.0
subassembly.recreate_processes()
[docs]
class ServiceEquipment(RepairsMixin):
"""Provides a servicing equipment object that can handle various maintenance and
repair tasks.
Parameters
----------
env : WombatEnvironment
The simulation environment.
windfarm : Windfarm
The ``Windfarm`` object.
repair_manager : RepairManager
The ``RepairManager`` object.
equipment_data_file : str
The equipment settings file name with extension.
Attributes
----------
env : WombatEnvironment
The simulation environment instance.
windfarm : Windfarm
The simulation windfarm instance.
manager : RepairManager
The simulation repair manager instance.
settings : ScheduledServiceEquipmentData | UnscheduledServiceEquipmentData
The servicing equipment's configuration settings, as provided by the user.
onsite : bool
Indicates if the servicing equipment is at the site (``True``), or not
(``False``).
enroute : bool
Indicates if the servicing equipment is on its way to the site (``True``),
or not (``False``).
at_port : bool
Indicates if the servicing equipment is at the port, or similar location for
land-based, (``True``), or not (``False``).
at_system : bool
Indications if the servicing equipment is at a cable, substation, or turbine
while on the site (``True``), or not (``False``).
current_system : str | None
Either the ``System.id`` if ``at_system``, or ``None`` if not.
transferring_crew : bool
Indications if the servicing equipment is at a cable, substation, or turbine
and transferring the crew to or from that system (``True``), or not
(``False``).
"""
def __init__(
self,
env: WombatEnvironment,
windfarm: Windfarm,
repair_manager: RepairManager,
equipment_data_file: str | Path | dict,
):
"""Initializes the ``ServiceEquipment`` class.
Parameters
----------
env : WombatEnvironment
The simulation environment.
windfarm : Windfarm
The ``Windfarm`` object.
repair_manager : RepairManager
The ``RepairManager`` object.
equipment_data_file : str
The equipment settings file name with extension.
"""
self.env = env
self.windfarm = windfarm
self.manager = repair_manager
self.onsite = False # True: mobilized to the site, but not necessarily at_site
self.dispatched = False
self.enroute = False
self.at_port = False
self.at_site = False
self.at_system = False
self.transferring_crew = False
self.current_system = None # type: str | None
self.port_based = False # Changed to False if port is registered
self.settings: ScheduledServiceEquipmentData | UnscheduledServiceEquipmentData
if isinstance(equipment_data_file, (str, Path)):
data = load_yaml(env.data_dir / "vessels", equipment_data_file)
else:
data = equipment_data_file
try:
if data["start_year"] < self.env.start_year:
data["start_year"] = self.env.start_year
if data["end_year"] > self.env.end_year:
data["end_year"] = self.env.end_year
except KeyError:
# Ignores for unscheduled maintenace equipment that won't have this input
pass
# NOTE: mypy is not caught up with attrs yet :(
self.settings = ServiceEquipmentData(data).determine_type() # type: ignore
# Register servicing equipment with the repair manager if it is using an
# unscheduled maintenance scenario, so it can be dispatched as needed
if self.settings.strategy in UNSCHEDULED_STRATEGIES:
self.manager._register_equipment(self)
# Only run the equipment if it is on a scheduled basis, otherwise wait
# for it to be dispatched
if self.settings.strategy == "scheduled":
self.env.process(self.run_scheduled_in_situ())
# Create partial functions for the labor and equipment costs for clarity
self.initialize_cost_calculators(which="equipment")
[docs]
def finish_setup_with_environment_variables(self) -> None:
"""A post-initialization step that will override unset parameters with those
from the the environemt that may have already been set.
"""
self._check_working_hours(which="env")
self.settings._set_port_distance(self.env.port_distance)
self.settings.set_non_operational_dates(
self.env.non_operational_start,
self.env.start_year,
self.env.non_operational_end,
self.env.end_year,
)
self.settings.set_reduced_speed_parameters(
self.env.reduced_speed_start,
self.env.start_year,
self.env.reduced_speed_end,
self.env.end_year,
self.env.reduced_speed,
)
[docs]
def _register_port(self, port: Port) -> None:
"""Method for a tugboat at attach the port for two-way communications. This also
sets the vessel to be at the port, and updates the port_distance.
Parameters
----------
port : Port
The port where the tugboat is based.
"""
self.port = port
self.port_based = True
self.at_port = True
self.settings._set_port_distance(port.settings.site_distance) # type: ignore
self._check_working_hours(which="port")
# Set the non-operational start/end dates if needed
self.settings.set_non_operational_dates(
port.settings.non_operational_start,
self.env.start_year,
port.settings.non_operational_end,
self.env.end_year,
)
# Set the reduced speed start/end dates if needed
self.settings.set_reduced_speed_parameters(
port.settings.reduced_speed_start,
self.env.start_year,
port.settings.reduced_speed_end,
self.env.end_year,
port.settings.reduced_speed,
)
[docs]
def _set_location(self, end: str, set_current: str | None = None) -> None:
"""Keeps track of the servicing equipment by setting the location at either:
site, port, or a specific system.
Parameters
----------
end : str
The ending location; one of "site", or "port"
set_current : str
The ``System.id`` for the new current location, if one is to be set.
"""
if end == "port":
self.at_port = True
self.at_site = False
else:
self.at_port = False
self.at_site = True
self.at_system = True if set_current is not None else False
self.current_system = set_current
[docs]
def _weather_forecast(
self, hours: int | float, which: str
) -> tuple[pl.Series, pl.Series, pl.Series]:
"""Retrieves the weather forecast from the simulation environment, and
translates it to a boolean for satisfactory (True) and unsatisfactory (False)
weather conditions.
Parameters
----------
hours : int | float
The number of hours of weather data that should be retrieved.
which : str
One of "repair" or "transport" to indicate which weather limits to be using.
Returns
-------
tuple[pl.Series, pl.Series, pl.Series]
The datetime Series, the hour of day Series, and the boolean Series of
where the weather conditions are within safe operating limits for the
servicing equipment (True) or not (False).
"""
if which == "repair":
max_wind = self.settings.max_windspeed_repair
max_wave = self.settings.max_waveheight_repair
elif which == "transport":
max_wind = self.settings.max_windspeed_transport
max_wave = self.settings.max_waveheight_transport
else:
raise ValueError("`which` must be one of `repair` or `transport`.")
dt, hour, wind, wave = self.env.weather_forecast(hours)
all_clear = (wind <= max_wind) & (wave <= max_wave)
return dt, hour, all_clear
[docs]
def get_speed(self, tow: bool = False) -> float:
"""Determines the appropriate speed that the servicing equipment should be
traveling at for towing or traveling, and if the timeframe is during a reduced
speed scenario.
Parameters
----------
tow : bool, optional
True indicates the servicing equipment should be using the towing speed,
and if False, then the traveling speed should be used, by default False.
Returns
-------
float
The maximum speed the servicing equipment should be traveling/towing at.
"""
if TYPE_CHECKING:
assert hasattr(self.settings, "reduced_speed_dates_set")
assert hasattr(self.settings, "tow_speed")
speed = self.settings.tow_speed if tow else self.settings.speed
if self.env.simulation_time.date() in self.settings.reduced_speed_dates_set:
if speed > self.settings.reduced_speed:
speed = self.settings.reduced_speed
return speed
[docs]
def get_next_request(self):
"""Gets the next request by the rig's method for processing repairs.
Returns
-------
simpy.resources.store.FilterStoreGet
The next ``RepairRequest`` to be processed.
"""
# Wait between 2 and 10 seconds to ensure a tow-to-port repair is always first
yield self.env.timeout(self.env.get_random_seconds(low=2))
if self.settings.method == "turbine":
request = self.manager.get_request_by_system(self.settings.capability)
if self.settings.method == "severity":
request = self.manager.get_request_by_severity(self.settings.capability)
if request is None:
yield request
else:
request = request.value
self.manager.invalidate_system(request.system_id)
yield request
[docs]
def enable_string_operations(self, cable: Cable) -> None:
"""Traverses the upstream cable and turbine connections and resets the
``System.cable_failure`` and ``Cable.downstream_failure`` until it hits
another cable failure, then the loop exits.
Parameters
----------
subassembly : Cable
The `Cable` or `System`
"""
farm = self.manager.windfarm
if cable.connection_type == "array":
# If there is another failure downstream of the repaired cable, do nothing
if not cable.downstream_failure.triggered:
return
# For each upstream turbine and cable, reset their operations
nodes = deepcopy(cable.upstream_nodes)
cables = cable.upstream_cables
tid = nodes.pop(0)
turbine = farm.system(tid)
turbine.cable_failure.succeed()
for tid, cid in zip_longest(nodes, cables, fillvalue=None): # type: ignore
if cid is not None: # type: ignore
cable = farm.cable(cid)
cable.downstream_failure.succeed()
if not cable.broken.triggered:
break
if tid is not None: # only None for last cable on string
turbine = farm.system(tid)
turbine.cable_failure.succeed()
if cable.connection_type == "export":
# Reset the substation's cable failure
farm.system(cable.end_node).cable_failure.succeed()
# For each string connected to the substation reset all the turbines and
# cables until another cable failure is encoountered, then move to the next
# string
for t_list, c_list in zip(cable.upstream_nodes, cable.upstream_cables):
for t, c in zip_longest(t_list, c_list, fillvalue=None):
if c is not None:
cable = farm.cable(c)
if not cable.broken.triggered:
break
cable.downstream_failure.succeed()
farm.system(t).cable_failure.succeed()
[docs]
def register_repair_with_subassembly(
self,
subassembly: Subassembly | Cable,
repair: RepairRequest,
starting_operating_level: float,
) -> None:
"""Goes into the repaired subassembly, component, or cable and returns its
``operating_level`` back to good as new for the specific repair. For fatal cable
failures, all upstream turbines are turned back on unless there is another fatal
cable failure preventing any more from operating.
Parameters
----------
subassembly : Subassembly | Cable
The subassembly or cable that was repaired.
repair : RepairRequest
The request for repair that was submitted.
starting_operating_level : float
The operating level before a repair was started.
"""
operation_reduction = repair.details.operation_reduction
# Put the subassembly/component back to good as new condition and restart
if repair.details.replacement:
subassembly.operating_level = 1.0
_ = self.manager.purge_subassembly_requests(
repair.system_id, repair.subassembly_id
)
subassembly.recreate_processes()
elif operation_reduction == 1:
subassembly.operating_level = repair.prior_operating_level
try:
subassembly.broken.succeed()
except RuntimeError as e:
raise e
elif operation_reduction == 0:
subassembly.operating_level = starting_operating_level
else:
subassembly.operating_level /= 1 - operation_reduction
if isinstance(subassembly, Subassembly):
self.manager.enable_requests_for_system(subassembly.system)
elif isinstance(subassembly, Cable):
# If the system is a cable, re-enable the upstream systems and cables
self.manager.enable_requests_for_system(subassembly)
if operation_reduction == 1:
self.enable_string_operations(subassembly)
else:
raise TypeError("`subassembly` was neither a `Cable` nor `Subassembly`.")
self.env.process(self.manager.register_repair(repair))
[docs]
def wait_until_next_operational_period(
self, *, less_mobilization_hours: int = 0
) -> Generator[Timeout, None, None]:
"""Delays the crew and equipment until the start of the next operational
period.
TODO: Need a custom error if weather doesn't align with the equipment dates.
Parameters
----------
less_mobilization_hours : int
The number of hours required for mobilization that will be subtracted from
the waiting period to account for mobilization, by default 0.
Yields
------
Generator[Timeout, None, None]
A Timeout event for the number of hours between when the function is called
and when the next operational period starts.
"""
if TYPE_CHECKING:
assert isinstance(self.settings, ScheduledServiceEquipmentData)
current = self.env.simulation_time.date()
ix_match = np.where(current < self.settings.operating_dates)[0]
if ix_match.size > 0:
still_in_operation = True
next_operating_date = self.settings.operating_dates[ix_match[0]]
hours_to_next_shift = (
self.env.date_ix(next_operating_date)
- self.env.now
+ self.settings.workday_start
- less_mobilization_hours
)
else:
still_in_operation = False
hours_to_next_shift = self.env.max_run_time - self.env.now
if still_in_operation:
# TODO: This message needs to account for unscheduled equipment as well, but
# the post processor filtering needs to be updated as well
additional = "will return next year"
else:
additional = "no more return visits will be made"
# Ensures that the statuses are correct
self.at_port = False
self.at_site = False
self.enroute = False
self.onsite = False
self.env.log_action(
agent=self.settings.name,
action="delay",
reason="work is complete",
additional=additional,
duration=hours_to_next_shift,
location="enroute",
)
yield self.env.timeout(hours_to_next_shift)
[docs]
def mobilize_scheduled(self) -> Generator[Timeout, None, None]:
"""Mobilizes the ServiceEquipment object by waiting for the next operational
period, less the mobilization time, then logging the mobiliztion cost.
NOTE: weather delays are not accounted for in this process.
Yields
------
Generator[Timeout, None, None]
A Timeout event for the number of hours between when the function is called
and when the next operational period starts.
"""
mobilization_hours = self.settings.mobilization_days * HOURS_IN_DAY
yield self.env.process(
self.wait_until_next_operational_period(
less_mobilization_hours=mobilization_hours
)
)
self.enroute = True
self.env.log_action(
agent=self.settings.name,
action="mobilization",
reason=f"{self.settings.name} is being mobilized",
additional="mobilization",
location="enroute",
duration=mobilization_hours,
)
yield self.env.timeout(mobilization_hours)
self.onsite = True
self.enroute = False
self.env.log_action(
agent=self.settings.name,
action="mobilization",
reason=f"{self.settings.name} has arrived on site",
additional="mobilization",
equipment_cost=self.settings.mobilization_cost,
location="site",
)
[docs]
def mobilize(self) -> Generator[Timeout, None, None]:
"""Mobilizes the ServiceEquipment object.
NOTE: weather delays are not accounted for at this stage.
Yields
------
Generator[Timeout, None, None]
A Timeout event for the number of hours the ServiceEquipment requires for
mobilizing to the windfarm site.
"""
self.enroute = True
mobilization_hours = self.settings.mobilization_days * HOURS_IN_DAY
self.env.log_action(
agent=self.settings.name,
action="mobilization",
reason=f"{self.settings.name} is being mobilized",
additional="mobilization",
duration=mobilization_hours,
location="enroute",
)
yield self.env.timeout(mobilization_hours)
self.onsite = True
self.enroute = False
self.env.log_action(
agent=self.settings.name,
action="mobilization",
reason=f"{self.settings.name} has arrived on site",
additional="mobilization",
equipment_cost=self.settings.mobilization_cost,
location="site",
)
[docs]
def find_uninterrupted_weather_window(
self, hours_required: int | float
) -> tuple[int | float, bool]:
"""Finds the delay required before starting on a process that won't be able to
be interrupted by a weather delay.
TODO: WEATHER FORECAST NEEDS TO BE DONE FOR ``math.floor(self.now)``, not the
ceiling or there will be a whole lot of rounding up errors on process times.
Parameters
----------
hours_required : int | float
The number of uninterrupted of hours that a process requires for completion.
Returns
-------
tuple[int | float, bool]
The number of hours in weather delays before a process can be completed, and
an indicator for if the process has to be delayed until the next shift for
a safe transfer.
"""
# If the hours required for the window is 0, then return 0 and indicate there is
# no shift delay to be processed
if hours_required == 0:
return 0, False
current = self.env.simulation_time
# If the time required for a transfer is longer than the time left in the shift,
# then return the hours left in the shift and indicate a shift delay
max_hours = hours_until_future_hour(
self.env.simulation_time, self.settings.workday_end
)
if hours_required > max_hours:
return max_hours, True
_, hour, all_clear = self._weather_forecast(max_hours, which="repair")
all_clear &= self._is_workshift(hour)
safe_operating_windows = consecutive_groups(np.where(all_clear)[0])
window_lengths = np.array([window.size for window in safe_operating_windows])
# If all the safe operating windows are shorter than the time required, then
# return the time until the end of the shift and indicate a shift delay
if all(window < hours_required for window in window_lengths):
return max_hours, True
# Find the length of the delay
delay = safe_operating_windows[
np.where(window_lengths >= hours_required)[0][0]
][0]
# If there is no delay, simply return 0 and the shift delay indicator
if delay == 0:
return delay, False
# If the delay is non-zero, ensure we get the correct hour difference from now
# until the top of the first available hour
delay = hours_until_future_hour(current, current.hour + delay)
return delay, False
[docs]
def find_interrupted_weather_window(
self, hours_required: int | float
) -> tuple[DatetimeIndex, np.ndarray, bool]:
"""Assesses at which points in the repair window, the wind (and wave)
constraints for safe operation are met.
The initial search looks for a weather window of length ``hours_required``, and
adds 24 hours to the window for the proceeding 9 days. If no satisfactory window
is found, then the calling process must make another call to this function, but
likely there is something wrong with the constraints or weather conditions if
no window is found within 10 days.
Parameters
----------
hours_required : int | float
The number of hours required to complete the repair.
Returns
-------
tuple[DatetimeIndex, np.ndarray, bool]
The pandas DatetimeIndex, and a corresponding boolean array for what points
in the time window are safe to complete a maintenance task or repair.
"""
window_found = False
hours_required = ceil(hours_required)
for i in range(10): # no more than 10 attempts to find a window
dt_ix, hour_ix, weather = self._weather_forecast(
hours_required + (i * 24), which="repair"
)
working_hours = self._is_workshift(hour_ix)
window = weather & working_hours
if window.sum() >= hours_required:
window_found = True
break
return dt_ix, weather, window_found
[docs]
def weather_delay(self, hours: int | float, **kwargs) -> Generator[Event, Any, Any]:
"""Processes a weather delay of length ``hours`` hours. If ``hours`` = 0, then
a Timeout is still processed, but not logging is done (this is to increase
consistency and do better typing validation across the codebase).
Parameters
----------
hours : int | float
The lenght, in hours, of the weather delay.
Yields
------
Generator[Event, Any, Any]
If the delay is more than 0 hours, then a ``Timeout`` is yielded of length
``hours``.
"""
if hours < 0:
raise ValueError(
f"`hours` must be greater than 0 for {self.settings.name} to process"
" a weather delay"
)
if hours == 0:
return
salary_cost = self.calculate_salary_cost(hours)
hourly_cost = 0 # contractors not paid for delays
equipment_cost = self.calculate_equipment_cost(hours)
self.env.log_action(
duration=hours,
action="delay",
additional="weather delay",
salary_labor_cost=salary_cost,
hourly_labor_cost=hourly_cost,
equipment_cost=equipment_cost,
**kwargs,
)
yield self.env.timeout(hours)
[docs]
@cache
def _calculate_intra_site_time(
self, start: str | None, end: str | None
) -> tuple[float, float]:
"""Calculates the time it takes to travel between port and site or between
systems on site.
Parameters
----------
start : str | None
The starting onsite location. If ``None``, then 0 is returned.
end : str | None
The ending onsite location. If ``None``, then 0 is returned.
Returns
-------
tuple[float, float]
The travel time and distance between two locations.
"""
distance = 0.0 # setting for invalid cases to have no traveling
valid_sys = self.windfarm.distance_matrix.columns
intra_site = start in valid_sys and end in valid_sys
if intra_site:
distance = self.windfarm.distance_matrix.loc[start, end]
# if no speed is set, then there is no traveling time
speed = self.get_speed()
travel_time = distance / speed if speed > 0 else 0.0
# Infinity is the result of "traveling" between a system twice in a row
if travel_time == float("inf"):
travel_time = 0
return travel_time, distance
[docs]
def _calculate_uninterrupted_travel_time(
self, distance: float, tow: bool = False
) -> tuple[float, float]:
"""Calculates the delay to the start of traveling and the amount of time it
will take to travel between two locations.
Parameters
----------
distance : float
The distance to be traveled.
tow : bool
Indicates if this travel is for towing (True), or not (False), by default
False.
Returns
-------
tuple[float, float]
The length of the delay and the length of travel time, in hours.` -1 is
returned if there no weather windows, and the process will have to be
attempted again.
"""
# Return 0 delay and travel time in the distance is zero
if distance == 0:
return 0, 0
speed = self.get_speed(tow=tow)
hours_required = distance / speed
n = 1
max_extra_days = 4
while n <= max_extra_days: # max tries before failing
hours = hours_required + (24 * n)
*_, all_clear = self._weather_forecast(hours, which="repair")
found, delay = calculate_delay_from_forecast(all_clear, hours_required)
if found:
return delay, hours_required
n += 1
# Return -1 for delay if no weather window was found
return -1, hours_required
[docs]
def _calculate_interrupted_travel_time(
self, distance: float, tow: bool = False
) -> float:
"""Calculates the travel time with speed reductions for inclement weather, but
without shift interruptions.
Parameters
----------
distance : flaot
The total distance to be traveled, in km.
tow : bool
Indicates if this travel is for towing (True), or not (False), by default
False.
Returns
-------
float
_description_
"""
speed = self.get_speed(tow=tow)
reduction_factor = 1 - self.settings.speed_reduction_factor
reduction_factor = 0.01 if reduction_factor == 0 else reduction_factor
# get the weather forecast with this time for the max travel time
max_hours = 1 + distance / speed * (1 / reduction_factor)
dt, _, all_clear = self._weather_forecast(max_hours, which="transport")
# calculate the distance able to be traveled in each 1-hour window
distance_traveled = speed * all_clear.cast(float)
distance_traveled[distance_traveled == 0] = speed * reduction_factor
# Reduce the first time step by the time lapsed since the start of the hour
# before proceeding
distance_traveled[0] *= 1 - self.env.now % 1
# Cumulative sum at the end of each full hour
distance_traveled_sum = distance_traveled.cum_sum()
# Get the index for the end of the hour where the distance requred to be
# traveled is reached.
try:
ix_hours = int(np.where(distance_traveled_sum >= distance)[0][0])
except IndexError as e:
# If an error occurs because an index maxes out the weather window, check
# that it's not due to having reached the end of the simulation. If so,
# return the max amount of time, but if that's not the case re-raise the
# error.
if self.env.end_datetime in dt:
ix_hours = distance_traveled.shape[0] - 1
else:
raise e
# Shave off the extra timing to get the exact travel time
total_hours = ix_hours + 1 # add 1 for 0-indexing
traveled = distance_traveled_sum.gather(ix_hours).item()
if traveled > distance:
difference = traveled - distance
speed_at_hour = distance_traveled.gather(ix_hours).item()
reduction = difference / speed_at_hour
total_hours -= reduction
return total_hours
[docs]
def travel(
self,
start: str,
end: str,
set_current: str | None = None,
hours: float | None = None,
distance: float | None = None,
**kwargs,
) -> Generator[Timeout | Process, None, None]:
"""The process for traveling between port and site, or two systems onsite.
NOTE: This does not currently take the weather conditions into account.
Parameters
----------
start : str
The starting location, one of "site", "port", or "system".
end : str
The starting location, one of "site", "port", or "system".
set_current : str, optional
Where to set ``current_system`` to be if traveling to site or a different
system onsite, by default None.
hours : float, optional
The number hours required for traveling between ``start`` and ``end``.
If provided, no internal travel time will be calculated.
distance : float, optional
The distance, in km, to be traveled. Only used if hours is provided
Yields
------
Generator[Timeout | Process, None, None]
The timeout event for traveling.
"""
validate_end_points(start, end)
if hours is None:
hours = 0.0
distance = 0.0
additional = f"traveling from {start} to {end}"
if start == end == "system":
additional = f"traveling from {self.current_system} to {set_current}"
hours, distance = self._calculate_intra_site_time(
self.current_system, set_current
)
elif {start, end} == {"site", "port"}:
additional = f"traveling from {start} to {end}"
distance = self.settings.port_distance
try:
hours = self._calculate_interrupted_travel_time(distance)
except IndexError:
# If the end of the simulation is hit while finding a suitable
# window exit, so the simulation can finish.
return None
if distance is None:
raise ValueError("`distance` must be provided if `hours` is provided.")
# MyPy helpers
if TYPE_CHECKING:
assert isinstance(distance, (int, float))
assert isinstance(hours, (float, int))
# If the the equipment will arive after the shift is over, then it must travel
# back to port (if needed), and wait for the next shift
if self.settings.non_stop_shift:
hours_to_shift_end = hours
else:
hours_to_shift_end = hours_until_future_hour(
self.env.simulation_time, self.settings.workday_end
)
future_time = self.env.simulation_time + timedelta(hours=hours)
is_shift = self._is_workshift(future_time.hour)
if (
(not is_shift or hours > hours_to_shift_end)
and end != "port"
and not self.at_port
):
kw = {
"additional": "insufficient time to complete travel before end of the shift" # noqa: E501
}
kw.update(kwargs) # type: ignore
yield self.env.process(
self.travel(start=start, end="port", **kw) # type: ignore
)
yield self.env.process(self.wait_until_next_shift(**kwargs))
yield self.env.process(
self.travel(start=start, end=end, set_current=set_current, **kwargs)
)
return
elif not is_shift and self.at_port:
kw = {
"additional": "insufficient time to complete travel before end of the shift" # noqa: E501
}
kw.update(kwargs)
yield self.env.process(self.wait_until_next_shift(**kw))
yield self.env.process(
self.travel(start=start, end=end, set_current=set_current, **kwargs)
)
return
salary_cost = self.calculate_salary_cost(hours)
hourly_cost = 0 # contractors not paid for traveling
equipment_cost = self.calculate_equipment_cost(hours)
kwargs.update({"additional": additional})
self.env.log_action(
action="traveling",
duration=hours,
distance_km=distance,
salary_labor_cost=salary_cost,
hourly_labor_cost=hourly_cost,
equipment_cost=equipment_cost,
location="enroute",
**kwargs,
)
# Unregister current_system during travel <- partial fix, still need to figure
# out where current_system needs to be set to None to allow things to "work"
self._set_location("site")
yield self.env.timeout(hours)
self._set_location(end, set_current)
where = set_current if set_current is not None else end
kwargs.update({"additional": f"arrived at {where}"})
self.env.log_action(
action="complete travel",
location=end,
**kwargs,
)
[docs]
def tow(
self,
start: str,
end: str,
set_current: str | None = None,
**kwargs,
) -> Generator[Timeout | Process, None, None]:
"""The process for towing a turbine to/from port.
Parameters
----------
start : str
The starting location; one of "site" or "port".
end : str
The ending location; one of "site" or "port".
set_current : str | None, optional
The ``System.id`` if the turbine is being returned to site, by default None
Yields
------
Generator[Timeout | Process, None, None]
The series of SimPy events that will be processed for the actions to occur.
"""
validate_end_points(start, end, no_intrasite=True)
# Get the distance that needs to be traveled, then calculate the delay and time
# traveling, and log each of them
distance = self.settings.port_distance
delay, hours = self._calculate_uninterrupted_travel_time(distance, tow=True)
if delay == -1:
if start == "site":
kw = deepcopy(kwargs)
kw.update({"reason": "Insufficient weather window, return to port"})
yield self.env.process(self.travel("site", "port", **kw))
yield self.env.timeout(HOURS_IN_DAY * 4)
yield self.env.process(
self.tow(start, end, set_current=set_current, **kwargs)
)
elif start == "port":
kw = deepcopy(kwargs)
kw.update(
{"reason": "Insufficient weather window, will try again later"}
)
yield self.env.timeout(HOURS_IN_DAY * 4)
yield self.env.process(
self.tow(start, end, set_current=set_current, **kwargs)
)
else:
self.env.process(self.weather_delay(delay, location=end, **kwargs))
salary_labor_cost = self.calculate_salary_cost(hours)
hourly_labor_cost = self.calculate_hourly_cost(hours)
equipment_cost = self.calculate_equipment_cost(hours)
self.env.log_action(
duration=hours,
distance_km=distance,
action="towing",
salary_labor_cost=salary_labor_cost,
hourly_labor_cost=hourly_labor_cost,
equipment_cost=equipment_cost,
location="enroute",
**kwargs,
)
yield self.env.timeout(hours)
self._set_location(end, set_current)
self.env.log_action(
action="complete towing",
salary_labor_cost=salary_labor_cost,
hourly_labor_cost=hourly_labor_cost,
equipment_cost=equipment_cost,
additional="complete",
location=end,
**kwargs,
)
[docs]
def crew_transfer(
self,
system: System | Cable,
subassembly: Subassembly,
request: RepairRequest,
to_system: bool,
) -> Generator[Timeout | Process, None, None]:
"""The process of transfering the crew from the equipment to the ``System``
for servicing using an uninterrupted weather window to ensure safe transfer.
Parameters
----------
system : System | Cable
The System where the crew needs to be transferred to.
subassembly : Subassembly
The Subassembly that is being worked on.
request : RepairRequest
The repair to be processed.
to_system : bool
True if the crew is being transferred to the system, or False if the crew
is being transferred off the system.
Returns
-------
None
None is returned when this is run recursively to not repeat the crew
transfer process.
Yields
------
Generator[Timeout | Process, None, None]
Yields a timeout event for the crew transfer once an uninterrupted weather
window can be found.
"""
hours_to_process = self.settings.crew_transfer_time
salary_cost = self.calculate_salary_cost(hours_to_process)
hourly_cost = self.calculate_hourly_cost(hours_to_process)
equipment_cost = self.calculate_equipment_cost(hours_to_process)
shared_logging = {
"system_id": system.id,
"system_name": system.name,
"part_id": subassembly.id,
"part_name": subassembly.name,
"system_ol": system.operating_level,
"part_ol": subassembly.operating_level,
"agent": self.settings.name,
"reason": request.details.description,
"request_id": request.request_id,
}
delay, shift_delay = self.find_uninterrupted_weather_window(hours_to_process)
# If there is a shift delay, then travel to port, wait, and travel back, and
# try again, until no shift delay is required.
while shift_delay:
travel_time = self.env.now
yield self.env.process(
self.travel(start="site", end="port", **shared_logging)
)
travel_time -= self.env.now # will be negative value, but is flipped
delay -= abs(travel_time) # decrement the delay by the travel time
# If removing the travel time from the delay puts it past the delay, then
# do not process an additional delay, and simply go to the next step
if delay >= 0:
yield self.env.process(
self.weather_delay(delay, location="port", **shared_logging)
)
yield self.env.process(
self.wait_until_next_shift(
**shared_logging,
**{"additional": "weather unsuitable to transfer crew"},
)
)
yield self.env.process(
self.travel(
start="port", end="site", set_current=system.id, **shared_logging
)
)
delay, shift_delay = self.find_uninterrupted_weather_window(
hours_to_process
)
yield self.env.process(
self.weather_delay(delay, location="system", **shared_logging)
)
if to_system:
additional = f"transferring crew from {self.settings.name} to {system.id}"
else:
additional = f"transferring crew from {system.id} to {self.settings.name}"
self.env.log_action(
action="transferring crew",
additional=additional,
duration=hours_to_process,
salary_labor_cost=salary_cost,
hourly_labor_cost=hourly_cost,
equipment_cost=equipment_cost,
location="system",
**shared_logging,
)
self.transferring_crew = True
yield self.env.timeout(hours_to_process)
self.transferring_crew = False
self.env.log_action(
action="complete transfer",
additional="complete",
location="system",
**shared_logging,
)
[docs]
def mooring_connection(
self,
system: System,
request: RepairRequest,
which: str,
) -> Generator[Timeout | Process, None, None]:
"""The process of either umooring a floating turbine to prepare for towing it to
port, or reconnecting it after its repairs have been completed.
Parameters
----------
system : System
The System that needs unmooring/reconnecting.
request : RepairRequest
The repair to be processed.
which : bool
"unmoor" for unmooring the turbine, "reconnect" for reconnecting the
turbine.
Returns
-------
None
None is returned when this is run recursively to not repeat the process.
Yields
------
Generator[Timeout | Process, None, None]
Yields a timeout event for the unmooring/reconnection once an uninterrupted
weather window can be found.
"""
if TYPE_CHECKING:
assert isinstance(self.settings, UnscheduledServiceEquipmentData)
which = which.lower().strip()
if which == "unmoor":
hours_to_process = self.settings.unmoor_hours
elif which == "reconnect":
hours_to_process = self.settings.reconnection_hours
else:
raise ValueError(
f"Only `unmoor` and `reconnect` are allowable inputs, not {which}"
)
salary_cost = self.calculate_salary_cost(hours_to_process)
hourly_cost = self.calculate_hourly_cost(hours_to_process)
equipment_cost = self.calculate_equipment_cost(hours_to_process)
shared_logging = {
"system_id": system.id,
"system_name": system.name,
"system_ol": system.operating_level,
"agent": self.settings.name,
"reason": request.details.description,
"request_id": request.request_id,
}
which_text = "unmooring" if which == "unmoor" else "mooring reconnection"
delay, shift_delay = self.find_uninterrupted_weather_window(hours_to_process)
# If there is a shift delay, then wait try again.
if shift_delay:
yield self.env.process(
self.weather_delay(delay, location="site", **shared_logging)
)
yield self.env.process(
self.wait_until_next_shift(
**shared_logging,
**{
"location": "site",
"additional": f"weather unsuitable for {which_text}",
},
)
)
yield self.env.process(
self.mooring_connection(system, request, which=which)
)
return
# If no shift delay, then process any weather delays before dis/connection
yield self.env.process(
self.weather_delay(delay, location="site", **shared_logging)
)
if which == "unmoor":
additional = f"Unmooring {system.id} to tow to port"
else:
additional = f"Reconnecting the mooring to {system.id}"
self.env.log_action(
action=which_text,
additional=additional,
duration=hours_to_process,
salary_labor_cost=salary_cost,
hourly_labor_cost=hourly_cost,
equipment_cost=equipment_cost,
location="system",
**shared_logging, # type: ignore
)
if which == "unmoor":
yield self.env.timeout(self.settings.unmoor_hours)
else:
yield self.env.timeout(self.settings.reconnection_hours)
self.env.log_action(
action=f"complete {which_text}",
additional="complete",
**shared_logging, # type: ignore
)
[docs]
def in_situ_repair(
self,
request: RepairRequest,
time_processed: int | float = 0,
prior_operation_level: float = -1.0,
initial: bool = False,
) -> Generator[Timeout | Process, None, None]:
"""Processes the repair including any weather and shift delays.
Parameters
----------
request : RepairRequest
The ``Maintenance`` or ``Failure`` receiving attention.
time_processed : int | float, optional
Time that has already been processed, by default 0.
prior_operation_level : float, optional
The operating level of the ``System`` just before the repair has begun, by
default -1.0.
initial : bool, optional
True for first step in a potentially-recursive logic, otherwise False. When
True, the repair manager will turn off the system being worked on, but if
done multiple times, the simulation will error out.
Yields
------
Generator[Timeout | Process, None, None]
Timeouts for the repair process.
"""
"""
NOTE: THE PROCESS
1. Travel to site/turbine, if not there already
2. Transfer Crew
3. Number of hours required, or hours until the end of the shift
4. Do repairs for the above
5. When shift/repair is complete, transfer crew
6. Travel to next turbine, back to port
7. Repeat until the repair is complete
8. Exit this function
"""
shift_delay = False
if request.cable:
system = self.windfarm.cable(request.system_id)
cable = request.subassembly_id.split("::")[1:]
subassembly = self.windfarm.graph.edges[cable]["cable"]
else:
system = self.windfarm.system(request.system_id) # type: ignore
subassembly = getattr(system, request.subassembly_id)
starting_operational_level = max(
prior_operation_level, subassembly.operating_level
)
shared_logging = {
"system_id": system.id,
"system_name": system.name,
"part_id": subassembly.id,
"part_name": subassembly.name,
"agent": self.settings.name,
"reason": request.details.description,
"request_id": request.request_id,
}
# Ensure there is enough time to transfer the crew back and forth with a buffer
# of twice the travel time or travel back to port and try again the next shift
start_shift = self.settings.workday_start
end_shift = self.settings.workday_end
current = self.env.simulation_time
hours_required = request.details.time - time_processed
if self.settings.non_stop_shift:
# Default the available to a buffered amount for initial processing
hours_available = hours_required * 2
else:
hours_available = hours_until_future_hour(current, end_shift)
if hours_available <= self.settings.crew_transfer_time * 4:
shared_logging.update(
system_ol=system.operating_level, part_ol=subassembly.operating_level
)
yield self.env.process(
self.travel(
start="site",
end="port",
**shared_logging,
)
)
yield self.env.process(self.wait_until_next_shift(**shared_logging))
yield self.env.process(
self.in_situ_repair(
request,
prior_operation_level=starting_operational_level,
initial=initial,
)
)
return
# Travel to site or the next system on site
if not self.at_system and self.at_port:
shared_logging.update(
system_ol=system.operating_level, part_ol=subassembly.operating_level
)
yield self.env.process(
self.travel(
start="port", end="site", set_current=system.id, **shared_logging
)
)
elif self.at_system is not None and not self.at_port:
shared_logging.update(
system_ol=system.operating_level, part_ol=subassembly.operating_level
)
yield self.env.process(
self.travel(
start="system",
end="system",
set_current=system.id,
**shared_logging,
)
)
else:
raise RuntimeError(f"{self.settings.name} is lost!")
if initial:
replacement = (
request.subassembly_id if request.details.replacement else None
)
self.manager.interrupt_system(system, replacement=replacement)
yield self.env.process(
self.crew_transfer(system, subassembly, request, to_system=True)
)
current = self.env.simulation_time
# If the hours required is longer than the shift time, then reset the available
# number of hours and the appropriate weather forecast, accounting for crew
# transfer, otherwise get an adequate weather window, allowing for interruptions
if not self.settings.non_stop_shift:
hours_available = hours_until_future_hour(current, end_shift)
hours_available -= self.settings.crew_transfer_time
*_, weather_forecast = self._weather_forecast(
hours_available, which="repair"
)
else:
_, weather_forecast, _ = self.find_interrupted_weather_window(
hours_required
)
# Check that all times are within the windfarm's working hours and cut off any
# time points that fall outside of the work shifts
if hours_required > hours_available:
shift_delay = True
hours_processed = 0
weather_delay_groups = consecutive_groups(np.where(~weather_forecast)[0])
while weather_forecast.shape[0] > 0 and hours_available > 0:
delays = np.where(~weather_forecast)[0]
if delays.size > 0:
hours_to_process = delays[0]
delay = weather_delay_groups.pop(0).size
else:
hours_to_process = weather_forecast.shape[0]
delay = 0
weather_forecast = weather_forecast.slice(hours_to_process + delay)
# If the delay is at the start, hours_to_process is 0, and a delay gets
# processed, otherwise the crew works for the minimum of
# ``hours_to_process`` or maximum time that can be worked until the shift's
# end, and maxed out by the hours required for the actual repair.
if hours_to_process > 0:
if hours_available <= hours_to_process:
hours_to_process = hours_available
else:
current = self.env.simulation_time
hours_to_process = hours_until_future_hour(
current, current.hour + int(hours_to_process)
)
if hours_required < hours_to_process:
hours_to_process = hours_required
# Ensure this gets the correct float hours to the start of the target
# hour, unless the hours to process is between (0, 1]
shared_logging.update(
system_ol=system.operating_level,
part_ol=subassembly.operating_level,
)
yield self.env.process(
self.process_repair(
hours_to_process, request.details, **shared_logging
)
)
hours_processed += hours_to_process
hours_available -= hours_to_process
hours_required -= hours_to_process
# If a delay is the first part of the process or a delay occurs after the
# some work is performed, then that delay is processed here.
if delay > 0 and hours_required > 0:
current = self.env.simulation_time
hours_to_process = hours_until_future_hour(
current, current.hour + delay
)
shared_logging.update(
system_ol=system.operating_level,
part_ol=subassembly.operating_level,
)
yield self.env.process(
self.weather_delay(
hours_to_process, location="system", **shared_logging
)
)
hours_available -= hours_to_process
# Reached the end of the shift or the end of the repair, and need to unload crew
# from the system
yield self.env.process(
self.crew_transfer(system, subassembly, request, to_system=False)
)
if shift_delay or hours_required > 0:
# For 24-hour shifts, we just need to start back at the top
if self.settings.non_stop_shift:
yield self.env.process(
self.in_situ_repair(
request,
time_processed=hours_processed + time_processed,
prior_operation_level=starting_operational_level,
)
)
return
shared_logging.update(
system_ol=system.operating_level, part_ol=subassembly.operating_level
)
yield self.env.process(
self.travel(start="site", end="port", **shared_logging)
)
shared_logging.update(
system_ol=system.operating_level, part_ol=subassembly.operating_level
)
yield self.env.process(self.wait_until_next_shift(**shared_logging))
yield self.env.process(
self.in_situ_repair(
request,
time_processed=hours_processed + time_processed,
prior_operation_level=starting_operational_level,
)
)
return
# Register the repair
self.register_repair_with_subassembly(
subassembly, request, starting_operational_level
)
action = "maintenance" if isinstance(request.details, Maintenance) else "repair"
self.env.log_action(
system_id=system.id,
system_name=system.name,
part_id=subassembly.id,
part_name=subassembly.name,
system_ol=system.operating_level,
part_ol=subassembly.operating_level,
agent=self.settings.name,
action=f"{action} complete",
reason=request.details.description,
materials_cost=request.details.materials,
additional="complete",
request_id=request.request_id,
location="system",
)
# If this is the end of the shift, ensure that we're traveling back to port
if not self.env.is_workshift(start_shift, end_shift):
shared_logging.update(
system_ol=system.operating_level, part_ol=subassembly.operating_level
)
yield self.env.process(
self.travel(start="site", end="port", **shared_logging)
)
[docs]
def run_scheduled_in_situ(self) -> Generator[Process, None, None]:
"""Runs the simulation of in situ repairs for scheduled servicing equipment
that have the `onsite` designation or don't require mobilization.
Yields
------
Generator[Process, None, None]
The simulation.
"""
if TYPE_CHECKING:
assert isinstance(self.settings, ScheduledServiceEquipmentData)
# If the starting operation date is the same as the simulations, set to onsite
if self.settings.operating_dates[0] == self.env.simulation_time.date():
yield self.env.process(self.mobilize_scheduled())
while True:
# Wait for a valid operational period to start
if self.env.simulation_time.date() not in self.settings.operating_dates_set:
yield self.env.process(self.mobilize_scheduled())
# Wait for next shift to start
is_workshift = self.env.is_workshift(
workday_start=self.settings.workday_start,
workday_end=self.settings.workday_end,
)
if not is_workshift:
yield self.env.process(
self.wait_until_next_shift(
**{
"agent": self.settings.name,
"reason": "not in working hours",
"additional": "not in working hours",
}
)
)
yield self.env.timeout(self.env.get_random_seconds(low=2))
_, request = self.get_next_request()
if request is None:
if not self.at_port:
yield self.env.process(
self.travel(
start="site",
end="port",
reason="no requests",
agent=self.settings.name,
)
)
yield self.env.process(
self.wait_until_next_shift(
**{
"agent": self.settings.name,
"reason": "no requests",
"additional": "no work requests, waiting until the next shift", # noqa: E501
}
)
)
else:
yield self.manager.in_process_requests.put(request)
self.manager.request_status_map["pending"].difference_update(
[request.request_id]
)
self.manager.request_status_map["processing"].update(
[request.request_id]
)
if request.cable:
system = self.windfarm.cable(request.system_id)
else:
system = self.windfarm.system(request.system_id) # type: ignore
yield system.servicing
yield self.env.process(self.in_situ_repair(request, initial=True))
[docs]
def run_unscheduled_in_situ(self) -> Generator[Process, None, None]:
"""Runs an in situ repair simulation for unscheduled servicing equipment, or
those that have to be mobilized before performing repairs and maintenance.
Yields
------
Generator[Process, None, None]
The simulation
"""
self.dispatched = True
if TYPE_CHECKING:
assert isinstance(self.settings, UnscheduledServiceEquipmentData)
mobilization_days = self.settings.mobilization_days
charter_end_env_time = self.settings.charter_days * HOURS_IN_DAY
charter_end_env_time += mobilization_days * HOURS_IN_DAY
charter_end_env_time += self.env.now
current = self.env.simulation_time
charter_start = current + pd.Timedelta(mobilization_days, "D")
charter_end = (
current
+ pd.Timedelta(mobilization_days, "D")
+ pd.Timedelta(self.settings.charter_days, "D")
)
charter_period = set(pd.date_range(charter_start, charter_end).date)
# If the charter period contains any non-operational date, then, try again
# at the next available date after the end of the attempted charter period
intersection = charter_period.intersection(
self.settings.non_operational_dates_set
)
if intersection:
intersection_end = max(intersection)
sim_end = self.env.end_datetime.date()
if intersection_end != sim_end:
hours_to_next = self.hours_to_next_operational_date(
start_search_date=intersection_end,
exclusion_days=mobilization_days,
)
else:
hours_to_next = self.env.max_run_time - self.env.now
self.env.log_action(
agent=self.settings.name,
action="delay",
reason="non-operational period",
additional="waiting for next operational period",
duration=hours_to_next,
)
yield self.env.timeout(hours_to_next)
yield self.env.process(self.run_unscheduled_in_situ())
return
while True and self.env.now < charter_end_env_time:
_, request = self.get_next_request()
if request is None:
yield self.env.process(
self.wait_until_next_shift(
**{
"agent": self.settings.name,
"reason": "no requests",
"additional": "no work requests submitted by start of shift", # noqa: E501
}
)
)
else:
break
if self.env.now >= charter_end_env_time:
self.dispatched = False
return
yield self.manager.in_process_requests.put(request)
self.manager.request_status_map["pending"].difference_update(
[request.request_id]
)
self.manager.request_status_map["processing"].update([request.request_id])
# Ensure the system isn't in service already during the checking stages, then
# halt its ongoing processes for the current repair.
# NOTE: port-based equipment will have already halted the system's processes.
if not hasattr(self, "port"):
if request.cable:
system = self.windfarm.cable(request.system_id)
else:
system = self.windfarm.system(request.system_id) # type: ignore
yield system.servicing
yield self.env.timeout(self.env.get_random_seconds(low=2))
yield system.servicing
while True:
if self.env.now >= charter_end_env_time:
self.onsite = False
self.at_port = False
self.at_site = False
self.at_system = False
self.dispatched = False
self.current_system = None
self.env.log_action(
agent=self.settings.name,
action="leaving site",
reason="charter period has ended",
)
break
if not self.onsite:
# Mobilize and immediately run the repair logic for the inital request
yield self.env.process(self.mobilize())
if request.cable:
system = self.windfarm.cable(request.system_id)
else:
system = self.windfarm.system(request.system_id) # type: ignore
yield system.servicing
yield self.env.process(self.in_situ_repair(request, initial=True))
# Wait for next shift to start
is_workshift = self.env.is_workshift(
workday_start=self.settings.workday_start,
workday_end=self.settings.workday_end,
)
if not is_workshift:
yield self.env.process(
self.wait_until_next_shift(
**{
"agent": self.settings.name,
"reason": "not in working hours",
"additional": "not in working hours",
}
)
)
_, request = self.get_next_request()
if request is None:
yield self.env.process(
self.wait_until_next_shift(
**{
"agent": self.settings.name,
"reason": "no requests",
"additional": "no work requests submitted by start of shift", # noqa: E501
}
)
)
else:
if request.cable:
system = self.windfarm.cable(request.system_id)
else:
system = self.windfarm.system(request.system_id) # type: ignore
yield system.servicing
yield self.env.process(self.in_situ_repair(request, initial=True))
self.dispatched = False
[docs]
def run_tow_to_port(self, request: RepairRequest) -> Generator[Process, None, None]:
"""Runs the tow to port logic, so a turbine can be repaired at port.
Parameters
----------
request : RepairRequest
The request the triggered the tow-to-port strategy.
Yields
------
Generator[Process, None, None]
The series of events that simulate the complete towing logic.
Raises
------
ValueError
Raised if the equipment is not currently at port
"""
self.dispatched = True
system = self.windfarm.system(request.system_id)
shared_logging = {
"system_id": request.system_id,
"system_name": request.system_name,
"request_id": request.request_id,
"agent": self.settings.name,
}
# Travel to the turbine
yield self.env.process(
self.travel(
"port",
"site",
set_current=system.id,
reason=request.details.description,
**shared_logging, # type: ignore
)
)
# Turn off the turbine
replacement = request.subassembly_id if request.details.replacement else None
self.manager.interrupt_system(system, replacement=replacement)
# Unmoor the turbine and tow it back to port
yield self.env.process(self.mooring_connection(system, request, which="unmoor"))
yield self.env.process(
self.tow("site", "port", reason="towing turbine to port", **shared_logging)
)
self.dispatched = False
[docs]
def run_tow_to_site(
self, request: RepairRequest, subassembly_resets: list[str] = []
) -> Generator[Process, None, None]:
"""Runs the tow to site logic for after a turbine has had its repairs completed
at port.
Parameters
----------
request : RepairRequest
The request the triggered the tow-to-port strategy.
subassembly_resets : list[str]
The `subassembly_id`s to reset to good as new. Defaults to [].
Yields
------
Generator[Process, None, None]
The series of events that simulate the complete towing logic.
Raises
------
ValueError
Raised if the equipment is not currently at port
"""
self.dispatched = True
system = self.windfarm.system(request.system_id)
shared_logging = {
"agent": self.settings.name,
"request_id": request.request_id,
"system_id": request.system_id,
"system_name": request.system_name,
}
# Tow the turbine back to site and reconnect it to the mooring system
yield self.env.process(
self.tow(
"port",
"site",
set_current=system.id,
reason="towing turbine back to site",
**shared_logging,
)
)
yield self.env.process(
self.mooring_connection(system, request, which="reconnect")
)
# Reset the turbine back to operating and return to port
reset_system_operations(system, subassembly_resets)
self.manager.enable_requests_for_system(system, tow=True)
yield self.env.process(
self.travel(
"site",
"port",
reason="traveling back to port after returning turbine",
**shared_logging, # type: ignore
)
)
self.dispatched = False