Source code for wombat.core.repair_management

"""Creates the necessary repair classes."""


from __future__ import annotations

from typing import TYPE_CHECKING
from itertools import chain
from collections import Counter
from collections.abc import Generator

import numpy as np
from simpy.resources.store import FilterStore, FilterStoreGet

from wombat.core import (
    Failure,
    Maintenance,
    StrategyMap,
    RepairRequest,
    WombatEnvironment,
    UnscheduledServiceEquipmentData,
)


if TYPE_CHECKING:
    from wombat.core import Port, ServiceEquipment
    from wombat.windfarm import Windfarm
    from wombat.windfarm.system import Cable, System


[docs] class RepairManager(FilterStore): """Provides a class to manage repair and maintenance tasks. Parameters ---------- FilterStore : simpy.resources.store.FilterStore The ``simpy`` class on which RepairManager is based to manage the repair and maintenance tasks. env : wombat.core.WombatEnvironment The simulation environment. capacity : float The maximum number of tasks that can be submitted to the manager, by default ``np.inf``. Attributes ---------- env : wombat.core.WombatEnvironment The simulation environment. windfarm: wombat.windfarm.Windfarm The simulated windfarm. This is only used for getting the operational capacity. _current_id : int The logged and auto-incrememented integer base for the ID generated for each submitted repair request. downtime_based_equipment: StrategyMap The mapping between downtime-based servicing equipment and their capabilities. request_based_equipment: StrategyMap The mapping between request-based servicing equipment and their capabilities. """ def __init__(self, env: WombatEnvironment, capacity: float = np.inf) -> None: super().__init__(env, capacity) self.env = env self._current_id = 0 self.invalid_systems: list[str] = [] self.systems_in_tow: list[str] = [] self.systems_waiting_for_tow: list[str] = [] self.downtime_based_equipment = StrategyMap() self.request_based_equipment = StrategyMap() self.completed_requests = FilterStore(self.env) self.in_process_requests = FilterStore(self.env) self.request_status_map: dict[str, set] = { "pending": set(), "processing": set(), "completed": set(), }
[docs] def _update_equipment_map(self, service_equipment: ServiceEquipment) -> None: """Updates ``equipment_map`` with a provided servicing equipment object.""" capability = service_equipment.settings.capability strategy = service_equipment.settings.strategy if strategy == "downtime": mapping = self.downtime_based_equipment elif strategy == "requests": mapping = self.request_based_equipment else: # Shouldn't be possible to get here! raise ValueError("Invalid servicing equipment!") if TYPE_CHECKING: assert isinstance( service_equipment.settings, UnscheduledServiceEquipmentData ) strategy_threshold = service_equipment.settings.strategy_threshold if isinstance(capability, list): for c in capability: mapping.update(c, strategy_threshold, service_equipment)
[docs] def _register_windfarm(self, windfarm: Windfarm) -> None: """Adds the simulation windfarm to the class attributes.""" self.windfarm = windfarm
[docs] def _register_equipment(self, service_equipment: ServiceEquipment) -> None: """Adds the servicing equipment to the class attributes and adds it to the capabilities mapping. """ self._update_equipment_map(service_equipment)
[docs] def _register_port(self, port: Port) -> None: """Registers the port with the repair manager, so that they can communicate as needed. Parameters ---------- port : Port The port where repairs will occur. """ self.port = port
[docs] def _create_request_id(self, request: RepairRequest) -> str: """Creates a unique ``request_id`` to be logged in the ``request``. Parameters ---------- request : RepairRequest The request object. Returns ------- str An 11-digit identifier starting with "MNT" for maintenance tasks or "RPR" for repairs. Raises ------ ValueError If the ``request.details`` property is not a ``Failure`` or ``Maintenance`` object, then a ValueError will be raised. """ if isinstance(request.details, Failure): prefix = "RPR" elif isinstance(request.details, Maintenance): prefix = "MNT" else: # Note this is a safety, and shouldn't be able to be reached raise ValueError("A valid ``RepairRequest`` must be submitted") request_id = f"{prefix}{str(self._current_id).zfill(8)}" self._current_id += 1 return request_id
[docs] def _is_request_processing(self, request: RepairRequest) -> bool: """Checks if a repair is being performed, or has already been completed. Parameters ---------- request : RepairRequest The request that is about to be submitted to servicing equipment, but needs to be double-checked against ongoing processes. Returns ------- bool True if the request is ongoing or completed, False, if it's ok to processed with the operation. """ if self.items == []: return False rid = request.request_id processing = (lambda: rid in self.request_status_map["processing"])() completed = (lambda: rid in self.request_status_map["completed"])() return processing or completed
[docs] def _run_equipment_downtime(self, request: RepairRequest) -> None | Generator: """Run any equipment that has a pending request where the current windfarm operating capacity is less than or equal to the servicing equipment's threshold. TODO: This methodology needs to better resolve dispatching every equipment relating to a request vs just the one(s) that are required. Basically, don't dispatch every available HLV, but just one plus one of every other capability category that has pending requests """ # Add an initial check to help avoid simultaneous dispatching seconds_to_wait, *_ = ( self.env.random_generator.integers(low=0, high=10, size=1) / 3600.0 ) yield self.env.timeout(seconds_to_wait) # Port-based servicing equipment should be handled by the port and does not # have an operating reduction threshold to meet at this time if "TOW" in request.details.service_equipment: if request.system_id not in self.port.invalid_systems: system = self.windfarm.system(request.system_id) yield system.servicing_queue & system.servicing yield self.env.timeout(self.env.get_random_seconds(high=1)) if request.system_id in self.systems_in_tow: return self.invalidate_system(system, tow=True) yield self.env.process(self.port.run_tow_to_port(request)) return # Wait for the actual system or cable to be available if request.cable: yield self.windfarm.cable(request.system_id).servicing else: yield self.windfarm.system(request.system_id).servicing operating_capacity = self.windfarm.current_availability_wo_servicing for capability in self.request_map: equipment_mapping = self.downtime_based_equipment.get_mapping(capability) for i, equipment in enumerate(equipment_mapping): if operating_capacity > equipment.strategy_threshold: continue # Avoid simultaneous dispatches by waiting a random number of seconds seconds_to_wait, *_ = ( self.env.random_generator.integers(low=0, high=30, size=1) / 3600.0 ) yield self.env.timeout(seconds_to_wait) equipment_obj = equipment.equipment if equipment_obj.dispatched: continue # Equipment-based logic does not manage system availability, so # ensure it's available prior to dispatching, and double check in # case delays causing a timing collision if equipment_obj.port_based: if request.system_id in self.port.invalid_systems: break yield self.windfarm.system(request.system_id).servicing self.env.process( self.port.run_unscheduled_in_situ(request, initial=True) ) else: self.env.process(equipment_obj.run_unscheduled_in_situ()) # Move the dispatched capability to the end of list to ensure proper # cycling of available servicing equipment self.downtime_based_equipment.move_equipment_to_end(capability, i)
[docs] def _run_equipment_requests(self, request: RepairRequest) -> None | Generator: """Run the first piece of equipment (if none are onsite) for each equipment capability category where the number of requests is greater than or equal to the equipment's threshold. """ # Add an initial check to help avoid simultaneous dispatching yield self.env.timeout(self.env.get_random_seconds(high=30)) # Port-based servicing equipment should be handled by the port and does not have # a requests-based threshold to meet at this time if "TOW" in request.details.service_equipment: if request.system_id not in self.port.invalid_systems: self.systems_waiting_for_tow.append(request.system_id) system = self.windfarm.system(request.system_id) yield system.servicing_queue & system.servicing yield self.env.timeout(self.env.get_random_seconds(high=1)) if request.system_id in self.systems_in_tow: return self.invalidate_system(system, tow=True) yield self.env.process(self.port.run_tow_to_port(request)) return # Wait for the actual system or cable to be available if request.cable: yield self.windfarm.cable(request.system_id).servicing else: yield self.windfarm.system(request.system_id).servicing dispatched = None for capability, n_requests in self.request_map.items(): # For a requests basis, the capability and submitted request must match if capability not in request.details.service_equipment: continue equipment_mapping = self.request_based_equipment.get_mapping(capability) for i, equipment in enumerate(equipment_mapping): if n_requests < equipment.strategy_threshold: continue # Avoid simultaneous dispatches by waiting a random number of seconds yield self.env.timeout(self.env.get_random_seconds(high=30)) # Run only the first piece of equipment in the mapping list, but ensure # that it moves to the back of the line after being used equipment_obj = equipment.equipment if equipment_obj.dispatched: break # Either run the repair logic from the port for port-based servicing # equipment, so that it can self-mangge or dispatch the servicing # equipment directly, when port is an implicitly modeled aspect if equipment_obj.port_based: # Equipment-based logic does not manage system availability, so # ensure it's available prior to dispatching if request.system_id in self.port.invalid_systems: break yield self.env.process( self.port.run_unscheduled_in_situ(request, initial=True) ) else: yield self.env.process(equipment_obj.run_unscheduled_in_situ()) # Move the dispatched capability to the end of list to ensure proper # cycling of available servicing equipment self.request_based_equipment.move_equipment_to_end(capability, i) dispatched = capability break yield self.env.timeout(1 / 60) # wait one minute for repair to register # Double check the the number of reqeusts is still below the threshold following # the dispatching of a piece of servicing equipment. This mostly pertains to # highly frequent request with long repair times and low thresholds. if ( dispatched is None or equipment_obj.port_based or dispatched not in self.request_map ): return n_requests = self.request_map[dispatched] threshold_check = [ n_requests >= eq.strategy_threshold for eq in self.request_based_equipment.get_mapping(dispatched) ] if any(threshold_check): new_request_check = [ x for x in self.items if dispatched in x.details.service_equipment and not self._is_request_processing(x) and x is not request ] if new_request_check: new_request = self.get(lambda x: x == new_request_check[0]).value yield self.env.process(self._run_equipment_requests(new_request))
[docs] def register_request(self, request: RepairRequest) -> RepairRequest: """The method to submit requests to the repair mananger and adds a unique identifier for logging. Parameters ---------- request : RepairRequest The request that needs to be tracked and logged. Returns ------- RepairRequest The same request as passed into the method, but with a unique identifier used for logging. """ request_id = self._create_request_id(request) request.assign_id(request_id) self.put(request) self.request_status_map["pending"].update([request.request_id]) return request
[docs] def submit_request(self, request: RepairRequest) -> None: """The method to submit requests to the repair mananger and adds a unique identifier for logging. Parameters ---------- request : RepairRequest The request that needs to be tracked and logged. Returns ------- RepairRequest The same request as passed into the method, but with a unique identifier used for logging. """ if self.downtime_based_equipment.is_running: self.env.process(self._run_equipment_downtime(request)) if self.request_based_equipment.is_running: self.env.process(self._run_equipment_requests(request))
[docs] def get_request_by_system( self, equipment_capability: list[str], system_id: str | None = None ) -> FilterStoreGet | None: """Gets all repair requests for a certain turbine with given a sequence of ``equipment_capability`` as long as it isn't registered as unable to be serviced. Parameters ---------- equipment_capability : list[str] The capability of the servicing equipment requesting repairs to process. system_id : Optional[str], optional ID of the turbine or OSS; should correspond to ``System.id``, by default None. If None, then it will simply sort the list by ``System.id`` and return the first repair requested. Returns ------- Optional[FilterStoreGet] The first repair request for the focal system or None, if self.items is empty, or there is no matching system. """ if not self.items: return None if system_id in self.invalid_systems: return None equipment_capability = set(equipment_capability) # type: ignore # Filter the requests by system requests = self.items if system_id is not None: requests = [el for el in self.items if el.system_id == system_id] if requests == []: return None # Filter the requests by equipment capability and return the first valid request if TYPE_CHECKING: assert isinstance(equipment_capability, set) for request in requests: if self._is_request_processing(request): continue if request.system_id in self.systems_waiting_for_tow: continue if equipment_capability.intersection(request.details.service_equipment): # If this is the first request for the system, make sure no other # servicing equipment can access i self.request_status_map["pending"].difference_update( [requests[0].request_id] ) self.request_status_map["processing"].update([requests[0].request_id]) return self.get(lambda x: x is requests[0]) # There were no matching equipment requirements to match the equipment # attempting to retrieve its next request return None
[docs] def get_request_by_severity( self, equipment_capability: list[str] | set[str], severity_level: int | None = None, ) -> FilterStoreGet | None: """Gets the next repair request by ``severity_level``. Parameters ---------- equipment_capability : list[str] The capability of the equipment requesting possible repairs to make. severity_level : int Severity level of focus, default None. Returns ------- Optional[FilterStoreGet] Repair request meeting the required criteria. """ if not self.items: return None equipment_capability = set(equipment_capability) requests = self.items if severity_level is not None: # Capture only the desired severity level, if specified requests = [ el for el in self.items if (el.severity_level == severity_level) ] if requests == []: return None # Re-order requests by severity (high to low) and the order they were submitted # Need to ensure that the requests are in submission order in case any get put # back requests = sorted(requests, key=lambda x: x.severity_level, reverse=True) for request in requests: if self._is_request_processing(request): continue if request.system_id in self.systems_waiting_for_tow: continue if request.cable: if not self.windfarm.cable(request.system_id).servicing.triggered: continue else: if not self.windfarm.system(request.system_id).servicing.triggered: continue if request.system_id not in self.invalid_systems: if equipment_capability.intersection(request.details.service_equipment): self.request_status_map["pending"].difference_update( [request.request_id] ) self.request_status_map["processing"].update([request.request_id]) return self.get(lambda x: x is request) # Ensure None is returned if nothing is found in the loop just as a FilterGet # would if allowed to oeprate without the above looping to identify multiple # criteria and acting on the request before it's processed return None
[docs] def invalidate_system( self, system: System | Cable | str, tow: bool = False ) -> None: """Disables the ability for servicing equipment to service a specific system, sets the turbine status to be in servicing, and interrupts all the processes to turn off operations. Parameters ---------- system : System | Cable | str The system, cable, or ``str`` id of one to disable repairs. tow : bool, optional Set to True if this is for a tow-to-port request. """ if isinstance(system, str): if "::" in system: system = self.windfarm.cable(system) else: system = self.windfarm.system(system) if system.id not in self.invalid_systems and system.servicing.triggered: system.servicing_queue = self.env.event() self.invalid_systems.append(system.id) else: raise RuntimeError( f"{self.env.simulation_time} {system.id} already being serviced" ) if tow: self.systems_in_tow.append(system.id) _ = self.systems_waiting_for_tow.pop( self.systems_waiting_for_tow.index(system.id) )
[docs] def interrupt_system( self, system: System | Cable, replacement: str | None = None ) -> None: """Sets the turbine status to be in servicing, and interrupts all the processes to turn off operations. Parameters ---------- system_id : str The system to disable repairs. replacement: str | None, optional If a subassebly `id` is provided, this indicates the interruption is caused by its replacement event. Defaults to None. """ if system.servicing.triggered and system.id in self.invalid_systems: system.servicing = self.env.event() system.interrupt_all_subassembly_processes(replacement=replacement) else: raise RuntimeError( f"{self.env.simulation_time} {system.id} already being serviced" )
[docs] def register_repair(self, repair: RepairRequest) -> Generator: """Registers the repair as complete with the repair managiner. Parameters ---------- repair : RepairRequest The repair that has been completed. port : bool, optional If True, indicates that a port handled the repair, otherwise that a managed servicing equipment handled the repair, by default False. Yields ------ Generator The ``completed_requests.put()`` that registers completion. """ self.request_status_map["processing"].difference_update([repair.request_id]) self.request_status_map["completed"].update([repair.request_id]) yield self.completed_requests.put(repair) yield self.in_process_requests.get(lambda x: x is repair)
[docs] def enable_requests_for_system( self, system: System | Cable, tow: bool = False ) -> None: """Reenables service equipment operations on the provided system. Parameters ---------- system_id : System | Cable The ``System`` or ``Cable`` that can be operated on again. tow : bool, optional Set to True if this is for a tow-to-port request. """ if system.servicing.triggered: raise RuntimeError( f"{self.env.simulation_time} Repairs were already completed" f" at {system.id}" ) _ = self.invalid_systems.pop(self.invalid_systems.index(system.id)) if tow: _ = self.systems_in_tow.pop(self.systems_in_tow.index(system.id)) system.servicing.succeed() system.servicing_queue.succeed() system.interrupt_all_subassembly_processes()
[docs] def get_all_requests_for_system( self, agent: str, system_id: str ) -> list[RepairRequest] | None | Generator: """Gets all repair requests for a specific ``system_id``. Parameters ---------- agent : str The name of the entity requesting all of a system's repair requests. system_id : Optional[str], optional ID of the turbine or OSS; should correspond to ``System.id``. the first repair requested. Returns ------- Optional[list[RepairRequest]] All repair requests for a given system. If no matching requests are found, or there aren't any items in the queue yet, then None is returned. """ if not self.items: return None # Filter the requests by system requests = self.items if system_id is not None: requests = [el for el in self.items if el.system_id == system_id] if requests == []: return None # Loop the requests and pop them from the queue for request in requests: self.env.log_action( system_id=request.system_id, system_name=request.system_name, part_id=request.subassembly_id, part_name=request.subassembly_name, system_ol=float("nan"), part_ol=float("nan"), agent=agent, action="requests being moved", reason="", request_id=request.request_id, ) self.request_status_map["pending"].difference_update([request.request_id]) self.request_status_map["processing"].update([request.request_id]) _ = yield self.get(lambda x: x is request) # pylint: disable=W0640 return requests
[docs] def purge_subassembly_requests( self, system_id: str, subassembly_id: str, exclude: list[str] = [] ) -> list[RepairRequest] | None: """Yields all the requests for a system/subassembly combination. This is intended to be used to remove erroneous requests after a subassembly has been replaced. Parameters ---------- system_id : str Either the ``System.id`` or ``Cable.id``. subassembly_id : str Either the ``Subassembly.id`` or the ``Cable.id`` repeated for cables. exclude : list[str] A list of ``request_id`` to exclude from the purge. This is a specific use case for the combined cable system/subassembly, but can be to exclude certain requests from the purge. Yields ------ Optional[list[RepairRequest]] All requests made to the repair manager for the provided system/subassembly combination. Returns None if self.items is empty or the loop terminates without finding what it is looking for. """ if not self.items: return None # First check the system matches because we'll need these separated later to # ensure we don't incorrectly remove towing requests from other subassemblies system_requests = [ request for request in self.items if request.system_id == system_id and request.request_id not in exclude ] if system_requests == []: return None subassembly_requests = [ request for request in system_requests if request.subassembly_id == subassembly_id ] if subassembly_requests == []: return None for request in subassembly_requests: which = "repair" if isinstance(request.details, Failure) else "maintenance" self.env.log_action( system_id=request.system_id, system_name=request.system_name, part_id=request.subassembly_id, part_name=request.subassembly_name, system_ol=float("nan"), part_ol=float("nan"), agent="RepairManager", action=f"{which} canceled", reason="replacement required", request_id=request.request_id, ) self.request_status_map["pending"].difference_update([request.request_id]) self.request_status_map["processing"].update([request.request_id]) _ = self.get(lambda x: x is request) # pylint: disable=W0640 sid = request.system_id # Ensure that if it was reset, and a tow was waiting, that it gets cleared, # unless a separate subassembly required the tow if sid in self.systems_waiting_for_tow: other_subassembly_match = [ r for r in system_requests if "TOW" in r.details.service_equipment ] if sid not in self.systems_in_tow and other_subassembly_match == []: _ = self.systems_waiting_for_tow.pop( self.systems_waiting_for_tow.index(sid) ) return subassembly_requests
@property def request_map(self) -> dict[str, int]: """Creates an updated mapping between the servicing equipment capabilities and the number of requests that fall into each capability category (nonzero values only). """ requests = dict( Counter( chain.from_iterable(r.details.service_equipment for r in self.items) ) ) return requests