Source code for wombat.windfarm.windfarm

"""Creates the Windfarm class/model."""

from __future__ import annotations

import csv
import datetime as dt
import itertools
from math import fsum
from functools import cache

import numpy as np
import pandas as pd
import networkx as nx
from geopy import distance

from wombat.core import RepairManager, WombatEnvironment
from wombat.core.library import load_yaml
from wombat.windfarm.system import Cable, System
from wombat.core.data_classes import String, SubString, WindFarmMap, SubstationMap


[docs] class Windfarm: """The primary class for operating on objects within a windfarm. The substations, cables, and turbines are created as a network object to be more appropriately accessed and controlled. """ def __init__( self, env: WombatEnvironment, windfarm_layout: str, repair_manager: RepairManager, ) -> None: self.env = env self.repair_manager = repair_manager # Set up the layout and instantiate all windfarm objects self.configs: dict[str, dict] = {"turbine": {}, "substation": {}, "cable": {}} self._create_graph_layout(windfarm_layout) self._create_turbines_and_substations() self._create_cables() self.capacity: int | float = sum( self.system(turb).capacity for turb in self.turbine_id ) self._create_substation_turbine_map() self._create_wind_farm_map() self.finish_setup() self.calculate_distance_matrix() # Create the logging items self.system_list = list(self.graph.nodes) self._setup_logger() # Register the windfarm and start the logger self.repair_manager._register_windfarm(self) self.env._register_windfarm(self) self.env.process(self._log_operations())
[docs] def _create_graph_layout(self, windfarm_layout: str) -> None: """Creates a network layout of the windfarm start from the substation(s) to be able to capture downstream turbines that can be cut off in the event of a cable failure. Parameters ---------- windfarm_layout : str Filename to use for reading in the windfarm layout; must be a csv file. """ # Read in the layout CSV file, then sort it by string, then order to ensure # it can be traversed in sequential order later layout_path = str(self.env.data_dir / "project/plant" / windfarm_layout) layout = ( pd.read_csv(layout_path) .sort_values(by=["string", "order"]) .reset_index(drop=True) ) layout.subassembly = layout.subassembly.fillna("") layout.upstream_cable = layout.upstream_cable.fillna("") windfarm = nx.DiGraph() windfarm.add_nodes_from(layout.id.values) # Assign the data attributes to the graph nodes for col in ("name", "latitude", "longitude", "subassembly"): nx.set_node_attributes(windfarm, dict(layout[["id", col]].values), name=col) # Determine which nodes are substations and which are turbines if "type" in layout.columns: # Extract the type directly from the layout file if layout.loc[~layout.type.isin(("substation", "turbine"))].size > 0: raise ValueError( "At least one value in the 'type' column are not one of:" " 'substation' or 'turbine'." ) substation_filter = layout.type == "substation" nx.set_node_attributes( windfarm, dict(layout[["id", "type"]].values), name="type" ) else: # Deduce substations by their self-connected setting for interconnection substation_filter = layout.id == layout.substation_id _type = {True: "substation", False: "turbine"} d = {i: _type[val] for i, val in zip(layout.id, substation_filter.values)} nx.set_node_attributes(windfarm, d, name="type") self.turbine_id: np.ndarray = layout.loc[~substation_filter, "id"].values self.substation_id = layout.loc[substation_filter, "id"].values for substation in self.substation_id: windfarm.nodes[substation]["connection"] = layout.loc[ layout.id == substation, "substation_id" ].values[0] # Create a mapping for each substation to all connected turbines (subgraphs) substations = layout[substation_filter].copy() turbines = layout[~substation_filter].copy() substation_sections = [ turbines[turbines.substation_id == substation] for substation in substations.id ] # For each subgraph, create the edge connections between substations and # turbines. Note: these are pre-sorted in the layout creation step. for section in substation_sections: for _, row in section.iterrows(): if row.order == 0: start: str = row.substation_id else: start = current # noqa: F821 current: str = row.id windfarm.add_edge( start, current, length=row.distance, cable=row.upstream_cable ) # Create the substation to substation and substation to self connections for substation in self.substation_id: row = layout.loc[layout.id == substation] windfarm.add_edge( row.substation_id.values[0], substation, length=row.distance.values[0], cable=row.upstream_cable.values[0], ) self.graph: nx.DiGraph = windfarm self.layout_df = layout
[docs] def _create_turbines_and_substations(self) -> None: """Instantiates the turbine and substation models as defined in the user-provided layout file, and connects these models to the appropriate graph nodes to create a fully representative windfarm network model. Raises ------ ValueError Raised if the subassembly data is not provided in the layout file. """ # Loop through all nodes in the graph, and create the actual simulation objects for system_id, data in self.graph.nodes(data=True): name = data["subassembly"] node_type = data["type"] if name == "": raise ValueError( "A 'subassembly' file must be specified for all nodes in the" " windfarm layout!" ) # Read in unique system configuration files only once, and reference # the existing dictionary when possible to reduce I/O if (subassembly_dict := self.configs[node_type].get(name)) is None: subassembly_dict = load_yaml(self.env.data_dir / f"{node_type}s", name) self.configs[node_type][name] = subassembly_dict # Create the turbine or substation simulation object self.graph.nodes[system_id]["system"] = System( self.env, self.repair_manager, system_id, data["name"], subassembly_dict, node_type, )
[docs] def _create_cables(self) -> None: """Instantiates the cable models as defined in the user-provided layout file, and connects these models to the appropriate graph edges to create a fully representative windfarm network model. Raises ------ ValueError Raised if the cable model is not specified. """ get_name = "upstream_cable_name" in self.layout_df bad_data_location_messages = [] # Loop over all the edges in the graph and create cable objects for start_node, end_node, data in self.graph.edges(data=True): name = data["cable"] # Check that the cable data is provided if name == "": raise ValueError( "An 'upstream_cable' file must be specified for all nodes in the" " windfarm layout!" ) # Read in unique cable configuration files once to reduce I/O if (cable_dict := self.configs["cable"].get(name)) is None: try: cable_dict = load_yaml(self.env.data_dir / "cables", data["cable"]) except FileNotFoundError: cable_dict = load_yaml( self.env.data_dir / "windfarm", data["cable"] ) bad_data_location_messages.append( "In v0.7, all cable configurations must be located in:" " '<library>/cables/" ) self.configs["cable"][name] = cable_dict # Get the lat, lon pairs for the start and end points start_coordinates = ( self.graph.nodes[start_node]["latitude"], self.graph.nodes[start_node]["longitude"], ) end_coordinates = ( self.graph.nodes[end_node]["latitude"], self.graph.nodes[end_node]["longitude"], ) # Get the unique naming of the cable connection if it's configured name = None if get_name: name, *_ = self.layout_df.loc[ self.layout_df.id == end_node, "upstream_cable_name" ] # If the real distance/cable length is not input, then the geodesic distance # is calculated if data["length"] == 0: data["length"] = distance.geodesic( start_coordinates, end_coordinates, ellipsoid="WGS-84" ).km # Encode whether it is an array cable or an export cable if self.graph.nodes[end_node]["type"] == "substation": data["type"] = "export" else: data["type"] = "array" # Create the Cable simulation object data["cable"] = Cable( self, self.env, data["type"], start_node, end_node, cable_dict, name ) # Calaculate the geometric center point of the cable for later # determining travel distances to cables end_points = np.array((start_coordinates, end_coordinates)) data["latitude"], data["longitude"] = end_points.mean(axis=0)
[docs] def calculate_distance_matrix(self) -> None: """Calculates the geodesic distance, in km, between all of the windfarm's nodes, e.g., substations and turbines, and cables. """ ids = list(self.graph.nodes()) ids.extend([data["cable"].id for *_, data in self.graph.edges(data=True)]) coords = [ (data["latitude"], data["longitude"]) for *_, data in (*self.graph.nodes(data=True), *self.graph.edges(data=True)) ] dist = [ distance.geodesic(c1, c2).km for c1, c2 in itertools.combinations(coords, 2) ] dist_arr = np.ones((len(ids), len(ids))) triangle_ix = np.triu_indices_from(dist_arr, 1) dist_arr[triangle_ix] = dist_arr.T[triangle_ix] = dist # Set the self distance to infinity, so that only one crew can be dropped off # at a single point np.fill_diagonal(dist_arr, np.inf) self.distance_matrix = pd.DataFrame(dist_arr, index=ids, columns=ids)
[docs] def _create_substation_turbine_map(self) -> None: """Creates ``substation_turbine_map``, a dictionary, that maps substation(s) to the dependent turbines in the windfarm, and the weighting of each turbine in the windfarm. """ # Get all turbines connected to each substation, excepting any connected via # export cables that connect substations as these operate independently s_t_map: dict = {s: {"turbines": [], "weights": []} for s in self.substation_id} for substation_id in self.substation_id: nodes = set( nx.bfs_tree(self.graph, substation_id, depth_limit=1).nodes ).difference(self.substation_id) for node in list(nodes): nodes.update( list(itertools.chain(*nx.dfs_successors(self.graph, node).values())) ) s_t_map[substation_id]["turbines"] = np.array(list(nodes)) # Reorient the mapping to have the turbine list and the capacity-based weighting # of each turbine for s_id in s_t_map: s_t_map[s_id]["weights"] = ( np.array([self.system(t).capacity for t in s_t_map[s_id]["turbines"]]) / self.capacity ) self.substation_turbine_map: dict[str, dict[str, np.ndarray]] = s_t_map # Calculate the turbine weights self.turbine_weights: pd.DataFrame = ( pd.concat([pd.DataFrame(val) for val in s_t_map.values()]) .set_index("turbines") .T )
[docs] def _create_wind_farm_map(self) -> None: """Creates a secondary graph object strictly for traversing the windfarm to turn on/off the appropriate turbines, substations, and cables more easily. """ substations = self.substation_id graph = self.graph wind_map = dict(zip(substations, itertools.repeat({}))) export = [el for el in graph.edges if el[1] in substations] for cable_tuple in export: self.cable(cable_tuple).set_string_details(*cable_tuple[::-1]) for s_id in self.substation_id: start_nodes = list( set(nx.bfs_tree(graph, s_id, depth_limit=1).nodes).difference( substations ) ) wind_map[s_id] = {"strings": dict(zip(start_nodes, itertools.repeat({})))} for start_node in start_nodes: upstream = list( itertools.chain(*nx.dfs_successors(graph, start_node).values()) ) wind_map[s_id]["strings"][start_node] = { start_node: SubString( downstream=s_id, # type: ignore upstream=upstream, # type: ignore ) } self.cable((s_id, start_node)).set_string_details(start_node, s_id) downstream = start_node for node in upstream: wind_map[s_id]["strings"][start_node][node] = SubString( downstream=downstream, # type: ignore upstream=list( # tye: ignore itertools.chain(*nx.dfs_successors(graph, node).values()) ), ) self.cable((downstream, node)).set_string_details(start_node, s_id) downstream = node wind_map[s_id]["strings"][start_node] = String( # type: ignore start=start_node, upstream_map=wind_map[s_id]["strings"][start_node] ) wind_map[s_id] = SubstationMap( # type: ignore string_starts=start_nodes, string_map=wind_map[s_id]["strings"], downstream=graph.nodes[s_id]["connection"], ) self.wind_farm_map = WindFarmMap( substation_map=wind_map, export_cables=export, # type: ignore )
[docs] def finish_setup(self) -> None: """Final initialization hook for any substations, turbines, or cables.""" for start_node, end_node in self.graph.edges(): self.cable((start_node, end_node)).finish_setup()
[docs] def _setup_logger(self, initial: bool = True): self._log_columns = [ "datetime", "env_datetime", "env_time", ] + self.system_list self.env._operations_writer = csv.DictWriter( self.env._operations_csv, delimiter="|", fieldnames=self._log_columns ) if initial: self.env._operations_writer.writeheader()
[docs] def _log_operations(self): """Logs the operational data for a simulation.""" message = { "datetime": dt.datetime.now(), "env_datetime": self.env.simulation_time, "env_time": self.env.now, } message.update( {system: self.system(system).operating_level for system in self.system_list} ) self.env._operations_writer.writerow(message) HOURS = 1 while True: # Loop 10K times, to limit the number of times we write to the operations # log file. 10K was a crude optimization decision, so performance can vary # dependending on the simulation for _ in range(10000): yield self.env.timeout(HOURS) message = { "datetime": dt.datetime.now(), "env_datetime": self.env.simulation_time, "env_time": self.env.now, } message.update( { system: self.system(system).operating_level for system in self.system_list } ) self.env._operations_buffer.append(message) self.env._operations_writer.writerows(self.env._operations_buffer) self.env._operations_buffer.clear()
[docs] @cache def system(self, system_id: str) -> System: """Convenience function to returns the desired `System` object for a turbine or substation in the windfarm. Parameters ---------- system_id : str The system's unique identifier, ``wombat.windfarm.System.id``. Returns ------- System The ``System`` object. """ return self.graph.nodes[system_id]["system"]
[docs] @cache def cable(self, cable_id: tuple[str, str] | str) -> Cable: """Convenience function to returns the desired `Cable` object for a cable in the windfarm. Parameters ---------- cable_id : tuple[str, str] | str The cable's unique identifier, of the form: (``wombat.windfarm.System.id``, ``wombat.windfarm.System.id``), for the (downstream node id, upstream node id), or the ``Cable.id``. Returns ------- Cable The ``Cable`` object. """ if isinstance(cable_id, str): edge_id = tuple(cable_id.split("::")) if len(edge_id) == 3: edge_id = edge_id[1:] else: edge_id = cable_id try: return self.graph.edges[edge_id]["cable"] except KeyError: raise KeyError(f"Edge {edge_id} is invalid.")
@property def current_availability(self) -> float: r"""Calculates the product of all system ``operating_level`` variables across the windfarm using the following forumation. .. math:: \sum{ OperatingLevel_{substation_{i}} * \sum{OperatingLevel_{turbine_{j}} * Weight_{turbine_{j}}} } where the :math:``{OperatingLevel}`` is the product of the operating level of each subassembly on a given system (substation or turbine), and the :math:``{Weight}`` is the proportion of one turbine's capacity relative to the whole windfarm. """ # noqa: W605 operating_levels = { s_id: [ self.system(t).operating_level for t in self.substation_turbine_map[s_id]["turbines"] # type: ignore ] for s_id in self.substation_turbine_map } availability = fsum( [ self.system(s_id).operating_level * fsum( operating_levels[s_id] * self.substation_turbine_map[s_id]["weights"] # type: ignore ) for s_id in self.substation_turbine_map ] ) return availability @property def current_availability_wo_servicing(self) -> float: r"""Calculates the product of all system ``operating_level`` variables across the windfarm using the following forumation, ignoring 0 operating level due to ongoing servicing. .. math:: \sum{ OperatingLevel_{substation_{i}} * \sum{OperatingLevel_{turbine_{j}} * Weight_{turbine_{j}}} } where the :math:``{OperatingLevel}`` is the product of the operating level of each subassembly on a given system (substation or turbine), and the :math:``{Weight}`` is the proportion of one turbine's capacity relative to the whole windfarm. """ # noqa: W605 operating_levels = { s_id: [ self.system(t).operating_level_wo_servicing for t in self.substation_turbine_map[s_id]["turbines"] # type: ignore ] for s_id in self.substation_turbine_map } availability = fsum( [ self.system(s_id).operating_level_wo_servicing * fsum( operating_levels[s_id] * self.substation_turbine_map[s_id]["weights"] # type: ignore ) for s_id in self.substation_turbine_map ] ) return availability