import csv
import json
import os
import subprocess
import warnings
from copy import deepcopy
from datetime import datetime
from enum import Enum
from itertools import count
from pathlib import Path
import dymos as dm
import numpy as np
import openmdao
import openmdao.api as om
import openmdao.utils.hooks as hooks
from openmdao.utils.reports_system import _default_reports
from openmdao.utils.units import convert_units
from packaging import version
from aviary.core.aviary_group import AviaryGroup
from aviary.interface.utils import set_warning_format
from aviary.utils.aviary_values import AviaryValues
from aviary.utils.csv_data_file import write_data_file
from aviary.utils.functions import convert_strings_to_data, get_path
from aviary.utils.merge_variable_metadata import merge_meta_data
from aviary.utils.named_values import NamedValues
from aviary.variable_info.enums import EquationsOfMotion, LegacyCode, ProblemType, Verbosity
from aviary.variable_info.functions import setup_model_options
from aviary.variable_info.variable_meta_data import CoreMetaData
from aviary.variable_info.variables import Aircraft, Dynamic, Mission, Settings
FLOPS = LegacyCode.FLOPS
GASP = LegacyCode.GASP
[docs]
class AviaryProblem(om.Problem):
"""
Main class for instantiating, formulating, and solving Aviary problems.
This Problem object is a specialized OpenMDAO Problem that has additional methods to help users
create and solve Aviary problems.
Parameters
----------
problem_type : ProblemType, optional
The type of problem to solve (e.g., SIZING, ALTERNATE, FALLOUT, MULTI_MISSION). If None, it
will be set from the aircraft data when ``load_inputs`` is called.
verbosity : Verbosity or int, optional
Controls the level of terminal output. If None, the verbosity will be determined from the
aircraft input data.
meta_data : dict, optional
Variable metadata used throughout the problem. Defaults to a copy of the Aviary base
metadata.
**kwargs : dict
Additional keyword arguments passed to ``om.Problem.__init__``.
Attributes
----------
timestamp : datetime
Timestamp recorded when the problem was created.
verbosity : Verbosity or None
Controls the level of terminal output for the problem.
problem_type : ProblemType or None
The type of problem being solved.
aviary_inputs : AviaryValues or None
The loaded aircraft and mission input data.
aviary_groups_dict : dict
Dictionary mapping names to AviaryGroup instances, used for multi-mission problems.
meta_data : dict
Variable metadata used throughout the problem.
generate_payload_range : bool
Flag indicating whether a payload-range diagram should be generated after a sizing run.
"""
[docs]
def __init__(
self,
problem_type: ProblemType = None,
verbosity=None,
meta_data=CoreMetaData.copy(),
**kwargs,
):
# Modify OpenMDAO's default_reports for this session.
new_reports = [
'subsystems',
'mission',
'timeseries_csv',
'run_status',
'sizing_results',
'input_checks',
'overridden_variables',
]
for report in new_reports:
if report not in _default_reports:
_default_reports.append(report)
super().__init__(**kwargs)
self.timestamp = datetime.now()
# If verbosity is set to anything but None, this defines how warnings are formatted for the
# whole problem - warning format won't be updated if user requests a different verbosity
# level for a specific method
self.verbosity = verbosity
set_warning_format(verbosity)
self.problem_type = problem_type
if problem_type == ProblemType.MULTI_MISSION:
self.model = om.Group()
else:
self.model = AviaryGroup()
# This causes problems where aviary_inputs are stored in different places for
# multi-mission vs. standard mission
self.aviary_inputs = None
self.aviary_groups_dict = {}
self.meta_data = meta_data
# TODO try and find a better solution than a new custom flag - the issue is multimission
# problems don't have a consistent variable path to check the inputs later on
self.generate_payload_range = False
[docs]
def load_external_subsystems(self, external_subsystems: list = [], verbosity=None):
"""
Add external subsystems to the Aviary problem.
For multi-mission problems, this adds the provided external subsystems to every AviaryGroup.
Subsystem metadata is merged into the problem's metadata after loading.
Parameters
----------
external_subsystems : list of SubsystemBuilder
List of external subsystems to be added.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
group.load_external_subsystems(
external_subsystems=external_subsystems, verbosity=verbosity
)
self.meta_data = merge_meta_data([self.meta_data, group.meta_data])
else:
self.model.load_external_subsystems(
external_subsystems=external_subsystems, verbosity=verbosity
)
self.meta_data = merge_meta_data([self.meta_data, self.model.meta_data])
[docs]
def add_aviary_group(
self,
name: str,
aircraft: AviaryValues,
phase_info: dict,
problem_configurator=None,
phase_info_modifier=None,
verbosity: Verbosity = Verbosity.BRIEF,
):
"""
Create and add an AviaryGroup for a multi-mission problem.
This method creates an AviaryGroup for a specific aircraft and mission combination, loads
and preprocesses its inputs, and merges its metadata into the problem-level metadata.
Parameters
----------
name : str
A unique name identifying this group, used to reference it later.
aircraft : AviaryValues
Defines the aircraft configuration for this mission.
phase_info : dict
Defines the mission phases the aircraft will fly.
problem_configurator : ProblemConfigurator, optional
Required when using custom equations of motion. See ``TwoDOFProblemConfigurator`` for an
example.
phase_info_modifier : callable, optional
A function that takes the ``phase_info`` dictionary and aviary_inputs and returns a modified
phase_info.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. Defaults to ``Verbosity.BRIEF``.
Returns
-------
AviaryGroup
The AviaryGroup object containing the specified aircraft and mission configuration.
Raises
------
ValueError
If ``problem_type`` is not ``ProblemType.MULTI_MISSION``.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type is not ProblemType.MULTI_MISSION:
ValueError(
'add_aviary_group() should only be called when ProblemType is MULTI_MISSION.'
)
sub = self.model.add_subsystem(name, AviaryGroup())
sub.meta_data = self.meta_data
sub.load_inputs(
aircraft_data=aircraft,
phase_info=phase_info,
problem_configurator=problem_configurator,
phase_info_modifier=phase_info_modifier,
verbosity=verbosity,
)
sub.check_and_preprocess_inputs()
self.aviary_groups_dict[name] = sub
if self.verbosity is None:
# If problem-level verbosity was not defined, use the verbosity specified in the first
# added AviaryGroup
self.verbosity = sub.verbosity
# TODO try and find a better solution than a new custom flag - the issue is multimission
# problems don't have a consistent variable path to check the inputs later on
if Settings.PAYLOAD_RANGE in sub.aviary_inputs:
self.generate_payload_range = sub.aviary_inputs.get_val(Settings.PAYLOAD_RANGE)
return sub
# TODO the third paragraph (talking about FLOPS vs GASP method) should be greatly simplified.
# Eventually, that functionality will be moved into mission (issue #361)
[docs]
def add_pre_mission_systems(self, verbosity=None):
"""
Add pre-mission systems to the Aviary problem.
These systems are executed before the mission and typically include aircraft sizing,
geometry, mass estimation, and takeoff calculations. For multi-mission problems, pre-mission
systems are added to each AviaryGroup independently.
Depending on the mission model specified (`FLOPS` or `GASP`), this method adds various
subsystems to the aircraft model. For the `FLOPS` mission model, a takeoff phase is added
using the Takeoff class with the number of engines and airport altitude specified. For the
`GASP` mission model, three subsystems are added: a TaxiSegment subsystem, an ExecComp to
calculate the time to initiate gear and flaps, and an ExecComp to calculate the speed at
which to initiate rotation. All subsystems are promoted with aircraft and mission inputs and
outputs as appropriate.
Parameters
----------
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
group.add_pre_mission_systems(verbosity=verbosity)
else:
self.model.add_pre_mission_systems(verbosity=verbosity)
[docs]
def add_phases(
self,
parallel_phases=True,
verbosity=None,
):
"""
Add mission phases to the problem trajectory.
Adds Dymos phases to the trajectory based on the user-specified phase_info dictionary. For
multi-mission problems, phases are added to each AviaryGroup independently.
Parameters
----------
parallel_phases : bool, optional
If True, the top-level container of all phases will be a ParallelGroup; otherwise it
will be a standard OpenMDAO Group. Defaults to True.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
Returns
-------
dm.Trajectory
The Dymos Trajectory object containing the added mission phases.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
Traj = group.add_phases(
parallel_phases=parallel_phases,
verbosity=verbosity,
comm=self.comm,
)
else:
Traj = self.model.add_phases(
parallel_phases=parallel_phases,
verbosity=verbosity,
comm=self.comm,
)
return Traj
# TODO the later paragraph (talking about FLOPS vs GASP method) should be greatly simplified.
# Eventually, that functionality will be moved into mission (issue #361)
[docs]
def add_post_mission_systems(self, verbosity=None):
"""
Add post-mission systems to the Aviary problem.
These systems are executed after the mission in the model execution order and typically
include landing calculations, fuel burn summation, reserve fuel calculations, and mission
constraints. For multi-mission problems, post-mission systems are added to each AviaryGroup
independently.
Depending on the mission model specified (`FLOPS` or `GASP`), this method adds various
subsystems to the aircraft model. For the `FLOPS` mission model, a landing phase is added
using the Landing class with the wing area and lift coefficient specified, and a takeoff
constraints ExecComp is added to enforce mass, range, velocity, and altitude continuity
between the takeoff and climb phases. The landing subsystem is promoted with aircraft and
mission inputs and outputs as appropriate, while the takeoff constraints ExecComp is only
promoted with mission inputs and outputs.
For the `GASP` mission model, four subsystems are added: a LandingSegment subsystem, an
ExecComp to calculate the reserve fuel required, an ExecComp to calculate the overall fuel
burn, and three ExecComps to calculate various mission objectives and constraints. All
subsystems are promoted with aircraft and mission inputs and outputs as appropriate.
Parameters
----------
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None,
uses the problem-level verbosity.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
group.add_post_mission_systems(verbosity=verbosity)
else:
self.model.add_post_mission_systems(verbosity=verbosity)
[docs]
def link_phases(self, verbosity=None):
"""
Add an optimization driver to the Aviary problem.
Depending on the provided optimizer, the method instantiates the relevant driver
(``ScipyOptimizeDriver`` or ``pyOptSparseDriver``) and sets optimizer-specific options.
Options for SNOPT, IPOPT, and SLSQP are preconfigured. The method also allows for the
declaration of coloring and setting debug print options based on verbosity.
Parameters
----------
optimizer : str, optional
The name of the optimizer to use. Can be ``'SLSQP'``, ``'SNOPT'``, ``'IPOPT'``, or
others supported by OpenMDAO/pyOptSparse. If ``'SLSQP'``, a ``ScipyOptimizeDriver`` is
used; otherwise a ``pyOptSparseDriver`` is used. Defaults to ``'IPOPT'``.
use_coloring : bool, optional
If True, the driver will declare coloring, which can speed up derivative computations.
Defaults to True.
max_iter : int, optional
The maximum number of iterations allowed for the optimization process. Defaults to 50.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
group.link_phases(verbosity=verbosity, comm=self.comm)
else:
self.model.link_phases(verbosity=verbosity, comm=self.comm)
[docs]
def add_driver(self, optimizer=None, use_coloring=None, max_iter=None, verbosity=None):
"""
Add an optimization driver to the Aviary problem.
Depending on the provided optimizer, the method instantiates the relevant driver
(``ScipyOptimizeDriver`` or ``pyOptSparseDriver``) and sets optimizer-specific options.
Options for SNOPT, IPOPT, and SLSQP are preconfigured. The method also allows for the
declaration of coloring and setting debug print options based on verbosity.
Parameters
----------
optimizer : str, optional
The name of the optimizer to use. Can be ``'SLSQP'``, ``'SNOPT'``, ``'IPOPT'``, or
others supported by OpenMDAO/pyOptSparse. If ``'SLSQP'``, a ``ScipyOptimizeDriver`` is
used; otherwise a ``pyOptSparseDriver`` is used. Defaults to ``'IPOPT'``.
use_coloring : bool, optional
If True, the driver will declare coloring, which can speed up derivative computations.
Defaults to True.
max_iter : int, optional
The maximum number of iterations allowed for the optimization process. Defaults to 50.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
# Set defaults for optimizer, use_coloring and max_iter
if optimizer is None:
optimizer = 'IPOPT'
if use_coloring is None:
use_coloring = True
if max_iter is None:
max_iter = 50
# check if optimizer is SLSQP
if optimizer == 'SLSQP':
driver = self.driver = om.ScipyOptimizeDriver()
else:
driver = self.driver = om.pyOptSparseDriver()
driver.options['optimizer'] = optimizer
if use_coloring:
# define coloring options by verbosity
if verbosity < Verbosity.VERBOSE: # QUIET, BRIEF
driver.declare_coloring(show_summary=False)
elif verbosity == Verbosity.VERBOSE:
driver.declare_coloring(show_summary=True)
else: # DEBUG
driver.declare_coloring(show_summary=True, show_sparsity=True)
if driver.options['optimizer'] == 'SNOPT':
# Print Options #
if verbosity == Verbosity.QUIET:
isumm, iprint = 0, 0
elif verbosity == Verbosity.BRIEF:
isumm, iprint = 6, 0
elif verbosity > Verbosity.BRIEF: # VERBOSE, DEBUG
isumm, iprint = 6, 9
driver.opt_settings['iSumm'] = isumm
driver.opt_settings['iPrint'] = iprint
# Optimizer Settings #
driver.opt_settings['Major iterations limit'] = max_iter
driver.opt_settings['Major optimality tolerance'] = 1e-4
driver.opt_settings['Major feasibility tolerance'] = 1e-6
elif driver.options['optimizer'] == 'IPOPT':
# Print Options #
if verbosity == Verbosity.QUIET:
print_level = 0
driver.opt_settings['print_user_options'] = 'no'
elif verbosity == Verbosity.BRIEF:
print_level = 3 # minimum to get exit status
driver.opt_settings['print_user_options'] = 'no'
driver.opt_settings['print_frequency_iter'] = 10
elif verbosity == Verbosity.VERBOSE:
print_level = 5
else: # DEBUG
print_level = 7
driver.opt_settings['print_level'] = print_level
# Optimizer Settings #
driver.opt_settings['tol'] = 1.0e-6
driver.opt_settings['mu_init'] = 1e-5
driver.opt_settings['max_iter'] = max_iter
# for faster convergence
driver.opt_settings['nlp_scaling_method'] = 'gradient-based'
driver.opt_settings['alpha_for_y'] = 'safer-min-dual-infeas'
driver.opt_settings['mu_strategy'] = 'monotone'
# Shugo's recommended settings for robustness
# driver.opt_settings['mu_init'] = 1.0
# driver.opt_settings['nlp_scaling_method'] = 'none'
# driver.opt_settings['limited_memory_max_history'] = 50
elif driver.options['optimizer'] == 'SLSQP':
# Print Options #
if verbosity == Verbosity.QUIET:
disp = False
else:
disp = True
driver.options['disp'] = disp
# Optimizer Settings #
driver.options['tol'] = 1e-9
driver.options['maxiter'] = max_iter
# pyoptsparse print settings for both SNOPT, IPOPT
if optimizer in ('SNOPT', 'IPOPT'):
if verbosity == Verbosity.QUIET:
driver.options['print_results'] = False
elif verbosity < Verbosity.DEBUG: # QUIET, BRIEF, VERBOSE
driver.options['print_results'] = 'minimal'
elif verbosity >= Verbosity.DEBUG:
driver.options['print_opt_prob'] = True
# optimizer agnostic settings
if verbosity == Verbosity.DEBUG:
driver.options['debug_print'] = [
'desvars',
'ln_cons',
'nl_cons',
'objs',
]
# TODO docstring is much, much too long. This information should get moved to documentation
[docs]
def add_design_variables(self, verbosity=None):
"""
Add design variables and related constraints to the Aviary problem.
Delegates to each AviaryGroup's ``add_design_variables`` method, which selects the
appropriate design variables and constraints based on the mission method and problem type.
For multi-mission problems, design variables are added to each AviaryGroup independently.
Parameters
----------
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
Notes
-----
If using the FLOPS model, a design variable is added for the gross mass of the aircraft,
with a lower bound of 10 lbm and an upper bound of 900,000 lbm.
If using the GASP model, the following design variables are added depending on the mission
type:
- the initial thrust-to-weight ratio of the aircraft during ascent
- the duration of the ascent phase
- the time constant for the landing gear actuation
- the time constant for the flaps actuation
In addition, two constraints are added for the GASP model:
- the initial altitude of the aircraft with gear extended is constrained to be 50 ft
- the initial altitude of the aircraft with flaps extended is constrained to be 400 ft
If solving a sizing problem, a design variable is added for the gross mass of the aircraft,
and another for the gross mass of the aircraft computed during the mission. A constraint is
also added to ensure that the residual range is zero.
If solving an OFF_DESIGN_MIN_FUEL problem, only a design variable for the gross mass of the aircraft
computed during the mission is added. A constraint is also added to ensure that the residual
range is zero.
In all cases, a design variable is added for the final cruise mass of the aircraft, with no
upper bound, and a residual mass constraint is added to ensure that the mass balances.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
group.add_design_variables(problem_type=self.problem_type, verbosity=verbosity)
else:
self.model.add_design_variables(problem_type=self.problem_type, verbosity=verbosity)
[docs]
def add_objective(self, objective_type=None, ref=None, verbosity=None):
"""
Add the objective function to the Aviary problem.
The objective is selected based on the provided ``objective_type`` string, or inferred from
``problem_type`` if no explicit type is given.
.. note::
The ``ref`` value should be positive for values being minimized and negative for values
being maximized.
Parameters
----------
objective_type : str, optional
The type of objective to add. Valid options are ``'mass'``, ``'time'``,
``'hybrid_objective'``, ``'fuel_burned'``, and ``'fuel'``. If None, the objective is
chosen based on ``problem_type``.
ref : float, optional
The reference value for scaling the objective. If None, a default value is used based on
the objective type. See ``default_ref_values`` in the method body for defaults.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
Raises
------
ValueError
If an invalid ``objective_type`` or ``problem_type`` is provided.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
# This isn't really a fuel objective, it's a hybrid or compound objective
self.model.add_subsystem(
'fuel_obj',
om.ExecComp(
'reg_objective = overall_fuel/10000 + ascent_duration/30.',
reg_objective={'val': 0.0, 'units': 'unitless'},
ascent_duration={'units': 's', 'shape': 1},
overall_fuel={'units': 'lbm'},
),
promotes_inputs=[
('ascent_duration', Mission.Takeoff.ASCENT_DURATION),
('overall_fuel', Mission.TOTAL_FUEL),
],
promotes_outputs=[('reg_objective', Mission.Objectives.FUEL)],
)
# TODO: All references to self.model. will need to be updated
self.model.add_subsystem(
'range_obj',
om.ExecComp(
'reg_objective = -actual_range/1000 + ascent_duration/30.',
reg_objective={'val': 0.0, 'units': 'unitless'},
ascent_duration={'units': 's', 'shape': 1},
actual_range={'val': self.model.target_range, 'units': 'NM'},
),
promotes_inputs=[
('actual_range', Mission.RANGE),
('ascent_duration', Mission.Takeoff.ASCENT_DURATION),
],
promotes_outputs=[('reg_objective', Mission.Objectives.RANGE)],
)
# Dictionary for default reference values
default_ref_values = {
'mass': -5e4,
'hybrid_objective': -5e4,
'fuel_burned': 1e4,
'fuel': 1e4,
}
# Check if an objective type is specified
if objective_type is not None:
ref = ref if ref is not None else default_ref_values.get(objective_type, 1)
final_phase_name = self.model.regular_phases[-1]
if objective_type == 'mass':
self.model.add_objective(
f'traj.{final_phase_name}.timeseries.{Dynamic.Vehicle.MASS}',
index=-1,
ref=ref,
)
elif objective_type == 'time':
self.model.add_objective(
f'traj.{final_phase_name}.timeseries.time', index=-1, ref=ref
)
elif objective_type == 'hybrid_objective':
self._add_hybrid_objective(self.model.mission_info)
self.model.add_objective('obj_comp.obj')
elif objective_type == 'fuel_burned':
self.model.add_objective(Mission.FUEL, ref=ref)
elif objective_type == 'fuel':
self.model.add_objective(Mission.Objectives.FUEL, ref=ref)
else:
raise ValueError(
f"{objective_type} is not a valid objective. 'objective_type' must "
'be one of the following: mass, time, hybrid_objective, '
'fuel_burned, or fuel'
)
else: # If no 'objective_type' is specified, we handle based on 'problem_type'
# If 'ref' is not specified, assign a default value
ref = ref if ref is not None else 1
if self.problem_type is ProblemType.SIZING:
self.model.add_objective(Mission.Objectives.FUEL, ref=ref)
elif self.problem_type is ProblemType.OFF_DESIGN_MIN_FUEL:
self.model.add_objective(Mission.Objectives.FUEL, ref=ref)
elif self.problem_type is ProblemType.OFF_DESIGN_MAX_RANGE:
# if ref > 0:
# # Maximize range.
# ref = -ref
self.model.add_objective(Mission.Objectives.RANGE, ref=ref)
else:
raise ValueError(f'{self.problem_type} is not a valid problem type.')
[docs]
def add_design_var_default(
self,
name: str,
lower: float = None,
upper: float = None,
units: str = None,
src_shape=None,
default_val: float = None,
ref: float = None,
):
"""
Add a design variable with an initialized default value.
The default value can be overwritten after setup using ``prob.set_val()``.
Parameters
----------
name : str
The promoted name of the design variable.
lower : float, optional
The lower bound for the design variable.
upper : float, optional
The upper bound for the design variable.
units : str, optional
Units for the design variable.
src_shape : int or tuple, optional
Assumed shape of any connected source or higher-level promoted input.
default_val : float or ndarray, optional
The default value to assign to this design variable.
ref : float or ndarray, optional
Value of design variable that scales to 1.0 in the driver.
"""
self.model.add_design_var(name=name, lower=lower, upper=upper, units=units, ref=ref)
if default_val is not None:
self.model.set_input_defaults(
name=name, val=default_val, units=units, src_shape=src_shape
)
[docs]
def set_design_range(self, missions: list[str], range: str):
# TODO: What happens if design range is specified in CSV??? should be able to access from group.aviary_values
"""
Set the design range for a multi-mission problem.
Finds the longest mission among those specified and sets its range as the design range for
all AviaryGroups. The design range is used within Aviary for sizing subsystems (e.g.,
avionics, air conditioning).
Parameters
----------
missions : list of str
The names of missions instantiated via ``add_aviary_group()``.
range : str
The promoted name of the range variable to set (e.g., ``'Aircraft1.Range'``).
"""
matching_names = [
(name, group) for name, group in self.aviary_groups_dict.items() if name in missions
]
design_range = []
# loop through all the phase_info and extract target ranges
for name, group in matching_names:
target_range, units = group.post_mission_info['target_range']
design_range.append(convert_units(target_range, units, 'nmi'))
# TODO: loop through all the .csv files and extract Aircraft.Design.RANGE
design_range_max = np.max(design_range)
self.set_val(range, val=design_range_max, units='nmi')
[docs]
def add_composite_objective(self, *args, ref: float = None):
"""
Add a composite objective by combining multiple model outputs.
Creates a composite objective output by assembling an ``ExecComp`` based on a variety of
AviaryGroup outputs selected by the user. Multiple outputs from the same or different
aircraft models can be combined (e.g., fuel burn plus noise emissions). Each output can
include a weight; if omitted, equal weighting is assumed.
Parameters
----------
*args : str, tuple of (str, float), tuple of (str, str), or tuple of (str, str, float)
Each argument specifies an objective component in one of the following forms:
- ``output`` *str* - A single output variable name.
- ``(output, weight)`` *tuple of (str, float)* - An output with its weight.
- ``(model, output)`` *tuple of (str, str)* - A model name and output variable.
- ``(model, output, weight)`` *tuple of (str, str, float)* - A model name, output variable, and weight.
If no arguments are provided, the objective is inferred from ``problem_type``.
Recognized shorthand strings for outputs include ``'fuel'``, ``'fuel_burned'``,
``'mass'``, ``'time'``, and ``'range'``.
ref : float, optional
Reference value for scaling the final composite objective.
Raises
------
ValueError
If an argument cannot be parsed into a valid (model, output, weight) tuple, or if a
model name is provided without an associated output.
"""
# There are LOTS of different ways for the users to input str, 2-tuple, or 3-tuple into *args
# Correct combinations are (output), (output, weight), (model, output), or (model, output, weight).
# We have to catch every case and advise the user on how to corect their errors and add defaults as needed.
default_model = 'model'
default_weight = 1.0
objectives = []
for arg in args:
if isinstance(arg, tuple) and len(arg) == 3:
model, output, weight = arg
if model not in self.aviary_groups_dict:
raise ValueError(
f'The first element specified in {arg} must be the model name.'
)
elif isinstance(arg, tuple) and len(arg) == 2:
first, second = arg
if isinstance(first, str) and isinstance(second, str):
if first in self.aviary_groups_dict:
# we have the model and output but no weight
model, output, weight = first, second, default_weight
else:
raise ValueError(
f'The first element specified in {arg} must be the model name.'
)
elif isinstance(first, str) and isinstance(second, (float, int)):
if first in self.aviary_groups_dict:
raise ValueError(
f'When specifying {arg}, the user specified a model name and a weight '
f'but did not specify what output from that model the weight should be applied to.'
)
else:
# we have the output and the weight but not model
model, output, weight = default_model, first, second
else:
raise ValueError(
f'The user specified {arg} which is not a 2-tuple of (model, output) or (output, weight).'
)
elif isinstance(arg, str):
if arg in self.aviary_groups_dict:
raise ValueError(
f"When specifying '{arg}', the user provided only a model name "
f'but did not specify what output from that model should be used as the objective.'
)
else:
# we have an output and we use the default model and weights
model, output, weight = default_model, arg, default_weight
# in some cases the users provides no input and we can derive the objectie from the problem type:
elif self.model.problem_type is ProblemType.SIZING:
model, output, weight = default_model, Mission.Objectives.FUEL, default_weight
elif self.model.problem_type is ProblemType.OFF_DESIGN_MIN_FUEL:
model, output, weight = default_model, Mission.Objectives.FUEL, default_weight
elif self.model.problem_type is ProblemType.OFF_DESIGN_MAX_RANGE:
model, output, weight = default_model, Mission.Objectives.RANGE, default_weight
else:
raise ValueError(
f'Unrecognized objective format: {arg}. '
f'Each argument must be one of the following: '
f'(output), (output, weight), (model, output), or (model, output, weight).'
f'Outputs can be from the variable meta data, or can be: fuel_burned, fuel'
f'Or problem type must be set to SIZING, OFF_DESIGN_MIN_FUEL, or OFF_DESIGN_MAX_RANGE'
)
objectives.append((model, output, weight))
# objectives = [
# ('model1', Mission.FUEL, 1),
# ('model2', Mission.CO2, 1),
# ...
# ]
# Dictionary for default reference values
default_ref_values = {
'mass': -5e4,
'hybrid_objective': -5e4,
'fuel_burned': 1e4,
'fuel': 1e4,
}
# Now checkout the output and see if we have recognizable strings and replace them with the variable meta data name
objectives_cleaned = []
for model, output, weight in objectives:
if output == 'fuel_burned':
output = Mission.FUEL
# default scaling is valid only if this is the only argument and the ref has not yet been set
if len(args) == 1 and ref == None:
# set a default ref
ref = default_ref_values['fuel_burned']
elif output == 'fuel':
output = Mission.Objectives.FUEL
if len(args) == 1 and ref == None:
ref = default_ref_values['fuel']
elif output == 'mass':
output = Mission.FINAL_MASS
if len(args) == 1 and ref == None:
ref = default_ref_values['mass']
elif output == 'time':
output = Mission.FINAL_TIME
elif output == 'range':
output = Mission.RANGE # Unsure if this will work
objectives_cleaned.append((model, output, weight))
# Create the calculation string for the ExecComp() and the promotion reference values
weighted_exprs = []
connection_names = []
obj_inputs = []
total_weight = sum(weight for _, _, weight in objectives_cleaned)
for model, output, weight in objectives_cleaned:
output_safe = output.replace(':', '_')
# we use "_" here because ExecComp() cannot intake "."
obj_input = f'{model}_{output_safe}'
obj_inputs.append(obj_input)
weighted_exprs.append(f'{obj_input}*{weight}/{total_weight}')
connection_names.append(
[f'{model}.{output}', f'composite_function.{model}_{output_safe}']
)
final_expr = ' + '.join(weighted_exprs)
# weighted_str looks like: 'model1_fuelburn*0.67*0.5 + model1_gross_mass*0.33*0.5 + model2_fuelburn*0.67*0.5 + model2_gross_mass*0.33*0.5'
kwargs = {}
if version.parse(openmdao.__version__) >= version.parse('3.40'):
# We can get the correct unit from the source. This prevents a warning.
kwargs = {k: {'units_by_conn': True} for k in obj_inputs}
# adding composite execComp to super problem
self.model.add_subsystem(
'composite_function',
om.ExecComp('composite_objective = ' + final_expr, **kwargs),
promotes_outputs=['composite_objective'],
)
# connect from inside of the models to the composite objective
for source, target in connection_names:
self.model.connect(source, target)
# finally add the objective
self.model.add_objective('composite_objective', ref=ref)
[docs]
def add_composite_objective_adv(
self,
missions: list[str],
outputs: list[str],
mission_weights: list[float] = None,
output_weights: list[float] = None,
ref: float = 1.0,
):
"""
Add a composite objective with independent mission and output weights.
Aggregates output values across multiple mission models with independent weighting for both
missions and outputs. This is useful when historical data is available on how often each
mission is flown (``mission_weights``) and multiple objectives are desired per flight
(``output_weights``).
Parameters
----------
missions : list of str
Subsystem names corresponding to different missions (e.g., ``['model1', 'model2']``).
outputs : list of str
Output variable names to include from each mission (e.g., ``[Mission.FUEL,
Mission.GROSS_MASS]``).
mission_weights : list of float, optional
Weights assigned to each mission. Normalized internally to sum to 1.0. If None, equal
weighting is assumed.
output_weights : list of float, optional
Weights assigned to each output variable. Normalized internally to sum to 1.0. If None,
equal weighting is assumed.
ref : float, optional
Reference value for scaling the final composite objective. Defaults to 1.0.
"""
# Setup mission and output lengths if they are not already given
if mission_weights is None:
mission_weights = np.ones(len(missions))
if output_weights is None:
output_weights = np.ones(len(outputs))
# # Make an ExecComp
# for mission in missions:
# for output in outputs:
# weights are normalized - e.g. for given weights 3:1, the normalized
# weights are 0.75:0.25
# TODO: Remove before push
# output_weights = [2,1]
# mission_weights = [1,1]
# missions = ['model1','model2']
# outputs = ['fuelburn','gross_mass']
weighted_exprs = []
connection_names = []
output_weights = [float(weight / sum(output_weights)) for weight in output_weights]
mission_weights = [float(weight / sum(mission_weights)) for weight in mission_weights]
for mission, mission_weight in zip(missions, mission_weights):
for output, output_weight in zip(outputs, output_weights):
connection_names.append(
[f'composite_function.{mission}_{output}', f'{mission}.{output}']
)
weighted_exprs.append(f'{mission}_{output}*{output_weight}*{mission_weight}')
final_expr = ' + '.join(weighted_exprs)
# weighted_str looks like: 'model1.fuelburn*0.67*0.5 + model1.gross_mass*0.33*0.5 + model2.fuelburn*0.67*0.5 + model2.gross_mass*0.33*0.5'
# adding composite execComp to super problem
self.model.add_subsystem(
'composite_function',
om.ExecComp('composite_objective = ' + final_expr, has_diag_partials=True),
promotes_outputs=['composite_objective'],
)
# connect from inside of the models to the composite objective
for target, source in connection_names:
self.model.connect(target, source)
# finally add the objective
self.model.add_objective('composite_objective', ref=ref)
[docs]
def build_model(self, verbosity=None):
"""
Build the complete Aviary model in a single call.
This is a convenience method that calls ``add_pre_mission_systems()``, ``add_phases()``,
``add_post_mission_systems()``, and ``link_phases()`` in sequence. If finer control is
needed, users should call these four methods individually.
Parameters
----------
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
group.add_pre_mission_systems(verbosity=verbosity)
group.add_phases(verbosity=verbosity, comm=self.comm)
group.add_post_mission_systems(verbosity=verbosity)
group.link_phases(verbosity=verbosity, comm=self.comm)
else:
self.model.add_pre_mission_systems(verbosity=verbosity)
self.model.add_phases(verbosity=verbosity, comm=self.comm)
self.model.add_post_mission_systems(verbosity=verbosity)
self.model.link_phases(verbosity=verbosity, comm=self.comm)
[docs]
def setup(self, **kwargs):
"""
Set up the Aviary problem.
Configures model options, calls OpenMDAO's ``setup()``, and applies initial guesses to seed
the problem with reasonable starting values.
Parameters
----------
**kwargs : dict
Additional keyword arguments passed to ``om.Problem.setup()``. Note: a ``verbosity``
keyword, if present, will be ignored, as this method does not require it.
"""
# verbosity is not used in this method, but it is understandable that a user might try and
# include it (only method that doesn't accept it).
if 'verbosity' in kwargs:
kwargs.pop('verbosity')
# Use OpenMDAO's model options to pass all options through the system hierarchy.
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
setup_model_options(
self, group.aviary_inputs, group.meta_data, prefix=name, group=group
)
with warnings.catch_warnings():
# group.aviary_inputs is already set
group.meta_data = self.meta_data # <- meta_data is the same for all groups
# group.phase_info is already set
else:
setup_model_options(self, self.aviary_inputs, self.meta_data)
# suppress warnings:
# "input variable '...' promoted using '*' was already promoted using 'aircraft:*'
with warnings.catch_warnings():
self.model.aviary_inputs = (
self.aviary_inputs
) # <- there is only one aviary_inputs in this case
self.model.meta_data = self.meta_data
# self.model.phase_info is already set
with warnings.catch_warnings():
warnings.simplefilter('ignore', om.OpenMDAOWarning)
warnings.simplefilter('ignore', om.PromotionWarning)
super().setup(**kwargs)
self.set_initial_guesses(verbosity=None)
[docs]
def set_initial_guesses(self, parent_prob=None, parent_prefix='', verbosity=None):
"""
Set initial guesses for trajectory states and controls.
Calls ``set_val`` on the trajectory to seed the problem with reasonable initial guesses.
This is especially important for collocation methods. For multi-mission problems, initial
guesses are set on each AviaryGroup independently.
Parameters
----------
parent_prob : om.Problem, optional
If provided along with ``parent_prefix``, initial guesses are set on this parent problem
instead of ``self``. Used internally for nested problem setups.
parent_prefix : str, optional
Prefix prepended to variable paths when setting values on ``parent_prob``. Defaults to
``''``.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if self.problem_type == ProblemType.MULTI_MISSION:
for name, group in self.aviary_groups_dict.items():
group.set_initial_guesses(
parent_prob=parent_prob,
parent_prefix=parent_prefix,
verbosity=verbosity,
)
else:
self.model.set_initial_guesses(
parent_prob=parent_prob,
parent_prefix=parent_prefix,
verbosity=verbosity,
)
[docs]
def run_aviary_problem(
self,
restart_filename=None,
suppress_solver_print=True,
run_driver=True,
simulate=False,
make_plots=True,
verbosity=None,
real_time_plotting=False,
):
"""
Run the Aviary problem.
Executes the problem, which may be a simulation, optimization, or single-pass model
evaluation depending on the arguments provided. After execution, an N2 diagram is generated
and, if applicable, a payload-range analysis is run.
Parameters
----------
restart_filename : str, optional
Path to a file containing previously computed solutions to use as starting points.
suppress_solver_print : bool, optional
If True, all solver print statements are suppressed. Defaults to True.
run_driver : bool, optional
If True, the optimizer is executed. If False, the problem runs a single pass (equivalent
to ``run_model``). Defaults to True.
simulate : bool, optional
If True, an explicit Dymos simulation is performed after optimization. Defaults to False.
make_plots : bool, optional
If True, Dymos HTML timeseries plots are generated. Defaults to True.
verbosity : Verbosity or int, optional
Controls the level of terminal output for this method. If None, uses the problem-level
verbosity.
real_time_plotting : bool, optional
If True, enables real-time plotting of the optimization progress.
"""
# `self.verbosity` is "true" verbosity for entire run. `verbosity` is verbosity
# override for just this method
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if (
verbosity >= Verbosity.VERBOSE or real_time_plotting
): # If real_time_plotting needs a driver recorder file to run the realtime plot server
recorder = om.SqliteRecorder('optimization_history.db')
self.driver.add_recorder(recorder)
self.final_setup()
if verbosity >= Verbosity.VERBOSE: # VERBOSE, DEBUG
with open(self.get_reports_dir() / 'input_list.txt', 'w') as outfile:
self.model.list_inputs(out_stream=outfile)
def _view_realtime_plot_hook(driver):
case_recorder_file = str(driver._rec_mgr._recorders[0]._filepath)
cmd = ['openmdao', 'realtime_plot', '--pid', str(os.getpid()), case_recorder_file]
cp = subprocess.Popen(cmd) # nosec: trusted input
# Do a quick non-blocking check to see if it immediately failed
# This will catch immediate failures but won't wait for the process to finish
quick_check = cp.poll()
if quick_check is not None and quick_check != 0:
# Process already terminated with an error
stderr = cp.stderr.read().decode()
raise RuntimeError(
f'Failed to start up the realtime plot server with code {quick_check}: {stderr}.'
)
# register the hook to stat up the real-time plot server
if real_time_plotting:
if not self.driver:
raise RuntimeError(
'Unable to run realtime optimization progress plot because no Driver'
)
hooks._register_hook(
'_setup_recording', 'Driver', post=_view_realtime_plot_hook, ncalls=1
)
hooks._setup_hooks(self.driver)
if suppress_solver_print:
self.set_solver_print(level=0)
# and run mission, and dynamics
if run_driver:
self.result = dm.run_problem(
self,
run_driver=run_driver,
simulate=simulate,
make_plots=make_plots,
solution_record_file='problem_history.db',
restart=restart_filename,
)
# Manually print out a failure message for low verbosity modes that suppress
# optimizer printouts, which may include the results message. Assumes success,
# alerts user on a failure
if (
not self.result.success and verbosity <= Verbosity.BRIEF # QUIET, BRIEF
):
warnings.warn('\nAviary run failed. See the dashboard for more details.\n')
else:
self.run_model()
self.result = self.driver.result
# update n2 diagram after run.
outdir = Path(self.get_reports_dir(force=True))
outfile = os.path.join(outdir, 'n2.html')
om.n2(
self,
outfile=outfile,
show_browser=False,
)
if verbosity >= Verbosity.VERBOSE: # VERBOSE, DEBUG
with open(Path(self.get_reports_dir()) / 'output_list.txt', 'w') as outfile:
self.model.list_outputs(out_stream=outfile)
if self.generate_payload_range and self.problem_type == ProblemType.SIZING:
self.run_payload_range()
[docs]
def run_off_design_mission(
self,
problem_type: ProblemType,
phase_info=None,
equations_of_motion: EquationsOfMotion = None,
problem_configurator=None,
num_first_class=None,
num_business=None,
num_economy=None,
num_pax=None,
wing_cargo=None,
misc_cargo=None,
cargo_mass=None,
mission_gross_mass=None,
mission_range=None,
optimizer=None,
name=None,
fill_cargo=False,
fill_fuel=False,
verbosity=None,
):
"""
Run an off-design mission using a previously sized aircraft.
Creates a new AviaryProblem configured for the specified off-design mission type, inheriting
the aircraft design from the current problem.
Parameters
----------
problem_type : ProblemType or str
The type of off-design mission to fly. Must be ``ALTERNATE`` or ``FALLOUT``; ``SIZING``
is not allowed.
phase_info : dict, optional
Phase info for the off-design mission. If None, the phase info from the previous Aviary
run is reused (with extended cruise bounds where applicable).
equations_of_motion : EquationsOfMotion, optional
Equations of motion for the off-design mission. If None, the equations from the previous
run are used.
problem_configurator : ProblemConfigurator, optional
Problem configurator for the off-design mission. If None, the configurator from the
previous run is used.
num_first_class : int, optional
Number of first-class passengers. FLOPS mass method only.
num_business : int, optional
Number of business-class passengers. FLOPS mass method only.
num_economy : int, optional
Number of economy-class passengers. FLOPS mass method only.
num_pax : int, optional
Total number of passengers. Optional when using FLOPS mass method if per-class counts
are provided instead.
wing_cargo : float, optional
Mass of wing cargo, in lbm. FLOPS mass method only.
misc_cargo : float, optional
Mass of miscellaneous cargo, in lbm. FLOPS mass method only.
cargo_mass : float, optional
Total cargo mass, in lbm. When using FLOPS mass, this overrides the sum of wing and misc
cargo.
mission_gross_mass : float, optional
Gross mass for the off-design mission, in lbm. Defaults to the design gross mass. For
OFF_DESIGN_MIN_FUEL missions, this is the initial guess.
mission_range : float, optional
Fixed range for OFF_DESIGN_MIN_FUEL missions, in nautical miles. Unused for other mission types.
optimizer : str, optional
Optimizer to use. If None, the optimizer from the sizing run is reused; if unavailable,
defaults to ``'SLSQP'``.
name : str, optional
Name of the off-design problem. Defaults to
``'{original_name}_off_design'``, with an incrementing suffix if the output directory
already exists.
fill_cargo : bool, optional
If True, cargo mass is varied to fill the aircraft to design takeoff gross weight.
Cannot be used with ``fill_fuel``. Defaults to False.
fill_fuel : bool, optional
If True, takeoff gross mass is added as a design variable. Cannot be used with
``fill_cargo``. Defaults to False.
verbosity : Verbosity or int, optional
Controls the level of terminal output for the off-design run.
Returns
-------
AviaryProblem
The completed off-design AviaryProblem after running.
Raises
------
UserWarning
If ``problem_type`` is ``SIZING``, or if both ``fill_cargo`` and ``fill_fuel`` are True.
"""
# For off-design missions, provided verbosity will be used for all L2 method calls
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
# accept str for problem type
problem_type = ProblemType(problem_type)
if problem_type is ProblemType.SIZING:
raise UserWarning('Off-design missions cannot be SIZING missions.')
if fill_cargo and fill_fuel:
raise UserWarning(
'Cannot run an off-design mission with both "fill_cargo" and "fill_fuel" flags '
'active.'
)
if name is None:
# increment the name if the output directory already exists
# to avoid overwriting previous off-design runs
base_name = self._name + '_off_design'
for design_number in count(0):
name = base_name if design_number == 0 else f'{base_name}_{design_number}'
output_path = Path(f'{name}_out')
if not output_path.is_dir():
break
off_design_prob = AviaryProblem(name=name)
# Set up problem for mission, such as equations of motion, configurators, etc.
inputs = deepcopy(self.aviary_inputs)
design_gross_mass = self.get_val(Aircraft.Design.GROSS_MASS, units='lbm')[0]
inputs.set_val(Aircraft.Design.GROSS_MASS, design_gross_mass, units='lbm')
if problem_type is not None:
inputs.set_val(Settings.PROBLEM_TYPE, problem_type)
if equations_of_motion is not None:
inputs.set_val(Settings.EQUATIONS_OF_MOTION, equations_of_motion)
if problem_configurator is not None:
off_design_prob.model.configurator = problem_configurator
if phase_info is None:
# model phase_info only contains mission information, recreate the whole thing here
phase_info = self.model.mission_info.copy()
phase_info['pre_mission'] = self.model.pre_mission_info.copy()
phase_info['post_mission'] = self.model.post_mission_info.copy()
# update passenger count and cargo masses
mass_method = inputs.get_val(Settings.MASS_METHOD)
# Sanity check passenger counts - cover specific edge case that preprocessors won't catch
# Since we inherit mission pax count from sizing mission, we need to overwrite it
if (
mass_method is FLOPS
and num_pax is None
and not any((num_economy, num_business, num_first_class))
):
num_pax = sum(filter(None, [num_economy, num_business, num_first_class]))
# only FLOPS cares about seat class or specific cargo categories
if mass_method == LegacyCode.FLOPS:
if num_first_class is not None:
inputs.set_val(Aircraft.CrewPayload.NUM_FIRST_CLASS, num_first_class)
if num_business is not None:
inputs.set_val(Aircraft.CrewPayload.NUM_BUSINESS_CLASS, num_business)
if num_economy is not None:
inputs.set_val(Aircraft.CrewPayload.NUM_ECONOMY_CLASS, num_economy)
if wing_cargo is not None:
inputs.set_val(Aircraft.CrewPayload.WING_CARGO, wing_cargo, 'lbm')
if misc_cargo is not None:
inputs.set_val(Aircraft.CrewPayload.MISC_CARGO, misc_cargo, 'lbm')
else:
warnings.warn(
'Off-design functionality is in beta for GASP-mass based aircraft. Please manually '
'verify your results.'
)
if num_pax is not None:
inputs.set_val(Aircraft.CrewPayload.NUM_PASSENGERS, num_pax)
if cargo_mass is not None:
inputs.set_val(Aircraft.CrewPayload.CARGO_MASS, cargo_mass, 'lbm')
# NOTE once load_inputs is run, phase info details are stored in prob.model.configurator,
# meaning any phase_info changes that happen after load inputs is ignored
if problem_type is ProblemType.OFF_DESIGN_MIN_FUEL:
# Set mission range, aviary will calculate required fuel
if mission_range is None:
if verbosity >= Verbosity.VERBOSE:
warnings.warn(
'OFF_DESIGN_MIN_FUEL problem type requested with no specified range. Using design '
'mission range for the off-design mission.'
)
mission_range = self.get_val(Mission.RANGE, units='NM')[0]
phase_info['post_mission']['target_range'] = (
mission_range,
'nmi',
)
# reset the AviaryProblem to run the new mission
off_design_prob.load_inputs(inputs, phase_info, verbosity=verbosity)
# Update inputs that are specific to problem type
# Some OFF_DESIGN_MIN_FUEL problem changes had to happen before load_inputs, all OFF_DESIGN_MAX_RANGE problem
# changes must come after load_inputs
if problem_type is ProblemType.OFF_DESIGN_MIN_FUEL:
off_design_prob.aviary_inputs.set_val(Mission.RANGE, mission_range, units='NM')
# set initial guess for Mission.GROSS_MASS to help optimizer with new design
# variable bounds.
if mission_gross_mass is None:
mission_gross_mass = off_design_prob.aviary_inputs.get_val(
Aircraft.Design.GROSS_MASS, 'lbm'
)
off_design_prob.aviary_inputs.set_val(
Mission.GROSS_MASS, mission_gross_mass * 0.9, units='lbm'
)
elif problem_type is ProblemType.OFF_DESIGN_MAX_RANGE:
# Set mission fuel and calculate gross weight, aviary will calculate range
if mission_gross_mass is None:
if verbosity >= Verbosity.VERBOSE:
warnings.warn(
'OFF_DESIGN_MAX_RANGE problem type requested with no specified gross mass. Using design '
'takeoff gross mass for the off-design mission.'
)
mission_gross_mass = self.get_val(Aircraft.Design.GROSS_MASS, units='lbm')[0]
off_design_prob.aviary_inputs.set_val(
Mission.GROSS_MASS, mission_gross_mass, units='lbm'
)
off_design_prob.check_and_preprocess_inputs(verbosity=verbosity)
# off_design_prob.add_pre_mission_systems(verbosity=verbosity)
# off_design_prob.add_phases(verbosity=verbosity)
# off_design_prob.add_post_mission_systems(verbosity=verbosity)
# off_design_prob.link_phases(verbosity=verbosity)
off_design_prob.build_model(verbosity=verbosity)
if optimizer is None:
try:
optimizer = self.driver.options['optimizer']
except KeyError:
optimizer = None
try:
if optimizer == 'SNOPT':
max_iter = self.driver.opt_settings['Major iterations limit']
elif optimizer == 'IPOPT':
max_iter = self.driver.opt_settings['max_iter']
elif optimizer == 'SLSQP':
max_iter = self.driver.opt_settings['maxiter']
else:
max_iter = None
except KeyError:
max_iter = None
off_design_prob.add_driver(optimizer=optimizer, max_iter=max_iter, verbosity=verbosity)
off_design_prob.add_design_variables(verbosity=verbosity)
# Handle edge case for payload-range diagrams
# Select which cargo variable makes the most sense to float, and then set a tolerance
# based on rough guesses on what is sufficient to get the problem to converge without
# setting design variable bounds too large
if fill_cargo:
# GASP cargo mass is an input, can directly use as control variable
if mass_method is GASP:
control_var = Aircraft.CrewPayload.CARGO_MASS
val = cargo_mass
tol = 1.05
# FLOPS cargo mass is an output, not valid for control variable. Pick control var.
else:
# See if misc_cargo is being used, float that as a backup
if misc_cargo is None or misc_cargo == 0:
# We aren't using cargo_mass OR misc_mass - try wing cargo as last resort
if wing_cargo is None or wing_cargo == 0:
# We don't know enough about the aircraft to make any informed guesses. Use
# arbitrary values
control_var = Aircraft.CrewPayload.MISC_CARGO
val = self.get_val(Aircraft.Design.GROSS_MASS)
tol = 0.05
inputs.set_val(Aircraft.CrewPayload.CARGO_MASS, 0, 'lbm')
else:
control_var = Aircraft.CrewPayload.WING_CARGO
val = wing_cargo
tol = 1.1
else:
control_var = Aircraft.CrewPayload.MISC_CARGO
val = misc_cargo
tol = 1.1
off_design_prob.model.add_design_var(
control_var,
lower=0,
upper=val * tol,
ref=val,
)
if fill_fuel:
off_design_prob.model.add_design_var(
Mission.GROSS_MASS,
lower=0,
upper=off_design_prob.aviary_inputs.get_val(
Aircraft.Design.GROSS_MASS, units='lbm'
),
ref=off_design_prob.aviary_inputs.get_val(Aircraft.Design.GROSS_MASS, units='lbm'),
)
off_design_prob.add_objective(verbosity=verbosity)
off_design_prob.setup(verbosity=verbosity)
off_design_prob.set_initial_guesses(verbosity=verbosity)
off_design_prob.run_aviary_problem(verbosity=verbosity)
return off_design_prob
[docs]
def run_payload_range(self, verbosity=None):
"""
Run payload-range analysis for the sized aircraft.
For a successfully converged sizing mission, this method runs additional off-design missions
to generate payload-range diagram data points: zero fuel, design mission, max fuel + payload
range, and ferry range. Results are saved to a CSV file in the reports directory.
Parameters
----------
verbosity : Verbosity or int, optional
Controls the level of terminal output for the payload-range analysis. If None, uses the
problem-level verbosity.
Returns
-------
tuple of AviaryProblem, or empty tuple
A tuple ``(max_fuel_pyld_range_prob, ferry_range_prob)`` containing the off-design
AviaryProblems for the max fuel + payload and ferry range points. Returns an empty tuple if
the sizing run did not converge or the problem type is not supported.
Notes
-----
Currently only supported for the energy state equations of motion. Reserve fuel is not yet
accounted for in the analysis.
"""
# For off-design missions, provided verbosity will be used for all L2 method calls
if verbosity is not None:
# compatibility with being passed int for verbosity
verbosity = Verbosity(verbosity)
else:
verbosity = self.verbosity # defaults to BRIEF
if not self.result.success and verbosity > Verbosity.QUIET:
warnings.warn(
'Payload Range Diagrams cannot be generated for unconverged Aviary problems.'
)
return ()
elif self.problem_type is ProblemType.MULTI_MISSION and verbosity > Verbosity.QUIET:
warnings.warn(
'Payload Range Diagrams currently cannot be generated for aircraft designed '
'using multimission capability.'
)
return ()
# Off-design missions are not tested with 2DOF missions.
mass_method = self.model.aviary_inputs.get_val(Settings.MASS_METHOD)
equations_of_motion = self.model.aviary_inputs.get_val(Settings.EQUATIONS_OF_MOTION)
if equations_of_motion is EquationsOfMotion.ENERGY_STATE:
# make a copy of the phase_info to avoid modifying the original.
phase_info = self.model.mission_info.copy()
phase_info['pre_mission'] = self.model.pre_mission_info.copy()
phase_info['post_mission'] = self.model.post_mission_info.copy()
# This checks if the 'cruise' phase exists, then automatically extends duration bounds
# of the cruise stage to allow for the longer off design missions.
if phase_info['cruise']:
min_duration = phase_info['cruise']['user_options']['time_duration_bounds'][0][0]
max_duration = phase_info['cruise']['user_options']['time_duration_bounds'][0][1]
cruise_units = phase_info['cruise']['user_options']['time_duration_bounds'][1]
phase_info['cruise']['user_options'].update(
{'time_duration_bounds': ((min_duration, 2 * max_duration), cruise_units)}
)
# TODO Verify that previously run point is actually max payload/fuel point, and if not
# run off-design mission for that point
# Point 1 is along the y axis (range=0)
payload_1 = float(self.get_val(Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS)[0])
range_1 = 0
fuel_1 = 0
# Point 2 (Design Range): sizing mission which is assumed to be the point of max
# payload + fuel on the payload and range diagram
payload_2 = payload_1
range_2 = float(self.get_val(Mission.RANGE)[0])
gross_mass = float(self.get_val(Mission.GROSS_MASS)[0])
# NOTE this operating mass is based on the previously run mission - assumed this is the
# design mission!! Includes cargo containers needed for design (max payload)
operating_mass = float(self.get_val(Mission.OPERATING_MASS)[0])
fuel_capacity = float(self.get_val(Aircraft.Fuel.TOTAL_CAPACITY)[0])
unusable_fuel = float(self.get_val(Aircraft.Fuel.UNUSABLE_FUEL_MASS)[0])
max_payload = float(self.get_val(Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS)[0])
fuel_2 = self.get_val(Mission.FUEL)[0]
# Operating mass includes unusable fuel, don't double count
max_usable_fuel = fuel_capacity - unusable_fuel
# An aircraft may be designed with fuel tank capacity that, if fully filled, would
# exceed MTOW. In that scenario, 'Max Fuel + Payload' range and 'Ferry' range are the same, and
# the point only needs to be run once.
if operating_mass + max_usable_fuel < gross_mass:
# Point 3 (Max Fuel + Payload Range): max fuel and remaining payload capacity
# Assume proportional decrease in all cargo types (including number of passengers)
# to make room for maximum fuel. Round pax count down to avoid loading over TOGW
max_fuel_pyld_total_payload = gross_mass - operating_mass - max_usable_fuel
payload_frac = max_fuel_pyld_total_payload / max_payload
# Calculates Different payload quantities
max_fuel_pyld_wing_cargo = (
self.model.aviary_inputs.get_val(Aircraft.CrewPayload.WING_CARGO, 'lbm')
* payload_frac
)
max_fuel_pyld_misc_cargo = (
self.model.aviary_inputs.get_val(Aircraft.CrewPayload.MISC_CARGO, 'lbm')
* payload_frac
)
max_fuel_pyld_num_first = int(
self.model.aviary_inputs.get_val(Aircraft.CrewPayload.Design.NUM_FIRST_CLASS)
* payload_frac
)
max_fuel_pyld_num_bus = int(
self.model.aviary_inputs.get_val(Aircraft.CrewPayload.Design.NUM_BUSINESS_CLASS)
* payload_frac
)
max_fuel_pyld_num_economy = int(
self.model.aviary_inputs.get_val(Aircraft.CrewPayload.Design.NUM_ECONOMY_CLASS)
* payload_frac
)
# Passenger number rounding and potentially cargo container mass changing means
# we don't know if we actually filled the aircraft to exactly TOGW yet. Need to use
# "fill_cargo" flag in off-design call
max_fuel_pyld_range_prob = self.max_fuel_pyld_range_prob = (
self.run_off_design_mission(
problem_type=ProblemType.OFF_DESIGN_MAX_RANGE,
phase_info=phase_info,
num_first_class=max_fuel_pyld_num_first,
num_business=max_fuel_pyld_num_bus,
num_economy=max_fuel_pyld_num_economy,
wing_cargo=max_fuel_pyld_wing_cargo,
misc_cargo=max_fuel_pyld_misc_cargo,
name=self._name + '_max_fuel_plus_payload_range',
fill_cargo=True,
verbosity=verbosity,
)
)
# Pull the payload and range values from the OFF_DESIGN_MAX_RANGE mission
payload_3 = max_fuel_pyld_range_prob.get_val(
Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS
)[0]
range_3 = max_fuel_pyld_range_prob.get_val(Mission.RANGE)[0]
fuel_3 = max_fuel_pyld_range_prob.get_val(Mission.FUEL)[0]
prob_3_skip = False
else:
prob_3_skip = True
# only fill fuel until hit TOGW
max_usable_fuel = gross_mass - operating_mass
# Point 4 (Ferry Range): maximum fuel and 0 payload
# Total cargo mass is an input in GASP, but an output in FLOPS. Avoid overriding cargo
# mass to 0 if not using GASP
if mass_method is GASP:
ferry_cargo_mass = 0
else:
ferry_cargo_mass = None
ferry_range_gross_mass = operating_mass + max_usable_fuel
ferry_range_prob = self.ferry_range_prob = self.run_off_design_mission(
problem_type=ProblemType.OFF_DESIGN_MAX_RANGE,
phase_info=phase_info,
num_first_class=0,
num_business=0,
num_economy=0,
wing_cargo=0,
misc_cargo=0,
cargo_mass=ferry_cargo_mass,
mission_gross_mass=ferry_range_gross_mass,
name=self._name + '_ferry_range',
fill_fuel=True,
verbosity=verbosity,
)
payload_4 = ferry_range_prob.get_val(Aircraft.CrewPayload.TOTAL_PAYLOAD_MASS)[0]
range_4 = ferry_range_prob.get_val(Mission.RANGE)[0]
fuel_4 = ferry_range_prob.get_val(Mission.FUEL)[0]
# if max fuel + payload mission was skipped, max_fuel_pyld_range_prob is the same as ferry_range_prob
if prob_3_skip:
max_fuel_pyld_range_prob = ferry_range_prob
payload_3 = payload_4
range_3 = range_4
# Check if OFF_DESIGN_MAX_RANGE missions ran successfully before writing to csv file
# If both missions ran successfully, writes the payload/range data to a csv file
self.payload_range_data = payload_range_data = NamedValues()
if ferry_range_prob.result.success and max_fuel_pyld_range_prob.result.success:
payload_range_data.set_val(
'Mission Name',
['Zero Fuel', 'Design Mission', 'Max Fuel + Payload Mission', 'Ferry Mission'],
)
payload_range_data.set_val(
'Payload', [payload_1, payload_2, payload_3, payload_4], 'lbm'
)
payload_range_data.set_val('Fuel', [fuel_1, fuel_2, fuel_3, fuel_4], 'lbm')
payload_range_data.set_val('Range', [range_1, range_2, range_3, range_4], 'NM')
write_data_file(
Path(self.get_reports_dir(force=True)) / 'payload_range_data.csv',
payload_range_data,
)
# Prints the payload/range data to the console if verbosity is set to VERBOSE or DEBUG
if verbosity >= Verbosity.VERBOSE:
for item in payload_range_data:
print(f'{item[0]} ({item[1][1]}): {item[1][0]}')
return (max_fuel_pyld_range_prob, ferry_range_prob)
else:
warnings.warn(
'One or both of the OFF_DESIGN_MAX_RANGE missions did not run successfully; payload/range '
'diagram was not generated.'
)
else:
warnings.warn(
'Payload/range analysis is currently only supported for the energy equations of '
'motion.'
)
[docs]
def save_results(self, json_filename='sizing_results.json'):
"""
Save the Aviary problem results to a JSON file.
Serializes the aviary_inputs and key mission results (including gross mass) into a JSON file
that can be reloaded later using ``reload_aviary_problem`` or ``_read_sizing_json``.
Parameters
----------
json_filename : str, optional
File name (and relative path) for the output JSON file. Defaults to
``'sizing_results.json'``.
"""
aviary_input_list = []
with open(json_filename, 'w') as jsonfile:
# Loop through aviary input datastructure and create a list
for data in self.model.aviary_inputs:
(name, (value, units)) = data
type_value = type(value)
# Get the gross mass value from the sizing problem and add it to input list
if name == Mission.GROSS_MASS or name == Aircraft.Design.GROSS_MASS:
Mission_Summary_GROSS_MASS_val = self.get_val(Mission.GROSS_MASS, units=units)
Mission_Summary_GROSS_MASS_val_list = Mission_Summary_GROSS_MASS_val.tolist()
value = Mission_Summary_GROSS_MASS_val_list[0]
else:
# there are different data types we need to handle for conversion to json format
# int, bool, float doesn't need anything special
# Convert numpy arrays to lists
if type_value is np.ndarray:
value = value.tolist()
# Lists are fine except if they contain enums or Paths
if type_value is list:
if isinstance(value[0], Enum):
for i in range(len(value)):
value[i] = value[i].name
elif isinstance(value[0], Path):
for i in range(len(value)):
value[i] = str(value[i])
# Enums and Paths need converting to a string
if isinstance(value, Enum):
value = value.name
elif isinstance(value, Path):
value = str(value)
# Append the data to the list
aviary_input_list.append([name, value, units, str(type_value)])
if Aircraft.Design.GROSS_MASS not in self.model.aviary_inputs:
aviary_input_list.append(
[
Aircraft.Design.GROSS_MASS,
self.get_val(Aircraft.Design.GROSS_MASS, 'lbm')[0],
'lbm',
str(float),
]
)
# Write the list to a json file
json.dump(
aviary_input_list,
jsonfile,
sort_keys=True,
indent=4,
ensure_ascii=False,
)
jsonfile.close()
def _add_hybrid_objective(self, phase_info):
"""
Add a hybrid objective that combines final mass and final time.
Parameters
----------
phase_info : dict
Dictionary of mission phase information, keyed by phase name.
"""
phases = list(phase_info.keys())
takeoff_mass = self.model.aviary_inputs.get_val(Aircraft.Design.GROSS_MASS, units='lbm')
obj_comp = om.ExecComp(
f'obj = -final_mass / {takeoff_mass} + final_time / 5.',
final_mass={'units': 'lbm'},
final_time={'units': 'h'},
)
self.model.add_subsystem('obj_comp', obj_comp)
final_phase_name = phases[-1]
self.model.connect(
f'traj.{final_phase_name}.timeseries.mass',
'obj_comp.final_mass',
src_indices=[-1],
)
self.model.connect(
f'traj.{final_phase_name}.timeseries.time',
'obj_comp.final_time',
src_indices=[-1],
)
def _read_sizing_json(json_filename, meta_data, verbosity=Verbosity.BRIEF):
"""
Read saved sizing results from a JSON file.
Parameters
----------
json_filename : str or Path
Path to the JSON file containing saved sizing data.
meta_data : dict
Variable metadata used to validate and load file data.
verbosity : Verbosity or int, optional
Controls the level of terminal output. Defaults to ``Verbosity.BRIEF``.
Returns
-------
AviaryValues
An AviaryValues object populated with input values from the JSON file.
"""
aviary_inputs = AviaryValues()
# load saved input list from json file
with open(json_filename) as json_data_file:
loaded_aviary_input_list = json.load(json_data_file)
json_data_file.close()
# Loop over input list and assign aviary problem input values
for inputs in loaded_aviary_input_list:
[var_name, var_values, var_units, var_type] = inputs
# Initialize some flags to identify enums
is_enum = False
if var_type == "<class 'list'>":
# check if the list contains enums
for i in range(len(var_values)):
if isinstance(var_values[i], str):
if var_values[i].find('<') != -1:
# Found a list of enums: set the flag
is_enum = True
# Manipulate the string to find the value
tmp_var_values = var_values[i].split(':')[-1]
var_values[i] = (
tmp_var_values.replace('>', '')
.replace('<', '')
.replace(']', '')
.replace("'", '')
.replace(' ', '')
)
if is_enum:
var_values = convert_strings_to_data(var_values)
elif var_type.find('<enum') != -1:
# Identify enums and manipulate the string to find the value
tmp_var_values = var_values.split(':')[-1]
var_values = (
tmp_var_values.replace('>', '')
.replace('<', '')
.replace(']', '')
.replace("'", '')
.replace(' ', '')
)
var_values = convert_strings_to_data([var_values])
# Check if the variable is in meta data
if var_name in meta_data.keys():
try:
aviary_inputs.set_val(var_name, var_values, units=var_units, meta_data=meta_data)
except BaseException:
if verbosity >= Verbosity.VERBOSE:
warnings.warn(
f'Could not add item in json output to AviaryValues: input string = '
f'{inputs}, attempted to set_value({var_name}, {var_values}, {var_units}). '
'This variable was skipped.'
)
else:
# Not in the MetaData
if verbosity >= Verbosity.VERBOSE:
warnings.warn(
f'While reading json output, item was not found in MetaData: {inputs}. This '
'variable was skipped.'
)
return aviary_inputs
[docs]
def reload_aviary_problem(
filename, phase_info=None, metadata=CoreMetaData.copy(), verbosity=Verbosity.QUIET
):
"""
Loads a previously sized Aviary model and returns an AviaryProblem for that model.
Parameters
----------
filename : str, Path
User specified name and relative path of json file containing the sized aircraft data
phase_info : dict, Path
phase_info dictionary used by the original problem
metadata : dict (optional)
Custom metadata if needed to read all variables present in the json output file
verbosity : Verbosity, int (optional)
Controls level of terminal output for function call
Returns
-------
Aviary Problem object with filled aviary_inputs. To use this problem for anything other than
running off-design missions, then the full level 2 interface should be used. "load_inputs()"
can be skipped as the "aviary_inputs" attribute is prefilled here.
"""
# warning if default is used
# Initialize a new aviary problem and aviary_input data structure
prob = AviaryProblem()
filename = get_path(filename)
aviary_inputs = _read_sizing_json(filename, metadata, verbosity)
prob.load_inputs(aviary_inputs, phase_info, verbosity=verbosity)
prob.check_and_preprocess_inputs(verbosity=verbosity)
# Add Systems
prob.add_pre_mission_systems(verbosity=verbosity)
prob.add_phases(verbosity=verbosity)
prob.add_post_mission_systems(verbosity=verbosity)
# Link phases and variables
prob.link_phases(verbosity=verbosity)
prob.add_driver(verbosity=verbosity)
prob.add_design_variables(verbosity=verbosity)
# Load optimization problem formulation
# Detail which variables the optimizer can control
prob.add_objective(verbosity=verbosity)
prob.setup(verbosity=verbosity)
prob.final_setup()
# some variables are normally in the problem instead, so add them there too
prob.set_val(Mission.GROSS_MASS, aviary_inputs.get_val(Mission.GROSS_MASS, 'lbm'), 'lbm')
return prob