Source code for wombat.core.port

"""Creates the `Port` class that provies the tow-to-port repair capabilities for
offshore floating wind farms. The `Port` will control a series of tugboats enabled
through the "TOW" capability that get automatically dispatched once a tow-to-port repair
is submitted and a tugboat is available (`ServiceEquipment.at_port`). The `Port` also
controls any mooring repairs through the "AHV" capability, which operates similarly to
the tow-to-port except that it will not be released until the repair is completed, and
operates on a strict shift scheduling basis.
"""

from __future__ import annotations

from typing import TYPE_CHECKING
from pathlib import Path
from collections.abc import Generator

import numpy as np
import simpy
import pandas as pd
import polars as pl
from simpy.events import Process, Timeout
from simpy.resources.store import FilterStore, FilterStoreGet

from wombat.windfarm import Windfarm
from wombat.core.mixins import RepairsMixin
from wombat.core.library import load_yaml
from wombat.utilities.time import hours_until_future_hour
from wombat.core.environment import WombatEnvironment
from wombat.core.data_classes import PortConfig, Maintenance, RepairRequest
from wombat.core.repair_management import RepairManager
from wombat.core.service_equipment import ServiceEquipment


[docs] class Port(RepairsMixin, FilterStore): """The offshore wind base port that operates tugboats and performs tow-to-port repairs. .. note:: The operating costs for the port are incorporated into the ``FixedCosts`` functionality in the high-levl cost bucket: ``operations_management_administration`` or the more granula cost bucket: ``marine_management`` Parameters ---------- env : WombatEnvironment The simulation environment instance. windfarm : Windfarm The simulation windfarm instance. repair_manager : RepairManager The simulation repair manager instance. config : dict | str | Path A path to a YAML object or dictionary encoding the port's configuration settings. This will be loaded into a ``PortConfig`` object during initialization. Attributes ---------- env : WombatEnvironment The simulation environment instance. windfarm : Windfarm The simulation windfarm instance. manager : RepairManager The simulation repair manager instance. settings : PortConfig The port's configuration settings, as provided by the user. requests_serviced : set[str] The set of requests that have already been serviced to ensure there are no duplications of labor when splitting out the repair requests to be processed. turbine_manager : simpy.Resource A SimPy ``Resource`` object that limits the number of turbines that can be towed to port, so as not to overload the quayside waters, which is controlled by ``settings.max_operations``. crew_manager : simpy.Resource A SimPy ``Resource`` object that limts the number of repairs that can be occurring at any given time, which is controlled by ``settings.n_crews``. service_equipment_manager : simpy.FilterStore A SimPy ``FilterStore`` object that acts as a coordination system for the registered tugboats to tow turbines between port and site. In order to tow in either direction they must be filtered by ``ServiceEquipment.at_port``. This is generated from the tugboat definitions in ``settings.tugboats``. active_repairs : dict[str, dict[str, simpy.events.Event]] A nested dictionary of turbines, and its associated request IDs with a SimPy ``Event``. The use of events allows them to automatically succeed at the end of repairs, and once all repairs are processed on a turbine, the tow-to-site process can commence. """ def __init__( self, env: WombatEnvironment, windfarm: Windfarm, repair_manager: RepairManager, config: dict | str | Path, ) -> None: super().__init__(env, np.inf) self.env = env self.windfarm = windfarm self.manager = repair_manager self.system_request_map: dict[str, list[RepairRequest]] = {} self.requests_serviced: set[str] = set() self.invalid_systems: list[str] = [] self.manager._register_port(self) if not isinstance(config, dict): config = load_yaml(env.data_dir / "project/port", config) if TYPE_CHECKING: assert isinstance(config, dict) self.settings = PortConfig.from_dict(config) self._check_working_hours(which="env") self.settings.set_non_operational_dates( self.env.non_operational_start, self.env.start_year, self.env.non_operational_end, self.env.end_year, ) self.settings.set_reduced_speed_parameters( self.env.reduced_speed_start, self.env.start_year, self.env.reduced_speed_end, self.env.end_year, self.env.reduced_speed, ) # Instantiate the crews, tugboats, and turbine availability if TYPE_CHECKING: assert isinstance(self.settings, PortConfig) self.turbine_manager = simpy.Resource(env, self.settings.max_operations) self.crew_manager = simpy.Resource(env, self.settings.n_crews) service_equipment = [] for t in self.settings.tugboats: tugboat = ServiceEquipment(self.env, self.windfarm, repair_manager, t) tugboat._register_port(self) service_equipment.append(tugboat) self.service_equipment_manager = simpy.FilterStore(env, len(service_equipment)) self.service_equipment_manager.items = service_equipment self.active_repairs: dict[str, dict[str, simpy.events.Event]] = {} self.subassembly_resets: dict[str, list[str]] = {} # Create partial functions for the labor and equipment costs for clarity self.initialize_cost_calculators(which="port") # Run the annualized fee logger if self.settings.annual_fee > 0: self.env.process(self._log_annual_fee())
[docs] def _log_annual_fee(self): """Logs the annual port lease fee on a monthly-basis.""" if TYPE_CHECKING: assert isinstance(self.settings, PortConfig) monthly_fee = self.settings.annual_fee / 12.0 ix_month_starts = self.env.weather.filter( (pl.col("datetime").dt.day() == 1) & (pl.col("datetime").dt.hour() == 0) & (pl.col("index") > 0) ).select(pl.col("index")) # At time 0 log the first monthly fee self.env.log_action( agent=self.settings.name, action="monthly lease fee", reason="port lease", equipment_cost=monthly_fee, ) for i, (ix_month,) in enumerate(ix_month_starts.rows()): # Get the time to the start of the next month time_to_next = ( ix_month if i == 0 else ix_month - ix_month_starts.slice(i - 1, 1).item() ) # Log the fee at the start of each month at midnight yield self.env.timeout(time_to_next) self.env.log_action( agent=self.settings.name, action="monthly lease fee", reason="port lease", equipment_cost=monthly_fee, )
[docs] def repair_single( self, request: RepairRequest ) -> Generator[Timeout | Process, None, None]: """Simulation logic to process a single repair request. Parameters ---------- request : RepairRequest The submitted repair or maintenance request. """ # Request a service crew crew_request = self.crew_manager.request() yield crew_request # Once a crew is available, process the acutal repair end_shift = self.settings.workday_end # Set the default hours to process and remaining hours for the repair hours_to_process = hours_remaining = request.details.time # Create the shared logging among the processes system = self.windfarm.system(request.system_id) subassembly = getattr(system, request.subassembly_id) shared_logging = { "agent": self.settings.name, "reason": request.details.description, "system_id": system.id, "system_name": system.name, "part_id": subassembly.id, "part_name": subassembly.name, "request_id": request.request_id, } # Continue repairing and waiting a shift until the remaining hours to complete # the repair is zero while hours_remaining > 0: current = self.env.simulation_time # Check if the workday is limited by shifts and adjust to stay within shift if not self.settings.non_stop_shift: hours_to_process = hours_until_future_hour(current, end_shift) # Delay until the next shift if we're at the end if hours_to_process == 0: additional = "end of shift; will resume work in the next shift" yield self.env.process( self.wait_until_next_shift(additional=additional, **shared_logging) ) continue # Process the repair for the minimum of remaining hours to completion and # hours available in the shift hours_to_process = min(hours_to_process, hours_remaining) yield self.env.process( self.process_repair(hours_to_process, request.details, **shared_logging) ) # Decrement the remaining hours and reset the default hours to process back # to the remaining repair time hours_remaining -= hours_to_process hours_to_process = hours_remaining # Log the completion of the repair action = "maintenance" if isinstance(request.details, Maintenance) else "repair" self.env.log_action( system_id=system.id, part_id=subassembly.id, part_name=subassembly.name, agent=self.settings.name, action=f"{action} complete", reason=request.details.description, materials_cost=request.details.materials, additional="complete", request_id=request.request_id, ) # Make the crew available again self.crew_manager.release(crew_request) self.active_repairs[request.system_id][request.request_id].succeed() yield self.env.process(self.manager.register_repair(request))
[docs] def transfer_requests_from_manager( self, system_id: str ) -> None | list[RepairRequest] | Generator: """Gets all of a given system's repair requests from the simulation's repair manager, removes them from that queue, and puts them in the port's queue. Parameters ---------- system_id : str The ``System.id`` attribute from the system that will be repaired at port. Returns ------- None | list[RepairRequest] The list of repair requests that need to be completed at port. """ requests = self.manager.get_all_requests_for_system( self.settings.name, system_id ) if requests is None: return requests requests = [r.value for r in requests] # type: ignore self.items.extend(requests) for request in requests: self.env.log_action( system_id=request.system_id, system_name=request.system_name, part_id=request.subassembly_id, part_name=request.subassembly_name, system_ol=float("nan"), part_ol=float("nan"), agent=self.settings.name, action="requests moved to port", reason="at-port repair can now proceed", request_id=request.request_id, ) _ = yield self.manager.in_process_requests.put(request) self.active_repairs[system_id][request.request_id] = self.env.event() request_ids = {el.request_id for el in requests} self.manager.request_status_map["pending"].difference_update(request_ids) self.manager.request_status_map["processing"].update(request_ids)
[docs] def run_repairs(self, system_id: str) -> Generator | None: """Method that transfers the requests from the repair manager and initiates the repair sequence. Parameters ---------- system_id : str The ``System.id`` that is has been towed to port. """ self.active_repairs[system_id] = {} self.subassembly_resets[system_id] = [] yield self.env.process(self.transfer_requests_from_manager(system_id)) # Get all the requests and run them. # NOTE: this will all fail if there are somehow no requests, which also means # something else is completely wrong with the simulation request_list = self.get_all_requests_for_system(system_id) for request in request_list: # type: ignore if TYPE_CHECKING: assert isinstance(request, FilterStoreGet) request = request.value self.requests_serviced.update([request.request_id]) self.subassembly_resets[system_id].append(request.subassembly_id) yield self.env.process(self.repair_single(request))
[docs] def get_all_requests_for_system( self, system_id: str ) -> None | Generator[FilterStoreGet, None, None]: """Gets all repair requests for a specific ``system_id``. Parameters ---------- system_id : Optional[str], optional ID of the turbine or OSS; should correspond to ``System.id``. the first repair requested. Returns ------- Optional[Generator[FilterStoreGet]] All repair requests for a given system. If no matching requests are found, or there aren't any items in the queue yet, then None is returned. """ if not self.items: return None # Filter the requests by system requests = self.items if system_id is not None: requests = [el for el in self.items if el.system_id == system_id] if requests == []: return None # Loop the requests and pop them from the queue for request in requests: _ = yield self.get(lambda x: x is request) # pylint: disable=W0640 return requests
[docs] def run_tow_to_port(self, request: RepairRequest) -> Generator[Process, None, None]: """The method to initiate a tow-to-port repair sequence. The process follows the following following routine: 1. Request a tugboat from the tugboat resource manager and wait 2. Runs ``ServiceEquipment.tow_to_port``, which encapsulates the traveling to site, unmooring, and return tow with a turbine 3. Transfers the the turbine's repair log to the port, and gets all available crews to work on repairs immediately 4. Requests a tugboat to return the turbine to site 5. Runs ``ServiceEquipment.tow_to_site()``, which encapsulates the tow back to site, reconnection, resetting the operating status, and returning back to port Parameters ---------- request : RepairRequest The request that initiated the process. This is primarily used for logging purposes. Yields ------ Generator[Process, None, None] The series of events constituting the tow-to-port repairs """ # If the request has already been addressed, return if request.request_id in self.requests_serviced: return system_id = request.system_id request_id = request.request_id # Double check in case a delay causes multiple vessels to be interacting with # the same turbine # Add the requested system to the list of systems undergoing or registered to be # undergoing repairs, so this method can't be run again on the same system self.invalid_systems.append(system_id) # If the system is already undergoing repairs from other servicing equipment, # then wait until it's done being serviced servicing = self.windfarm.system(system_id).servicing # Wait for a spot to open up in the port queue turbine_request = self.turbine_manager.request() yield turbine_request & servicing yield self.env.timeout(self.env.get_random_seconds()) yield self.windfarm.system(system_id).servicing # Request a tugboat to retrieve the turbine tugboat = yield self.service_equipment_manager.get( lambda x: x.at_port and (not x.dispatched) and "TOW" in x.settings.capability ) # Check that there is enough time to complete towing, connection, and repairs # before starting the process, otherwise, wait until the next operational period # TODO: use a more sophisticated guess on timing, other than 20 days current = self.env.simulation_time.date() check_range = set( pd.date_range(current, current + pd.Timedelta(days=20), freq="D").date ) intersection = check_range.intersection(self.settings.non_operational_dates_set) if intersection: hours_to_next = self.hours_to_next_operational_date( start_search_date=max(intersection) ) self.env.log_action( agent=self.settings.name, action="delay", reason="non-operational period", additional="waiting for next operational period", duration=hours_to_next, ) yield self.env.timeout(hours_to_next) self.requests_serviced.update([request_id]) if TYPE_CHECKING: assert isinstance(tugboat, ServiceEquipment) yield self.env.process(tugboat.run_tow_to_port(request)) # Make the tugboat available again yield self.service_equipment_manager.put(tugboat) # Transfer the repairs to the port queue, which will initiate the repair process yield self.env.process(self.run_repairs(system_id)) # Wait for the repairs to complete yield simpy.AllOf(self.env, self.active_repairs[system_id].values()) # Request a tugboat to tow the turbine back to site, and open the turbine queue tugboat = yield self.service_equipment_manager.get( lambda x: x.at_port and (not x.dispatched) and "TOW" in x.settings.capability ) self.turbine_manager.release(turbine_request) self.subassembly_resets[system_id] = list( set(self.subassembly_resets[system_id]) ) yield self.env.process( tugboat.run_tow_to_site(request, self.subassembly_resets[system_id]) ) self.invalid_systems.pop(self.invalid_systems.index(system_id)) # Make the tugboat available again yield self.service_equipment_manager.put(tugboat)
[docs] def run_unscheduled_in_situ( self, request: RepairRequest, initial: bool = False ) -> Generator[Process, None, None]: """Runs the in-situ repair processes for port-based servicing equipment such as tugboats that will always return back to port, but are not necessarily a feature of the windfarm itself, such as a crew transfer vessel. Parameters ---------- request : RepairRequest The request that triggered the non tow-to-port, but port-based servicing equipment repair. Yields ------ Generator[Process, None, None] The travel and repair processes. """ # If the request has already been addressed, return if request.request_id in self.requests_serviced: return system = self.windfarm.system(request.system_id) if initial: _ = self.manager.get(lambda x: x is request) self.manager.in_process_requests.put(request) self.manager.request_status_map["pending"].difference_update( [request.request_id] ) self.manager.request_status_map["processing"].update([request.request_id]) # If the system is already undergoing repairs from other servicing equipment, # then wait until it's done being serviced, then double check yield system.servicing seconds_to_wait, *_ = ( self.env.random_generator.integers(low=0, high=30, size=1) / 3600.0 ) yield self.env.timeout(seconds_to_wait) yield system.servicing # Halt the turbine before going further to avoid issue with requests being # being submitted between now and when the tugboat gets to the turbine self.requests_serviced.update([request.request_id]) # Request a vessel that isn't solely a towing vessel vessel = yield self.service_equipment_manager.get( lambda x: x.at_port and (not x.dispatched) and x.settings.capability != ["TOW"] ) if TYPE_CHECKING: assert isinstance(vessel, ServiceEquipment) request = yield self.manager.get(lambda x: x is request) yield self.env.process(vessel.in_situ_repair(request, initial=True)) # If the tugboat finished mid-shift, the in-situ repair logic will keep it # there, so ensure it returns back to port once it's complete if not vessel.at_port: yield self.env.process( vessel.travel( start="site", end="port", agent=vessel.settings.name, reason=f"{request.details.description} complete", system_id=request.system_id, system_name=request.system_name, part_id=request.subassembly_id, part_name=request.subassembly_name, request_id=request.request_id, ) ) # Make the tugboat available again yield self.service_equipment_manager.put(vessel)