"""!Runs the EnKF data assimilation on the HWRF system.""" ##@var __all__ # The list of symbols exported by "from hwrf.enkf import *" __all__ = ['ENKF'] import os, shutil import produtil.fileop, produtil.datastore, produtil.cd, produtil.run import hwrf.hwrftask, hwrf.wrf, hwrf.numerics, hwrf.namelist import datetime import produtil.rusage from produtil.rusage import setrlimit, rusage, getrlimit from produtil.datastore import COMPLETED from produtil.cd import NamedDir from produtil.fileop import make_symlink, deliver_file, isnonempty, \ make_symlinks_in from produtil.run import mpirun, mpi, openmp, checkrun from hwrf.numerics import to_datetime,to_datetime_rel, to_fraction, \ to_timedelta from hwrf.exceptions import ENKFInputError,StormRadiusError class ENKF(hwrf.hwrftask.HWRFTask): """Base class of the ENKF.""" def __init__(self,ds,conf,section,domain,sim,ensda, gsi_meanhx,gsi_enshx,gdas_merge,meandomain, relocate,**kwargs): super(ENKF,self).__init__(ds,conf,section,**kwargs) self.logger=self.log() self._sim=sim self._domain=domain self._gsi=hwrf.gsi.GSIBase self._atime=to_datetime(conf.cycle) self._wrf_out_ensmean=gsi_meanhx.get_wrf_ensmean(domain) self._ensda=ensda self._gsi_meanhx=gsi_meanhx self._gsi_enshx=gsi_enshx self._gdas_merge=gdas_merge self._meandomain=meandomain if relocate is not None: self._input_storm_radius=relocate.get_storm_radius() else: self._input_storm_radius=None self.da_ensemble_size=self.conf.getint('hwrf_da_ens','ensda_size',40) # Get the DataCatalog for our data grabbers: self._in_catalog=None incat_name=None if 'in_catalog' in kwargs: ink=kwargs['in_catalog'] if isinstance(ink,hwrf.input.DataCatalog): self._in_catalog=ink elif isinstance(ink,str): incat_name=ink elif ink is None: pass else: raise TypeError( 'In hwrf.enkf.ENKFBase.__init__, in_catalog must be None, ' 'a basestring or a DataCatalog. You provided an object ' 'of type %s, with value %s.' %(type(ink).__name__,repr(ink))) if self._in_catalog is None: if incat_name is None: incat_name=self.confstr('catalog') self._in_catalog=hwrf.input.DataCatalog( self.conf,incat_name,self._atime) self._prods=self._make_products() def _make_products(self): """!Generates FileProduct objects for all outputs.""" logger=self.log() with self.dstore.transaction() as t: prods=dict() for (ens,member) in self._gsi_enshx.members_at_time(self._atime): logger.info('ens,member = %s,%s'%(repr(ens),repr(member))) iens=int(ens) pname='analysis.mem%03d'%iens prod=produtil.datastore.FileProduct( self.dstore,prodname=pname,category=self.taskname) prod.location=os.path.join(self.outdir,pname) prods[pname]=prod return prods def get_enkfanl_member(self,member): """Reture EnKF analysis of one ensemble member""" pname='analysis.mem%03d'%member return self._prods[pname] def run_enkf_exe(self): """Running EnKF excutable""" logger=self.log() prog=self.conf.getexe('enkf') cmd = (mpirun(mpi(self.conf.getexe('enkf')),allranks=True) < 'enkf.nml') cmd = cmd > 'stdout' logger.warning(repr(cmd)) self.postmsg('Starting EnKF') sleeptime=self.conffloat('sleeptime',30.0) try: getrlimit(logger=logger) with rusage(logger=logger): checkrun(cmd,sleeptime=sleeptime) except Exception as e: logger.critical('EnKF failed',exc_info=True) raise self.postmsg('EnKF succeeded') def should_run_ens_relocate(self): """!Should ensemble relocation be run? Reads the RUN_ENSDA_RELOCATE from the run_ensda flag file in COM.""" flag_file=self.conf.getstr('ensda_relocate_pre','ens_rlct_flag_file', '{com}/{stormlabel}.run_ensda_relocate') run_ens_relocate=False with open(flag_file,'rt') as f: for line in f: if line.find('RUN_ENSDA_RELOCATE=YES')>=0: run_ens_relocate=True return run_ens_relocate def grab_wrf_out_member(self,ensda): """!Copies ensemble forecast wrf out files to this directory. @param ensda the hwrf.ensda.DAEnsemble that provides the files""" logger=self.log() logger.info('in grab_wrf_enkf') nameme=dict() plist=list() did=None for (ens,member) in ensda.members_at_time(self._atime): logger.info('ens,member = %s,%s'%(repr(ens),repr(member))) iens=int(ens) domain=self._domain logger.info('ens,member,domain = %s,%s,%s' %(repr(ens),repr(member),repr(domain))) did=int(domain.get_grid_id()) if self.should_run_ens_relocate(): prod=member.rstage3.get_wrfout(domain=domain) else: prod=member.get_wrfanl(domain=domain,atime=self._atime) if prod is None: logger.info('No product for domain %s.'%(str(domain),)) continue logger.info('domain %s prod %s'%(str(domain),prod.did)) nameme[prod]=['firstguess.mem%03d'%iens, 'analysis.mem%03d'%iens] plist.append(prod) assert(did is not None) def renamer(p,l): return nameme[p][1] def copy(fromfile,tofile,copier): deliver_file(fromfile,tofile,logger=logger,keep=True,copier=copier) def link(fromfile,tofile): produtil.fileop.make_symlink(fromfile,tofile,logger=logger) workpool=None def actor(p,n,l): fromfile=p.location l.info('Copy %s to %s'%(fromfile,n)) if produtil.fileop.netcdfver(fromfile)=='HDF5': l.info('%s: file is HDF5. I will assume your GSI was built with support for compressed NetCDF4'%(fromfile,)) else: l.info('%s: file is NetCDF3.'%(fromfile,)) copier=None workpool.add_work(copy,[fromfile,n,copier]) maxtime=self.confint('maxwait',240) if not self.realtime: maxtime=30 with produtil.workpool.WorkPool(4) as wp: workpool=wp count=produtil.datastore.wait_for_products( plist,logger,renamer,actor,maxtime=maxtime) workpool.barrier() wanted=len(plist) if count= 2020: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L106') elif int(self.confstr('GFSVER').strip('PROD')) >= 2017: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L85') else: logger.warning('Unsupported number of vertical levels: %d ' 'Using anavinfo_hwrf_L75. Continuing...'%(nz,)) anavinfo=s('{FIXgsi}/anavinfo_hwrf_L75') else: logger.warning('Unsupported number of vertical levels: %d ' 'Using anavinfo_hwrf_L75. Continuing...'%(nz,)) anavinfo=s('{FIXgsi}/anavinfo_hwrf_L75') else: if nz == 61: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L60') elif nz == 75: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L74') elif nz == 43: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L42') else: logger.warning('Unsupported number of vertical levels: %d ' 'Using anavinfo_hwrf_L60. Continuing...'%(nz,)) anavinfo=s('{FIXgsi}/anavinfo_hwrf_L60') else: if nz == 61: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L60_nooz') elif nz == 75: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L74_nooz') elif nz == 43: anavinfo=s('{FIXgsi}/anavinfo_hwrf_L42_nooz') else: logger.warning('Unsupported number of vertical levels: %d ' 'Using anavinfo_hwrf_L60_nooz. Continuing...'%(nz,)) anavinfo=s('{FIXgsi}/anavinfo_hwrf_L60_nooz') ozinfo=s('{FIXgsi}/global_ozinfo.txt') convinfo=s('{FIXgsi}/hwrf_convinfo.txt') satinfo=s('{FIXgsi}/hwrf_satinfo.txt') scaninfo=s('{FIXgsi}/global_scaninfo.txt') lnsf(anavinfo, './anavinfo') lnsf(ozinfo, './ozinfo') lnsf(convinfo, './convinfo') lnsf(satinfo, './satinfo') lnsf(scaninfo, './scaninfo') def make_ensemble_namelist(self,filename='wrf-ensemble.input',mean=True): """!Creates the hwrf_ensemble namelist in the specified file. @param filename the destination filename""" logger=self.log() invars=dict() if mean: invars.update(IS_ENSEMBLE_MEAN=True, IS_ENSEMBLE_RECENTER=False, ENSEMBLE_MEMBER_FILENAME_LIST='wrf_ens_list', ENSEMBLE_MEAN_FILENAME='enkf_anl_mean', ENSEMBLE_RECENTER_FILENAME='NOT USED', ENSEMBLE_RECENTER_INTERP_FILENAME='NOT USED') else: invars.update(IS_ENSEMBLE_MEAN=False, IS_ENSEMBLE_RECENTER=True, ENSEMBLE_MEMBER_FILENAME_LIST='wrf_ens_list', ENSEMBLE_MEAN_FILENAME='NOT USED', ENSEMBLE_RECENTER_FILENAME='NOT USED', ENSEMBLE_RECENTER_INTERP_FILENAME='enkf_anl_mean') nml_file=self.confstr('ensemble_nml_file') nml_section=self.confstr('ensemble_nml_section',self.section) atime=self._atime ni=hwrf.namelist.NamelistInserter(self.conf,nml_section) with open(nml_file,'rt') as nf: with open(filename,'wt') as of: of.write(ni.parse(nf,logger=logger,source=nml_file, raise_all=True,atime=atime,**invars)) def make_intrp_namelist(self,filename='hwrf_interpolate.nml'): """!Creates the hwrf_interpolate namelist in the specified file. @param domain the domain to be interpolated @param filename the destination filename""" logger=self.log() invars=dict() nml_file=self.confstr('interpolate_nml_file') nml_section=self.confstr('interpolate_nml_section',self.section) atime=self._atime ni=hwrf.namelist.NamelistInserter(self.conf,nml_section) with open(nml_file,'rt') as nf: with open(filename,'wt') as of: of.write(ni.parse(nf,logger=logger,source=nml_file, raise_all=True,atime=atime,**invars)) def grab_hybrid_analysis(self,filename='wrfghost_d02'): """!Copies the GSI hybrid analysis to the specified filename @param filename the file to receive the data""" logger=self.log() def namer(p,logger,*args): return filename def actor(p,name,logger,*args): make_symlink(p.location,name,logger=logger,force=True) prod=self._gdas_merge.get_wrfghost(self._meandomain) produtil.datastore.wait_for_products(prod, logger,namer,actor) def check_storm_radius(self): """!If no relocate was given, gets the storm radius file from a fix file. Also checks to see if the storm_radius file is present and non-empty, regardless of whether it came from the fix or relocate.""" logger=self.log() if self._input_storm_radius is None: storm_radius=os.path.join(self.getdir('FIXhwrf'), 'hwrf_storm_radius') logger.warning( 'Could not get storm_radius from the relocate jobs.') logger.warning( 'Will use the fix file $FIXhwrf/hwrf_storm_radius instead.') make_symlink(storm_radius,'storm_radius',force=True,logger=logger) else: p=self._input_storm_radius make_symlink(p.location,'storm_radius',force=True,logger=logger) if not isnonempty('storm_radius'): msg='storm_radius file is missing' logger.error(msg) raise StormRadiusError(msg) def recenter(self): """!Run hwrf_ensemble to recenter EnKF analysis to GSI hybrid analysis""" logger=self.log() def ensemble_filelist(pl): with open('wrf_ens_list','wt') as fl: for p in pl.keys(): fl.write(p+'\n') if fl is not None: fl.close() cmd = (mpirun(mpi(self.getexe('hwrf_ensemble')),allranks=True)\ < 'wrf-ensemble.input') cmd = cmd > 'stdout_hwrf_ensemble_mean' logger.info('Starting hwrf_ensemble to compute ensemble mean') self.make_ensemble_namelist(mean=True) ensemble_filelist(self._prods) try: checkrun(cmd,logger=logger) except Exception as e: logger.critical('hwrf_ensemble failed') raise self.make_intrp_namelist() cmd = (mpirun(mpi(self.getexe('hwrf_interpolate')),allranks=True) \ < 'hwrf_interpolate.nml') cmd = cmd > 'stdout_interpolate' logger.info('Starting hwrf_interpolate') try: checkrun(cmd,logger=logger) except Exception as e: logger.critical('hwrf_interpolate failed') raise cmd = (mpirun(mpi(self.getexe('hwrf_ensemble')),allranks=True)\ < 'wrf-ensemble.input') cmd = cmd > 'stdout_hwrf_ensemble_recenter' self.make_ensemble_namelist(mean=False) ensemble_filelist(self._prods) try: checkrun(cmd,logger=logger) except Exception as e: logger.critical('hwrf_ensemble failed') raise def deliver_products(self): """!Delivers products.""" logger=self.log() produtil.fileop.makedirs(self.outdir) for p in self._prods.values(): loc=p.location bloc=os.path.basename(loc) if os.path.exists(bloc): logger.warning('%s: deliver product from ./%s'%(p.did,bloc)) p.deliver(frominfo=bloc,keep=False,logger=logger) else: logger.warning('%s: ./%s does not exist. Cannot deliver.' %(p.did,bloc)) diagpre=self.confstr('diagpre',self.conf.getdir('com')) stdoutanl=diagpre+'.stdout.anl' produtil.fileop.deliver_file( 'stdout',stdoutanl,keep=False,logger=logger) def run(self): """!Runs the EnKF and delivers the results. Executes the EnKF in a temporary scrub directory, deleting it afterwards if self.scrub is False. Follows this overall pattern: 1. Make temporary area and cd there 2. Calls make_enkf_namelist() to generate the namelist 3. Grab fix file and other input files 4. Calls run_enkf_exe() to run the actual EnKF program 5. Calls deliver_products() to copy files to COM""" logger=self.log() dirname=self.workdir logger.info('Run EnKF in directory %s'%(dirname,)) if os.path.exists(dirname): logger.info('Delete old data in %s'%(dirname,)) shutil.rmtree(dirname) with NamedDir(dirname,keep=not self.scrub): self.make_enkf_namelist() self.grab_fix_parm() self.grab_diag(self._gsi_enshx,self._gsi_meanhx) self.grab_wrf_out_member(self._ensda) self.grab_ensemble_mean() self.run_enkf_exe() self.grab_hybrid_analysis() self.check_storm_radius() self.recenter() self.deliver_products() self.state=COMPLETED def unset_enkfstatus(conf,logger=None): """!Delete the enkf status file. Deletes all EnKF status files, whose paths are determined from the given config object. If the logger is not specified, the enkfstatus subdomain of the conf default logging domain is used. @param conf the hwrf.config.HWRFConfig object @param logger Optional: a logging.Logger for logging.""" if logger is None: logger=conf.log('enkfstatus') for onetwo in ( 'enkfstatus', 'enkfstatus2' ): enkfstat=conf.get('enkf',onetwo) enkfstatfile=os.path.join(conf.getdir('com'),enkfstat) if isnonempty(enkfstatfile): produtil.fileop.remove_file(enkfstatfile,info=True,logger=logger) def set_init_enkfstatus(conf,logger=None): """!Sets the EnKF status to be No. @param conf the hwrf.config.HWRFConfig object @param logger Optional: the logging.Logger for log messages.""" if logger is None: logger=conf.log('set default enkf status') enkfflagd02="ToBeDecided" for onetwo in ( 'enkfstatus', 'enkfstatus2' ): enkfstat=conf.get('enkf',onetwo) enkfstatfile=os.path.join(conf.getdir('com'),enkfstat) logger.info('Setting run_enkf_d02=%s in enkf status file %s'%( enkfflagd02,enkfstatfile)) with open(enkfstatfile,'wt') as f: f.write('run_enkf_d02=%s\n'%(enkfflagd02)) def set_enkfstatus(conf,logger=None): """!Sets the EnKF status files. Set run_enkf=YES (true) or =NO (false) depending on the configuration. If the logger is not specified, the enkfstatus subdomain of the conf default logging domain is used. @param conf the hwrf.config.HWRFConfig object @param logger Optional: the logging.Logger for log messages.""" if logger is None: logger=conf.log('set enkf status') run_enkf=conf.getbool('config','run_ensemble_da') and conf.getint('config','ensda_opt')>0 enkfflagd02="YES" if run_enkf else "NO" for onetwo in ( 'enkfstatus', 'enkfstatus2' ): enkfstat=conf.get('enkf',onetwo) enkfstatfile=os.path.join(conf.getdir('com'),enkfstat) logger.info('Setting run_enkf_d02=%s in enkf status file %s'%( enkfflagd02,enkfstatfile)) with open(enkfstatfile,'wt') as f: f.write('run_enkf_d02=%s\n'%(enkfflagd02)) def get_enkfstatus(conf,domain,logger=None): """!Checks the enkf status for a specific domain. Checks the first ENKF status file, scanning for information about the specified domain. If the file does not exist or cannot be opened or read, then False is returned. Otherwise, the file is scanned for run_enkf=YES/NO (case insensitive). The last of those run_enkf lines is used: NO=return False, YES=return True. @param conf the hwrf.config.HWRFConfig object with configuration info @param domain either "enkf_d02" or "enkf_d03", the domain of interest @param logger Optional: the logging.Logger for log messages""" if domain!='enkf_d02': raise ValueError('In get_enkfstatus, domain must be enkf_d02 ') if logger is None: logger=conf.log('get enkf status') enkfstat=conf.get('enkf','enkfstatus') enkfstatfile=os.path.join(conf.getdir('com'),enkfstat) run_enkf=None logger.info('%s: scan enkf status file for run_%s=YES or NO'%( enkfstatfile,domain)) if isnonempty(enkfstatfile): try: with open(enkfstatfile,'rt') as f: for line in f: if line.find('run_%s=YES'%(domain))>=0: run_enkf=True logger.info( 'enkf status file says: run_%s=YES'%(domain)) else: run_enkf=False logger.warning( 'enkf status file says %s: '%(line)) except EnvironmentError as e: logger.error( 'Error checking enkf status file: %s' %(str(e),),exc_info=True) except Exception as ee: logger.error( 'Unhandled exception while checking enkf status file: %s' %(str(ee),),exc_info=True) raise if run_enkf is None: logger.warning('Could not scan enkf status file for run_%s=YES' 'or NO. Assuming run_%s=NO.'%(domain,domain)) run_enkf=False elif run_enkf: logger.info( 'run_%s=YES: enkf status file says enkf succeeded.'%(domain)) else: logger.warning( 'enkf status file says %s.'%(line)) return run_enkf