import datetime
import json
from pathlib import Path
import sys
import time
import pandas as pd
import numpy as np
from openmdao.utils.mpi import MPI
from openmdao.utils.reports_system import register_report
from openmdao.visualization.tables.table_builder import generate_table
from aviary.interface.utils.markdown_utils import write_markdown_variable_table
from aviary.utils.named_values import NamedValues
from aviary.utils.functions import wrapped_convert_units
[docs]
def register_custom_reports():
"""
Registers Aviary reports with OpenMDAO, so they are automatically generated and
added to the same reports folder as other default reports
"""
# TODO top-level aircraft report?
# TODO add flag to skip registering reports?
# register per-subsystem report generation
register_report(name='subsystems',
func=subsystem_report,
desc='Generates reports for each subsystem builder in the '
'Aviary Problem',
class_name='AviaryProblem',
method='run_driver',
pre_or_post='post',
# **kwargs
)
register_report(name='mission',
func=mission_report,
desc='Generates report for mission results from Aviary problem',
class_name='AviaryProblem',
method='run_driver',
pre_or_post='post')
register_report(name='timeseries_csv',
func=timeseries_csv,
desc='Generates an output .csv file for variables in the timeseries of the trajectory',
class_name='AviaryProblem',
method='run_driver',
pre_or_post='post')
register_report(name='run_status',
func=run_status,
desc='Generates a report on the status of the run',
class_name='AviaryProblem',
method='run_driver',
pre_or_post='post',
)
[docs]
def run_status(prob):
"""
Creates a JSON file that containts high level overview of the run
Parameters
----------
prob : AviaryProblem
The AviaryProblem used to generate this report
"""
if MPI and MPI.COMM_WORLD.rank != 0:
return
reports_folder = Path(prob.get_reports_dir())
report_file = reports_folder / 'status.json'
runtime = prob.driver.result.runtime
runtime_ms = (runtime * 1000.0) % 1000.0
runtime_formatted = \
f"{time.strftime('%H hours %M minutes %S seconds', time.gmtime(runtime))} " \
f"{runtime_ms:.1f} milliseconds"
t = datetime.datetime.now()
time_stamp = t.strftime("%Y-%m-%d %H:%M:%S %Z")
status = {}
status['Problem'] = prob._name
status['Script'] = sys.argv[0]
status['Optimizer'] = prob.driver._get_name()
status['Number of driver iterations'] = prob.driver.result.iter_count
status['Number of model evals'] = prob.driver.result.model_evals
status['Number of deriv evals'] = prob.driver.result.deriv_evals
status['Wall clock run time'] = runtime_formatted
status['Exit status'] = prob.driver.result.exit_status
status['Report generation date and time'] = time_stamp
with open(report_file, 'w') as f:
json.dump(status, f, indent=1, ensure_ascii=False)
print(file=f) # avoid 'no newline at end of file' message
[docs]
def subsystem_report(prob, **kwargs):
"""
Loops through all subsystem builders in the AviaryProblem calls their write_report
method. All generated report files are placed in the "reports/subsystem_reports" folder
Parameters
----------
prob : AviaryProblem
The AviaryProblem used to generate this report
"""
reports_folder = Path(prob.get_reports_dir() / 'subsystems')
reports_folder.mkdir(exist_ok=True)
# TODO external subsystems??
core_subsystems = prob.core_subsystems
for subsystem in core_subsystems.values():
subsystem.report(prob, reports_folder, **kwargs)
[docs]
def mission_report(prob, **kwargs):
"""
Creates a basic mission summary report that is placed in the "reports" folder
Parameters
----------
prob : AviaryProblem
The AviaryProblem used to generate this report
"""
def _get_phase_value(traj, phase, var_name, units, indices=None):
try:
vals = prob.get_val(f"{traj}.{phase}.timeseries.{var_name}",
units=units,
indices=indices,
get_remote=True)
except KeyError:
try:
vals = prob.get_val(f"{traj}.{phase}.{var_name}",
units=units,
indices=indices,
get_remote=True)
# 2DOF breguet range cruise uses time integration to track mass
except TypeError:
vals = prob.get_val(f"{traj}.{phase}.timeseries.time",
units=units,
indices=indices,
get_remote=True)
except KeyError:
vals = None
return vals
def _get_phase_diff(traj, phase, var_name, units, indices=[0, -1]):
vals = _get_phase_value(traj, phase, var_name, units, indices)
if vals is not None:
diff = vals[-1]-vals[0]
if isinstance(diff, np.ndarray):
diff = diff[0]
return diff
else:
return None
reports_folder = Path(prob.get_reports_dir())
report_file = reports_folder / 'mission_summary.md'
# read per-phase data from trajectory
data = {}
for idx, phase in enumerate(prob.phase_info):
# TODO for traj in trajectories, currently assuming single one named "traj"
# TODO delta mass and fuel consumption need to be tracked separately
fuel_burn = _get_phase_diff('traj', phase, 'mass', 'lbm', [-1, 0])
time = _get_phase_diff('traj', phase, 't', 'min')
range = _get_phase_diff('traj', phase, 'distance', 'nmi')
# get initial values, first in traj
if idx == 0:
initial_mass = _get_phase_value('traj', phase, 'mass', 'lbm', 0)[0]
initial_time = _get_phase_value('traj', phase, 't', 'min', 0)
initial_range = _get_phase_value('traj', phase, 'distance', 'nmi', 0)[0]
outputs = NamedValues()
# Fuel burn is negative of delta mass
outputs.set_val('Fuel Burn', fuel_burn, 'lbm')
outputs.set_val('Elapsed Time', time, 'min')
outputs.set_val('Ground Distance', range, 'nmi')
data[phase] = outputs
# get final values, last in traj
final_mass = _get_phase_value('traj', phase, 'mass', 'lbm', -1)[0]
final_time = _get_phase_value('traj', phase, 't', 'min', -1)
final_range = _get_phase_value('traj', phase, 'distance', 'nmi', -1)[0]
totals = NamedValues()
totals.set_val('Total Fuel Burn', initial_mass - final_mass, 'lbm')
totals.set_val('Total Time', final_time - initial_time, 'min')
totals.set_val('Total Ground Distance', final_range - initial_range, 'nmi')
if MPI and MPI.COMM_WORLD.rank != 0:
return
with open(report_file, mode='w') as f:
f.write('# MISSION SUMMARY')
write_markdown_variable_table(f, totals,
['Total Fuel Burn',
'Total Time',
'Total Ground Distance'],
{'Total Fuel Burn': {'units': 'lbm'},
'Total Time': {'units': 'min'},
'Total Ground Distance': {'units': 'nmi'}})
f.write('\n# MISSION SEGMENTS')
for phase in data:
f.write(f'\n## {phase}')
write_markdown_variable_table(f, data[phase], ['Fuel Burn', 'Elapsed Time', 'Ground Distance'],
{'Fuel Burn': {'units': 'lbm'},
'Elapsed Time': {'units': 'min'},
'Ground Distance': {'units': 'nmi'}})
[docs]
def timeseries_csv(prob, **kwargs):
"""
Generates a CSV file containing timeseries data for variables from an Aviary mission.
This function extracts timeseries data from the provided problem object, processes the data
to unify units across different phases of the mission, and then outputs the result to a CSV file.
The 'time' variable is moved to the beginning of the dataset so it's always the leftmost column.
Duplicate consecutive rows are eliminated.
Parameters
----------
prob : AviaryProblem
The AviaryProblem used to generate this report
kwargs : dict
Additional keyword arguments (unused)
The output CSV file is named 'mission_timeseries_data.csv' and is saved in the reports directory.
The first row of the CSV file contains headers with variable names and units.
Each subsequent row represents the mission outputs at a different time step.
"""
timeseries_outputs = prob.model.list_outputs(
includes='*timeseries*', out_stream=None, return_format='dict', units=True)
phase_names = prob.model.traj._phases.keys()
# There are no more collective calls, so we can exit.
if MPI and MPI.COMM_WORLD.rank != 0:
return
timeseries_outputs = {value['prom_name']: value for key,
value in timeseries_outputs.items()}
timeseries_outputs = {key: value for key,
value in timeseries_outputs.items() if not key.endswith('_phase')}
unique_variable_names = set([timeseries_output.split('.')[-1]
for timeseries_output in timeseries_outputs])
timeseries_data = {}
for variable_name in unique_variable_names:
timeseries_data[variable_name] = {}
first = True # flag to check if this is first iteration in for loop
units = None
for idx_phase, phase_name in enumerate(phase_names):
variable_str = f'traj.{phase_name}.timeseries.{variable_name}'
time_str = f'traj.{phase_name}.timeseries.time'
if variable_str not in timeseries_outputs:
Warning(
f'Variable {variable_str} not found in timeseries_outputs for phase {phase_name}.')
val = np.zeros_like(timeseries_outputs[time_str]['val'])
val[:] = np.nan
if first:
val_full_traj = val
first = False
else:
val_full_traj = np.vstack((val_full_traj, val))
else:
val = timeseries_outputs[variable_str]['val']
# grab the units from the first phase that uses this variable; use these units for all others
if units is None:
units = timeseries_outputs[variable_str]['units']
if first:
val_full_traj = val
first = False
else:
val_full_traj = np.vstack((val_full_traj, val))
else:
original_units = timeseries_outputs[variable_str]['units']
if original_units != units:
val = wrapped_convert_units((val, original_units), units)
if first:
val_full_traj = val
first = False
else:
val_full_traj = np.vstack((val_full_traj, val))
timeseries_data[variable_name]['val'] = val_full_traj
timeseries_data[variable_name]['units'] = units
timeseries_data[variable_name]['shape'] = val_full_traj.shape
# Create a DataFrame from timeseries_data
df_data = {variable_name: pd.Series(timeseries_data[variable_name]['val'].flatten())
for variable_name in timeseries_data}
df = pd.DataFrame(df_data)
time_column = ['time'] # Isolate the 'time' column
# Sort the rest of the columns
other_columns = sorted([col for col in df.columns if col != 'time'])
columns = time_column + other_columns # Combine them, keeping 'time' first
df = df[columns]
# Add units to column names
df.columns = [f'{col} ({timeseries_data[col]["units"]})' for col in df.columns]
df.drop_duplicates()
# The path where you want to save the CSV file
reports_folder = Path(prob.get_reports_dir())
report_file = reports_folder / 'mission_timeseries_data.csv'
# Write the DataFrame to a CSV file
df.to_csv(report_file, index=False)