Source code for lib.liborganize

"""
##############################################################################################

  Project   : CRESCENDO/AEROCOM
  Filename  : liborganize.py
  Author    : Ramiro Checa-Garcia
  email     : rcheca@lsce.ipsl.fr
  Purpose   : Reorganize variables for LMDzINCAOR experiment runs.

  Revision History ----------------------------------------------------------

  Date       Author     Ref    Revision

  2018-Apr   R.Checa           First code
  2018-Jul   R.Checa           Implemented as module
  2018-Nov   R.Checa           Final first version.


  TODO LIST:

  1. Diagnostics on lev_var not in hard-code         (Improvement)
  2. Add reference values for key diagnostics        (Improvement)
  3. Add description to each function                (Documentation)
  4. Factors in new_var could be variables in netcdf (Improvement)

##############################################################################################
"""

import xarray as xr
import numpy  as np
import glob
from  netCDF4  import Dataset
import os
import json
from pprint import pprint
import shutil
import yaml
import pandas as pd
import lib.check as check
import gc

[docs]def memory_usage_psutil(): # return the memory usage in MB import psutil return psutil.virtual_memory()
[docs]def myprint(mystr, finfo=None): print(mystr) if finfo!=None: finfo.write(mystr+'\n') return
[docs]def post_processing(mysettings, directory, list_variables, lyears, post_t, study, expID, freq, finfo=None): gc.collect() ''' This function is doing the post-processing of the extracted files Note that it is not the best function from the programming point of view as the results depends strongly on the files found in the directory. - It should be important to add something that test if the files on the directories are the files expected from the settings file ''' f_checkings= open('etc/check_cmip6_variables.yaml', 'r') cmip6check = yaml.load(f_checkings) # We define here the columns of the info-checks file ------------ dfcolumns=['variable','method','value', 'units', 'testing-file'] vars_checks = [] l_non_processed = [] l_yes_processed = [] l_uni_processed = [] l_yes_checked = [] for idirec, direc in enumerate(list_variables): str1=('\n ... Post-Processing of files related with '+direc) myprint(str1, finfo=finfo) lfiles = sorted(glob.glob(directory+direc+'/'+direc+'*.nc')) print(lfiles) ## we select only those files that begins with the variable name, as they were ## generated on the pre-processing step. ## We also ensure that we are not including aggregations of several files ## and we remove those files with the '_accum' string on the name. for indx, item in enumerate(lfiles): if '-accum' in item or '_accum' in item: del lfiles[indx] for item in lfiles: if len(item)!=len(lfiles[0]): str1=('\n -- Problem not all filenames have the same length: unusual') myprint(str1, finfo=finfo) if mysettings['safety']['check_files']==True: exit() if len(lfiles)==1: first_yr = lfiles[0].split('_')[-2] last_yr = lfiles[0].split('_')[-1].split('.')[0] stringyr = lyears[0] new_file_t= lfiles[0].replace(stringyr+'0101',first_yr) new_file = new_file_t.replace('_'+stringyr+'1231','-'+last_yr+'-accum') varid = new_file.split('/')[-1].split('_')[0] #print('check this', new_file,varid) if len(lfiles)>1: first_yr = lfiles[1].split('_')[-2] last_yr = lfiles[-1].split('_')[-1].split('.')[0] str_files = ' '.join(lfiles[1::]) stringyr = lyears[0] new_file_t= lfiles[0].replace(stringyr+'0101',first_yr) new_file = new_file_t.replace('_'+stringyr+'1231','-'+last_yr+'-accum') #print(new_file) varid = new_file.split('/')[-1].split('_')[0] # Concatenation of all files ------------------------------------------------- if post_t=='all': commandnco ='ncrcat -O '+str_files +' '+ new_file os.system(commandnco) varid = new_file.split('/')[-1].split('_')[0] if varid!=direc: varid=direc good_name, info_var = create_filename(mysettings, study, expID, varid, new_file, freq=freq, finfo=finfo) if good_name!=False: save_name = directory+direc+'/'+good_name unit_stat, finalname = add_global_attributes(mysettings, new_file, save_name, study, info_var) l_yes_processed.append(direc) l_uni_processed.append(unit_stat) if varid in cmip6check.keys(): l_yes_checked.append(varid) for mymethod in cmip6check[varid]['method'].keys(): if mymethod=='tendency_mass': a1, a2, a3 = check.tendency_mass(finalname, varid, 'etc/area_grid.nc', 'area', finfo=finfo) vars_checks.append({'variable': a1, 'method': mymethod, 'value': a3, 'units':a2, 'testing-file':finalname.split('/')[-1]}) if mymethod=='global_mean': a1, a2, a3, a4= check.global_mean(finalname, varid, cmip6check[varid], finfo=finfo) vars_checks.append({'variable': a1, 'method': mymethod, 'value': a3, 'units':a2, 'testing-file':finalname.split('/')[-1]}) if mymethod=='global_mean_atlev': for level in cmip6check[varid]['method'][mymethod]['levels']: a1, a2, a3 , a4= check.global_mean_atlev(finalname, varid, cmip6check[varid], level=level, finfo=finfo) vars_checks.append({'variable': a1, 'method': 'glo mean at'+a4, 'value': a3, 'units':a2, 'testing-file':finalname.split('/')[-1]}) if mymethod=='total_load': a1, a2, a3 = check.total_load(finalname, varid, cmip6check[varid], area_fname='data/area_grid.nc', area_varid='area', finfo=finfo) vars_checks.append({'variable': a1, 'method': mymethod, 'value': a3, 'units':a2, 'testing-file':finalname.split('/')[-1]}) else: l_non_processed.append(direc) if post_t=='year' or 'AEROCOM' in expID: #print(expID, '++++++++++++++++++++++++++++++++++++++++') for fname in lfiles: good_name, info_var = create_filename(mysettings, study, expID, varid, fname.replace('1_20','1-20'), freq=freq, finfo=finfo) if good_name!=False: save_name = directory+direc+'/'+good_name unit_stat, finalname = add_global_attributes(mysettings, fname, save_name, study, info_var) l_yes_processed.append(direc) l_uni_processed.append(unit_stat) if varid in cmip6check.keys(): l_yes_checked.append(varid) for mymethod in cmip6check[varid]['method'].keys(): if mymethod=='tendency_mass': a1, a2, a3 = check.tendency_mass(finalname, varid, 'etc/area_grid.nc', 'area', finfo=finfo) vars_checks.append({'variable': a1, 'method': mymethod, 'value': a3, 'units':a2, 'testing-file':finalname.split('/')[-1]}) if mymethod=='global_mean': a1, a2, a3, a4= check.global_mean(finalname, varid, cmip6check[varid], finfo=finfo) vars_checks.append({'variable': a1, 'method': mymethod, 'value': a3, 'units':a2, 'testing-file':finalname.split('/')[-1]}) if mymethod=='global_mean_atlev': for level in cmip6check[varid]['method'][mymethod]['levels']: a1, a2, a3 , a4= check.global_mean_atlev(finalname, varid, cmip6check[varid], level=level, finfo=finfo) vars_checks.append({'variable': a1, 'method': 'glo mean at'+a4, 'value': a3, 'units':a2, 'testing-file':finalname.split('/')[-1]}) else: l_non_processed.append(direc) non_cmip6_units = [y for i,y in enumerate(l_yes_processed) if l_uni_processed[i]==False] yes_cmip6_units = [y for i,y in enumerate(l_yes_processed) if l_uni_processed[i]==True] str1=('\n\n Processed variables ...... ['+str(len(l_yes_processed))+'] '+ str(l_yes_processed)+ '\n') str2=(' without CMIP6 units .... '+ str([y for i,y in enumerate(l_yes_processed) if l_uni_processed[i]==False])+ '\n') str3=(' with CMIP6 units .... '+ str([y for i,y in enumerate(l_yes_processed) if l_uni_processed[i]==True])+'\n') with open('vars_cf_cmip6.txt', 'w') as fcmip6: for y in yes_cmip6_units: fcmip6.write(y+'\n') str4=(' Non processed variables ... ['+str(len(l_non_processed))+'] '+ str(l_non_processed)+'\n') myprint(str1+str2+str3+str4, finfo=finfo) myprint('\n Checked variables ... ['+str(len(l_yes_checked))+'] '+ str(l_yes_checked)+ '\n', finfo=finfo) l_non_checked = set(l_yes_processed)-set(l_yes_checked) myprint( ' ... processed non checked ... ['+str(len(l_non_checked))+'] '+ str(l_non_checked)+ '\n', finfo=finfo) pd.set_option('display.width', 140) if freq=='hr': previ_process_file = mysettings[study]['process']['hrs'] else: previ_process_file = mysettings[study]['process'][freq] cmip6_process_file = previ_process_file.replace('.process', '_cmip6.process') noncmip6_process_file = previ_process_file.replace('.process', '_noncmip6.process') df = pd.read_table(previ_process_file, skiprows=0, header=0, delim_whitespace=True) cmip6_df = df[df['varname-out'].isin(yes_cmip6_units)] cmip6_df.to_csv('tempf', sep=' ', index=False) os.system(('column -t tempf > '+ cmip6_process_file)) non_df = df[df['varname-out'].isin(non_cmip6_units+l_non_processed)] non_df.to_csv('tempf', sep=' ', index=False) os.system(('column -t tempf > '+ noncmip6_process_file)) for varid in l_non_checked: vars_checks.append({'variable': varid, 'method': 'not-set-up', 'value': -9.99, 'units':'', 'testing-file':''}) test_info = pd.DataFrame(vars_checks, columns=dfcolumns ) os.remove('tempf') return test_info
[docs]def find_tableID(varID, freq='mon', origin='ATM'): table_out=False ltable=[] varIDwith = "'"+varID+"'" cmip6_tables = 'etc/cmip6-cmor-tables-master/Tables/' for fname in os.listdir(cmip6_tables): # change directory as needed #print(fname) if os.path.isfile(cmip6_tables+fname): # it's a file, not a directory entry with open(cmip6_tables+fname) as f: # open file for line in f: # process line by line if varIDwith in line: # search for string table_name = cmip6_tables+fname ltable.append(table_name) break varIDwith = '"'+varID+'"' for fname in os.listdir(cmip6_tables): # change directory as needed #print(fname) if os.path.isfile(cmip6_tables+fname): # it's a file, not a directory entry with open(cmip6_tables+fname) as f: # open file for line in f: # process line by line if varIDwith in line: # search for string table_name = cmip6_tables+fname ltable.append(table_name) break if len(ltable)==1: table_out = ltable[0] else: l2table=[] for table_name in ltable: if freq in table_name: l2table.append(table_name) if len(l2table)==1: table_out=l2table[0] else: for table_name in l2table: if 'CMIP6_AER' in table_name: table_out = table_name if table_out==False: for table_name in l2table: if 'CMIP6_A' in table_name: table_out = table_name return table_out
[docs]def create_filename(mysettings, study, expID, varID, varfile, freq='mon', finfo=''): """ This function creates the new filename of the netcdf with the diagnostics as derived from the current settings. In the case of CMIP6 netcdfs the filename will have the structure: **varID_tableID_sourceID_experimentID_memberID_gridlevel_timerange** * varID -------> is the variable name: varname * tableID ------> depends on variable AERday, Amon etc... * sourceID -----> IPSL-LMDZORINCAv6 * experimentID -> CRESCWP3PD-amip * memberID -----> v5-r1i1p1f1 (v5 refers to our internal version) * gridlabel ----> gr (refers to the original grid but we provide at preslev) * timerange ----> 20000101-20121231 for all cases. :param mysettings: :param study: :param expID: :param varID: is the variable name: varname :param varfile: :param freq: :param finfo: :return: - save_name - info_var """ table_name = find_tableID(varID, freq=freq) if table_name == False: print(' ---> variable: ',varID, ' not found in CMIP6-cmor-tables\n') return False, False tableID = table_name.split('/')[-1].split('.')[0].split('CMIP6')[1][1::] sourceID = mysettings[study]['expCVs']['sourceID'] experimentID = mysettings[study]['expID'] memberID = mysettings[study]['expCVs']['memberID'] gridlabel = mysettings[study]['expCVs']['gridlabel'] timerange = varfile.split('_')[-1] if mysettings[study]['expKIND'].upper() in ['CMIP6','CRESCENDO','JASPERKOK']: # for CMIP6 or CRESCENDO save_name = '_'.join([varID ,tableID, sourceID, experimentID, memberID, gridlabel, timerange ]) if 'aerocom' in mysettings[study]['expKIND'].lower(): # for aerocom-phaseIII # aerocom3_<ModelName>_<ExperimentName>_<VariableName>_<VerticalCoordinateType> # _<Period>_<Frequency>.nc # expID is AP3-CTRL2016-PD or AP3-CTRL2016-PI or AP3-remotesensing etc.. save_name = ('aerocom3_' + sourceID + '_' + mysettings[study]['expID'] + '_' + varID + '_' + timerange.replace('.nc','') + '_' + freq + '.nc') with open(table_name, 'r') as cmip6tb: tbdata = json.load(cmip6tb) info_var = tbdata['variable_entry'][varID] myprint(' ... final filename will be :'+ save_name, finfo=finfo) return save_name, info_var
[docs]def add_global_attributes(mysettings, ncname, newname, study, info_var, finfo=None): """ :param mysettings: :param ncname: :param newname: :param study: :param info_var: :param finfo: :return: """ dataset = xr.open_dataset(ncname) #, autoclose=True) autoclose not implemented in IRENE old xarray version # Here we opened a dataset dataset.attrs['activity_id'] = mysettings[study]['expCVs']['activityID'] dataset.attrs['institution_id']='IPSL-LSCE' dataset.attrs['source_id'] ='IPSL-LMDZORINCAv6' dataset.attrs['source_type'] ='AGCM-AER' dataset.attrs['realm'] ='aerosols' dataset.attrs['experiment_id'] = mysettings[study]['expID'] dataset.attrs['packagedby'] ='pyIPSLpack (contact: rcheca@lsce.ipsl.fr)' dataset.attrs['contact'] = mysettings[study]['expCVs']['contactID'] dataset.attrs['spinning-up'] = mysettings[study]['expCVs']['spin-up'] for field in ['long_name', 'standard_name', 'modeling_realm', 'frequency', 'dimensions', 'comment']: dataset[info_var['out_name']].attrs[field]=info_var[field] if dataset[info_var['out_name']].attrs['units']!=info_var['units']: # Here it would be possible to create a file with units equivalences... lipsl = ['-', '-', 'kg/kg', 'kg/m2' , 'kg/m2/s', 'kg/(s*m2)' , 'kg/m3' , '-', 'm^-1', 'W/m2', 'kg/kg', 'm/s'] l_cf = ['%', 'mol mol-1', '1' , 'kg m-2', 'kg m-2 s-1', 'kg m-2 s-1', 'kg m-3', '1', 'm-1' , 'W m-2','kg kg-1', 'm s-1'] for valipsl, valcf in zip(lipsl, l_cf): if dataset[info_var['out_name']].attrs['units']==valipsl and info_var['units']==valcf: dataset[info_var['out_name']].attrs['units']=info_var['units'] if dataset[info_var['out_name']].attrs['units']!=info_var['units']: str1=(' non CMIP6 units '+ dataset[info_var['out_name']].attrs['units']+','+info_var['units']) str2=(' keeping non CMIP6') myprint(str1+'\n'+str2+'\n', finfo=finfo) unit_status=False else: str1=(' ... Units consistent with CF') myprint(str1, finfo=finfo) unit_status=True ltest = ['times_bnds', 'time_bnds', 'time_instant_bounds'] for checkbnd in ltest: if 'bounds' in dataset['time'].attrs.keys(): if dataset['time'].attrs['bounds']==checkbnd: if checkbnd not in dataset.data_vars.keys(): del dataset['time'].attrs['bounds'] #print('removed?', dataset['time'].attrs) if 'AEROCOM' in newname.upper(): if 'lat' in dataset.coords and 'lon' in dataset.coords and 'pres' in dataset.coords: finalname = newname.replace(info_var['out_name']+'_', info_var['out_name']+'_3D_') elif 'lat' in dataset.coords and 'lon' in dataset.coords and 'pres' in dataset.coords: finalname = newname.replace(info_var['out_name']+'_', info_var['out_name']+'_3D_') elif 'lat' in dataset.coords and 'lon' in dataset.coords: finalname = newname.replace(info_var['out_name']+'_', info_var['out_name']+'_2D_') else: finalname=newname else: finalname=newname #print('going to save', info_var['out_name'] ) #if '3d' not in info_var['out_name']: if os.path.isfile(finalname): print('File ',finalname, ' is there') print(os.access(finalname, os.W_OK)) if 'airmass' in finalname or 'ec550' in finalname: print('') else: dataset.to_netcdf(finalname, mode='w') dataset.close() #else: # shutil.copy2(ncname,finalname) return unit_status, finalname
[docs]def directory_structure(study, mysettings, clean=False, create=False, finfo=None): """ :param study: :param mysettings: :param clean: :param create: :param finfo: :return: """ import shutil from os.path import join as pjoin cwd = mysettings['outdir']['path'] subf = mysettings['outdir']['subfolder'] filename1 = pjoin(cwd, subf, study, 'monthly/') filename2 = pjoin(cwd, subf, study, 'daily/') filename3 = pjoin(cwd, subf, study, 'hourly/') if clean==True: for filename in [filename1, filename2, filename3]: myprint(' Cleaning of: '+ filename, finfo=finfo) try: listf = os.listdir(filename) if mysettings['safety']['overwrite']==False and len(listf)>0: myprint(' Safety settings stop the program: out-tree is not empty: \n\n', finfo=finfo) myprint(str(listf), finfo=finfo) myprint('-------\n'*2, finfo=finfo) else: shutil.rmtree(filename) except OSError: myprint(' ... ask to clean an empty directory ...', finfo=finfo) pass if create==True: for filename in [filename1, filename2, filename3]: try: os.makedirs(filename) except OSError: pass return filename1, filename2, filename3
def _get_var_checkdir(outname, freqsource, dirMO, dirDA, dirHF): """ :param outname: :param freqsource: :param dirMO: :param dirDA: :param dirHF: :return: """ if freqsource=='DA': try: os.stat(dirDA+outname) except: os.mkdir(dirDA+outname) if freqsource=='MO': try: os.stat(dirMO+outname) except: os.mkdir(dirMO+outname) if freqsource=='HF': try: os.stat(dirHF+outname) except: os.mkdir(dirHF+outname) return True def _get_var_checkfiles(outname, yr, study, freqsource, module, timeout, exp, extra, storepath, dirMO, dirDA, dirHF): """ :param outname: :param yr: :param study: :param freqsource: :param module: :param timeout: :param exp: :param extra: :param storepath: :param dirMO: :param dirDA: :param dirHF: :return: """ #dic_struc = { 'ATM':{'MO':('_1M_histmth.nc','_mon'), 'DA':('_1M_histmth.nc','_mon'), # 'HF':('_HF_histh1f.nc','_1hr')} f_tail = '_IPSL_'+study+'_'+yr+'0101_'+yr+'1231'+'.nc' bpath = storepath+exp if module=='ATM': basedir=bpath+'/ATM/Output/'+freqsource+'/'+exp+'_'+yr+'0101_'+yr+'1231' if freqsource=='MO': input_f=basedir +'_1M_histmth.nc' outfile = dirMO+outname+'/'+outname+'_mon'+f_tail if freqsource=='DA': input_f=basedir+'_1D_histday.nc' outfile = dirDA+outname+'/'+outname+'_day'+f_tail if freqsource=='HF': if timeout=='1h': input_f = basedir+'_HF_histh1f.nc' outfile = dirHF+outname+'/'+outname+'_1hr'+f_tail if timeout=='6h': input_f = basedir+'_HF_histh6f.nc' outfile = dirHF+outname+'/'+outname+'_6hr'+f_tail if timeout=='3h': input_f = basedir+'_3H_remotesens.nc' outfile = dirHF+outname+'/'+outname+'_3hr'+f_tail if module=='CHM': if freqsource=='MO': input_f=bpath+'/CHM/Output/MO/'+exp+'_'+yr+'0101_'+yr+'1231'+'_1M_inca_'+extra outfile = dirMO+outname+'/'+outname+'_mon'+f_tail if freqsource=='DA': input_f=bpath+'/CHM/Output/DA/'+exp+'_'+yr+'0101_'+yr+'1231'+'_1D_inca_'+extra outfile = dirDA+outname+'/'+outname+'_day'+f_tail if freqsource=='HF': input_f=bpath+'/CHM/Output/DA/'+exp+'_'+yr+'0101_'+yr+'1231'+'_1D_inca_'+extra if timeout=='1h': outfile = dirHF+outname+'/'+outname+'_1h'+f_tail if timeout=='6h': outfile = dirHF+outname+'/'+outname+'_6h'+f_tail if module=='SRF': input_f=bpath+'/SRF/Output/MO/'+exp+'_'+yr+'0101_'+yr+'1231'+'_1M_sechiba_history.nc' if freqsource=='MO': outfile = dirMO+outname+'/'+outname+'_mon'+f_tail return input_f, outfile
[docs]def solve_time(outfile, outfilen, varname='', out=''): """ :param outfile: :param outfilen: :param varname: :param out: :return: """ commandnco = ('ncrename -O -d time_counter,time -v time_counter,time ' + outfile + ' ' + outfilen + out) os.system(commandnco) commandnco = 'ncrename -O -v .time_counter_bnds,time_bnds '+outfile+' '+outfilen+out os.system(commandnco) commandnco = 'ncrename -O -d .presnivs,pres -v .presnivs,pres '+outfile+' '+outfilen+out os.system(commandnco) commandnco = 'ncatted -O -a standard_name,pres,o,c,air_pressure '+outfile+' '+outfilen+out os.system(commandnco) commandnco = 'ncatted -O -a bounds,time,o,c,time_bnds '+outfile+' '+outfilen+out os.system(commandnco) commandnco = 'ncks -O -x -v time_centered '+outfile+' '+outfilen+out os.system(commandnco) commandnco = 'ncks -O -x -v time_instant '+outfile+' '+outfilen+out os.system(commandnco) if varname is not '': commandnco = 'ncatted -O -a coordinates,'+varname+',d,, '+outfile+' '+outfilen+out os.system(commandnco) #print(commandnco, varname) return
[docs]def get_var(mysettings, year, varname, freqsource, module, timeout, outname, expcase, study, extra='',index=1,finfo=None): """ :param mysettings: :param year: :param varname: :param freqsource: :param module: :param timeout: :param outname: :param expcase: :param study: :param extra: :param index: :param finfo: :return: """ """ - Module= ATM, CHM, etc - name=vmro3 etc - freq=DA, HF, MO for ATM, for CHM all is DA """ dirMO, dirDA, dirHF = directory_structure(study, mysettings, finfo=finfo) cwd = mysettings['outdir']['path'] subf = mysettings['outdir']['subfolder'] storepath = mysettings['storepath'] status = _get_var_checkdir(outname, freqsource, dirMO, dirDA, dirHF) input_f, outfile= _get_var_checkfiles(outname, year, study, freqsource, module, timeout, expcase, extra, storepath, dirMO, dirDA, dirHF) str1=(' Extracting ('+str(index).rjust(4)+'): '+varname.ljust(20)+ ' and save as '+outname.ljust(20)+' .... for year '+ year + ' ... ['+freqsource+' '+module+']') myprint(str1, finfo=finfo) commandcdo =('cdo --silent selvar,' + varname + ' ' + input_f + ' ' + outfile + ' &> delfiles/del' + varname + freqsource + module) #print(commandcdo) os.system(commandcdo) solve_time(outfile, outfile, varname=varname, out=' &>> delfiles/del'+ varname+freqsource+module) if outname!=varname: commandcdo ='cdo --silent -O chname,'+varname+','+outname+' '+outfile+' '+outfile+'x' #print(commandcdo) os.system(commandcdo) os.rename(outfile+'x', outfile) return
[docs]def check_units(varname_list, vars_dataset): """ :param varname_list: :param vars_dataset: :return: """ lunits = [] for varn in varname_list: lunits.append(vars_dataset[varn].attrs['units']) return lunits
[docs]def scale_variable(vardataset, d_scale_vars, outname, varname, outfile, finfo=None): """ :param vardataset: :param d_scale_vars: :param outname: :param varname: :param outfile: :param finfo: :return: """ var_initial = d_scale_vars[outname]['var_initial'] if 'scale_factor' in d_scale_vars[outname].keys(): varscaled = vardataset[var_initial]*d_scale_vars[outname]['scale_factor'] else: varscaled = vardataset[var_initial] # Check if global unit factor and new units is indicated if 'new_units' in d_scale_vars[outname].keys(): units_factor = d_scale_vars[outname]['new_units']['units_factor'] units_name = d_scale_vars[outname]['new_units']['units_name'] myprint(' We will change units from '+vardataset[varname].attrs['units']+ ' to '+units_name, finfo=finfo) myprint(' We use a unit conversion factor of '+str(units_factor), finfo=finfo) else: units_factor = 1.0 try: units_name = vardataset[varname_sur[0]].attrs['units'] except IndexError: units_name = vardataset[varname_alt[0]].attrs['units'] varfinal = varscaled*units_factor varfinal.name = outname varfinal.attrs['units']=units_name varfinal.to_netcdf(outfile, unlimited_dims='time_counter') varfinal.close() return
[docs]def new_variable_altsur(vardataset, d_new_vars, outname, varname, outfile, finfo=None): """ :param vardataset: :param d_new_vars: :param outname: :param varname: :param outfile: :param finfo: :return: """ varname_sur = d_new_vars[outname]['var_surface'] varname_alt = d_new_vars[outname]['var_altitud'] factors_sur = [str(y) for y in d_new_vars[outname]['fac_surface']] factors_alt = [str(y) for y in d_new_vars[outname]['fac_altitud']] lunits =check_units(varname_sur+varname_alt, vardataset) if len(set(lunits))==1: myprint(' internal consistency in units is ... ok ... '+lunits[0], finfo=finfo) else: myprint(str(lunits)+str(varname_sur)+str(varname_alt), finfo=finfo) exit() # Aggregate values over vertical coordinates for ivar, var_alt in enumerate(varname_alt): if ivar==0: var_sum_alt = vardataset[var_alt].sum(dim='presnivs') else: var_sum_alt = (var_sum_alt + vardataset[var_alt].sum(dim='presnivs')*d_new_vars[outname]['fac_altitud'][ivar]) # Aggregate values on surface for ivar, var_sur in enumerate(varname_sur): if 'presnivs' in vardataset[var_sur].dims: myprint(' selecting the pressure ... '+str(vardataset['presnivs'].values[0])+ ' '+str(vardataset['presnivs'].attrs['units']), finfo=finfo) if ivar==0: var_sum_sur = vardataset[var_sur].isel(presnivs=0) else: var_sum_sur = (var_sum_sur + vardataset[var_sur].isel(presnivs=0)*d_new_vars[outname]['fac_surface'][ivar]) else: if ivar==0: var_sum_sur = vardataset[var_sur] else: var_sum_sur = (var_sum_sur + vardataset[var_sur]*d_new_vars[outname]['fac_surface'][ivar]) # Check if global unit factor and new units is indicated if 'new_units' in d_new_vars[outname].keys(): units_factor = d_new_vars[outname]['new_units']['units_factor'] units_name = d_new_vars[outname]['new_units']['units_name'] myprint(' We will change units from '+ vardataset[varname_sur[0]].attrs['units']+ ' to ' + units_name, finfo=finfo) myprint(' We use a unit conversion factor of '+str(units_factor), finfo=finfo) else: units_factor = 1.0 try: units_name = vardataset[varname_sur[0]].attrs['units'] except IndexError: units_name = vardataset[varname_alt[0]].attrs['units'] # Aggregate all together: if len(varname_sur)>=1 and len(varname_alt)>=1: varfinal = (var_sum_alt+var_sum_sur)*units_factor if len(varname_alt)==0: varfinal = var_sum_sur*units_factor if len(varname_sur)==0: varfinal = var_sum_alt*units_factor varfinal.name = outname varfinal.attrs['units']=units_name varfinal.to_netcdf(outfile, unlimited_dims='time_counter') varfinal.close() return
[docs]def new_var(mysettings, year, varname, freqsource, module, timeout, outname, expcase, study, extra='',index=1, finfo=None): """ :param mysettings: :param year: :param varname: :param freqsource: :param module: :param timeout: :param outname: :param expcase: :param study: :param extra: :param index: :param finfo: :return: """ """ - Module= ATM, CHM, etc - name=vmro3 etc - freq=DA, HF, MO for ATM, for CHM all is DA """ dirMO, dirDA, dirHF = directory_structure(study, mysettings) cwd = mysettings['outdir']['path'] subf = mysettings['outdir']['subfolder'] storepath = mysettings['storepath'] status = _get_var_checkdir(outname, freqsource, dirMO, dirDA, dirHF) input_f, outfile = _get_var_checkfiles(outname, year, study, freqsource, module, timeout, expcase, extra, storepath, dirMO, dirDA, dirHF) vardataset = xr.open_dataset(input_f, chunks={'time_counter': 10}) # we assume that all variable are in same file. f_new_vars = open("etc/new_variables.yaml", 'r') d_new_vars = yaml.load(f_new_vars) f_scale_vars = open("etc/scale_variables.yaml", 'r') d_scale_vars = yaml.load(f_scale_vars) if outname in d_scale_vars.keys(): str1=( ' Scaling ('+str(index).rjust(4)+'): '+varname.ljust(20)+ ' and save as '+outname.ljust(20)+ ' .... for year '+ year + ' ... ['+freqsource+' '+module+']\n') myprint(str1, finfo=finfo) scale_variable(vardataset, d_scale_vars, outname, varname, outfile, finfo) if outname in d_new_vars.keys(): varname_sur = d_new_vars[outname]['var_surface'] varname_alt = d_new_vars[outname]['var_altitud'] factors_sur = [str(y) for y in d_new_vars[outname]['fac_surface']] factors_alt = [str(y) for y in d_new_vars[outname]['fac_altitud']] str1=( ' Creating ('+str(index).rjust(4)+'): '+varname.ljust(20)+' and save as ' +outname.ljust(20)+' .... for year ' + year + ' ... ['+freqsource+' '+module+']\n') str2=( ' using files ... '+', '.join(varname_sur)+ '\n ... with factors '+', '.join(factors_sur)+ '\n and files ... '+', '.join(varname_alt)+ '\n ... with factors '+', '.join(factors_alt)) myprint(str1+str2, finfo=finfo) new_variable_altsur(vardataset, d_new_vars, outname, varname, outfile, finfo) solve_time(outfile, outfile, varname=varname, out=' &>> delfiles/del' +varname+freqsource+module) return
[docs]def lev_var(mysettings, year, varname, freqsource, module, timeout, outname, expcase, study, extra='', lev='top', index=1, finfo=None): """ :param mysettings: :param year: :param varname: :param freqsource: :param module: :param timeout: :param outname: :param expcase: :param study: :param extra: :param lev: :param index: :param finfo: :return: """ """ - Module= ATM, CHM, etc - name=vmro3 etc - freq=DA, HF, MO for ATM, for CHM all is DA """ dirMO, dirDA, dirHF = directory_structure(study, mysettings, finfo=finfo) cwd = mysettings['outdir']['path'] subf = mysettings['outdir']['subfolder'] storepath = mysettings['storepath'] status = _get_var_checkdir(outname, freqsource, dirMO, dirDA, dirHF) input_f, outfile = _get_var_checkfiles(outname, year, study, freqsource, module, timeout, expcase, extra, storepath, dirMO, dirDA, dirHF) vardataset = xr.open_dataset(input_f) # we assume that all variable are in same file. if outname in ['rlut']: varname_lev = ['rlu'] if outname in ['rlutcs']: varname_lev = ['rlucs'] str1=(' Creating ('+str(index).rjust(4)+'): '+varname.ljust(20)+ ' and save as '+outname.ljust(20)+' .... for year '+ year + ' ... ['+freqsource+' '+module+']') str2=(' using files ... '+','.join(varname_lev)) myprint(str1+'\n'+str2, finfo=finfo) if lev=='top': presval = -1 elif lev=='surf': presval = 0 else: print('Problem in lev_var') exit() var_lev = vardataset[varname_lev[0]].isel(presnivs=presval) myprint(' selecting the pressure ... '+str(vardataset['presnivs'].values[presval]), finfo=finfo) var_lev.name = outname var_lev.to_netcdf(outfile, unlimited_dims='time_counter') solve_time(outfile, outfile, varname=varname, out=' &>> delfiles/del'+ varname+freqsource+module) return
[docs]def read_process_file(mysettings, process_file, year, expcase, study, finfo=None): """ :param mysettings: :param process_file: :param year: :param expcase: :param study: :param finfo: :return: """ import pandas as pd pd.set_option('display.width', 120) df = pd.read_table(process_file, skiprows=0, header=0, delim_whitespace=True) two_times = df.duplicated(subset='varname-out') if True in two_times.values: myprint('\n\n\n ============= CHECK your diagnostics file '+process_file +'\n ============= it has two repeated diagnostics final varnames!!\n\n\n', finfo=finfo) exit() list_vars = [] for index, row in df.iterrows(): gc.collect() list_vars.append(row['varname-out']) if mysettings[study]['preprocess']==True: if row['method']=='new_var': new_var(mysettings, year, row['varname-source'], row['freq-source'], row['module-IPSL'], row['freq-specific'], row['varname-out'], expcase, study, extra=row['extra-info-source'], index=index, finfo=finfo) if row['method']=='get_var': get_var(mysettings, year, row['varname-source'], row['freq-source'], row['module-IPSL'], row['freq-specific'], row['varname-out'], expcase, study, extra=row['extra-info-source'], index=index, finfo=finfo) if row['method']=='lev_var': lev_var(mysettings, year, row['varname-source'], row['freq-source'], row['module-IPSL'], row['freq-specific'], row['varname-out'], expcase, study, extra=row['extra-info-source'], index=index, finfo=finfo) return list_vars
[docs]def process_files(mysettings, year, expcase, study, proc_file, finfo=None): """ :param mysettings: :param year: :param expcase: :param study: :param proc_file: :param finfo: :return: """ if str(proc_file)!='none': myprint(' ... aprox. number variables: '+ str(sum(1 for line in open(proc_file))-1), finfo=finfo) processed_vars = read_process_file(mysettings, proc_file, year, expcase, study, finfo=finfo) else: myprint(' ... No processing ... [process-file==None]', finfo=finfo) processed_vars=[] return processed_vars