"""!Utilities for ensemble-based data assimilation. This module contains utilities for doing various forms of ensemble-based data assimilation. It manages a two-dimension ensemble-time array of Task objects that know how to run each analysis cycle for each member. This module also provides a way of presenting the previous forecast cycle's ensda objects such that this cycle can use them. This is all built on top of the classes in the hwrf.fcsttask module.""" ##@var __all__ # The symbols exported by "from hwrf.ensda import *" __all__=['DAEnsemble','FromPriorCycle','FromGFSENKF', 'write_ensda_flag_file','read_ensda_flag_file', 'CycleTDRCheck','AlwaysRunENSDA','enada_pre_object_for', 'GFSEnKFInputCheck'] import os, subprocess, shutil import produtil.datastore, produtil.cd, produtil.fileop, produtil.log import hwrf.hwrftask, hwrf.numerics, hwrf.prep, hwrf.fcsttask import hwrf.input, hwrf.regrib, hwrf.tracker, hwrf.exceptions from produtil.log import jlogger from produtil.datastore import COMPLETED,UpstreamFile,FileProduct from produtil.fileop import isnonempty, wait_for_files, make_symlink, deliver_file from hwrf.numerics import to_datetime_rel, to_timedelta, TimeArray, to_datetime, within_dt_epsilon from hwrf.post import PostManyWRF from produtil.cd import NamedDir from hwrf.wrf import WRFDomain from produtil.run import alias, exe, mpirun, mpi, checkrun, batchexe from hwrf.regrib import clatlon from hwrf.enkf import get_enkfstatus from produtil.ecflow import set_ecflow_event from produtil.run import run, batchexe class DAEnsemble(hwrf.hwrftask.HWRFTask): """!Represents a two-dimensional ensemble-vs-time array of hwrf.hwrftask.HWRFTask objects.""" def __init__(self,dstore,conf,section,anlintime=None, taskname=None,**kwargs): """!Constructor for DAEnsemble @param dstore the produtil.datastore.Datastore database to use @param conf the hwrf.config.HWRFConfig for configuration info @param section the section to use in conf @param anlintime the analysis input time @param taskname the name of this task in the database @param kwargs passed to the superclass constructor""" super(DAEnsemble,self).__init__(dstore,conf,section,taskname, **kwargs) if anlintime is None: anlintime=conf.cycle anlintime=to_datetime_rel(anlintime,conf.cycle) self.__anlintime=anlintime self.__memberids=set() cycling_interval=conf.getfloat('config','cycling_interval')*3600.0 self.__tstep=to_timedelta(self.confstr('da_cycle',cycling_interval)) assert(self.__tstep>to_timedelta(1800)) endtime=to_datetime_rel(cycling_interval,conf.cycle) self.__members=TimeArray(conf.cycle,endtime-self.__tstep, self.__tstep,dict) self.__anlouttime=self.__members.lasttime @property def anlintime(self): """!The time at the beginning of the first ensemble step.""" return self.__anlintime @property def anlouttime(self): """!The time at the end of the last ensemble step.""" return self.__anlouttime @property def nmembers(self): """!The number of members of the ensemble.""" return len(self.__memberids) @property def nsteps(self): """!The number of ensemble DA time steps.""" return len(self.__members) @property def anlintimes(self): """!Iterates over all ensemble analysis input times.""" for t in self.__members.times(): yield t @property def anlouttimes(self): """!Iterates over all ensemble analysis output times.""" first=True for t in self.__members.times(): if first: first=False else: yield t yield self.__anlouttime def member_ids(self): """!Iterates over all member ids.""" for memberid in self.__memberids: yield memberid def set_member(self,atime,enkfmem,task): """!sets the HWRFTask to use to use for one cycle of one member Tells member enkfmem to use the specified task to produce output whose input analysis time is atime. @param atime the analysis time, a datetime.datetime @param enkfmem the enkf member id @param task the HWRFTask to use""" self.__memberids.add(enkfmem) self.__members[atime][enkfmem]=task def members_at_time(self,atime): """!iterate over members for a specified analysis input time Iterates over all members at the specified anlintime, yielding (id,member) tuples. @param atime the analysis time, a datetime.datetime""" time=to_datetime_rel(atime,self.__anlintime) for (enkfmem,memstep) in self.__members[time].items(): yield (enkfmem,memstep) def members_at_anlouttime(self): """!iterate over members at the final analysis output time. Iterates over all members at the final analysis output time, yielding (id,member) tuples.""" for (e,m) in self.members_at_time(self.__members.lasttime): yield e,m def steps_for_member(self,enkfmem): """!iterate over analysis cycles for a specific member Iterates over (time,EnsembleDAMemberStep) pairs for the specified member.""" for (t,a) in self.__members.items(): yield t,a[enkfmem] def member(self,atime,enkfmem): """!get the analysis cycle for the specified member and time Returns the da cycle for member enkfmem at analysis input time atime. @param atime the analysis time, a datetime.datetime @param enkfmem the ensemble id""" return self.__members[atime][enkfmem] def inputiter(self): """!iterate over all needed input data, for hwrf.input Calls inputiter for all steps of all ENKF members. This is for use by the hwrf.input to figure out what input data is required for the DA ensemble.""" for t,e in self.__members.items(): for m in e.values(): for d in m.inputiter(): yield d def dump(self): """!print detailed diagnostics Sends detailed diagnostics about all members to the print() statement. This is intended for debugging only.""" for (time,stuff) in self.__members.items(): for (enkfid,memberstep) in stuff.items(): print("self.__members[%s][%s]=%s"%( repr(time),repr(enkfid),repr(memberstep))) t=self.__members.lasttime print('last time t is %s'%(repr(t),)) for (enkfid,memberstep) in self.members_at_time(t): print('self.__members[t][%s]=%s'%( repr(enkfid),repr(memberstep))) ######################################################################## class FromPriorCycle(hwrf.hwrftask.HWRFTask): """!Represents an ensemble member from the previous forecast cycle. This is used to generate UpstreamFile objects for the previous cycle's ensemble of FromGFSENKF simulations.""" def __init__(self,dstore,conf,section,domains,enkfmem,anlouttime, **kwargs): """!FromPriorCycle constructor Makes a new FromPriorCycle object. @param dstore the produtil.datastore.Datastore database object @param conf the hwrf.config.HWRFConfig with configuration info @param section the section to use within conf @param domains the list or tuple of hwrf.wrf.WRFDomain objects to obtain from the prior cycle, in order of grid ID. @param enkfmem the enkf member ID @param anlouttime the output time at the end of the analysis @param kwargs passed to the superclass constructor""" super(FromPriorCycle,self).__init__(dstore,conf,section,**kwargs) self.__domains=[ d for d in domains ] self.__enkfmem=int(enkfmem) self.__anlouttime=hwrf.numerics.to_datetime(anlouttime) def products(self,domains=None,**kwargs): """!Iterates over all products Iterates over products produced by the prior forecast cycle's analysis cycle. @param domains if provided and non-None, only products from these domains are yielded @param kwargs ignored """ if False: yield None # ensures this is an iterator if domains is None: domains=self.__domains assert(domains) for domain in domains: assert(domain in self.__domains) if domain in self.__domains: if domain.is_moad(): yield self.get_wrfinput(domain,self.__anlouttime) else: yield self.get_wrfanl(domain,self.__anlouttime) def get_wrfinput(self,domain,atime): """!return the product for the specified domain and analysis time Returns the product for the wrfinput_d01 file for the specified domain and time. This is simply a wrapper around get_product(domain,atime) @param domain the domain of interest @param atime the analysis time as a datetime.datetime""" return self.get_product(domain,atime) def get_wrfanl(self,domain,atime): """!returns the wrfanl file's product""" return self.get_product(domain,atime) def get_product(self,domain,atime): """!Returns a product for the specified domain and time. Creates a new produtil.datastore.UpstreamFile for the wrfinput or wrfanl file for the given domain, at the given analysis time. The analysis time must be the anlouttime. @return a newly created UpstreamFile for the file of interest @param domain the domain of interest @param atime the analysis time""" logger=self.log() atime=hwrf.numerics.to_datetime(atime) if not atime==self.__anlouttime: logger.info('Wrong atime: %s vs %s' %(str(atime),str(self.__anlouttime))) return None if not domain in self.__domains: logger.info('Invalid domain: %s not in %s'%( str(domain), ', '.join([str(x) for x in self.__domains]))) return None loc=self.confstrinterp( '{oldcom}/{oldvit[stormnamelc]}{oldvit[stormid3lc]}.{oldvit[YMDH]}.' 'ensda_{enkfid:03d}.wrfinput_d{domid:02d}',enkfid=self.__enkfmem, domid=int(domain.get_grid_id())) logger.info('Domain %s atime %s enkfmem %s loc %s'%( str(domain),str(atime),repr(self.__enkfmem),repr(loc))) uf=UpstreamFile(self.dstore,category=self.taskname, prodname=os.path.basename(loc), location=loc) uf.check() return uf def get_track(self,atime): logger=self.log() atime=hwrf.numerics.to_datetime(atime) if not atime==self.__anlouttime: logger.info('Wrong atime: %s vs %s' %(str(atime),str(self.__anlouttime))) return None enkfid='%03d'%int(self.__enkfmem) loc=self.confstrinterp( '{oldcom}/{oldvit[stormnamelc]}{oldvit[stormid3lc]}.{oldvit[YMDH]}.' 'trak.hwrf.atcfunix.mem'+enkfid) logger.info('atime %s enkfmem %s loc %s'%( str(atime),repr(self.__enkfmem),repr(loc))) uf=UpstreamFile(self.dstore,category=self.taskname, prodname=os.path.basename(loc), location=loc) uf.check() return uf ######################################################################## class FromGFSENKF(hwrf.hwrftask.HWRFTask): """! Forecast ensemble member based on the GFS ENKF. Runs one member of an ensemble DA forecast ensemble, using a member of the GFS ENKF ensemble as initial and boundary conditions. Some data from the earlier deterministic forecast jobs is reused to simplify the process.""" def __init__(self,dstore,conf,section,detinit,enkfmem,sim, taskname=None,track=None,hwrfenkf=None, relocate=None,**kwargs): """!Constructor for FromGFSENKF @param dstore the produtil.datastore.Datastore database to use @param conf the hwrf.config.HWRFConfig that provides configuration data @param section the section in conf to use @param detinit the deterministic initialization, an hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI object. @param enkfmem the ensemble member id @param sim the hwrf.wrf.WRFSimulation object @param taskname the name of the task within the database @param hwrfenkf the hwrf.enkf.ENKF object @param kwargs passed to the parent class constructor""" super(FromGFSENKF,self).__init__(dstore,conf,section,taskname, **kwargs) assert(isinstance(sim,hwrf.wrf.WRFSimulation)) if track is None: raise TypeError( 'You must explicitly specify the track arguments ' 'to FromGFSENKF.__init__ and they must be boolean ' 'values, not None.') self.enkfmem=enkfmem self.track=bool(track) self.hwrfenkf=hwrfenkf self.relocate=relocate self.runenkf=get_enkfstatus(conf,'enkf_d02',logger=self.log()) self.make_wrf(detinit,sim) self.make_init(detinit) self.make_fcst(detinit) if self.track: self._make_track() ##@var enkfmem # the enkf member id ##@var fcst # the forecast task, an hwrf.fcsttask.AnalysisCycle ##@var geogrid # the geogrid task from the deterministic initialization ##@var metgrid # the metgrid task from the deterministic initialization ##@var prep # the hwrf.prep.PrepHybrid that processes the GFS ENKF forecast # and analysis spectral data ##@var realinit # the hwrf.fcsttask.RealNMM that generates wrfinput_d01 and # wrfbdy_d01 files for input to the fcst and wrfanl ##@var wrfanl # the hwrf.fcsttask.WRFAnl that generates input wrfanl_d* files # for input to the fcst @property def anlintime(self): """!The analysis input time.""" return self.__wrf.simstart() def make_wrf(self,detinit,sim): """!Create the wrf() and fcst This function, called from the constructor, creates the hwrf.wrf.WRFSimulation and hwrf.fcsttask.AnalysisCycle used for this forecast. @param detinit the deterministic model initialization, an hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI. @param sim the hwrf.wrf.WRFSimulation passed to the constructor.""" self.fcst=hwrf.fcsttask.AnalysisCycle( self.dstore,self.conf,self.confstr('fcsttask'),sim.copy(), taskname=self.taskname+'.fcst',outdir=self.outdir+'/fcst', workdir=self.workdir+'/fcst',keeprun=True) self.__wrf=sim # WCOSS workaround: if self.confbool('io_netcdf',True,section='ensda_runwrf'): self.fcst.sim.set_active_io_form_to(2) def make_fcst(self,detinit): """!Adds input sources to the forecast object. Adds metgrid, geogrid, wrfinput, wrfbdy, wrfanl, and coupler fort.65 input to the fcst member variable. @param delinit the deterministic model initialization, an hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI.""" """!Adds input sources to the forecast object. Adds metgrid, geogrid, wrfinput, wrfbdy, wrfanl, and coupler fort.65 input to the fcst member variable. @param delinit the deterministic model initialization, an hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI.""" self.fcst\ .add_geogrid(self.geogrid) \ .add_metgrid(self.metgrid) \ .add_fort65(self.realinit) \ .add_wrfbdy(self.realinit) if not self.runenkf: self.fcst.add_wrfinput(self.realinit) else: self.fcst.add_input('wrfinput', hwrf.fcsttask.WRFInput2WRF(self.enkfmerge)) for domain in self.sim: if domain.is_moad(): continue if not self.runenkf: self.fcst.add_wrfanl(self.wrfanl,domain) else: name='wrfanl-%s'%(domain.name,) self.fcst.add_input(name, hwrf.fcsttask.WRFAnl2WRF(self.enkfmerge,domain)) self.fcst.sim.add_output('history',start=0,step=3600*6,end=6*3600) @property def sim(self): """!The wrf simulation made by make_wrf()""" return self.__wrf def inputiter(self): """!Passes control to the hwrf.prep.PrepHybrid.inputiter(). Iterates over the prep member's inputiter, a hwrf.prep.PrepHybrid.inputiter(). Yields all input data information needed by the hwrf.input module to pull input data.""" inoptional=self.confbool('ensda_fallback',False,section='config') for d in self.prep.inputiter(optional=inoptional): yield d def make_init(self,detinit): """!Creates initialization tasks. Called from the constructor. Creates the initialization tasks, prep, realinit and wrfanl; and links to the deterministic init geogrid and metgrid tasks. This is called by the constructor, to create the needed inputs to the fcst member. @param detinit the deterministic model initialization, an hwrf.init.HWRFInit or hwrf.init.InitBeforeGSI.""" self.geogrid=detinit.geogrid #self.realbdy=detinit.realinit self.metgrid=detinit.metgrid wrf=self.sim moad=wrf.get_moad() geodat=self.geogrid.geodat(moad) realsection=self.confstr('realinit') prepsection=self.confstr('prep_hybrid') mergesection=self.confstr('enkf_merge') self.prep=hwrf.prep.PrepHybrid( self.dstore,self.conf,prepsection,self.sim,geodat, atime=self.anlintime,taskname=self.taskname+'.prep', workdir=self.workdir+'/prep',outdir=self.outdir+'/prep') self.realinit=hwrf.fcsttask.RealNMM( self.dstore,self.conf,realsection,self.sim, taskname=self.taskname+'.real',keeprun=False, outdir=self.outdir+'/real',workdir=self.workdir+'/real', is_ensda_realnmm=True) \ .add_geogrid(self.geogrid)\ .add_metgrid(self.metgrid)\ .add_prep_hybrid(self.prep) # WCOSS workaround: if self.confbool('io_netcdf',True,section='ensda_runwrf'): self.realinit.sim.set_active_io_form_to(2) self.prep.tvset('enkfmem',self.enkfmem) # We use a WRFGhost instead of a WRFAnl since WRFGhost # disables the history stream. self.wrfanl=hwrf.fcsttask.WRFGhost( self.dstore,self.conf,realsection,self.sim,False, atime=self.anlintime,taskname=self.taskname+'.wrfanl', workdir=self.workdir+'/wrfanl',outdir=self.outdir+'/wrfanl')\ .add_geogrid(self.geogrid) \ .add_metgrid(self.metgrid) \ .add_fort65(self.realinit) \ .add_wrfinput(self.realinit) \ .add_wrfbdy(self.realinit) if self.hwrfenkf is not None: self.enkfmerge=hwrf.ensda.MergeGFSHWRFEnKF( self.dstore,self.conf,mergesection,self.sim,self.enkfmem, self.realinit,self.wrfanl,self.hwrfenkf,self.relocate, atime=self.anlintime,taskname=self.taskname+'.merge', workdir=self.workdir+'/merge',outdir=self.outdir+'/merge') def _make_track(self): """Makes the gribber and tracker member variables.""" ensdadoms=[ d for d in self.fcst.sim ] d2=ensdadoms[-1] postdomains=[ d2 ] self.post=PostManyWRF(self.fcst,postdomains,self.conf, self.confstr('post'),6*3600,needcrtm=False, streams=['history'],taskname=self.taskname+'.post', outdir=self.outdir+'/post',workdir=self.workdir+'/post') # Regridding stuff: grid2=hwrf.regrib.igrb1(self.post,domain=d2) trksub=hwrf.regrib.GRIBSubsetter( 'part',hwrf.tracker.tracker_subset,None) domloc=hwrf.regrib.FixedLocation(lat=self.conf['config','domlat'], lon=self.conf['config','domlon']) stormloc=hwrf.regrib.FixedLocation(lat=self.storminfo.lat, lon=self.storminfo.lon) basin=self.storminfo.pubbasin2 if ((basin.upper()=='AL' or basin.upper()=='EP') \ and domloc.ewcenter<360.0): domloc.ewcenter+=360.0 r=hwrf.regrib.RegribMany(copygb=alias(exe(self.conf.getexe('copygb'))), wgrib=alias(exe(self.conf.getexe('wgrib')))) r.add('d2',clatlon(grid2,res=[0.045,0.045],size=[30.,30.], scan=136,n=[667,667])) r.add('trkgrid',grid2*r.grid('d2')) r.add('hwrftrk',hwrf.tracker.vinttave(r.GRIB('trkgrid')/trksub)) basedir=self.outdir+'/regribber' enkf000='%03d'%int(self.enkfmem) r.to_intercom('{out_prefix}.hwrftrk.grbf{fahr:02d}.mem'+enkf000,'hwrftrk') self.gribber=hwrf.gribtask.GRIBTask( self.dstore,self.conf,self.confstr('regribber'),r, start=self.anlintime,step=6*3600,end=6*3600, taskname=self.taskname+'.regribber', atime=self.anlintime,workdir=self.workdir+'/regribber') self.tracker=hwrf.tracker.TrackerTask( self.dstore,self.conf,self.confstr('tracker'), taskname=self.taskname+'tracker',start=self.anlintime, step=6*3600,end=6*3600,outdir=self.outdir+'/tracker', workdir=self.workdir+'/tracker') self.tracker.add_moving_grid(self.storminfo,self.gribber,'hwrftrk') self.tracker.send_atcfunix( 'track0','{com}/{out_prefix}.trak.hwrf.atcfunix.mem'+enkf000) def products(self,**kwargs): """!Iterates over all forecast products. Passes control to hwrf.fcsttask.AnalysisCycle.products() to iterate over all products that match the specified arguments. @param kwargs passed to hwrf.fcsttask.AnalysisCycle.products()""" for p in self.fcst.products(**kwargs): yield p def run(self): """!Runs the initialization and forecast for this ensemble member. Runs the prep, realinit, wrfanl and fcst member tasks, using input from the GFS ENKF, and the deterministic initialization.""" where=self.workdir logger=self.log() logger.info('Run ensemble member in %s'%(where,)) with produtil.cd.NamedDir(where,keep=False,keep_on_error=True, logger=logger): self.prep.run() self.realinit.run() self.wrfanl.run() if self.runenkf: self.enkfmerge.run() self.fcst.run() if self.track: with self.dstore.transaction() as t: self.post.unrun() self.gribber.unrun() self.post.run() self.gribber.run() if self.gribber.is_completed(): self.tracker.run() else: msg='Error regridding inputs to tracker. See earlier log messages for details.' logger.error(msg) raise hwrf.exceptions.GribberError(msg) self.state=COMPLETED def run_track(self): """!Runs tracker for the initialization and forecast for this ensemble member.""" where=self.workdir logger=self.log() logger.info('Run ensemble member in %s'%(where,)) with produtil.cd.NamedDir(where,keep=False,keep_on_error=True, logger=logger): with self.dstore.transaction() as t: self.post.unrun() self.gribber.unrun() self.post.run() self.gribber.run() self.tracker.run() self.state=COMPLETED ######################################################################## class ForHWRFEnKF(hwrf.hwrftask.HWRFTask): """!Represents an ensemble member from the previous forecast cycle. Runs EnKF pre-processing (gsihx) for one member , using a member from previous cycle ensemble forecast with or without vortex relocation""" def __init__(self,dstore,conf,section,ensdadom,ensdasim,enkfmem,atime, prior_ensda,relocate,taskname=None,**kwargs): """!FromHWRFEnKF constructor Makes a new FromHWRFEnKF object. @param dstore the produtil.datastore.Datastore database object @param conf the hwrf.config.HWRFConfig with configuration info @param section the section to use within conf @param ensdadom the ensda domain object @param ensdasim the WRFSimulation object @param enkfmem the enkf member ID @param atime the output time at the end of the analysis @prior_ensda the priorcycle ensda object @param kwargs passed to the superclass constructor""" super(ForHWRFEnKF,self).__init__(dstore,conf,section,taskname,**kwargs) self.domain=ensdadom self.sim=ensdasim self.enkfmem=int(enkfmem) self.atime=hwrf.numerics.to_datetime(atime) self.prior_ensda=prior_ensda self.relocate=bool(relocate) if self.relocate: self._make_relocate(self.relocate,'ENKF') self._make_gsihx() assert(prior_ensda is not None) def _make_relocate_kwargs(self,track,modin,dest_dir,priorcycle): """Makes a dict containing the keyword arguments to send in to the constructor for the hwrf.relocate task(s). modin - the modin argument to the constructor dest_dir - the directory in which to run the relocate""" ensdadoms=[ d for d in self.sim ] kwargs=dict( sim=self.sim,domains=ensdadoms, modin=modin,dest_dir=dest_dir, workdir=self.workdir+'/relocate', outdir=self.outdir+'/relocate', location=self.outdir+'/relocate') if priorcycle is not None: kwargs.update(ensda=priorcycle) if track: kwargs.update(parentTrack=priorcycle.get_track(self.atime), trackName='track0') return kwargs def _make_relocate(self,track,modin): """Makes the relocation, rstage1, rstage2 and rstage3 member variables. track - the track argument to the constructor modin - the modin argument to the constructor""" dest_dir=os.path.join(self.workdir,'relocate') kwargs=self._make_relocate_kwargs(track,modin,dest_dir,self.prior_ensda) def t(s): return '%s.%s'%(self.taskname,s) self.relocation=hwrf.relocate.Relocation(self.dstore,self.conf, self.confstr('relocate'), taskname_pattern=t('relocate.stage')+'%d', **kwargs) self.rstage1=self.relocation.rstage1 self.rstage3=self.relocation.rstage3 def run_relocate(self): """Runs the relocate jobs, if present.""" if 'rstage1' in self.__dict__: self.rstage1.delete_temp() self.rstage1.run() if 'rstage3' in self.__dict__: self.rstage3.run() if self.rstage1.scrub and self.rstage3.scrub: self.rstage3.delete_temp() def _make_gsihx(self): """Makes the gsihx member variables.""" kwargs=dict( atime=self.atime, workdir=self.workdir+'/gsihx', outdir=self.outdir+'/gsihx') if self.relocate: wrf_in_prod=self.rstage3.get_wrfout(domain=self.domain) else: wrf_in_prod=self.prior_ensda.get_wrfanl(domain=self.domain,atime=self.atime) self.gsihx=hwrf.gsi.GSIHx(self.dstore,self.conf, self.confstr('gsihx'),self.domain,wrf_in_prod,self.sim, self.enkfmem,taskname=self.taskname+'.gsihx', **kwargs) def run_gsihx(self): """Runs the relocate jobs, if present.""" self.gsihx.run() ######################################################################## class MergeGFSHWRFEnKF(hwrf.hwrftask.HWRFTask): """ Merge HWRF EnKF analysis to GDAS EnKF analysis for one member""" def __init__(self,dstore,conf,section,ensdasim,enkfmem,realinit, wrfanl,hwrfenkf,relocate,atime,taskname=None,**kwargs): """!MergeGFSHWRFEnKF constructor Makes a new FromHWRFEnKF object. @param dstore the produtil.datastore.Datastore database object @param conf the hwrf.config.HWRFConfig with configuration info @param section the section to use within conf @param ensdasim the WRFSimulation object @param atime the output time at the end of the analysis @realinit the hwrf.fcsttask.RealNMM object @wrfanl the hwrf.fcsttask.WRFGhost object @hwrfenkf the hwrf.enkf.ENKF object @param kwargs passed to the superclass constructor""" super(MergeGFSHWRFEnKF,self).__init__(dstore,conf,section,taskname,**kwargs) self.sim=ensdasim self.domains=[ensdasim[domain] for domain in ensdasim] self.enkfmem=int(enkfmem) self.atime=hwrf.numerics.to_datetime(atime) self.dt_epsilon=ensdasim.bdyepsilon() assert(realinit is not None) assert(wrfanl is not None) assert(hwrfenkf is not None) self.realinit=realinit self.wrfanl=wrfanl self.enkf=hwrfenkf self._wrfinput_d01=self.realinit.get_wrfinput() for domain in self.sim: if domain.is_moad(): continue self._wrfanl_d02=self.wrfanl.get_ghost(domain) self._enkfanl=self.enkf.get_enkfanl_member(self.enkfmem) if relocate is not None: self._input_storm_radius=relocate.get_storm_radius() self.make_products() def make_namelist(self,domain,filename='hwrf_interpolate.nml'): """!Creates the hwrf_interpolate namelist in the specified file. @param domain the domain to be interpolated @param filename the destination filename""" logger=self.log() invars=dict() if domain.is_moad(): invars.update(INTRP_TO_FILENAME='wrfinput_d01') nml_file=self.confstr('interpolate_nml_file') nml_section=self.confstr('interpolate_nml_section',self.section) atime=self.atime ni=hwrf.namelist.NamelistInserter(self.conf,nml_section) with open(nml_file,'rt') as nf: with open(filename,'wt') as of: of.write(ni.parse(nf,logger=logger,source=nml_file, raise_all=True,atime=atime,**invars)) def grab_input(self): """!Grab input files""" logger=self.log() p=self._enkfanl make_symlink(p.location,'hwrf_enkf_anl',logger=logger,force=True) p=self._wrfinput_d01 deliver_file(p.location,'wrfinput_d01',logger=logger,keep=True) p=self._wrfanl_d02 deliver_file(p.location,'wrfanl_d02',logger=logger,keep=True) def check_storm_radius(self): """!If no relocate was given, gets the storm radius file from a fix file. Also checks to see if the storm_radius file is present and non-empty, regardless of whether it came from the fix or relocate.""" logger=self.log() if self._input_storm_radius is None: storm_radius=os.path.join(self.getdir('FIXhwrf'), 'hwrf_storm_radius') logger.warning( 'Could not get storm_radius from the relocate jobs.') logger.warning( 'Will use the fix file $FIXhwrf/hwrf_storm_radius instead.') make_symlink(storm_radius,'storm_radius',force=True,logger=logger) else: p=self._input_storm_radius make_symlink(p.location,'storm_radius',force=True,logger=logger) if not isnonempty('storm_radius'): msg='storm_radius file is missing' logger.error(msg) raise StormRadiusError(msg) def write_vitals(self,filename='tcvitals.as',logger=None): """!Writes the tcvitals (from self.storminfo) to the specified file. @param filename the file to write @param logger a logging.Logger for log messages""" if logger is None: logger=self.log() logger.info('Writing tcvitals to %s'%(repr(filename),)) with open(filename,'wt') as f: f.write(self.storminfo.as_tcvitals()+"\n") assert(os.path.exists(filename)) def run_intrp(self,domain): """Runs interpolation to merge HWRF EnKF analysis to GDAS EnKF analysis""" logger=self.log() cmd = (mpirun(mpi(self.getexe('hwrf_interpolate')),allranks=True) \ < 'hwrf_interpolate.nml') cmd = cmd > 'stdout_interpolate' logger.info('Starting hwrf_interpolate for domain:%s' %(domain)) try: checkrun(cmd,logger=logger) except Exception as e: logger.critical('hwrf_interpolate failed') raise def make_products(self): """!prepares products for deliver_products() and products() Generates produtil.datastore.FileProduct objects for the files that should be delivered in deliver_products() and listed by products().""" self._prod_wrfinput=FileProduct(self.dstore,'wrfinput_d01', self.taskname,location=os.path.join(self.outdir,'wrfinput_d01')) self._prod_wrfanl_d02=FileProduct(self.dstore,'wrfanl_d02', self.taskname,location=os.path.join(self.outdir,'wrfanl_d02')) def deliver_products(self): """!delivers products that were identified by make_products()""" produtil.fileop.makedirs(os.path.dirname( self._prod_wrfinput.location)) self._prod_wrfinput.deliver(frominfo='wrfinput_d01',keep=False) self._prod_wrfanl_d02.deliver(frominfo='wrfanl_d02',keep=False) def products(self): """!iterates over all products Iterates over all of this Task's products (produtil.datastore.FileProduct objects) created by make_products()""" yield self._prod_wrfinput yield self._prod_wrfanl_d02 def run(self): """!Merge EnKF analysis and deliver the results""" logger=self.log() dirname=self.workdir logger.info('Run hwrf_interpolate in directory %s'%(dirname,)) if os.path.exists(dirname): logger.info('Delete old data in %s'%(dirname,)) shutil.rmtree(dirname) with NamedDir(dirname,keep=not self.scrub): self.grab_input() self.check_storm_radius() self.write_vitals('tcvitals.as') for domain in self.sim: self.make_namelist(domain) self.run_intrp(domain) self.deliver_products() def wrfanl_at_time(self,atime,domain): """!Returns the wrfanl output product for the specified domain and time or None if no such data is available. @param atime the time of interest @param domain the domain of interest""" return self.get_wrfanl(domain) def get_wrfanl(self,domain): """!Returns the wrfanl product for the specified domain or None if no such data is available. @param domain the domain of interest""" if domain is None: return None if not domain in self.sim: return None if domain.__dict__['name']==self.domains[1].__dict__['name']: return self._prod_wrfanl_d02 def wrfinput_at_time(self,atime,domain): """!Returns a Product object for the wrfinput output file for the specified domain if the atime matches this object's self.sim.simstart() @param atime the time of interest @param domain the domain of interest""" if domain and domain.__dict__['name']!=self.sim.get_moad().__dict__['name']: return None if atime is not None and \ not within_dt_epsilon(atime,self.sim.simstart(), self.dt_epsilon): self.log().info( 'wrfinput_at_time: atime=%s is not near my time %s' %(atime.strftime('%Y%m%d%H'),domain.strftime('%Y%m%d%H'))) return None return self.get_wrfinput(domain) def get_wrfinput(self,domain=None): """!Returns a Product object for the wrfinput output file. If a domain is specified, and is not the correct MOAD, then None is returned. @param domain the domain of interest""" if domain is not None and domain.__dict__['name']!=self.domains[0].__dict__['name']: return None return self._prod_wrfinput ####################################################################### def write_ensda_flag_file(flag_file,run_mode,flag_name): """!Writes the stormX.run_ensda* flag file. Writs the storm*.run_ensda flag file for this cycle. The purpose of the file is to tell the workflow and scripting layers whether the ensemble needs to be run, whether it is a cycled ensemble, and whether the relocation need to be run for the ensemble The file will contain lines: RUN_ENSDA=YES or RUN_ENSDA=NO RUN_CYCLED_ENSDA=YES or RUN_CYCLED_ENSDA=NO RUN_ENSDA_RELOCATE=YES or RUN_ENSDA_RELOCATE=NO Will also log a message to the jlogfile at INFO level telling which was written. @param flag_file the full path to the flag file @param run_mode True or False: should the ENSDA or relocation job be run? @param flag_name the name of the flag 'RUN_ENSDA' or 'RUN_ENSDA_RELOCATE'""" log_name=flag_name.strip('RUN_') if run_mode: with open(flag_file,'at') as f: f.write(flag_name+'=YES\n') else: # None, False, anything else: with open(flag_file,'at') as f: f.write(flag_name+'=NO\n') def read_ensda_flag_file(flag_file,whichflag): """!Reads the stormX.run_ensda or stormX.run_ensda_relocate* flag file This function is used by the scripting and workflow layers to determine if the data assimilation ensemble should be run, if cycled data assimilation ensemble should be run, and if relocation should be run for the ensemble. Reads the storm*.run_ensda* flag file line by line, searching for a single line RUN_ENSDA=YES/NO or RUN_CYCLED_ENSDA=YES/NO or RUN_ENSDA_RELOCATE=YES/NO. Returns True for YES or False for No, based on the last such line seen. Returns None otherwise.""" run_ensda=False if isnonempty(flag_file): with open(flag_file,'rt') as f: for line in f: if line.find(whichflag+'=YES')>=0: run_ensda=True return run_ensda ######################################################################## class Storm1Ensda(hwrf.hwrftask.HWRFTask): """!Tells HWRF to run the ENSDA for storm 1, but not any other storms.""" def __init__(self,dstore,conf,section,**kwargs): """!Storm1Ensda constructor. Create a new Storm1Ensda which will instruct HWRF to run ensda only for a specific storm priority, by default storm 1. This is intended only for testing purposes. @param dstore the produtil.datastore.Datastore database object @param conf the hwrf.config.HWRFConfig with configuration info @param section the section to use within conf @param kwargs passed to the superclass constructor """ super(Storm1Ensda,self).__init__(dstore,conf,section,**kwargs) self.__ensda_flag_file=self.confstr('tdr_flag_file') self.__ensda_flag_file2=self.confstr('tdr_flag_file2') def should_run_ensda(self): """!Should ENSDA be run? Returns true if the storm is priority 1, and false otherwise.""" storm_num=self.conf.get('config','storm_num') storm_num=int(storm_num) return storm_num==1 def write_flag_file(self,run_ensda): """!Write the ensda flag file. Calls hwrf.ensda.write_ensda_flag_file to write the flag file. @param run_ensda True means the ensemble should be run, False if it should not be run.""" write_ensda_flag_file(self.__ensda_flag_file,run_ensda) write_ensda_flag_file(self.__ensda_flag_file2,run_ensda) #ensda_ecflow_comms(self.conf,self.log(),run_ensda) if run_ensda: set_ecflow_event('Ensda',self.log()) def run(self): """!creates the storm*.run_ensda file Creates the storm1.run_ensda flag file with RUN_ENSDA=YES for storm 1, and RUN_ENSDA=NO otherwise.""" self.write_flag_file(self.should_run_ensda()) ######################################################################## class GFSEnKFInputCheck(hwrf.hwrftask.HWRFTask): """!Determines if GFS EnKF analysis data are available. This class checks to see if GFS EnKF analysis files are available. This is the condition used to decide whether to run the DA ensemble in HWRF.""" def __init__(self,dstore,conf,section,cycle_rel,**kwargs): """!GFSEnKFInputCheck constructor. Create a new GFSEnKFInputCheck which will look for GFS EnKF analysis files for specified cycle. @param dstore the produtil.datastore.Datastore database object @param conf the hwrf.config.HWRFConfig with configuration info @param section the section to use within conf @param kwargs passed to the superclass constructor""" super(GFSEnKFInputCheck,self).__init__(dstore,conf,section,**kwargs) self.atime=to_datetime(self.conf.cycle) self.ftime=to_datetime_rel(cycle_rel,self.conf.cycle) incat_name=self.confstr('catalog') self.inputs=hwrf.input.DataCatalog( self.conf,incat_name,self.atime) self.enkf_dataset=self.confstr('dataset','enkf') self.enkf_anl_item=self.confstr('anl_item','enkf_siganl') self.enkf_item=self.confstr('item','enkf_sfg') def run(self): """!Check if GFS EnKF analysis data are available Checks the on-disk input data to see if the GFS EnKF data are available.""" logger=self.log() ens_size=self.confint('hwrf_da_ens','ensda_size',40) gfsenkfthere=True for imem in range(40): imemb=imem+1 there=self.inputs.locate(self.enkf_dataset,self.enkf_anl_item, atime=self.atime,logger=logger, enkfmem=imemb,ftime=self.ftime) if not isnonempty(there): logger.warning(there+': does not exist') gfsenkfthere=False break there=self.inputs.locate(self.enkf_dataset,self.enkf_item, atime=self.atime,logger=logger, enkfmem=imemb,ftime=self.ftime) if not isnonempty(there): logger.warning(there+': does not exist') gfsenkfthere=False break return gfsenkfthere ######################################################################## class CycleTDRCheck(hwrf.hwrftask.HWRFTask): """!Determines if Tail Doppler Radar (TDR) data is available. This class checks to see if a specified cycle has Tail Doppler Radar data available. This is the condition used to decide whether to run the DA ensemble in the 2015 Operational HWRF.""" def __init__(self,dstore,conf,section,cycle_rel,**kwargs): """!CycleTDRCheck constructor. Create a new CycleTDRCheck which will look for TDR for the specified cycle. The cycle_rel is anything accepted by to_datetime_rel's second argument. @param dstore the produtil.datastore.Datastore database object @param conf the hwrf.config.HWRFConfig with configuration info @param section the section to use within conf @param cycle_rel specifies the cycle. This must be a number of hours relative to the current cycle (conf.cycle) analysis time. For example, -6*3600 would be the prior cycle and 48*3600 would be two days from now. @param kwargs passed to the superclass constructor""" super(CycleTDRCheck,self).__init__(dstore,conf,section,**kwargs) incat_name=self.confstr('catalog') self.__ensda_flag_file=self.confstr('tdr_flag_file') self.__ensda_flag_file2=self.confstr('tdr_flag_file2') self.__run_ensda=None self.tgtcycle=to_datetime_rel(cycle_rel,conf.cycle) self.__in_catalog=hwrf.input.DataCatalog( self.conf,incat_name,self.tgtcycle) self.dataset=self.confstr('dataset','tdr') self.item=self.confstr('item','gdas1_bufr') self.obstype=self.confstr('obstype','tldplr') self.__tdrdict=dict(self.taskvars,dataset=self.dataset,item=self.item, obstype=self.obstype,atime=self.tgtcycle,ftime=self.tgtcycle, optional=True) self._dirname=self.workdir self.ensda_opt=self.confint('ensda_opt',0) yyyy=self.tgtcycle.strftime('%Y%m%d%H')[0:4] basin=self.storminfo.pubbasin2 if int(yyyy) <= 2013: self.stormid='%02d' % (self.storminfo.stnum) elif basin.upper()=='AL': self.stormid='%s%02d' % ('1',self.storminfo.stnum) elif basin.upper()=='EP': self.stormid='%s%02d' % ('2',self.storminfo.stnum) elif basin.upper()=='CP': self.stormid='%s%02d' % ('3',self.storminfo.stnum) else: self.stormid='999' ##@var tgtcycle # the cycle for whom TDR data is checked def should_run_ensda(self): """!Should ENSDA be run? If self.run was called in this process, returns the cached result of that. Otherwise, reads the run_ensda flag file from COM. Uses hwrf.ensda.read_ensda_flag_file()""" if self.__run_ensda is None: self.__run_ensda=read_ensda_flag_file(self.__ensda_flag_file, 'RUN_ENSDA') return self.__run_ensda def inputiter(self): """!Iterates over needed files from upstream workflows. This iterator's purpose is to provide "all files external to this workflow" that are needed by the task. In this case, of course, it is only the TDR bufr_d file. Hence, there is a single "yield" statement that requests the TDR.""" yield self.__tdrdict def run(self): """!creates the storm*.run_ensda file Creates the storm1.run_ensda flag file with RUN_ENSDA=YES if the TDR data is available, and RUN_ENSDA=NO otherwise.""" logger=self.log() try: logger.info('Run ensda_pre in directory %s'%(self.workdir,)) if os.path.exists(self.workdir): logger.info('Delete old data in %s'%(self.workdir,)) shutil.rmtree(self.workdir) with NamedDir(self.workdir,keep=not self.scrub): (run_ensda, run_ensda_cycled, switch_storm) = self._actually_run() except Exception as e: logger.warning('unhandled exception running ' 'self._actually_run() '+str(e),exc_info=True) raise finally: self.write_flag_file(run_ensda,run_ensda_cycled) def write_flag_file(self,run_ensda,run_cycled_ensda): """!Write the ensda flag file. Calls hwrf.ensda.write_ensda_flag_file to write the flag file. @param run_ensda True means the ensemble should be run, False if it should not be run.""" write_ensda_flag_file(self.__ensda_flag_file,run_ensda,'RUN_ENSDA') write_ensda_flag_file(self.__ensda_flag_file,run_cycled_ensda,'RUN_CYCLED_ENSDA') write_ensda_flag_file(self.__ensda_flag_file2,run_ensda,'RUN_ENSDA') write_ensda_flag_file(self.__ensda_flag_file2,run_cycled_ensda,'RUN_CYCLED_ENSDA') #ensda_ecflow_comms(self.conf,self.log(),run_ensda) if run_ensda: set_ecflow_event('Ensda',self.log()) def tdr_this_cycle(self): """!Check if TDR data is available for this cycle Checks the on-disk input data to see if the TDR data is available.""" logger=self.log() atime=to_datetime(self.conf.cycle) self._in_catalog=hwrf.input.DataCatalog( self.conf,self.confstr('catalog'),atime) item=self.conf.get('tdr_new_obstype','item') ds=os.path.join(self.getdir('intercom'),'bufrprep') it=self._in_catalog.parse(item,atime=atime, logger=logger,obstype='tldplr') there=os.path.join(ds,it) logger.info('TDR bufrprep could be at %s'%there) if isnonempty(there): logger.info('Find TDR data for this storm') return (True, True) else: ymdh=atime.strftime('%Y%m%d%H') if not self.realtime: self.thiscycletdrdict=dict(self.taskvars,dataset=self.dataset,item=self.item, obstype=self.obstype,atime=atime,ftime=atime, optional=True) there_it_is=self.__in_catalog.locate(**self.thiscycletdrdict) if there_it_is is None: logger.error( 'Configuration error: DataCatalog %s does not know how ' 'to find TDR data.' %(repr(ic),)) return (False, False) elif not isnonempty(there_it_is): logger.warning( '%s: %s Tail Doppler Radar bufr_d file is empty or ' 'non-existant.' %(there_it_is,self.tgtcycle.strftime('%Y%m%d%H'))) return(False, False) else: make_symlink(there_it_is,'tldplrbufr',force=True,logger=logger) self.readensdatrigger(self.stormid,ymdh) if isnonempty('runensda'): logger.info('Find TDR data for this storm.' 'The data may be a trigger file or a small dump') return (True, True) else: logger.info('Find TDR data for a different storm') return (True, False) else: input_catalog=self.conf.get('config','fcst_catalog') dcomenv=os.path.join(os.environ.get('DCOMROOT','/dcom'),'prod') dcom=self.conf.get(input_catalog,'dcom',dcomenv) tdrpresent=False if os.path.isdir(dcom): btime=to_datetime_rel(-24*3600,atime) tank1=os.path.join(dcom,atime.strftime("%Y%m%d"),'b006/xx070') tank2=os.path.join(dcom,btime.strftime("%Y%m%d"),'b006/xx070') logger.info('Locations: tank1 at %s tank2 at %s'%(tank1,tank2)) if isnonempty(tank1) or isnonempty(tank2): run(batchexe('%s/dumpjb'%(os.environ.get('USHobsproc_dump','/you/forgot/to/set/USHobsproc_dump'),)) \ [atime.strftime("%Y%m%d%H"), '3.00', 'tldplr'] \ .env(TANK=dcom, \ DATA=self._dirname)) tdrbufr='tldplr.ibm' if isnonempty(tdrbufr): tdrpresent=True make_symlink(tdrbufr,'tldplrbufr',force=True,logger=logger) self.readensdatrigger(self.stormid,ymdh) else: tdrpresent=False else: tdrpresent=False else: logger.warning('%s does not exist. This is not wcoss.' 'real-time ensda trigger can only be run on wcoss'%dcom) return (False, False) if isnonempty('runensda'): return (tdrpresent, True) else: return (tdrpresent, False) def _actually_run(self): """!Search on disk for TDR or a trigger file. Do not call this routine directly; it is an internal implementation routine. This routine contains the code that actually determines if TDR is present or not. The "run" routine calls _actually_run in a try-finally block to ensure the run_ensda flag file is created no matter what. This function returns a tuple (tdrpresent, is4thisstorm)""" logger=self.log() input_catalog=self.conf.get('config','fcst_catalog') if self.realtime and not self.conf.has_section(input_catalog): logger.error('This is a realtime run. ' 'Forecast catalog section [%s] does not exist. ' %(input_catalog,)) raise hwrf.exceptions.UnexpectedFailureTest() (tdrpresent_this_cycle, is4thisstorm_this_cycle)=self.tdr_this_cycle() if tdrpresent_this_cycle: tdrpresent=tdrpresent_this_cycle is4thisstorm=is4thisstorm_this_cycle else: if self.realtime: dcomenv=os.path.join(os.environ.get( 'DCOMROOT','/you/forgot/to/set/DCOMROOT'),'prod') dcom=self.conf.get(input_catalog,'dcom',dcomenv) if os.path.isdir(dcom): realtimeonwcoss=True else: realtimeonwcoss=False else: realtimeonwcoss=False (tdrpresent, is4thisstorm)=self.read_trigger_file(realtimeonwcoss) if is4thisstorm: logger.info('There will be P3 TDR data for cycle %s!' %self.tgtcycle.strftime('%Y%m%d%H')) switch_storm=False if self.ensda_opt > 0: (run_ensda_prior, run_ensda_cycled_prior)=self.readpriorensda() if not run_ensda_cycled_prior: if is4thisstorm: if not run_ensda_prior: run_ensda=True run_ensda_cycled=False logger.info('P3 TDR data found for this storm.' ' Enabling cycled ENSDA from cycle %s!' %self.conf.cycle.strftime('%Y%m%d%H')) self.write_cycled_ensda_alert_file(1) else: run_ensda=True run_ensda_cycled=True logger.info('P3 TDR data found for this storm.' ' Will continue cycled ENSDA from cycle %s!' %self.conf.cycle.strftime('%Y%m%d%H')) else: if not run_ensda_prior: run_ensda=False run_ensda_cycled=False logger.info('No P3 TDR data found for this storm, ' ' Will continue without ENSDA.') else: run_ensda=True run_ensda_cycled=True logger.info('No P3 TDR data found for this storm. ' 'ENSDA has been enabled in previouse cycle.' ' Will continue cycled ENSDA from cycle %s!' %self.conf.cycle.strftime('%Y%m%d%H')) else: if is4thisstorm: run_ensda=True run_ensda_cycled=True logger.info('P3 TDR data found for this storm.' ' Will continue cycled ENSDA.') elif tdrpresent: if isnonempty('P3TDR'): run_ensda=False run_ensda_cycled=False switch_storm=True logger.info('Find P3 TDR data for a different storm,' ' Will discontinue cycled ENSDA from cycle' ' %s!'%self.conf.cycle.strftime('%Y%m%d%H')) self.write_cycled_ensda_alert_file(2) else: run_ensda=True run_ensda_cycled=True logger.info('Find G-IV TDR data. ' ' Will continue cycled ENSDA for this' ' storm.') else: run_ensda=True run_ensda_cycled=True logger.info('No TDR data found.' ' Will continue cycled ENSDA.') else: run_ensda=is4thisstorm run_ensda_cycled=False if is4thisstorm: logger.info('TDR data found for this storm. Enabling ENSDA.') else: logger.warning( 'Tail Doppler Radar bufr_d file is empty or ' 'non-existant. Will continue without ENSDA.') return (run_ensda, run_ensda_cycled, switch_storm) def read_trigger_file(self,realtime): """!Read TDR trigger file for operational run Reads the TDR trigger file sent by the Aircraft Operations Center (AOC) before a NOAA P3 Orion flight. This is used in NCEP Operations when running the operational HWRF, to determine if TDR is going to be available soon.""" logger=self.log() atime=to_datetime(self.tgtcycle) ymdh=atime.strftime('%Y%m%d%H') tdrpresent=False if not realtime: ic=self.__in_catalog there_it_is=ic.locate(**self.__tdrdict) if there_it_is is None: logger.error( 'Configuration error: DataCatalog %s does not know how ' 'to find TDR data. Will continue without ENSDA.' %(repr(ic),)) return (False, False) elif not isnonempty(there_it_is): logger.warning( '%s: %s Tail Doppler Radar bufr_d file is empty or ' 'non-existant.' %(there_it_is,self.tgtcycle.strftime('%Y%m%d%H'))) return (False, False) else: tdrpresent=True logger.info('%s: TDR data found for cycle %s!' %(there_it_is,self.tgtcycle.strftime('%Y%m%d%H'))) make_symlink(there_it_is,'tldplrbufr',force=True,logger=logger) self.readensdatrigger(self.stormid,ymdh) else: input_catalog=self.conf.get('config','fcst_catalog') dcomenv=os.path.join(os.environ.get('DCOMROOT','/dcom'),'prod') dcom=self.conf.get(input_catalog,'dcom',dcomenv) if os.path.isdir(dcom): btime=to_datetime_rel(-24*3600,atime) tank1=os.path.join(dcom,atime.strftime("%Y%m%d"),'b006/xx070') tank2=os.path.join(dcom,btime.strftime("%Y%m%d"),'b006/xx070') logger.info('Locations: tank1 at %s tank2 at %s'%(tank1,tank2)) numtry=self.confint('numofcheck',1) timeinv=self.confint('checksecinv',300) stime=timeinv/2 n=1 self.numtired=0 while n<=numtry: if isnonempty(tank1) or isnonempty(tank2): run(batchexe('%s/dumpjb'%(os.environ.get('USHobsproc_dump','/you/forgot/to/set/USHobsproc_dump'),)) \ [atime.strftime("%Y%m%d%H"), '3.00', 'tldplr'] \ .env(TANK=dcom, \ DATA=self._dirname)) tdrbufr='tldplr.ibm' if isnonempty(tdrbufr): tdrpresent=True make_symlink(tdrbufr,'tldplrbufr',force=True,logger=logger) self.readensdatrigger(self.stormid,ymdh) else: tdrpresent=False else: tdrpresent=False if n= log produtil.run.checkrun(cmd,logger=logger) def readpriorensda(self): """ !Read RUN_CYCLED_ENSDA from prior cycle storm1.run_ensda """ logger=self.log() run_ensda=False run_ensda_cycled=False if os.path.isdir(self.icstr('{oldcom}')): flag_file=self.icstr('{oldcom}/run_ensda.{oldvit[stnum]:02d}{oldvit[basin1lc]}.{oldvit[YMDH]}') waited=wait_for_files( flag_file,logger,maxwait=15,sleeptime=10, min_size=1,min_mtime_age=5) if not waited: msg='%s: did not exist or was too small after 150 seconds'%( flag_file) logger.warning(msg) else: try: with open(flag_file,'rt') as f: for line in f: if line.find('RUN_ENSDA=YES')>=0: run_ensda=True if line.find('RUN_CYCLED_ENSDA=YES')>=0: run_ensda_cycled=True except Exception as e: logger.warning('unhandled exception reading ' + flag_file+str(e),exc_info=True) raise finally: logger.info('previouse cycle RUN_ENSDA='+str(run_ensda)+ ' RUN_CYCLED_ENSDA='+str(run_ensda_cycled)) else: logger.info(self.icstr('{oldcom}')+' does not exist.') return (run_ensda, run_ensda_cycled) def write_cycled_ensda_alert_file(self,alert_mode): """!Writs the alert_cycled_ensda file for this cycle. The purpose of the file is to tell a cron job to alert HWRF team that cycled ensemble will be launched or shut down @param alert_mode the alert mode - 1: launch, 2: shut down""" if alert_mode == 1: alert_msg='Find HWRF ensemble trigger for this storm, will start' else: alert_msg='Find HWRF ensemble trigger for a different storm, will end' content=self.conf.timestrinterp('config',"""ATTN HWRF team: \n""" """\n"""+""" """+alert_msg+""" cycled HWRF ensemble hybrid DA HWRF storm slot {storm_num} Storm = {vit[basinname]} Tropical System {vit[stormname]} {vit[stormid3]} R.S.M.C. = {vit[center]} Storm slot = hwrf{storm_num} Fcst. cycle = {ayear}-{amonth}-{aday} {aHH} Sincerely, HWRF """) alert_file=self.confstrinterp('{com}/{out_prefix}.alert_cycled_ensda') with open(alert_file,'at') as f: f.write(content) produtil.log.jlogger.info('Write '+alert_file+' for this cycle') ######################################################################## class AlwaysRunENSDA(CycleTDRCheck): """!Used in place of CycleTDRCheck to force ENSDA to always run. This subclass of CycleTDRCheck instructs ENSDA to run whether TDR is available or not.""" def should_run_ensda(self): """!Returns True. Always returns True, indicating that ENSDA should always be run even if the world is imploding and pigs are raining from clouds of cotton candy. @returns True""" return True def _actually_run(self): """!Runs the TDR check and ignores its results. Calls the superclass _actually_run, so that the TDR check is done, and then returns True regardless of the TDR availability. Logs a warning about this to the produtil.log.jlogger. @returns True""" (run_ensda, run_ensda_cycled, switch_storm)=super(AlwaysRunENSDA,self)._actually_run() if not run_ensda: msg="OVERRIDE: Will run ENSDA anyway due to "\ "configuration settings." self.log().warning(msg) jlogger.warning(msg) (run_ensda, run_ensda_cycled)=self.readpriorensda() return (run_ensda, run_ensda_cycled, switch_storm) def readpriorensda(self): """ !Read RUN_CYCLED_ENSDA from prior cycle storm1.run_ensda """ logger=self.log() run_ensda=True run_ensda_cycled=False if self.ensda_opt > 0: if os.path.isdir(self.icstr('{oldcom}')): flag_file=self.icstr('{oldcom}/run_ensda.{oldvit[stnum]:02d}{oldvit[basin1lc]}.{oldvit[YMDH]}') waited=wait_for_files( flag_file,logger,maxwait=15,sleeptime=10, min_size=1,min_mtime_age=5) if not waited: msg='%s: did not exist or was too small after 150 seconds'%( flag_file) logger.warning(msg) else: try: with open(flag_file,'rt') as f: for line in f: if line.find('RUN_ENSDA=YES')>=0: run_ensda_cycled=True except Exception as e: logger.warning('unhandled exception reading ' + flag_file+str(e),exc_info=True) raise finally: logger.info('previouse cycle RUN_ENSDA=' +str(run_ensda_cycled)) else: logger.info(self.icstr('{oldcom}')+' does not exist.') return (run_ensda, run_ensda_cycled) ######################################################################## class CheckVmaxEnsda(CycleTDRCheck): """This subclass of CycleTDRCheck instructs ENSDA to run when Vmax exceeds certain threshold, whether TDR is available or not.""" def _actually_run(self): """!Runs the TDR check and ignores its results. Calls the superclass _actually_run, so that the TDR check is done, and then returns True regardless of the TDR availability. Logs a warning about this to the produtil.log.jlogger. @returns True""" (run_ensda, run_ensda_cycled, switch_storm)=super(CheckVmaxEnsda,self)._actually_run() vmaxthreshold=self.conffloat('run_ensda_wmax',17.0) if self.storminfo.wmax > vmaxthreshold and not switch_storm: run_ensda=True msg="Will run ENSDA due to Vmax "\ "exceeding threshold %s m/s" %(vmaxthreshold) self.log().warning(msg) jlogger.warning(msg) if self.ensda_opt > 0: run_ensda_cycled=True msg="Will run ENSDA due to Vmax "\ "exceeding threshold %s m/s" %(vmaxthreshold) self.log().warning(msg) jlogger.warning(msg) return (run_ensda, run_ensda_cycled, switch_storm) ######################################################################## def enada_pre_object_for(ds,conf,section,next_cycle): """!Generates a CycleTDRCheck or AlwaysRunENSDA based on configuration settings. Reads the [config] section ensda_when option to decide what TDR check class should be used. * ensda_when="tdr_next_cycle" - create a CycleTDRCheck * ensda_when="always" - create an AlwaysRunENSDA * ensda_when=anything else - raise an exception @param ds,conf,section,next_cycle - passed to the constructor""" ensda_when=conf.getstr('config','ensda_when','tdr_next_cycle').lower() if ensda_when=='tdr_next_cycle': return hwrf.ensda.CycleTDRCheck( ds,conf,'ensda_pre',next_cycle) elif ensda_when=='storm1': return hwrf.ensda.Storm1Ensda(ds,conf,'ensda_pre') elif ensda_when=='always': return hwrf.ensda.AlwaysRunENSDA( ds,conf,'ensda_pre',next_cycle) elif ensda_when=='tdr_and_wmax': return hwrf.ensda.CheckVmaxEnsda( ds,conf,'ensda_pre',next_cycle) else: raise ValueError('The ensda_when option must be set to tdr_next_cycle or always (case-insensitive) not %s.'%(repr(ensda_when),)) ######################################################################## class EnsRelocateCheck(hwrf.hwrftask.HWRFTask): """!Determines if relocation for ensemble should be run This class checks if TCvital Vmax is greater than 14 m/s and if the ensemble atcf have expected forecast hours (6 hour forecast track for 3D application).""" def __init__(self,dstore,conf,section,atime=None,ensda=None,**kwargs): """ EnsRelocateCheck constructor @param dstore the produtil.datastore.Datastore database to use @param conf the hwrf.config.HWRFConfig that provides configuration data @param section the section in conf to use @param atime the analysis time @param ensda the hwrf.ensda.DAEnsemble that provides ensemble atcf files""" super(EnsRelocateCheck,self).__init__(dstore,conf,section,**kwargs) self.__ens_rlct_flag_file=self.confstr('ens_rlct_flag_file') self.ensda=ensda if atime is None: atime=conf.cycle self.atime=to_datetime(atime) def check_ensda_atcf(self,atime,ensda): """!Check if the atcf of ensemble members have 6 hour forecast track @atime analysis time @param ensda the hwrf.ensda.DAEnsemble that provides the atcf files""" logger=self.log() logger.info('in check_ensda_atcf') plist=list() for (ens,member) in self.ensda.members_at_time(atime): iens=int(ens) prod=member.get_track(atime) size=os.path.getsize(prod.location) plist.append((size,prod.location)) plist.sort() run_ens_relocate=True for _, prod in plist: cmd='grep "HWRF, 006," %s' %prod f6hr=subprocess.getoutput(cmd) if not f6hr: logger.info('6hr forecast not in atcf') run_ens_relocate=False break return run_ens_relocate def run(self): """!create the storm*.run_ens_relocate file Creates the storm1.run_ens_relocate file with RUN_ENS_RELOCATE=YES, if TCvital vmax is greater than 14 m/s and ensemble atcf files have expected forecast hours""" logger=self.log() run_ens_relocate=self.confbool('run_ens_relocation',False,section='config') try: if self.storminfo.wmax < self.conffloat('intensitymin',14.0): logger.info('wmax is smaller than 14 m/s') run_ens_relocate=False if not self.check_ensda_atcf(self.atime,self.ensda): run_ens_relocate=False finally: self.write_flag_file(run_ens_relocate) def write_flag_file(self,run_ens_relocate): """!Write the ensemble relocate flag file. Calls hwrf.ensda.write_ensda_flag_file to write the flag file. @param run_ens_relocate True means relocation for the ensemble should be run, False if it should not be run.""" write_ensda_flag_file(self.__ens_rlct_flag_file,run_ens_relocate, 'RUN_ENSDA_RELOCATE') ######################################################################## def unset_ensdastatus(conf,flag_file,logger=None): """!Delete the enkf status file. Deletes all EnKF status files, whose paths are determined from the given config object. If the logger is not specified, the enkfstatus subdomain of the conf default logging domain is used. @param conf the hwrf.config.HWRFConfig object @param logger Optional: a logging.Logger for logging.""" if logger is None: logger=conf.log('ensdastatus') if isnonempty(flag_file): produtil.fileop.remove_file(flag_file,info=True,logger=logger) def reset_ensda_flag_file(conf,flag_file,run_ensda,run_cycled_ensda,logger=None): """!reset ensda flag file due to job failure. The flag file is used to fall back to the run without ensda. @param conf the hwrf.config.HWRFConfig object @param flag_file name of flag file @param run_ensda True means run ensemble forecast @param run_cycled_ensda True means run cycle ensda @param logger Optional: a logging.Logger for logging.""" if logger is None: logger=conf.log('ensdastatus') flag_tmp=os.path.join(conf.getdir('WORKhwrf'),flag_file) write_ensda_flag_file(flag_tmp,run_ensda,'RUN_ENSDA') write_ensda_flag_file(flag_tmp,run_cycled_ensda,'RUN_CYCLED_ENSDA') there=conf.getstr('ensda_pre','tdr_flag_file') produtil.fileop.deliver_file(flag_tmp,there,logger=logger,keep=True) there=conf.getstr('ensda_pre','tdr_flag_file2') produtil.fileop.deliver_file(flag_tmp,there,logger=logger,keep=True) def reset_ensda_relocate_flag_file(conf,flag_file,run_ens_relocate,logger=None): """!reset ensda flag file due to job failure. The flag file is used to fall back to the run without ensda. @param conf the hwrf.config.HWRFConfig object @param flag_file name of flag file @param run_ens_relocate True means run relocate for ensemble @param logger Optional: a logging.Logger for logging.""" if logger is None: logger=conf.log('ensda_relocate_status') flag_tmp=os.path.join(conf.getdir('WORKhwrf'),flag_file) write_ensda_flag_file(flag_tmp,run_ens_relocate,'RUN_ENSDA_RELOCATE') there=conf.getstr('ensda_relocate_pre','ens_rlct_flag_file') produtil.fileop.deliver_file(flag_tmp,there,logger=logger,keep=True)