Source code for ard.cost.orbit_wrap

from pathlib import Path
import shutil
import warnings

import numpy as np
import pandas as pd

import openmdao.api as om

import wisdem.orbit.orbit_api as orbit_wisdem

from ORBIT.core.library import default_library
from ORBIT.core.library import initialize_library

from ard.cost.wisdem_wrap import ORBIT_setup_latents


[docs] def generate_orbit_location_from_graph( graph, # TODO: replace with a terse_links representation X_turbines, Y_turbines, X_substations, Y_substations, allow_branching_approximation=False, ): """ go from a optiwindnet graph to an ORBIT input CSV convert a optiwindnet graph representation of a collection system and get to a best-possible approximation of the same collection system for compatibility with ORBIT. ORBIT doesn't allow branching and optiwindnet does by default, so we allow some cable duplication if necessary to get a conservative approximation of the BOS costs if the graph isn't compatible with ORBIT Parameters ---------- graph : networkx.Graph the graph representation of the collection system design X_turbines : np.array the cartesian X locations, in kilometers, of the turbines Y_turbines : np.array the cartesian Y locations, in kilometers, of the turbines X_substations : np.array the cartesian X locations, in kilometers, of the substations Y_substations : np.array the cartesian Y locations, in kilometers, of the substations Returns ------- pandas.DataFrame a dataframe formatted for ORBIT to specify a farm layout Raises ------ RecursionError if the recursive setup seems to be stuck in a loop """ # get all edges, sorted by the first node then the second node edges_to_process = [edge for edge in graph.edges] edges_to_process.sort(key=lambda x: (x[0], x[1])) # get the edges with a negative index node (a substation) edges_inclsub = [edge for edge in edges_to_process if edge[0] < 0 or edge[1] < 0] edges_inclsub.sort(key=lambda x: (x[0], x[1])) # check to see if any nodes appear more than twice # (i.e. once destination and possibly one source) node_countmap = dict.fromkeys( list(set(node for edge in edges_to_process for node in edge)), 0 ) for edge in edges_to_process: node_countmap[edge[0]] += 1 node_countmap[edge[1]] += 1 # if this has branching, handle it if np.any( ( np.array(list(node_countmap.values())) > 2 ) # multiple turbine appearances indicates a branch & ( np.array(list(node_countmap.keys())) >= 0 ) # but substations do appear so "mask" them ): if allow_branching_approximation: warnings.warn( "The provided collection system design graph includes branching, " "which ORBIT does not support. Proceeding with an approximate " "radial collection system for cost modeling." ) else: raise ValueError( "The graph has branching. ORBIT does not support this. " "By modifying the approximate_branches option to True in the " "ORBITDetail component, you can allow ORBIT to approximate the " "BOS costs by a close radial-layout collection system " "approximation." ) # data for ORBIT data_orbit = { "id": [], "substation_id": [], "name": [], "longitude": [], "latitude": [], "string": [], "order": [], "cable_length": [], "bury_speed": [], } idx_string = 0 order = 0 for edge in edges_inclsub: # every edge w/ a substation starts a string def handle_edge( edge, turbine_origination, idx_string, order, recursion_level=0 ): # recursively handle the edges if recursion_level > 10: # for safe recursion raise RecursionError("Recursion limit reached") # get the target turbine index turbine_tgt_index = edge[0] if edge[0] != turbine_origination else edge[1] # get the turbine name turbine_name = turbine_id = f"t{turbine_tgt_index:03d}" # add the turbine to the dataset data_orbit["id"].append(turbine_id) data_orbit["substation_id"].append(substation_id) data_orbit["name"].append(turbine_name) data_orbit["longitude"].append(X_turbines[turbine_tgt_index]) data_orbit["latitude"].append(Y_turbines[turbine_tgt_index]) data_orbit["string"].append(int(idx_string)) data_orbit["order"].append(int(order)) data_orbit["cable_length"].append(0) # ORBIT computes automatically data_orbit["bury_speed"].append(0) # ORBIT computes automatically # pop this edge out of the edges list edges_to_process.remove(edge) # get the set of remaining edges that include the terminal turbine edges_turbine = [e for e in edges_to_process if (turbine_tgt_index in e)] order += 1 for new_string, edge_next in enumerate(edges_turbine): if new_string: idx_string += 1 order = 0 idx_string, order = handle_edge( edge_next, turbine_tgt_index, idx_string, order, recursion_level=recursion_level + 1, ) return idx_string, order # get the substation id as a one-liner substation_index = len(X_substations) + (edge[0] if edge[0] < 0 else edge[1]) # get the substation name substation_name = substation_id = f"oss{substation_index:01d}" # add the substation to the dataset if not substation_id in data_orbit["id"]: data_orbit["id"].append(substation_id) data_orbit["substation_id"].append(substation_id) data_orbit["name"].append(substation_name) data_orbit["longitude"].append(X_substations[substation_index] / 1.0e3) data_orbit["latitude"].append(Y_substations[substation_index] / 1.0e3) data_orbit["string"].append(None) data_orbit["order"].append(None) data_orbit["cable_length"].append(None) data_orbit["bury_speed"].append(None) # handle the edge that we get idx_string, order = handle_edge( edge, substation_index - len(X_substations), idx_string, order ) order = 0 idx_string += 1 df_orbit = pd.DataFrame(data_orbit).fillna("") df_orbit.string = [int(v) if v != "" else "" for v in df_orbit.string] df_orbit.order = [int(v) if v != "" else "" for v in df_orbit.order] return df_orbit
[docs] class ORBITDetail(orbit_wisdem.Orbit): """ Wrapper for WISDEM's ORBIT offshore BOS calculators. A thicker wrapper of `wisdem.orbit_api` that 1) replaces capabilities that assume a grid farm layout that is default in WISDEM's ORBIT with a custom array layout, and 2) traps warning messages that are recognized not to be issues. See: https://github.com/WISDEM/ORBIT """
[docs] def initialize(self): """Initialize for API connections.""" super().initialize() self.options.declare("case_title", default="working") self.options.declare("modeling_options") self.options.declare("approximate_branches", default=False)
[docs] def setup(self): """Define all input variables from all models.""" # load modeling options self.modeling_options = self.options["modeling_options"] self.N_turbines = self.modeling_options["layout"]["N_turbines"] self.N_substations = self.modeling_options["layout"]["N_substations"] self.set_input_defaults("wtiv", "example_wtiv") self.set_input_defaults("feeder", "example_feeder") # self.set_input_defaults("num_feeders", 1) # self.set_input_defaults("num_towing", 1) # self.set_input_defaults("num_station_keeping", 3) # self.set_input_defaults( # "oss_install_vessel", "example_heavy_lift_vessel", # ) self.set_input_defaults("site_distance", 40.0, units="km") self.set_input_defaults("site_distance_to_landfall", 40.0, units="km") self.set_input_defaults("interconnection_distance", 40.0, units="km") self.set_input_defaults("plant_turbine_spacing", 7) self.set_input_defaults("plant_row_spacing", 7) self.set_input_defaults("plant_substation_distance", 1, units="km") # self.set_input_defaults("num_port_cranes", 1) # self.set_input_defaults("num_assembly_lines", 1) self.set_input_defaults("takt_time", 170.0, units="h") self.set_input_defaults("port_cost_per_month", 2e6, units="USD/mo") self.set_input_defaults("construction_insurance", 44.0, units="USD/kW") self.set_input_defaults("construction_financing", 183.0, units="USD/kW") self.set_input_defaults("contingency", 316.0, units="USD/kW") self.set_input_defaults("commissioning_cost_kW", 44.0, units="USD/kW") self.set_input_defaults("decommissioning_cost_kW", 58.0, units="USD/kW") self.set_input_defaults("site_auction_price", 100e6, units="USD") self.set_input_defaults("site_assessment_cost", 50e6, units="USD") self.set_input_defaults("construction_plan_cost", 1e6, units="USD") self.set_input_defaults("installation_plan_cost", 2.5e5, units="USD") self.set_input_defaults("boem_review_cost", 0.0, units="USD") self.add_subsystem( "orbit", ORBITWisdemDetail( modeling_options=self.modeling_options, case_title=self.options["case_title"], approximate_branches=self.options["approximate_branches"], floating=self.modeling_options["floating"], jacket=self.modeling_options.get("jacket"), jacket_legs=self.modeling_options.get("jacket_legs"), ), promotes=["*"], )
[docs] class ORBITWisdemDetail(orbit_wisdem.OrbitWisdem): """ORBIT-WISDEM Fixed Substructure API, modified for detailed layouts""" _path_library = None
[docs] def initialize(self): super().initialize() self.options.declare("case_title", default="working") self.options.declare("modeling_options") self.options.declare("approximate_branches", default=False)
[docs] def setup(self): """Define all the inputs.""" # call the superclass method super().setup() # load modeling options self.modeling_options = self.options["modeling_options"] self.N_turbines = self.modeling_options["layout"]["N_turbines"] self.N_substations = self.modeling_options["layout"]["N_substations"] # bring in collection system design self.add_discrete_input("graph", None) # add the detailed turbine and substation locations self.add_input("x_turbines", np.zeros((self.N_turbines,)), units="km") self.add_input("y_turbines", np.zeros((self.N_turbines,)), units="km") self.add_input("x_substations", np.zeros((self.N_substations,)), units="km") self.add_input("y_substations", np.zeros((self.N_substations,)), units="km") # copy the default ORBIT library to a local directory under case_files path_library_default = Path(default_library).absolute() self._path_library = ( Path("case_files") / self.options["case_title"] / "ORBIT_library" ).absolute() if path_library_default.exists(): shutil.copytree( path_library_default, self._path_library, dirs_exist_ok=True ) else: raise FileNotFoundError( f"Can not find default ORBIT library at {path_library_default}." )
[docs] def compile_orbit_config_file( self, inputs, outputs, discrete_inputs, discrete_outputs, ): config = super().compile_orbit_config_file( inputs, outputs, discrete_inputs, discrete_outputs, ) # run the superclass # remove the grid plant option, and replace with a custom plant config["plant"] = { "layout": "custom", "num_turbines": int(discrete_inputs["number_of_turbines"]), "turbine_spacing": inputs["plant_turbine_spacing"], "row_spacing": inputs["plant_row_spacing"], } # switch to the custom array system design if not ("ArraySystemDesign" in config["design_phases"]): raise KeyError( "I assumed that 'ArraySystemDesign' would be in the config. Something changed." ) config["design_phases"][ config["design_phases"].index("ArraySystemDesign") ] = "CustomArraySystemDesign" # add a turbine location csv on the config basename_farm_location = "wisdem_detailed_array" config["array_system_design"]["distance"] = True # don't use WGS84 lat/long config["array_system_design"]["location_data"] = basename_farm_location config["array_system_design"]["cables"] = [ f"XLPE_185mm_66kV{'_dynamic' if self.options['floating'] else ''}", f"XLPE_500mm_132kV{'_dynamic' if self.options['floating'] else ''}", f"XLPE_630mm_66kV{'_dynamic' if self.options['floating'] else ''}", f"XLPE_1000mm_220kV{'_dynamic' if self.options['floating'] else ''}", ] # we require bigger cables than the standard WISDEM wrap # create the csv file that holds the farm layout path_farm_location = ( self._path_library / "cables" / (basename_farm_location + ".csv") ) # generate the csv data needed to locate the farm elements generate_orbit_location_from_graph( discrete_inputs["graph"], inputs["x_turbines"], inputs["y_turbines"], inputs["x_substations"], inputs["y_substations"], allow_branching_approximation=self.options["approximate_branches"], ).to_csv(path_farm_location, index=False) self._orbit_config = config # reinstall- probably not needed due to reference return config # and return
[docs] def compute(self, inputs, outputs, discrete_inputs, discrete_outputs): """Creates and runs the project, then gathers the results.""" # setup the custom-location library if self._path_library: initialize_library(self._path_library) # send it back to the superclass compute super().compute( inputs, outputs, discrete_inputs, discrete_outputs, )
[docs] class ORBITDetailedGroup(om.Group): """wrapper for ORBIT-WISDEM Fixed Substructure API, allowing manual IVC incorporation"""
[docs] def initialize(self): """Initialize the group and declare options.""" self.options.declare("case_title", default="working") self.options.declare( "modeling_options", types=dict, desc="Ard modeling options" ) self.options.declare("approximate_branches", default=False)
[docs] def setup(self): # add IVCs for landbosse variable_mapping = ORBIT_setup_latents( modeling_options=self.options["modeling_options"] ) # create source independent variable components for LandBOSSE inputs for key, meta in variable_mapping.items(): if key in ["number_of_turbines", "number_of_blades", "num_mooring_lines"]: comp = om.IndepVarComp() comp.add_discrete_output(name=key, val=meta["val"]) self.add_subsystem(f"IVC_orbit_{key}", comp, promotes=["*"]) else: self.add_subsystem( f"IVC_orbit_{key}", om.IndepVarComp(key, val=meta["val"], units=meta["units"]), promotes=["*"], ) # add orbit self.add_subsystem( "orbit", ORBITDetail( case_title=self.options["case_title"], modeling_options=self.options["modeling_options"], approximate_branches=self.options["approximate_branches"], ), promotes=[ "total_capex", "total_capex_kW", "bos_capex", "installation_capex", "graph", "x_turbines", "y_turbines", "x_substations", "y_substations", # "plant_turbine_spacing", # "plant_row_spacing", ], ) # connect for key in variable_mapping.keys(): self.connect(key, f"orbit.{key}")