importgetpassimportnumpyasnpimportreimportwarningsfromdatetimeimportdatetimefrompathlibimportPathfromopenmdao.utils.unitsimportvalid_units,is_compatiblefromaviary.utils.named_valuesimportget_items,get_keysfromaviary.utils.functionsimportget_pathfromaviary.utils.named_valuesimportNamedValues# multiple type annotation uses "typeA | typeB" syntax, but requires Python 3.10+
[docs]defread_data_file(filename:(str,Path),metadata=None,aliases=None,save_comments=False):""" Read data file in Aviary format, which is data delimited by commas with any amount of whitespace allowed between data entries. Spaces are not allowed in openMDAO variables, so any spaces in header entries are replaced with underscores. Parameters ---------- filename : (str, Path) filename or filepath of data file to be read metadata : dict, optional metadata to check validity of variable names provided in data file. Columns with variable names that can't be found in metadata will be skipped. If not provided, all validly formatted columns are always read. aliases : dict, optional optional dictionary to define a mapping of variables to allowable aliases in the data file header. Keys are variable names, to be used in openMDAO, values are a list of headers that correspond to that variable. Alias matching is not case-sensitive, and underscores and spaces are treated as equivalent. save_comments : bool, optional flag if comments in data file should be returned along with data. Defaults to False. Returns ------- data : NamedValues data read from file in NamedValues format, including variable name, units, and values (stored in a numpy array) comments : list of str any comments from file, with comment characters ('#') stripped out (only if save_comments=True) """filepath=get_path(filename)data=NamedValues()comments=[]# prep aliases for case-insensitive matching, with spaces == underscoresifaliases:forkeyinaliases:ifisinstance(aliases[key],str):aliases[key]=[aliases[key]]aliases[key]=[re.sub('\s','_',item).lower()foriteminaliases[key]]withopen(filepath,newline=None,encoding='utf-8-sig')asfile:# csv.reader() and other avaliable packages that can read csv files are not used# Manual control of file reading ensures that comments are kept intact and other# checks can be performedcheck_for_header=Trueforline_count,line_datainenumerate(file):# if comments are present in line, strip them outif'#'inline_data:index=line_data.index('#')comments.append(line_data[index+1:].strip())line_data=line_data[:index]# split by delimiters, remove whitespace and newline charactersline_data=re.split(r'[;,]\s*',line_data.strip())# ignore empty linesifnotline_dataorline_data==['']:continue# try to convert line_data to float, skip any blank stringstry:line_data=[float(var)forvarinline_dataifvar!='']# data contains things other than floatsexcept(ValueError):# skip checking for header data if not requiredifcheck_for_header:# dictionary of header name: unitsheader={}# list of which column goes with each valid header entryvalid_indices=[]forindexinrange(len(line_data)):item=re.split('[(]',line_data[index])item=[item[i].strip(') ')foriinrange(len(item))]# openMDAO vars can't have spaces, convert to underscoresname=re.sub('\s','_',item[0])ifaliases:# "reverse" lookup name in alias dictforkeyinaliases:ifname.lower()inaliases[key]:name=keybreak# 'default' default_unitsdefault_units='unitless'# if metadata is provided, ensure variable exists and update# default_unitsifmetadataisnotNone:ifnamenotinmetadata.keys():warnings.warn(f'Header <{name}> was not recognized, and ''will be skipped')continueelse:default_units=metadata[name]['units']# if units are provided, check that they are validiflen(item)>1:units=item[-1]ifvalid_units(item[1]):# check that units are compatible with expected unitsifmetadataisnotNone:ifnotis_compatible(units,default_units):# Raising error here, as trying to use default# units could mean accidental conversion which# would significantly impact analysisraiseValueError(f'Provided units of <{units}> 'f'for column <{name}>, which ''are not compatible with default 'f'units of {default_units}')else:# Units were not recognized. Raise errorraiseValueError(f'Invalid units <{units}> provided for 'f'column <{name}> while reading 'f'<{filepath}>.')else:ifmetadataisnotNoneanddefault_units!='unitless':# units were not provided, but variable should have them# assume default units for that variablewarning=f'Units were not provided for column <{name}> '\
f'while reading <{filepath}>. Using default '\
f'units of {default_units}.'warnings.warn(warning)units=default_unitsheader[name]=unitsvalid_indices.append(index)iflen(header)>0:check_for_header=Falseraw_data={key:[]forkeyinheader.keys()}continue# only raise error if not checking for header, or invalid header foundraiseValueError(f'Non-numerical value found in data file <{filepath}> on line 'f'{str(line_count)}')# This point is reached when the first valid numerical entry in data file# is found. Stop looking for header data from now oncheck_for_header=False# pull out data for each valid header, ignore other columnsforidx,variableinenumerate(header.keys()):# valid_indices matches dictionary order, pull data from correct columnraw_data[variable].append(line_data[valid_indices[idx]])# store data in NamedValues objectforvariableinheader.keys():data.set_val(variable,val=np.array(raw_data[variable]),units=header[variable])ifsave_comments:returndata,commentselse:returndata
# multiple type annotation uses "typeA | typeB" syntax, but requires Python 3.10+
[docs]defwrite_data_file(filename:(str,Path)=None,data:NamedValues=None,comments:(str,list)=[],include_timestamp:bool=False):""" Write data to a comma-separated values (csv) format file using the Aviary data table format. Parameters ---------- filename : (str, Path) filename or filepath for data file to be written data : NamedValues NamedValues object containing data that will be written to file, which includes variable name, units, and values comments : (str, list of str), optional optional comments that will be included in the top of the output file, before data begins include_timestamp : bool, optional optional flag to set if timestamp and user should be include in file comments """ifisinstance(filename,str):filepath=Path(filename)else:filepath=filenameifdataisNone:raiseUserWarning(f'No data provided to write to {filepath.name}')iftype(comments)isstr:comments=[comments]# strip '#' from comments - np.savetxt() will automatically add themforidx,lineinenumerate(comments):iflen(line)>0:ifline[0]!='#':comments[idx]='# '+line.strip()# if there are comments, add some spacing afterwards - otherwise it should be emptyifcomments:comments.append('\n')ifinclude_timestamp:timestamp=datetime.now().strftime('%m/%d/%y at %H:%M')try:user=' by '+getpass.getuser()exceptException:user=''stamp=[f'# created {timestamp}{user}\n']comments=stamp+comments# assemble separate variable name and units information into single list for headerheader=[]data_dict={}forvar,val_and_unitsinget_items(data):units=val_and_units[1]formatted_units=''# only explicitly include units if there are anyifunitsisnotNoneandunits!='unitless':formatted_units=' ('+units+')'header.append(var+formatted_units)data_dict[var]=np.array([str(i)foriindata.get_val(var,units)])# set column widths, for more human-readable formatcol_format=[]fori,keyinenumerate(get_keys(data)):header_len=len(header[i])data_len=len(max(data_dict[key],key=len))# min column width is 10 - spaced out columns are visually easier to follow# don't pad first columnifi>0:min_width=10else:min_width=0col_len=max(header_len,data_len,min_width)# if headers are smaller than column, pad with leading whitespaceifheader_len<col_len:header[i]=' '*(col_len-header_len)+header[i]# special string to define column formatting with specific widthformat=f'%{col_len}s'# don't include commas for last columnifi<len(header)-1:format=format+', 'col_format.append(format)# convert engine_data from dict to array so it can be written using savetxtformatted_data=np.array([data_dict[key]forkeyindata_dict]).transpose()# write to output file w/ header and commentsnp.savetxt(filepath,formatted_data,fmt=''.join(col_format),delimiter=',',header=', '.join(header),comments='\n'.join(comments))