Source code for aviary.core.aviary_problem

import csv
import json
import os
import subprocess
import warnings
from copy import deepcopy
from datetime import datetime
from enum import Enum
from itertools import count
from pathlib import Path

import dymos as dm
import numpy as np
import openmdao
import openmdao.api as om
import openmdao.utils.hooks as hooks
from openmdao.utils.reports_system import _default_reports
from openmdao.utils.units import convert_units
from packaging import version

from aviary.core.aviary_group import AviaryGroup
from aviary.interface.utils import set_warning_format
from aviary.utils.aviary_values import AviaryValues
from aviary.utils.csv_data_file import write_data_file
from aviary.utils.functions import convert_strings_to_data, get_path
from aviary.utils.merge_variable_metadata import merge_meta_data
from aviary.utils.named_values import NamedValues
from aviary.variable_info.enums import EquationsOfMotion, LegacyCode, ProblemType, Verbosity
from aviary.variable_info.functions import setup_model_options
from aviary.variable_info.variable_meta_data import CoreMetaData
from aviary.variable_info.variables import Aircraft, Dynamic, Mission, Settings

FLOPS = LegacyCode.FLOPS
GASP = LegacyCode.GASP


[docs] class AviaryProblem(om.Problem): """ Main class for instantiating, formulating, and solving Aviary problems. This Problem object is a specialized OpenMDAO Problem that has additional methods to help users create and solve Aviary problems. Parameters ---------- problem_type : ProblemType, optional The type of problem to solve (e.g., SIZING, ALTERNATE, FALLOUT, MULTI_MISSION). If None, it will be set from the aircraft data when ``load_inputs`` is called. verbosity : Verbosity or int, optional Controls the level of terminal output. If None, the verbosity will be determined from the aircraft input data. meta_data : dict, optional Variable metadata used throughout the problem. Defaults to a copy of the Aviary base metadata. **kwargs : dict Additional keyword arguments passed to ``om.Problem.__init__``. Attributes ---------- timestamp : datetime Timestamp recorded when the problem was created. verbosity : Verbosity or None Controls the level of terminal output for the problem. problem_type : ProblemType or None The type of problem being solved. aviary_inputs : AviaryValues or None The loaded aircraft and mission input data. aviary_groups_dict : dict Dictionary mapping names to AviaryGroup instances, used for multi-mission problems. meta_data : dict Variable metadata used throughout the problem. generate_payload_range : bool Flag indicating whether a payload-range diagram should be generated after a sizing run. """
[docs] def __init__( self, problem_type: ProblemType = None, verbosity=None, meta_data=CoreMetaData.copy(), **kwargs, ): # Modify OpenMDAO's default_reports for this session. new_reports = [ 'subsystems', 'mission', 'timeseries_csv', 'run_status', 'sizing_results', 'input_checks', 'overridden_variables', ] for report in new_reports: if report not in _default_reports: _default_reports.append(report) super().__init__(**kwargs) self.timestamp = datetime.now() # If verbosity is set to anything but None, this defines how warnings are formatted for the # whole problem - warning format won't be updated if user requests a different verbosity # level for a specific method self.verbosity = verbosity set_warning_format(verbosity) self.problem_type = problem_type if problem_type == ProblemType.MULTI_MISSION: self.model = om.Group() else: self.model = AviaryGroup() # This causes problems where aviary_inputs are stored in different places for # multi-mission vs. standard mission self.aviary_inputs = None self.aviary_groups_dict = {} self.meta_data = meta_data # TODO try and find a better solution than a new custom flag - the issue is multimission # problems don't have a consistent variable path to check the inputs later on self.generate_payload_range = False
[docs] def load_inputs( self, aircraft_data, phase_info=None, problem_configurator=None, phase_info_modifier=None, meta_data=None, verbosity=None, ): """ Load aircraft data and phase information for the Aviary problem. This method loads the aviary_values inputs and options that the user specifies. Aircraft data can be provided as a file path or an existing AviaryValues object. Phase info is also loaded if provided; if ``phase_info`` is None, the appropriate default phase_info based on the mission analysis method is used. Parameters ---------- aircraft_data : str, Path, or AviaryValues Aircraft configuration data. Can be a path to a CSV file or an existing AviaryValues object. phase_info : dict, str, or Path, optional Dictionary defining mission phases, or a path to a Python file containing a ``phase_info`` dictionary. If None, the default phase_info for the selected equations of motion is used. problem_configurator : ProblemConfigurator, optional Custom problem configurator, required when using custom equations of motion. phase_info_modifier : callable, optional A function that takes the ``phase_info`` dictionary and aviary_inputs and returns a modified phase_info. meta_data : dict, optional Custom variable metadata. If provided, replaces the metadata currently stored on this problem. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. Returns ------- AviaryValues The processed aircraft and mission input data, also stored as ``self.aviary_inputs``. """ # We haven't read the input data yet, we don't know what desired run verbosity is # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # usually None if meta_data is not None: # Support for custom meta_data set. self.meta_data = meta_data # TODO: We cannot pass self.verbosity back up from load inputs for multi-mission because there could be multiple .csv files self.model.meta_data = self.meta_data aviary_inputs, verbosity = self.model.load_inputs( aircraft_data=aircraft_data, phase_info=phase_info, problem_configurator=problem_configurator, phase_info_modifier=phase_info_modifier, verbosity=verbosity, ) self.aviary_inputs = aviary_inputs self.verbosity = verbosity if self.problem_type is None: # if there are multiple load_inputs() calls, only the problem type from the first aviary_values is used self.problem_type = aviary_inputs.get_val(Settings.PROBLEM_TYPE) # TODO try and find a better solution than a new custom flag - the issue is multimission # problems don't have a consistent variable path to check the inputs later on # BUG you can't specify generating payload-range diagram via aviary_inputs after load_inputs if Settings.PAYLOAD_RANGE in aviary_inputs: self.generate_payload_range = aviary_inputs.get_val(Settings.PAYLOAD_RANGE) return self.aviary_inputs
[docs] def load_external_subsystems(self, external_subsystems: list = [], verbosity=None): """ Add external subsystems to the Aviary problem. For multi-mission problems, this adds the provided external subsystems to every AviaryGroup. Subsystem metadata is merged into the problem's metadata after loading. Parameters ---------- external_subsystems : list of SubsystemBuilder List of external subsystems to be added. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): group.load_external_subsystems( external_subsystems=external_subsystems, verbosity=verbosity ) self.meta_data = merge_meta_data([self.meta_data, group.meta_data]) else: self.model.load_external_subsystems( external_subsystems=external_subsystems, verbosity=verbosity ) self.meta_data = merge_meta_data([self.meta_data, self.model.meta_data])
[docs] def add_aviary_group( self, name: str, aircraft: AviaryValues, phase_info: dict, problem_configurator=None, phase_info_modifier=None, verbosity: Verbosity = Verbosity.BRIEF, ): """ Create and add an AviaryGroup for a multi-mission problem. This method creates an AviaryGroup for a specific aircraft and mission combination, loads and preprocesses its inputs, and merges its metadata into the problem-level metadata. Parameters ---------- name : str A unique name identifying this group, used to reference it later. aircraft : AviaryValues Defines the aircraft configuration for this mission. phase_info : dict Defines the mission phases the aircraft will fly. problem_configurator : ProblemConfigurator, optional Required when using custom equations of motion. See ``TwoDOFProblemConfigurator`` for an example. phase_info_modifier : callable, optional A function that takes the ``phase_info`` dictionary and aviary_inputs and returns a modified phase_info. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. Defaults to ``Verbosity.BRIEF``. Returns ------- AviaryGroup The AviaryGroup object containing the specified aircraft and mission configuration. Raises ------ ValueError If ``problem_type`` is not ``ProblemType.MULTI_MISSION``. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type is not ProblemType.MULTI_MISSION: ValueError( 'add_aviary_group() should only be called when ProblemType is MULTI_MISSION.' ) sub = self.model.add_subsystem(name, AviaryGroup()) sub.meta_data = self.meta_data sub.load_inputs( aircraft_data=aircraft, phase_info=phase_info, problem_configurator=problem_configurator, phase_info_modifier=phase_info_modifier, verbosity=verbosity, ) sub.check_and_preprocess_inputs() self.aviary_groups_dict[name] = sub if self.verbosity is None: # If problem-level verbosity was not defined, use the verbosity specified in the first # added AviaryGroup self.verbosity = sub.verbosity # TODO try and find a better solution than a new custom flag - the issue is multimission # problems don't have a consistent variable path to check the inputs later on if Settings.PAYLOAD_RANGE in sub.aviary_inputs: self.generate_payload_range = sub.aviary_inputs.get_val(Settings.PAYLOAD_RANGE) return sub
[docs] def check_and_preprocess_inputs(self, verbosity=None): """ Check and preprocess user-supplied input values. This method validates input values for potential problems, preprocesses them for use in the Aviary problem, and estimates initial conditions for mission analysis. For multi-mission problems, each AviaryGroup is checked and preprocessed independently. Parameters ---------- verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): group.check_and_preprocess_inputs(verbosity=verbosity) else: self.model.check_and_preprocess_inputs(verbosity=verbosity)
# TODO the third paragraph (talking about FLOPS vs GASP method) should be greatly simplified. # Eventually, that functionality will be moved into mission (issue #361)
[docs] def add_pre_mission_systems(self, verbosity=None): """ Add pre-mission systems to the Aviary problem. These systems are executed before the mission and typically include aircraft sizing, geometry, mass estimation, and takeoff calculations. For multi-mission problems, pre-mission systems are added to each AviaryGroup independently. Depending on the mission model specified (`FLOPS` or `GASP`), this method adds various subsystems to the aircraft model. For the `FLOPS` mission model, a takeoff phase is added using the Takeoff class with the number of engines and airport altitude specified. For the `GASP` mission model, three subsystems are added: a TaxiSegment subsystem, an ExecComp to calculate the time to initiate gear and flaps, and an ExecComp to calculate the speed at which to initiate rotation. All subsystems are promoted with aircraft and mission inputs and outputs as appropriate. Parameters ---------- verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): group.add_pre_mission_systems(verbosity=verbosity) else: self.model.add_pre_mission_systems(verbosity=verbosity)
[docs] def add_phases( self, parallel_phases=True, verbosity=None, ): """ Add mission phases to the problem trajectory. Adds Dymos phases to the trajectory based on the user-specified phase_info dictionary. For multi-mission problems, phases are added to each AviaryGroup independently. Parameters ---------- parallel_phases : bool, optional If True, the top-level container of all phases will be a ParallelGroup; otherwise it will be a standard OpenMDAO Group. Defaults to True. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. Returns ------- dm.Trajectory The Dymos Trajectory object containing the added mission phases. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): Traj = group.add_phases( parallel_phases=parallel_phases, verbosity=verbosity, comm=self.comm, ) else: Traj = self.model.add_phases( parallel_phases=parallel_phases, verbosity=verbosity, comm=self.comm, ) return Traj
# TODO the later paragraph (talking about FLOPS vs GASP method) should be greatly simplified. # Eventually, that functionality will be moved into mission (issue #361)
[docs] def add_post_mission_systems(self, verbosity=None): """ Add post-mission systems to the Aviary problem. These systems are executed after the mission in the model execution order and typically include landing calculations, fuel burn summation, reserve fuel calculations, and mission constraints. For multi-mission problems, post-mission systems are added to each AviaryGroup independently. Depending on the mission model specified (`FLOPS` or `GASP`), this method adds various subsystems to the aircraft model. For the `FLOPS` mission model, a landing phase is added using the Landing class with the wing area and lift coefficient specified, and a takeoff constraints ExecComp is added to enforce mass, range, velocity, and altitude continuity between the takeoff and climb phases. The landing subsystem is promoted with aircraft and mission inputs and outputs as appropriate, while the takeoff constraints ExecComp is only promoted with mission inputs and outputs. For the `GASP` mission model, four subsystems are added: a LandingSegment subsystem, an ExecComp to calculate the reserve fuel required, an ExecComp to calculate the overall fuel burn, and three ExecComps to calculate various mission objectives and constraints. All subsystems are promoted with aircraft and mission inputs and outputs as appropriate. Parameters ---------- verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): group.add_post_mission_systems(verbosity=verbosity) else: self.model.add_post_mission_systems(verbosity=verbosity)
[docs] def add_driver(self, optimizer=None, use_coloring=None, max_iter=None, verbosity=None): """ Add an optimization driver to the Aviary problem. Depending on the provided optimizer, the method instantiates the relevant driver (``ScipyOptimizeDriver`` or ``pyOptSparseDriver``) and sets optimizer-specific options. Options for SNOPT, IPOPT, and SLSQP are preconfigured. The method also allows for the declaration of coloring and setting debug print options based on verbosity. Parameters ---------- optimizer : str, optional The name of the optimizer to use. Can be ``'SLSQP'``, ``'SNOPT'``, ``'IPOPT'``, or others supported by OpenMDAO/pyOptSparse. If ``'SLSQP'``, a ``ScipyOptimizeDriver`` is used; otherwise a ``pyOptSparseDriver`` is used. Defaults to ``'IPOPT'``. use_coloring : bool, optional If True, the driver will declare coloring, which can speed up derivative computations. Defaults to True. max_iter : int, optional The maximum number of iterations allowed for the optimization process. Defaults to 50. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF # Set defaults for optimizer, use_coloring and max_iter if optimizer is None: optimizer = 'IPOPT' if use_coloring is None: use_coloring = True if max_iter is None: max_iter = 50 # check if optimizer is SLSQP if optimizer == 'SLSQP': driver = self.driver = om.ScipyOptimizeDriver() else: driver = self.driver = om.pyOptSparseDriver() driver.options['optimizer'] = optimizer if use_coloring: # define coloring options by verbosity if verbosity < Verbosity.VERBOSE: # QUIET, BRIEF driver.declare_coloring(show_summary=False) elif verbosity == Verbosity.VERBOSE: driver.declare_coloring(show_summary=True) else: # DEBUG driver.declare_coloring(show_summary=True, show_sparsity=True) if driver.options['optimizer'] == 'SNOPT': # Print Options # if verbosity == Verbosity.QUIET: isumm, iprint = 0, 0 elif verbosity == Verbosity.BRIEF: isumm, iprint = 6, 0 elif verbosity > Verbosity.BRIEF: # VERBOSE, DEBUG isumm, iprint = 6, 9 driver.opt_settings['iSumm'] = isumm driver.opt_settings['iPrint'] = iprint # Optimizer Settings # driver.opt_settings['Major iterations limit'] = max_iter driver.opt_settings['Major optimality tolerance'] = 1e-4 driver.opt_settings['Major feasibility tolerance'] = 1e-6 elif driver.options['optimizer'] == 'IPOPT': # Print Options # if verbosity == Verbosity.QUIET: print_level = 0 driver.opt_settings['print_user_options'] = 'no' elif verbosity == Verbosity.BRIEF: print_level = 3 # minimum to get exit status driver.opt_settings['print_user_options'] = 'no' driver.opt_settings['print_frequency_iter'] = 10 elif verbosity == Verbosity.VERBOSE: print_level = 5 else: # DEBUG print_level = 7 driver.opt_settings['print_level'] = print_level # Optimizer Settings # driver.opt_settings['tol'] = 1.0e-6 driver.opt_settings['mu_init'] = 1e-5 driver.opt_settings['max_iter'] = max_iter # for faster convergence driver.opt_settings['nlp_scaling_method'] = 'gradient-based' driver.opt_settings['alpha_for_y'] = 'safer-min-dual-infeas' driver.opt_settings['mu_strategy'] = 'monotone' # Shugo's recommended settings for robustness # driver.opt_settings['mu_init'] = 1.0 # driver.opt_settings['nlp_scaling_method'] = 'none' # driver.opt_settings['limited_memory_max_history'] = 50 elif driver.options['optimizer'] == 'SLSQP': # Print Options # if verbosity == Verbosity.QUIET: disp = False else: disp = True driver.options['disp'] = disp # Optimizer Settings # driver.options['tol'] = 1e-9 driver.options['maxiter'] = max_iter # pyoptsparse print settings for both SNOPT, IPOPT if optimizer in ('SNOPT', 'IPOPT'): if verbosity == Verbosity.QUIET: driver.options['print_results'] = False elif verbosity < Verbosity.DEBUG: # QUIET, BRIEF, VERBOSE driver.options['print_results'] = 'minimal' elif verbosity >= Verbosity.DEBUG: driver.options['print_opt_prob'] = True # optimizer agnostic settings if verbosity == Verbosity.DEBUG: driver.options['debug_print'] = [ 'desvars', 'ln_cons', 'nl_cons', 'objs', ]
# TODO docstring is much, much too long. This information should get moved to documentation
[docs] def add_design_variables(self, verbosity=None): """ Add design variables and related constraints to the Aviary problem. Delegates to each AviaryGroup's ``add_design_variables`` method, which selects the appropriate design variables and constraints based on the mission method and problem type. For multi-mission problems, design variables are added to each AviaryGroup independently. Parameters ---------- verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. Notes ----- If using the FLOPS model, a design variable is added for the gross mass of the aircraft, with a lower bound of 10 lbm and an upper bound of 900,000 lbm. If using the GASP model, the following design variables are added depending on the mission type: - the initial thrust-to-weight ratio of the aircraft during ascent - the duration of the ascent phase - the time constant for the landing gear actuation - the time constant for the flaps actuation In addition, two constraints are added for the GASP model: - the initial altitude of the aircraft with gear extended is constrained to be 50 ft - the initial altitude of the aircraft with flaps extended is constrained to be 400 ft If solving a sizing problem, a design variable is added for the gross mass of the aircraft, and another for the gross mass of the aircraft computed during the mission. A constraint is also added to ensure that the residual range is zero. If solving an OFF_DESIGN_MIN_FUEL problem, only a design variable for the gross mass of the aircraft computed during the mission is added. A constraint is also added to ensure that the residual range is zero. In all cases, a design variable is added for the final cruise mass of the aircraft, with no upper bound, and a residual mass constraint is added to ensure that the mass balances. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): group.add_design_variables(problem_type=self.problem_type, verbosity=verbosity) else: self.model.add_design_variables(problem_type=self.problem_type, verbosity=verbosity)
[docs] def add_objective(self, objective_type=None, ref=None, verbosity=None): """ Add the objective function to the Aviary problem. The objective is selected based on the provided ``objective_type`` string, or inferred from ``problem_type`` if no explicit type is given. .. note:: The ``ref`` value should be positive for values being minimized and negative for values being maximized. Parameters ---------- objective_type : str, optional The type of objective to add. Valid options are ``'mass'``, ``'time'``, ``'hybrid_objective'``, ``'fuel_burned'``, and ``'fuel'``. If None, the objective is chosen based on ``problem_type``. ref : float, optional The reference value for scaling the objective. If None, a default value is used based on the objective type. See ``default_ref_values`` in the method body for defaults. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. Raises ------ ValueError If an invalid ``objective_type`` or ``problem_type`` is provided. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF # This isn't really a fuel objective, it's a hybrid or compound objective self.model.add_subsystem( 'fuel_obj', om.ExecComp( 'reg_objective = overall_fuel/10000 + ascent_duration/30.', reg_objective={'val': 0.0, 'units': 'unitless'}, ascent_duration={'units': 's', 'shape': 1}, overall_fuel={'units': 'lbm'}, ), promotes_inputs=[ ('ascent_duration', Mission.Takeoff.ASCENT_DURATION), ('overall_fuel', Mission.TOTAL_FUEL), ], promotes_outputs=[('reg_objective', Mission.Objectives.FUEL)], ) # TODO: All references to self.model. will need to be updated self.model.add_subsystem( 'range_obj', om.ExecComp( 'reg_objective = -actual_range/1000 + ascent_duration/30.', reg_objective={'val': 0.0, 'units': 'unitless'}, ascent_duration={'units': 's', 'shape': 1}, actual_range={'val': self.model.target_range, 'units': 'NM'}, ), promotes_inputs=[ ('actual_range', Mission.RANGE), ('ascent_duration', Mission.Takeoff.ASCENT_DURATION), ], promotes_outputs=[('reg_objective', Mission.Objectives.RANGE)], ) # Dictionary for default reference values default_ref_values = { 'mass': -5e4, 'hybrid_objective': -5e4, 'fuel_burned': 1e4, 'fuel': 1e4, } # Check if an objective type is specified if objective_type is not None: ref = ref if ref is not None else default_ref_values.get(objective_type, 1) final_phase_name = self.model.regular_phases[-1] if objective_type == 'mass': self.model.add_objective( f'traj.{final_phase_name}.timeseries.{Dynamic.Vehicle.MASS}', index=-1, ref=ref, ) elif objective_type == 'time': self.model.add_objective( f'traj.{final_phase_name}.timeseries.time', index=-1, ref=ref ) elif objective_type == 'hybrid_objective': self._add_hybrid_objective(self.model.mission_info) self.model.add_objective('obj_comp.obj') elif objective_type == 'fuel_burned': self.model.add_objective(Mission.FUEL, ref=ref) elif objective_type == 'fuel': self.model.add_objective(Mission.Objectives.FUEL, ref=ref) else: raise ValueError( f"{objective_type} is not a valid objective. 'objective_type' must " 'be one of the following: mass, time, hybrid_objective, ' 'fuel_burned, or fuel' ) else: # If no 'objective_type' is specified, we handle based on 'problem_type' # If 'ref' is not specified, assign a default value ref = ref if ref is not None else 1 if self.problem_type is ProblemType.SIZING: self.model.add_objective(Mission.Objectives.FUEL, ref=ref) elif self.problem_type is ProblemType.OFF_DESIGN_MIN_FUEL: self.model.add_objective(Mission.Objectives.FUEL, ref=ref) elif self.problem_type is ProblemType.OFF_DESIGN_MAX_RANGE: # if ref > 0: # # Maximize range. # ref = -ref self.model.add_objective(Mission.Objectives.RANGE, ref=ref) else: raise ValueError(f'{self.problem_type} is not a valid problem type.')
[docs] def add_design_var_default( self, name: str, lower: float = None, upper: float = None, units: str = None, src_shape=None, default_val: float = None, ref: float = None, ): """ Add a design variable with an initialized default value. The default value can be overwritten after setup using ``prob.set_val()``. Parameters ---------- name : str The promoted name of the design variable. lower : float, optional The lower bound for the design variable. upper : float, optional The upper bound for the design variable. units : str, optional Units for the design variable. src_shape : int or tuple, optional Assumed shape of any connected source or higher-level promoted input. default_val : float or ndarray, optional The default value to assign to this design variable. ref : float or ndarray, optional Value of design variable that scales to 1.0 in the driver. """ self.model.add_design_var(name=name, lower=lower, upper=upper, units=units, ref=ref) if default_val is not None: self.model.set_input_defaults( name=name, val=default_val, units=units, src_shape=src_shape )
[docs] def set_design_range(self, missions: list[str], range: str): # TODO: What happens if design range is specified in CSV??? should be able to access from group.aviary_values """ Set the design range for a multi-mission problem. Finds the longest mission among those specified and sets its range as the design range for all AviaryGroups. The design range is used within Aviary for sizing subsystems (e.g., avionics, air conditioning). Parameters ---------- missions : list of str The names of missions instantiated via ``add_aviary_group()``. range : str The promoted name of the range variable to set (e.g., ``'Aircraft1.Range'``). """ matching_names = [ (name, group) for name, group in self.aviary_groups_dict.items() if name in missions ] design_range = [] # loop through all the phase_info and extract target ranges for name, group in matching_names: target_range, units = group.post_mission_info['target_range'] design_range.append(convert_units(target_range, units, 'nmi')) # TODO: loop through all the .csv files and extract Aircraft.Design.RANGE design_range_max = np.max(design_range) self.set_val(range, val=design_range_max, units='nmi')
[docs] def add_composite_objective(self, *args, ref: float = None): """ Add a composite objective by combining multiple model outputs. Creates a composite objective output by assembling an ``ExecComp`` based on a variety of AviaryGroup outputs selected by the user. Multiple outputs from the same or different aircraft models can be combined (e.g., fuel burn plus noise emissions). Each output can include a weight; if omitted, equal weighting is assumed. Parameters ---------- *args : str, tuple of (str, float), tuple of (str, str), or tuple of (str, str, float) Each argument specifies an objective component in one of the following forms: - ``output`` *str* - A single output variable name. - ``(output, weight)`` *tuple of (str, float)* - An output with its weight. - ``(model, output)`` *tuple of (str, str)* - A model name and output variable. - ``(model, output, weight)`` *tuple of (str, str, float)* - A model name, output variable, and weight. If no arguments are provided, the objective is inferred from ``problem_type``. Recognized shorthand strings for outputs include ``'fuel'``, ``'fuel_burned'``, ``'mass'``, ``'time'``, and ``'range'``. ref : float, optional Reference value for scaling the final composite objective. Raises ------ ValueError If an argument cannot be parsed into a valid (model, output, weight) tuple, or if a model name is provided without an associated output. """ # There are LOTS of different ways for the users to input str, 2-tuple, or 3-tuple into *args # Correct combinations are (output), (output, weight), (model, output), or (model, output, weight). # We have to catch every case and advise the user on how to corect their errors and add defaults as needed. default_model = 'model' default_weight = 1.0 objectives = [] for arg in args: if isinstance(arg, tuple) and len(arg) == 3: model, output, weight = arg if model not in self.aviary_groups_dict: raise ValueError( f'The first element specified in {arg} must be the model name.' ) elif isinstance(arg, tuple) and len(arg) == 2: first, second = arg if isinstance(first, str) and isinstance(second, str): if first in self.aviary_groups_dict: # we have the model and output but no weight model, output, weight = first, second, default_weight else: raise ValueError( f'The first element specified in {arg} must be the model name.' ) elif isinstance(first, str) and isinstance(second, (float, int)): if first in self.aviary_groups_dict: raise ValueError( f'When specifying {arg}, the user specified a model name and a weight ' f'but did not specify what output from that model the weight should be applied to.' ) else: # we have the output and the weight but not model model, output, weight = default_model, first, second else: raise ValueError( f'The user specified {arg} which is not a 2-tuple of (model, output) or (output, weight).' ) elif isinstance(arg, str): if arg in self.aviary_groups_dict: raise ValueError( f"When specifying '{arg}', the user provided only a model name " f'but did not specify what output from that model should be used as the objective.' ) else: # we have an output and we use the default model and weights model, output, weight = default_model, arg, default_weight # in some cases the users provides no input and we can derive the objectie from the problem type: elif self.model.problem_type is ProblemType.SIZING: model, output, weight = default_model, Mission.Objectives.FUEL, default_weight elif self.model.problem_type is ProblemType.OFF_DESIGN_MIN_FUEL: model, output, weight = default_model, Mission.Objectives.FUEL, default_weight elif self.model.problem_type is ProblemType.OFF_DESIGN_MAX_RANGE: model, output, weight = default_model, Mission.Objectives.RANGE, default_weight else: raise ValueError( f'Unrecognized objective format: {arg}. ' f'Each argument must be one of the following: ' f'(output), (output, weight), (model, output), or (model, output, weight).' f'Outputs can be from the variable meta data, or can be: fuel_burned, fuel' f'Or problem type must be set to SIZING, OFF_DESIGN_MIN_FUEL, or OFF_DESIGN_MAX_RANGE' ) objectives.append((model, output, weight)) # objectives = [ # ('model1', Mission.FUEL, 1), # ('model2', Mission.CO2, 1), # ... # ] # Dictionary for default reference values default_ref_values = { 'mass': -5e4, 'hybrid_objective': -5e4, 'fuel_burned': 1e4, 'fuel': 1e4, } # Now checkout the output and see if we have recognizable strings and replace them with the variable meta data name objectives_cleaned = [] for model, output, weight in objectives: if output == 'fuel_burned': output = Mission.FUEL # default scaling is valid only if this is the only argument and the ref has not yet been set if len(args) == 1 and ref == None: # set a default ref ref = default_ref_values['fuel_burned'] elif output == 'fuel': output = Mission.Objectives.FUEL if len(args) == 1 and ref == None: ref = default_ref_values['fuel'] elif output == 'mass': output = Mission.FINAL_MASS if len(args) == 1 and ref == None: ref = default_ref_values['mass'] elif output == 'time': output = Mission.FINAL_TIME elif output == 'range': output = Mission.RANGE # Unsure if this will work objectives_cleaned.append((model, output, weight)) # Create the calculation string for the ExecComp() and the promotion reference values weighted_exprs = [] connection_names = [] obj_inputs = [] total_weight = sum(weight for _, _, weight in objectives_cleaned) for model, output, weight in objectives_cleaned: output_safe = output.replace(':', '_') # we use "_" here because ExecComp() cannot intake "." obj_input = f'{model}_{output_safe}' obj_inputs.append(obj_input) weighted_exprs.append(f'{obj_input}*{weight}/{total_weight}') connection_names.append( [f'{model}.{output}', f'composite_function.{model}_{output_safe}'] ) final_expr = ' + '.join(weighted_exprs) # weighted_str looks like: 'model1_fuelburn*0.67*0.5 + model1_gross_mass*0.33*0.5 + model2_fuelburn*0.67*0.5 + model2_gross_mass*0.33*0.5' kwargs = {} if version.parse(openmdao.__version__) >= version.parse('3.40'): # We can get the correct unit from the source. This prevents a warning. kwargs = {k: {'units_by_conn': True} for k in obj_inputs} # adding composite execComp to super problem self.model.add_subsystem( 'composite_function', om.ExecComp('composite_objective = ' + final_expr, **kwargs), promotes_outputs=['composite_objective'], ) # connect from inside of the models to the composite objective for source, target in connection_names: self.model.connect(source, target) # finally add the objective self.model.add_objective('composite_objective', ref=ref)
[docs] def add_composite_objective_adv( self, missions: list[str], outputs: list[str], mission_weights: list[float] = None, output_weights: list[float] = None, ref: float = 1.0, ): """ Add a composite objective with independent mission and output weights. Aggregates output values across multiple mission models with independent weighting for both missions and outputs. This is useful when historical data is available on how often each mission is flown (``mission_weights``) and multiple objectives are desired per flight (``output_weights``). Parameters ---------- missions : list of str Subsystem names corresponding to different missions (e.g., ``['model1', 'model2']``). outputs : list of str Output variable names to include from each mission (e.g., ``[Mission.FUEL, Mission.GROSS_MASS]``). mission_weights : list of float, optional Weights assigned to each mission. Normalized internally to sum to 1.0. If None, equal weighting is assumed. output_weights : list of float, optional Weights assigned to each output variable. Normalized internally to sum to 1.0. If None, equal weighting is assumed. ref : float, optional Reference value for scaling the final composite objective. Defaults to 1.0. """ # Setup mission and output lengths if they are not already given if mission_weights is None: mission_weights = np.ones(len(missions)) if output_weights is None: output_weights = np.ones(len(outputs)) # # Make an ExecComp # for mission in missions: # for output in outputs: # weights are normalized - e.g. for given weights 3:1, the normalized # weights are 0.75:0.25 # TODO: Remove before push # output_weights = [2,1] # mission_weights = [1,1] # missions = ['model1','model2'] # outputs = ['fuelburn','gross_mass'] weighted_exprs = [] connection_names = [] output_weights = [float(weight / sum(output_weights)) for weight in output_weights] mission_weights = [float(weight / sum(mission_weights)) for weight in mission_weights] for mission, mission_weight in zip(missions, mission_weights): for output, output_weight in zip(outputs, output_weights): connection_names.append( [f'composite_function.{mission}_{output}', f'{mission}.{output}'] ) weighted_exprs.append(f'{mission}_{output}*{output_weight}*{mission_weight}') final_expr = ' + '.join(weighted_exprs) # weighted_str looks like: 'model1.fuelburn*0.67*0.5 + model1.gross_mass*0.33*0.5 + model2.fuelburn*0.67*0.5 + model2.gross_mass*0.33*0.5' # adding composite execComp to super problem self.model.add_subsystem( 'composite_function', om.ExecComp('composite_objective = ' + final_expr, has_diag_partials=True), promotes_outputs=['composite_objective'], ) # connect from inside of the models to the composite objective for target, source in connection_names: self.model.connect(target, source) # finally add the objective self.model.add_objective('composite_objective', ref=ref)
[docs] def build_model(self, verbosity=None): """ Build the complete Aviary model in a single call. This is a convenience method that calls ``add_pre_mission_systems()``, ``add_phases()``, ``add_post_mission_systems()``, and ``link_phases()`` in sequence. If finer control is needed, users should call these four methods individually. Parameters ---------- verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): group.add_pre_mission_systems(verbosity=verbosity) group.add_phases(verbosity=verbosity, comm=self.comm) group.add_post_mission_systems(verbosity=verbosity) group.link_phases(verbosity=verbosity, comm=self.comm) else: self.model.add_pre_mission_systems(verbosity=verbosity) self.model.add_phases(verbosity=verbosity, comm=self.comm) self.model.add_post_mission_systems(verbosity=verbosity) self.model.link_phases(verbosity=verbosity, comm=self.comm)
[docs] def promote_inputs(self, mission_names: list[str], var_pairs: list[tuple[str, str]]): """ Promote inputs from specific AviaryGroups to the top-level model. For multi-mission problems, this method promotes input variables from named AviaryGroups so they can be connected at the top level. Parameters ---------- mission_names : list of str The subsystem names of the AviaryGroups whose inputs should be promoted. var_pairs : list of tuple of (str, str) Each pair is ``(input_name_in_group, top_level_name_to_use)``. """ for name, group in self.aviary_groups_dict.items(): for mission_name in mission_names: if name == mission_name: # the group name matches the mission name, # group.promotes(var_pairs) # print("var_pairs",var_pairs) self.model.promotes(mission_name, inputs=var_pairs)
[docs] def setup(self, **kwargs): """ Set up the Aviary problem. Configures model options, calls OpenMDAO's ``setup()``, and applies initial guesses to seed the problem with reasonable starting values. Parameters ---------- **kwargs : dict Additional keyword arguments passed to ``om.Problem.setup()``. Note: a ``verbosity`` keyword, if present, will be ignored, as this method does not require it. """ # verbosity is not used in this method, but it is understandable that a user might try and # include it (only method that doesn't accept it). if 'verbosity' in kwargs: kwargs.pop('verbosity') # Use OpenMDAO's model options to pass all options through the system hierarchy. if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): setup_model_options( self, group.aviary_inputs, group.meta_data, prefix=name, group=group ) with warnings.catch_warnings(): # group.aviary_inputs is already set group.meta_data = self.meta_data # <- meta_data is the same for all groups # group.phase_info is already set else: setup_model_options(self, self.aviary_inputs, self.meta_data) # suppress warnings: # "input variable '...' promoted using '*' was already promoted using 'aircraft:*' with warnings.catch_warnings(): self.model.aviary_inputs = ( self.aviary_inputs ) # <- there is only one aviary_inputs in this case self.model.meta_data = self.meta_data # self.model.phase_info is already set with warnings.catch_warnings(): warnings.simplefilter('ignore', om.OpenMDAOWarning) warnings.simplefilter('ignore', om.PromotionWarning) super().setup(**kwargs) self.set_initial_guesses(verbosity=None)
[docs] def set_initial_guesses(self, parent_prob=None, parent_prefix='', verbosity=None): """ Set initial guesses for trajectory states and controls. Calls ``set_val`` on the trajectory to seed the problem with reasonable initial guesses. This is especially important for collocation methods. For multi-mission problems, initial guesses are set on each AviaryGroup independently. Parameters ---------- parent_prob : om.Problem, optional If provided along with ``parent_prefix``, initial guesses are set on this parent problem instead of ``self``. Used internally for nested problem setups. parent_prefix : str, optional Prefix prepended to variable paths when setting values on ``parent_prob``. Defaults to ``''``. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if self.problem_type == ProblemType.MULTI_MISSION: for name, group in self.aviary_groups_dict.items(): group.set_initial_guesses( parent_prob=parent_prob, parent_prefix=parent_prefix, verbosity=verbosity, ) else: self.model.set_initial_guesses( parent_prob=parent_prob, parent_prefix=parent_prefix, verbosity=verbosity, )
[docs] def run_aviary_problem( self, restart_filename=None, suppress_solver_print=True, run_driver=True, simulate=False, make_plots=True, verbosity=None, real_time_plotting=False, ): """ Run the Aviary problem. Executes the problem, which may be a simulation, optimization, or single-pass model evaluation depending on the arguments provided. After execution, an N2 diagram is generated and, if applicable, a payload-range analysis is run. Parameters ---------- restart_filename : str, optional Path to a file containing previously computed solutions to use as starting points. suppress_solver_print : bool, optional If True, all solver print statements are suppressed. Defaults to True. run_driver : bool, optional If True, the optimizer is executed. If False, the problem runs a single pass (equivalent to ``run_model``). Defaults to True. simulate : bool, optional If True, an explicit Dymos simulation is performed after optimization. Defaults to False. make_plots : bool, optional If True, Dymos HTML timeseries plots are generated. Defaults to True. verbosity : Verbosity or int, optional Controls the level of terminal output for this method. If None, uses the problem-level verbosity. real_time_plotting : bool, optional If True, enables real-time plotting of the optimization progress. """ # `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity # override for just this method if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if ( verbosity >= Verbosity.VERBOSE or real_time_plotting ): # If real_time_plotting needs a driver recorder file to run the realtime plot server recorder = om.SqliteRecorder('optimization_history.db') self.driver.add_recorder(recorder) self.final_setup() if verbosity >= Verbosity.VERBOSE: # VERBOSE, DEBUG with open(self.get_reports_dir() / 'input_list.txt', 'w') as outfile: self.model.list_inputs(out_stream=outfile) def _view_realtime_plot_hook(driver): case_recorder_file = str(driver._rec_mgr._recorders[0]._filepath) cmd = ['openmdao', 'realtime_plot', '--pid', str(os.getpid()), case_recorder_file] cp = subprocess.Popen(cmd) # nosec: trusted input # Do a quick non-blocking check to see if it immediately failed # This will catch immediate failures but won't wait for the process to finish quick_check = cp.poll() if quick_check is not None and quick_check != 0: # Process already terminated with an error stderr = cp.stderr.read().decode() raise RuntimeError( f'Failed to start up the realtime plot server with code {quick_check}: {stderr}.' ) # register the hook to stat up the real-time plot server if real_time_plotting: if not self.driver: raise RuntimeError( 'Unable to run realtime optimization progress plot because no Driver' ) hooks._register_hook( '_setup_recording', 'Driver', post=_view_realtime_plot_hook, ncalls=1 ) hooks._setup_hooks(self.driver) if suppress_solver_print: self.set_solver_print(level=0) # and run mission, and dynamics if run_driver: self.result = dm.run_problem( self, run_driver=run_driver, simulate=simulate, make_plots=make_plots, solution_record_file='problem_history.db', restart=restart_filename, ) # Manually print out a failure message for low verbosity modes that suppress # optimizer printouts, which may include the results message. Assumes success, # alerts user on a failure if ( not self.result.success and verbosity <= Verbosity.BRIEF # QUIET, BRIEF ): warnings.warn('\nAviary run failed. See the dashboard for more details.\n') else: self.run_model() self.result = self.driver.result # update n2 diagram after run. outdir = Path(self.get_reports_dir(force=True)) outfile = os.path.join(outdir, 'n2.html') om.n2( self, outfile=outfile, show_browser=False, ) if verbosity >= Verbosity.VERBOSE: # VERBOSE, DEBUG with open(Path(self.get_reports_dir()) / 'output_list.txt', 'w') as outfile: self.model.list_outputs(out_stream=outfile) if self.generate_payload_range and self.problem_type == ProblemType.SIZING: self.run_payload_range()
[docs] def run_off_design_mission( self, problem_type: ProblemType, phase_info=None, equations_of_motion: EquationsOfMotion = None, problem_configurator=None, num_first_class=None, num_business=None, num_economy=None, num_pax=None, wing_cargo=None, misc_cargo=None, cargo_mass=None, mission_gross_mass=None, mission_range=None, optimizer=None, name=None, fill_cargo=False, fill_fuel=False, verbosity=None, ): """ Run an off-design mission using a previously sized aircraft. Creates a new AviaryProblem configured for the specified off-design mission type, inheriting the aircraft design from the current problem. Parameters ---------- problem_type : ProblemType or str The type of off-design mission to fly. Must be ``ALTERNATE`` or ``FALLOUT``; ``SIZING`` is not allowed. phase_info : dict, optional Phase info for the off-design mission. If None, the phase info from the previous Aviary run is reused (with extended cruise bounds where applicable). equations_of_motion : EquationsOfMotion, optional Equations of motion for the off-design mission. If None, the equations from the previous run are used. problem_configurator : ProblemConfigurator, optional Problem configurator for the off-design mission. If None, the configurator from the previous run is used. num_first_class : int, optional Number of first-class passengers. FLOPS mass method only. num_business : int, optional Number of business-class passengers. FLOPS mass method only. num_economy : int, optional Number of economy-class passengers. FLOPS mass method only. num_pax : int, optional Total number of passengers. Optional when using FLOPS mass method if per-class counts are provided instead. wing_cargo : float, optional Mass of wing cargo, in lbm. FLOPS mass method only. misc_cargo : float, optional Mass of miscellaneous cargo, in lbm. FLOPS mass method only. cargo_mass : float, optional Total cargo mass, in lbm. When using FLOPS mass, this overrides the sum of wing and misc cargo. mission_gross_mass : float, optional Gross mass for the off-design mission, in lbm. Defaults to the design gross mass. For OFF_DESIGN_MIN_FUEL missions, this is the initial guess. mission_range : float, optional Fixed range for OFF_DESIGN_MIN_FUEL missions, in nautical miles. Unused for other mission types. optimizer : str, optional Optimizer to use. If None, the optimizer from the sizing run is reused; if unavailable, defaults to ``'SLSQP'``. name : str, optional Name of the off-design problem. Defaults to ``'{original_name}_off_design'``, with an incrementing suffix if the output directory already exists. fill_cargo : bool, optional If True, cargo mass is varied to fill the aircraft to design takeoff gross weight. Cannot be used with ``fill_fuel``. Defaults to False. fill_fuel : bool, optional If True, takeoff gross mass is added as a design variable. Cannot be used with ``fill_cargo``. Defaults to False. verbosity : Verbosity or int, optional Controls the level of terminal output for the off-design run. Returns ------- AviaryProblem The completed off-design AviaryProblem after running. Raises ------ UserWarning If ``problem_type`` is ``SIZING``, or if both ``fill_cargo`` and ``fill_fuel`` are True. """ # For off-design missions, provided verbosity will be used for all L2 method calls if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF # accept str for problem type problem_type = ProblemType(problem_type) if problem_type is ProblemType.SIZING: raise UserWarning('Off-design missions cannot be SIZING missions.') if fill_cargo and fill_fuel: raise UserWarning( 'Cannot run an off-design mission with both "fill_cargo" and "fill_fuel" flags ' 'active.' ) if name is None: # increment the name if the output directory already exists # to avoid overwriting previous off-design runs base_name = self._name + '_off_design' for design_number in count(0): name = base_name if design_number == 0 else f'{base_name}_{design_number}' output_path = Path(f'{name}_out') if not output_path.is_dir(): break off_design_prob = AviaryProblem(name=name) # Set up problem for mission, such as equations of motion, configurators, etc. inputs = deepcopy(self.aviary_inputs) design_gross_mass = self.get_val(Aircraft.Design.GROSS_MASS, units='lbm')[0] inputs.set_val(Aircraft.Design.GROSS_MASS, design_gross_mass, units='lbm') if problem_type is not None: inputs.set_val(Settings.PROBLEM_TYPE, problem_type) if equations_of_motion is not None: inputs.set_val(Settings.EQUATIONS_OF_MOTION, equations_of_motion) if problem_configurator is not None: off_design_prob.model.configurator = problem_configurator if phase_info is None: # model phase_info only contains mission information, recreate the whole thing here phase_info = self.model.mission_info.copy() phase_info['pre_mission'] = self.model.pre_mission_info.copy() phase_info['post_mission'] = self.model.post_mission_info.copy() # update passenger count and cargo masses mass_method = inputs.get_val(Settings.MASS_METHOD) # Sanity check passenger counts - cover specific edge case that preprocessors won't catch # Since we inherit mission pax count from sizing mission, we need to overwrite it if ( mass_method is FLOPS and num_pax is None and not any((num_economy, num_business, num_first_class)) ): num_pax = sum(filter(None, [num_economy, num_business, num_first_class])) # only FLOPS cares about seat class or specific cargo categories if mass_method == LegacyCode.FLOPS: if num_first_class is not None: inputs.set_val(Aircraft.CrewPayload.NUM_FIRST_CLASS, num_first_class) if num_business is not None: inputs.set_val(Aircraft.CrewPayload.NUM_BUSINESS_CLASS, num_business) if num_economy is not None: inputs.set_val(Aircraft.CrewPayload.NUM_ECONOMY_CLASS, num_economy) if wing_cargo is not None: inputs.set_val(Aircraft.CrewPayload.WING_CARGO, wing_cargo, 'lbm') if misc_cargo is not None: inputs.set_val(Aircraft.CrewPayload.MISC_CARGO, misc_cargo, 'lbm') else: warnings.warn( 'Off-design functionality is in beta for GASP-mass based aircraft. Please manually ' 'verify your results.' ) if num_pax is not None: inputs.set_val(Aircraft.CrewPayload.NUM_PASSENGERS, num_pax) if cargo_mass is not None: inputs.set_val(Aircraft.CrewPayload.CARGO_MASS, cargo_mass, 'lbm') # NOTE once load_inputs is run, phase info details are stored in prob.model.configurator, # meaning any phase_info changes that happen after load inputs is ignored if problem_type is ProblemType.OFF_DESIGN_MIN_FUEL: # Set mission range, aviary will calculate required fuel if mission_range is None: if verbosity >= Verbosity.VERBOSE: warnings.warn( 'OFF_DESIGN_MIN_FUEL problem type requested with no specified range. Using design ' 'mission range for the off-design mission.' ) mission_range = self.get_val(Mission.RANGE, units='NM')[0] phase_info['post_mission']['target_range'] = ( mission_range, 'nmi', ) # reset the AviaryProblem to run the new mission off_design_prob.load_inputs(inputs, phase_info, verbosity=verbosity) # Update inputs that are specific to problem type # Some OFF_DESIGN_MIN_FUEL problem changes had to happen before load_inputs, all OFF_DESIGN_MAX_RANGE problem # changes must come after load_inputs if problem_type is ProblemType.OFF_DESIGN_MIN_FUEL: off_design_prob.aviary_inputs.set_val(Mission.RANGE, mission_range, units='NM') # set initial guess for Mission.GROSS_MASS to help optimizer with new design # variable bounds. if mission_gross_mass is None: mission_gross_mass = off_design_prob.aviary_inputs.get_val( Aircraft.Design.GROSS_MASS, 'lbm' ) off_design_prob.aviary_inputs.set_val( Mission.GROSS_MASS, mission_gross_mass * 0.9, units='lbm' ) elif problem_type is ProblemType.OFF_DESIGN_MAX_RANGE: # Set mission fuel and calculate gross weight, aviary will calculate range if mission_gross_mass is None: if verbosity >= Verbosity.VERBOSE: warnings.warn( 'OFF_DESIGN_MAX_RANGE problem type requested with no specified gross mass. Using design ' 'takeoff gross mass for the off-design mission.' ) mission_gross_mass = self.get_val(Aircraft.Design.GROSS_MASS, units='lbm')[0] off_design_prob.aviary_inputs.set_val( Mission.GROSS_MASS, mission_gross_mass, units='lbm' ) off_design_prob.check_and_preprocess_inputs(verbosity=verbosity) # off_design_prob.add_pre_mission_systems(verbosity=verbosity) # off_design_prob.add_phases(verbosity=verbosity) # off_design_prob.add_post_mission_systems(verbosity=verbosity) # off_design_prob.link_phases(verbosity=verbosity) off_design_prob.build_model(verbosity=verbosity) if optimizer is None: try: optimizer = self.driver.options['optimizer'] except KeyError: optimizer = None try: if optimizer == 'SNOPT': max_iter = self.driver.opt_settings['Major iterations limit'] elif optimizer == 'IPOPT': max_iter = self.driver.opt_settings['max_iter'] elif optimizer == 'SLSQP': max_iter = self.driver.opt_settings['maxiter'] else: max_iter = None except KeyError: max_iter = None off_design_prob.add_driver(optimizer=optimizer, max_iter=max_iter, verbosity=verbosity) off_design_prob.add_design_variables(verbosity=verbosity) # Handle edge case for payload-range diagrams # Select which cargo variable makes the most sense to float, and then set a tolerance # based on rough guesses on what is sufficient to get the problem to converge without # setting design variable bounds too large if fill_cargo: # GASP cargo mass is an input, can directly use as control variable if mass_method is GASP: control_var = Aircraft.CrewPayload.CARGO_MASS val = cargo_mass tol = 1.05 # FLOPS cargo mass is an output, not valid for control variable. Pick control var. else: # See if misc_cargo is being used, float that as a backup if misc_cargo is None or misc_cargo == 0: # We aren't using cargo_mass OR misc_mass - try wing cargo as last resort if wing_cargo is None or wing_cargo == 0: # We don't know enough about the aircraft to make any informed guesses. Use # arbitrary values control_var = Aircraft.CrewPayload.MISC_CARGO val = self.get_val(Aircraft.Design.GROSS_MASS) tol = 0.05 inputs.set_val(Aircraft.CrewPayload.CARGO_MASS, 0, 'lbm') else: control_var = Aircraft.CrewPayload.WING_CARGO val = wing_cargo tol = 1.1 else: control_var = Aircraft.CrewPayload.MISC_CARGO val = misc_cargo tol = 1.1 off_design_prob.model.add_design_var( control_var, lower=0, upper=val * tol, ref=val, ) if fill_fuel: off_design_prob.model.add_design_var( Mission.GROSS_MASS, lower=0, upper=off_design_prob.aviary_inputs.get_val( Aircraft.Design.GROSS_MASS, units='lbm' ), ref=off_design_prob.aviary_inputs.get_val(Aircraft.Design.GROSS_MASS, units='lbm'), ) off_design_prob.add_objective(verbosity=verbosity) off_design_prob.setup(verbosity=verbosity) off_design_prob.set_initial_guesses(verbosity=verbosity) off_design_prob.run_aviary_problem(verbosity=verbosity) return off_design_prob
[docs] def run_payload_range(self, verbosity=None): """ Run payload-range analysis for the sized aircraft. For a successfully converged sizing mission, this method runs additional off-design missions to generate payload-range diagram data points: zero fuel, design mission, max fuel + payload range, and ferry range. Results are saved to a CSV file in the reports directory. Parameters ---------- verbosity : Verbosity or int, optional Controls the level of terminal output for the payload-range analysis. If None, uses the problem-level verbosity. Returns ------- tuple of AviaryProblem, or empty tuple A tuple ``(max_fuel_pyld_range_prob, ferry_range_prob)`` containing the off-design AviaryProblems for the max fuel + payload and ferry range points. Returns an empty tuple if the sizing run did not converge or the problem type is not supported. Notes ----- Currently only supported for the energy state equations of motion. Reserve fuel is not yet accounted for in the analysis. """ # For off-design missions, provided verbosity will be used for all L2 method calls if verbosity is not None: # compatibility with being passed int for verbosity verbosity = Verbosity(verbosity) else: verbosity = self.verbosity # defaults to BRIEF if not self.result.success and verbosity > Verbosity.QUIET: warnings.warn( 'Payload Range Diagrams cannot be generated for unconverged Aviary problems.' ) return () elif self.problem_type is ProblemType.MULTI_MISSION and verbosity > Verbosity.QUIET: warnings.warn( 'Payload Range Diagrams currently cannot be generated for aircraft designed ' 'using multimission capability.' ) return () # Off-design missions are not tested with 2DOF missions. mass_method = self.model.aviary_inputs.get_val(Settings.MASS_METHOD) equations_of_motion = self.model.aviary_inputs.get_val(Settings.EQUATIONS_OF_MOTION) if equations_of_motion is EquationsOfMotion.ENERGY_STATE: # make a copy of the phase_info to avoid modifying the original. phase_info = self.model.mission_info.copy() phase_info['pre_mission'] = self.model.pre_mission_info.copy() phase_info['post_mission'] = self.model.post_mission_info.copy() # This checks if the 'cruise' phase exists, then automatically extends duration bounds # of the cruise stage to allow for the longer off design missions. if phase_info['cruise']: min_duration = phase_info['cruise']['user_options']['time_duration_bounds'][0][0] max_duration = phase_info['cruise']['user_options']['time_duration_bounds'][0][1] cruise_units = phase_info['cruise']['user_options']['time_duration_bounds'][1] phase_info['cruise']['user_options'].update( {'time_duration_bounds': ((min_duration, 2 * max_duration), cruise_units)} ) # TODO Verify that previously run point is actually max payload/fuel point, and if not # run off-design mission for that point # Point 1 is along the y axis (range=0) payload_1 = float(self.get_val(Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS)[0]) range_1 = 0 fuel_1 = 0 # Point 2 (Design Range): sizing mission which is assumed to be the point of max # payload + fuel on the payload and range diagram payload_2 = payload_1 range_2 = float(self.get_val(Mission.RANGE)[0]) gross_mass = float(self.get_val(Mission.GROSS_MASS)[0]) # NOTE this operating mass is based on the previously run mission - assumed this is the # design mission!! Includes cargo containers needed for design (max payload) operating_mass = float(self.get_val(Mission.OPERATING_MASS)[0]) fuel_capacity = float(self.get_val(Aircraft.Fuel.TOTAL_CAPACITY)[0]) unusable_fuel = float(self.get_val(Aircraft.Fuel.UNUSABLE_FUEL_MASS)[0]) max_payload = float(self.get_val(Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS)[0]) fuel_2 = self.get_val(Mission.FUEL)[0] # Operating mass includes unusable fuel, don't double count max_usable_fuel = fuel_capacity - unusable_fuel # An aircraft may be designed with fuel tank capacity that, if fully filled, would # exceed MTOW. In that scenario, 'Max Fuel + Payload' range and 'Ferry' range are the same, and # the point only needs to be run once. if operating_mass + max_usable_fuel < gross_mass: # Point 3 (Max Fuel + Payload Range): max fuel and remaining payload capacity # Assume proportional decrease in all cargo types (including number of passengers) # to make room for maximum fuel. Round pax count down to avoid loading over TOGW max_fuel_pyld_total_payload = gross_mass - operating_mass - max_usable_fuel payload_frac = max_fuel_pyld_total_payload / max_payload # Calculates Different payload quantities max_fuel_pyld_wing_cargo = ( self.model.aviary_inputs.get_val(Aircraft.CrewPayload.WING_CARGO, 'lbm') * payload_frac ) max_fuel_pyld_misc_cargo = ( self.model.aviary_inputs.get_val(Aircraft.CrewPayload.MISC_CARGO, 'lbm') * payload_frac ) max_fuel_pyld_num_first = int( self.model.aviary_inputs.get_val(Aircraft.CrewPayload.Design.NUM_FIRST_CLASS) * payload_frac ) max_fuel_pyld_num_bus = int( self.model.aviary_inputs.get_val(Aircraft.CrewPayload.Design.NUM_BUSINESS_CLASS) * payload_frac ) max_fuel_pyld_num_economy = int( self.model.aviary_inputs.get_val(Aircraft.CrewPayload.Design.NUM_ECONOMY_CLASS) * payload_frac ) # Passenger number rounding and potentially cargo container mass changing means # we don't know if we actually filled the aircraft to exactly TOGW yet. Need to use # "fill_cargo" flag in off-design call max_fuel_pyld_range_prob = self.max_fuel_pyld_range_prob = ( self.run_off_design_mission( problem_type=ProblemType.OFF_DESIGN_MAX_RANGE, phase_info=phase_info, num_first_class=max_fuel_pyld_num_first, num_business=max_fuel_pyld_num_bus, num_economy=max_fuel_pyld_num_economy, wing_cargo=max_fuel_pyld_wing_cargo, misc_cargo=max_fuel_pyld_misc_cargo, name=self._name + '_max_fuel_plus_payload_range', fill_cargo=True, verbosity=verbosity, ) ) # Pull the payload and range values from the OFF_DESIGN_MAX_RANGE mission payload_3 = max_fuel_pyld_range_prob.get_val( Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS )[0] range_3 = max_fuel_pyld_range_prob.get_val(Mission.RANGE)[0] fuel_3 = max_fuel_pyld_range_prob.get_val(Mission.FUEL)[0] prob_3_skip = False else: prob_3_skip = True # only fill fuel until hit TOGW max_usable_fuel = gross_mass - operating_mass # Point 4 (Ferry Range): maximum fuel and 0 payload # Total cargo mass is an input in GASP, but an output in FLOPS. Avoid overriding cargo # mass to 0 if not using GASP if mass_method is GASP: ferry_cargo_mass = 0 else: ferry_cargo_mass = None ferry_range_gross_mass = operating_mass + max_usable_fuel ferry_range_prob = self.ferry_range_prob = self.run_off_design_mission( problem_type=ProblemType.OFF_DESIGN_MAX_RANGE, phase_info=phase_info, num_first_class=0, num_business=0, num_economy=0, wing_cargo=0, misc_cargo=0, cargo_mass=ferry_cargo_mass, mission_gross_mass=ferry_range_gross_mass, name=self._name + '_ferry_range', fill_fuel=True, verbosity=verbosity, ) payload_4 = ferry_range_prob.get_val(Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS)[0] range_4 = ferry_range_prob.get_val(Mission.RANGE)[0] fuel_4 = ferry_range_prob.get_val(Mission.FUEL)[0] # if max fuel + payload mission was skipped, max_fuel_pyld_range_prob is the same as ferry_range_prob if prob_3_skip: max_fuel_pyld_range_prob = ferry_range_prob payload_3 = payload_4 range_3 = range_4 # Check if OFF_DESIGN_MAX_RANGE missions ran successfully before writing to csv file # If both missions ran successfully, writes the payload/range data to a csv file self.payload_range_data = payload_range_data = NamedValues() if ferry_range_prob.result.success and max_fuel_pyld_range_prob.result.success: payload_range_data.set_val( 'Mission Name', ['Zero Fuel', 'Design Mission', 'Max Fuel + Payload Mission', 'Ferry Mission'], ) payload_range_data.set_val( 'Payload', [payload_1, payload_2, payload_3, payload_4], 'lbm' ) payload_range_data.set_val('Fuel', [fuel_1, fuel_2, fuel_3, fuel_4], 'lbm') payload_range_data.set_val('Range', [range_1, range_2, range_3, range_4], 'NM') write_data_file( Path(self.get_reports_dir(force=True)) / 'payload_range_data.csv', payload_range_data, ) # Prints the payload/range data to the console if verbosity is set to VERBOSE or DEBUG if verbosity >= Verbosity.VERBOSE: for item in payload_range_data: print(f'{item[0]} ({item[1][1]}): {item[1][0]}') return (max_fuel_pyld_range_prob, ferry_range_prob) else: warnings.warn( 'One or both of the OFF_DESIGN_MAX_RANGE missions did not run successfully; payload/range ' 'diagram was not generated.' ) else: warnings.warn( 'Payload/range analysis is currently only supported for the energy equations of ' 'motion.' )
[docs] def save_results(self, json_filename='sizing_results.json'): """ Save the Aviary problem results to a JSON file. Serializes the aviary_inputs and key mission results (including gross mass) into a JSON file that can be reloaded later using ``reload_aviary_problem`` or ``_read_sizing_json``. Parameters ---------- json_filename : str, optional File name (and relative path) for the output JSON file. Defaults to ``'sizing_results.json'``. """ aviary_input_list = [] with open(json_filename, 'w') as jsonfile: # Loop through aviary input datastructure and create a list for data in self.model.aviary_inputs: (name, (value, units)) = data type_value = type(value) # Get the gross mass value from the sizing problem and add it to input list if name == Mission.GROSS_MASS or name == Aircraft.Design.GROSS_MASS: Mission_Summary_GROSS_MASS_val = self.get_val(Mission.GROSS_MASS, units=units) Mission_Summary_GROSS_MASS_val_list = Mission_Summary_GROSS_MASS_val.tolist() value = Mission_Summary_GROSS_MASS_val_list[0] else: # there are different data types we need to handle for conversion to json format # int, bool, float doesn't need anything special # Convert numpy arrays to lists if type_value is np.ndarray: value = value.tolist() # Lists are fine except if they contain enums or Paths if type_value is list: if isinstance(value[0], Enum): for i in range(len(value)): value[i] = value[i].name elif isinstance(value[0], Path): for i in range(len(value)): value[i] = str(value[i]) # Enums and Paths need converting to a string if isinstance(value, Enum): value = value.name elif isinstance(value, Path): value = str(value) # Append the data to the list aviary_input_list.append([name, value, units, str(type_value)]) if Aircraft.Design.GROSS_MASS not in self.model.aviary_inputs: aviary_input_list.append( [ Aircraft.Design.GROSS_MASS, self.get_val(Aircraft.Design.GROSS_MASS, 'lbm')[0], 'lbm', str(float), ] ) # Write the list to a json file json.dump( aviary_input_list, jsonfile, sort_keys=True, indent=4, ensure_ascii=False, ) jsonfile.close()
def _add_hybrid_objective(self, phase_info): """ Add a hybrid objective that combines final mass and final time. Parameters ---------- phase_info : dict Dictionary of mission phase information, keyed by phase name. """ phases = list(phase_info.keys()) takeoff_mass = self.model.aviary_inputs.get_val(Aircraft.Design.GROSS_MASS, units='lbm') obj_comp = om.ExecComp( f'obj = -final_mass / {takeoff_mass} + final_time / 5.', final_mass={'units': 'lbm'}, final_time={'units': 'h'}, ) self.model.add_subsystem('obj_comp', obj_comp) final_phase_name = phases[-1] self.model.connect( f'traj.{final_phase_name}.timeseries.mass', 'obj_comp.final_mass', src_indices=[-1], ) self.model.connect( f'traj.{final_phase_name}.timeseries.time', 'obj_comp.final_time', src_indices=[-1], )
def _read_sizing_json(json_filename, meta_data, verbosity=Verbosity.BRIEF): """ Read saved sizing results from a JSON file. Parameters ---------- json_filename : str or Path Path to the JSON file containing saved sizing data. meta_data : dict Variable metadata used to validate and load file data. verbosity : Verbosity or int, optional Controls the level of terminal output. Defaults to ``Verbosity.BRIEF``. Returns ------- AviaryValues An AviaryValues object populated with input values from the JSON file. """ aviary_inputs = AviaryValues() # load saved input list from json file with open(json_filename) as json_data_file: loaded_aviary_input_list = json.load(json_data_file) json_data_file.close() # Loop over input list and assign aviary problem input values for inputs in loaded_aviary_input_list: [var_name, var_values, var_units, var_type] = inputs # Initialize some flags to identify enums is_enum = False if var_type == "<class 'list'>": # check if the list contains enums for i in range(len(var_values)): if isinstance(var_values[i], str): if var_values[i].find('<') != -1: # Found a list of enums: set the flag is_enum = True # Manipulate the string to find the value tmp_var_values = var_values[i].split(':')[-1] var_values[i] = ( tmp_var_values.replace('>', '') .replace('<', '') .replace(']', '') .replace("'", '') .replace(' ', '') ) if is_enum: var_values = convert_strings_to_data(var_values) elif var_type.find('<enum') != -1: # Identify enums and manipulate the string to find the value tmp_var_values = var_values.split(':')[-1] var_values = ( tmp_var_values.replace('>', '') .replace('<', '') .replace(']', '') .replace("'", '') .replace(' ', '') ) var_values = convert_strings_to_data([var_values]) # Check if the variable is in meta data if var_name in meta_data.keys(): try: aviary_inputs.set_val(var_name, var_values, units=var_units, meta_data=meta_data) except BaseException: if verbosity >= Verbosity.VERBOSE: warnings.warn( f'Could not add item in json output to AviaryValues: input string = ' f'{inputs}, attempted to set_value({var_name}, {var_values}, {var_units}). ' 'This variable was skipped.' ) else: # Not in the MetaData if verbosity >= Verbosity.VERBOSE: warnings.warn( f'While reading json output, item was not found in MetaData: {inputs}. This ' 'variable was skipped.' ) return aviary_inputs
[docs] def reload_aviary_problem( filename, phase_info=None, metadata=CoreMetaData.copy(), verbosity=Verbosity.QUIET ): """ Loads a previously sized Aviary model and returns an AviaryProblem for that model. Parameters ---------- filename : str, Path User specified name and relative path of json file containing the sized aircraft data phase_info : dict, Path phase_info dictionary used by the original problem metadata : dict (optional) Custom metadata if needed to read all variables present in the json output file verbosity : Verbosity, int (optional) Controls level of terminal output for function call Returns ------- Aviary Problem object with filled aviary_inputs. To use this problem for anything other than running off-design missions, then the full level 2 interface should be used. "load_inputs()" can be skipped as the "aviary_inputs" attribute is prefilled here. """ # warning if default is used # Initialize a new aviary problem and aviary_input data structure prob = AviaryProblem() filename = get_path(filename) aviary_inputs = _read_sizing_json(filename, metadata, verbosity) prob.load_inputs(aviary_inputs, phase_info, verbosity=verbosity) prob.check_and_preprocess_inputs(verbosity=verbosity) # Add Systems prob.add_pre_mission_systems(verbosity=verbosity) prob.add_phases(verbosity=verbosity) prob.add_post_mission_systems(verbosity=verbosity) # Link phases and variables prob.link_phases(verbosity=verbosity) prob.add_driver(verbosity=verbosity) prob.add_design_variables(verbosity=verbosity) # Load optimization problem formulation # Detail which variables the optimizer can control prob.add_objective(verbosity=verbosity) prob.setup(verbosity=verbosity) prob.final_setup() # some variables are normally in the problem instead, so add them there too prob.set_val(Mission.GROSS_MASS, aviary_inputs.get_val(Mission.GROSS_MASS, 'lbm'), 'lbm') return prob