"""!Contains Bufrprep, which converts data tanks to bufr files, and otherwise preprocesses the prepbufr and bufr files.""" ##@var __all__ # List of symbols to export by "from hwrf.bufrprep import *" __all__ = ['Bufrprep'] import collections, datetime, numpy, os, shutil, tarfile, string, re import contextlib import hwrf.hwrftask, hwrf.exceptions, hwrf.namelist import produtil.fileop, produtil.cd, produtil.cluster, produtil.log import produtil.config from produtil.ecflow import set_ecflow_event from produtil.cd import NamedDir from produtil.fileop import make_symlink, deliver_file, isnonempty, \ fortlink from produtil.run import run, batchexe from hwrf.hwrftask import HWRFTask from hwrf.numerics import to_datetime, to_datetime_rel from hwrf.exceptions import GSIInputError from contextlib import closing #---- class FormatSonde(object): """ DESCRIPTION: This is the base-class method to format TEMP-DROP messages (observations) in accordance with the expectations of the tempdrop_sonde executable. INPUT VARIABLES: * opts_obj; a Python object containing the user command line options. """ def __init__(self,atime,logger,datapath): """ DESCRIPTION: Creates a new FormatSonde object. """ self.cycle=atime.strftime('%Y%m%d%H') self.datapath=datapath self.logger=logger self.dateobj=datetime.datetime.strptime(self.cycle,'%Y%m%d%H') self.srchstrs=['UZNT','UZPN','UZPA'] self.flag_list=['CCA'] self.max_offset_seconds=int(2*84600) self.tempdrop_list=list() def check_timestamp(self,file_timestamp,timestamps): """ DESCRIPTION: This method checks the validity of the timestamp used to assign filenames; if the difference between the date in the timestamp and the date of the forecast cycle is greater than 1-day, the filename timestamp is reset; in addition, if the timestamp assigned to the filename does not make sense, it is assigned to one that does. INPUT VARIABLES: * fts; a Python integer containing the timestamp (assuming the UNIX format %Y%m%d%H%M) to be assigned to the file name. * timestamps; a Python list of timestamps for which the current processing is occurring. OUTPUT VARIABLES: * fts; a Python integer containing the (updated) timestamp (assuming the UNIX format %Y%m%d%H%M) to be assigned to the file name. """ fts=file_timestamp try: timestamp_obj=datetime.datetime.strptime(file_timestamp,'%Y%m%d%H%M') offset_seconds=numpy.abs((self.dateobj-timestamp_obj).seconds) if offset_seconds>self.max_offset_seconds: yyyymmdd=sorted(timestamps)[0][0:8] hhmm=file_timestamp[-4::] fts='%s%s'%(yyyymmdd,hhmm) except ValueError: hhmm=file_timestamp[-4::] for timestamp in timestamps: yyyymmdd=timestamp[0:8] fts='%s%s'%(yyyymmdd,hhmm) try: timestamp_obj=datetime.datetime.strptime(fts,'%Y%m%d%H%M') break except: pass return fts def collect_sondes(self,data): """ DESCRIPTION: This method loops through each key within an input Python dictionary containing all observations (data) and returns character strings denote the locations of observations to be formatted. INPUT VARIABLES: * data; a Python dictionary containing key and value pairs for all observations within the respective files to be processed. OUTPUT VARIABLES: * outinfostrs; a Python dictionary containing key (timestamp) and value (TEMP-DROP sonde observation headers) pairs. """ infostrs=dict() for key in sorted(data.keys()): infostrs[key]=list() for srchstr in self.srchstrs: for item in data[key]: if srchstr in item: infostrs[key].append(item.strip()) kwargs={'infostrs':infostrs[key],'data':data[key]} infostrs[key]=self.get_obsinfo(infostrs[key],data[key]) flag_infostrs=list() for item in list(infostrs[key].keys()): for flag_item in self.flag_list: if flag_item in item: flag_infostrs.append(item) msg=('Found the following flagged message headers for %s:\n %s\n'%\ (key,flag_infostrs)) self.logger.info(msg=msg) rmvinfostrs=list() for key in sorted(data.keys()): infostr=infostrs[key] for item in flag_infostrs: for flag_item in self.flag_list: if flag_item in item: string=item.replace(flag_item,'').rstrip() for ifs in list(infostr.keys()): if string==ifs: msn1=infostr[ifs]['mission'] obs1=infostr[ifs]['obid'] msn2=infostr[string]['mission'] obs2=infostr[string]['obid'] if (msn1==msn2) and (obs1==obs2): rmvinfostrs.append(string) msg=('Removing the following unique message header(s):\n %s\n'%set(rmvinfostrs)) self.logger.info(msg=msg) outinfostrs=list() for key in sorted(data.keys()): outstrs=list() for item in list(infostrs[key].keys()): if item not in rmvinfostrs: outstrs.append(item) outinfostrs.append(item) infostrs[key]=outstrs msg=('The following %d TEMP-DROP sonde message headers will be processed:\n%s\n'%\ (len(outinfostrs),outinfostrs)) self.logger.info(msg=msg) return outinfostrs def createfilelist(self): """ DESCRIPTION: This method creates a formatted list of files in accordance with the expectations of the tempdrop_sonde executable. """ workpath=os.getcwd() filenames=os.listdir(workpath) with open(os.path.join(workpath,'filelist.tempdrop'),'w') as f: for item in filenames: if '.mod' in item: f.write('"%s"\n'%os.path.join(workpath,item)) self.tempdrop_list.append(os.path.join(workpath,item)) def find_sondefiles(self): """ DESCRIPTION: This method collects relevant sonde files to be processed; the relevancy is determined relative to the forecast cycle timestamp. """ offset_seconds=10800 time_dict={'year':{'cstrt':0,'cstp':4},'month':{'cstrt':4,'cstp':6},\ 'day':{'cstrt':6,'cstp':8},'hour':{'cstrt':8,'cstp':10},\ 'minute':{'cstrt':10,'cstp':12},'second':{'cstrt':12,'cstp':14}} datetime_kwargs=dict() time_key_list=['year','month','day','hour','minute','second'] cycle=datetime.datetime.strftime(self.dateobj,'%Y%m%d%H') for item in time_key_list: cstrt=time_dict[item]['cstrt'] cstp=time_dict[item]['cstp'] value=cycle[cstrt:cstp] if len(value)>0: datetime_kwargs[item]=int(value) else: datetime_kwargs[item]=0 timestamp=datetime.datetime(**datetime_kwargs) dtime=datetime.timedelta(seconds=offset_seconds) in_list=[(timestamp-dtime).strftime('%Y%m%d'),(timestamp+dtime).\ strftime('%Y%m%d')] timestamps=sorted(set(in_list)) filenames=os.listdir(self.datapath) filedict=dict() msg=('Cycle %s; searching for the following timestamps: %s'%(cycle,\ timestamps)) self.logger.info(msg=msg) self.tempdrop_list=list() for item in filenames: for timestamp in timestamps: if timestamp in item: filename=os.path.join(self.datapath,item) filedict[filename]=timestamp msg=('Found file %s for processing.'%filename) self.logger.info(msg=msg) break # filedict=collections.OrderedDict(sorted(filedict.items())) return filedict def formatsondes(self): """ DESCRIPTION: This method formats TEMP-DROP messages (observations) in accordance with the expectations of the tempdrop_sonde executable. """ srchstrs=['REL','SPG','SPL'] excldstrs=['62626','REL','SPG','SPL'] for infile in self.tempdrop_list: if os.path.exists(infile): with open(infile,'rb') as inf: data=inf.read().decode("utf-8", "ignore") outfile=('%s.mod'%infile) datan=list() data.replace('\r','') data=data.split('\n') data=[_f for _f in data if _f] for item in data: item=self.stripmeta(instr=item) datan.append(item) data=datan with open(outfile,'w') as outf: for item in data: if any(s in item for s in excldstrs): pass else: outf.write('%s\n'%item) outdata=list() for (i,item) in enumerate(data): for srchstr in srchstrs: if srchstr in item: try: nstr=data[i]+data[i+1] nstr=self.stripmeta(instr=nstr) indx=nstr.index(srchstr) sstr=nstr[indx:indx+23] sstr=self.stripmeta(instr=sstr) outf.write('%s\n'%sstr) except IndexError: pass def get_obsinfo(self,infostrs,data): """ DESCRIPTION: This method collects the observation and aircraft flight identifications and returns a Python dictionary (obsdict) corresponding to the observation header string. INPUT VARIABLES: * infostrs; a Python list of observation header strings. * data; a Python list of all observations collected by driver level of script. OUTPUT VARIABLES: * obsdict; a Python dictionary containing the observation header string and the corresponding observation and aircraft flight identifications. """ obsdict=dict() whitelist=['OB'] for infostr in infostrs: obsdict[infostr]=dict() lnidx=0 for (i,item) in enumerate(data): if infostr in item: lnidx=i break for (i,item) in enumerate(data[lnidx::]): if 'OB' in item: obsitem=item.split().index('OB') obsdict[infostr]['obid']=item.split()[obsitem+1] obsdict[infostr]['mission']=item.split()[1] break return obsdict def read_sondefiles(self,filedict): """ DESCRIPTION: This method collects all observations contained within the files to be processed (filedict keys) and returns a Python dictionary containing key (timestamp) and value (observations) pairs. INPUT VARIABLES: * filedict; a Python dictionary containing key (timestamp) and values (file paths) pairs. OUTPUT VARIABLES: * data; a Python dictionary containing key (timestamp) and values (observations collected from the respective input file) pairs. """ data=dict() for infile in list(filedict.keys()): msg=('Processing file %s.'%infile) self.logger.info(msg=msg) year=filedict[infile][0:4] month=filedict[infile][4:6] day=filedict[infile][6:8] lnidx=0 with open(infile,'rb') as f: infdata=f.read().decode("utf-8", "ignore") data[filedict[infile]]=infdata.split('\n') return data def sondedump(self): """ DESCRIPTION: This method collects all GPS dropsonde files and prepares them for further processing as follows: (1) Finds all relevant files containing observations; this script assumes that the observation files (in the user specified data path) are prefixed with a timestamp (assuming the UNIX format) as %Y%m%d. (2) Reads all relevant sonde files and compiles a Python list of observations within the respective files.' (3) Finds all TEMP-DROP sonde observations; this is done by identifying header strings throughout the respective file(s). (4) Loops though all collect TEMP-DROP sonde observation headers and prepares individual files for all identified TEMP-DROP observations. """ # Collect sonde files relevant for the current cycle. filedict=self.find_sondefiles() # Collect sonde file observations. data=self.read_sondefiles(filedict=filedict) # Collect all relevant sonde observations. infostrs=self.collect_sondes(data=data) # Create concatenated list of sonde observations. infodata=dict() for timestamp in sorted(data.keys()): infodata[timestamp]=list() for item in data[timestamp]: item=self.stripmeta(instr=item) infodata[timestamp].append(item) # Loop through all timestamps and prepare individual files for # each observation. for infostr in set(infostrs): infostr=self.stripmeta(instr=infostr) mission_id=infostr.split()[1] timestr=infostr.split()[2] timestamps=sorted(data.keys()) for timestamp in timestamps: year=timestamp[0:4] month=timestamp[4:6] timestrday=timestr[0:2] fts=('%s%s%s'%(year,month,timestr)) kwargs={'file_timestamp':fts,'timestamps':timestamps} value=self.check_timestamp(**kwargs) if value is not None: fts=value idxs=[idx for idx, e in enumerate(infodata[timestamp]) if infostr==e] for idx in idxs: lnidx=idx outfile=('%s.%s'%(fts,mission_id)) i=1 while os.path.isfile(outfile): if os.path.isfile(outfile): outfile=('%s.%s.%s'%(fts,mission_id,i)) i=i+1 tdkwargs={'outfile':outfile,'infostr':infostr,'data':\ infodata[timestamp],'lnidx':lnidx} self.write_tempdrop(**tdkwargs) self.tempdrop_list.append(outfile) def stripmeta(self,instr): """ DESCRIPTION: This method stripts meta-characters and carriage returns from an input string. INPUT VARIABLES: * instr; a Python string possibly containing meta-characters. OUTPUT VARIABLES: * outstr; a Python string stripped of meta-characters and carriage returns. """ for c in (string.ascii_lowercase+string.ascii_uppercase): chkstr='^%s'%c outstr=instr.replace(chkstr,'') instr=outstr outstr=outstr.replace('\r','') return outstr def write_tempdrop(self,outfile,infostr,data,lnidx): """ DESCRIPTION: This method parses a list of strings and writes a given TEMP-DROP message to a user specified file. INPUT VARIABLES: * outfile; a Python string specifying the path to the output file to contain the TEMP-DROP message. * infostr; a Python string specifying the TEMP-DROP message header (e.g., UZNT10 KNHC 250600). * infdata; a Python list of data strings to be parsed using the infostr information. """ strtmsg=lnidx for (i,item) in enumerate(data[strtmsg::],1): if ';' in item: lnidx=i break if not item.strip(): lnidx=i break endmsg=strtmsg+lnidx msg=('Writing TEMP-DROP message to %s; data block: [%s,%s]'%\ (outfile,strtmsg,endmsg)) self.logger.info(msg=msg) with open(outfile,'wt+') as f: for item in data[strtmsg:endmsg]: item=self.stripmeta(instr=item) f.write('%s\n'%item) def run(self): """ DESCRIPTION: This method performs the following tasks: (1) Collects observations from external files (possibly) containing TEMP-DROP messages. (2) Formats (any) TEMP-DROP messages in accordance with the expectations of the tempdrop_sonde executable. (3) Creates a formatted list of TEMP-DROP message files (e.g., observations) to be processed (and in accordance with the expectations of the) tempdrop_sonde executable. """ self.sondedump() self.formatsondes() self.createfilelist() return self.tempdrop_list #---- class Bufrprep(HWRFTask): """!This is a HWRF task that preprocesses observations in data tanks to create bufr files suitable for input to hwrf.gsi classes. It also does other preprocessing of bufr and prepbufr files.""" def __init__(self,dstore,conf,section,taskname=None,atime=None, **kwargs): """!Bufrprep constructor @param dstore passed to Datum: the Datastore object for this Task @param conf the conf object for this task (passed to HWRFTask) @param section the conf section for this task (passed to HWRFTask) @param taskname Optional: the taskname for this product in the datastore @param atime the analsysis time @param kwargs Additional keyword arguments are passed to the hwrf.hwrftask.HWRFTask.__init__ constructor""" super(Bufrprep,self).__init__(dstore,conf,section,taskname=taskname, **kwargs) taskname=self.taskname if atime is None: atime=conf.cycle self._atime=to_datetime(atime) self._dirname=self.workdir self._stormid='999' # Get the DataCatalog for our data grabbers: self._in_catalog=None incat_name=None if 'in_catalog' in kwargs: ink=kwargs['in_catalog'] if isinstance(ink,hwrf.input.DataCatalog): self._in_catalog=ink elif isinstance(ink,str): incat_name=ink elif ink is None: pass else: raise TypeError( 'In hwrf.bufrprep.Bufrprep.__init__, in_catalog must be None, ' 'a basestring or a DataCatalog. You provided an object ' 'of type %s, with value %s.' %(type(ink).__name__,repr(ink))) if self._in_catalog is None: if incat_name is None: incat_name=self.confstr('catalog') self._in_catalog=hwrf.input.DataCatalog( self.conf,incat_name,self._atime) self.conf2json=produtil.config.Conf2JSON() self.endmsg=0 def getstormid(self): """!The storm ID.""" logger=self.log() atime=self._atime.strftime('%Y%m%d%H') year=int(atime[0:4]) basin=self.storminfo.pubbasin2 if year <= 2013: self.stormid='%02d' % (self.storminfo.stnum) elif basin.upper()=='AL': self._stormid='%s%02d' % ('1',self.storminfo.stnum) elif basin.upper()=='EP': self._stormid='%s%02d' % ('2',self.storminfo.stnum) elif basin.upper()=='CP': self._stormid='%s%02d' % ('3',self.storminfo.stnum) else: self._stormid='999' logger.info('get input storm id %s'%(self._stormid)) return self._stormid def grab_bufr(self,atime=None,morevars=None): """!Links or copies all needed bufr files to the local directory. @param atime the analysis time @param morevars passed to hwrf.config.HWRFConfig.get() and similar routines to define string replacements""" olist=self.confstr('obstypes') touched=set() for osection in olist.split(','): trim=osection.strip() if len(trim)>0 and not trim in touched: self.grab_obstype_section(trim,atime=atime,morevars=morevars) def grab_obstype_section(self,section,atime=None,morevars=None): """!Copies or links observations specified in the obstype sections to the current working directory. The section listed in self.section should contain an obstype option, whose value is a comma separated list of section names. This method reads every section in that list. For each section, the option keys are the local directory filenames expected by GSI, while the values are the data type part of the operational filename (ie.: the satwind in gfs.t12z.tm00.satwind.bufr_d). There are a few special keys: * dataset - the name of the dataset for hwrf.input purposes * item - the name of the item for hwrf.input purposes * type - the type of observation: satellite, or anything else. At present, only "satellite" has any special meaning. If the type is "satellite" then the entire section will be skipped if sat_da=False in this task's config section. Once the section is parsed, the files are all linked to this directory. @param section Name of the section to read. @param atime Analysis time. @param morevars A dict for additional string replacements in the hwrf.config.HWRFConfig.get() family of functions.""" logger=self.log() if not isinstance(section,str): section=str(section) if atime is None: atime=self._atime else: atime=to_datetime_rel(atime,self._atime) dataset=self.conf.get(section,'dataset') item=self.conf.get(section,'item') otype=self.conf.get(section,'type').lower() logger.warning('process obs section %s with dataset=%s item=%s ' 'type=%s'%(section,dataset,item,otype)) obstypes=list() items=self.conf.items(section) otdict=dict( [ (v,k) for k,v in items ] ) namer=lambda f,t: otdict[t] for localname,obstype in items: if localname in ['dataset','item','type']: continue obstypes.append(obstype) for obstype in obstypes: logger.warning('Find obstype=%s in dataset=%s item=%s' %(obstype,dataset,item)) if not isinstance(obstype,str): raise TypeError( 'In bufrprep.Bufrprep.link_bufr, the obstypes parameter must ' 'be an iterable container of basestrings. One of the ' 'elements was a %s (value %s) instead.' %(type(obstype).__name__,repr(obstype))) there=self._in_catalog.locate(dataset,item,atime=atime, logger=logger,obstype=obstype) if there is None or there=='': msg='%s: Could not find a location for this obstype.'\ %(obstype,) logger.warning(msg) elif produtil.fileop.isnonempty(there): bn=os.path.basename(there) on=namer(bn,obstype) make_symlink(there,on,logger=logger,force=True) else: msg='%s: Observation file is empty or non-existant: %s'\ %(obstype,there) logger.warning(msg) def grab_prepbufr(self,atime=None,**kwargs): """!Links or copies the prepbufr file to the local directory. @param atime The analysis time. @param kwargs More string substitution variables for the hwrf.config.HWRFConfig family of functions.""" if atime is None: atime=self._atime else: atime=to_datetime_rel(atime,self._atime) logger=self.log() bufr_dataset=self.confstr('bufr_dataset') prepbufr_item=self.confstr('prepbufr_item') there=self._in_catalog.locate(bufr_dataset,prepbufr_item, atime=atime,logger=logger,**kwargs) if there is None or there=='': msg='Could not find the prepbufr file (item=%s dataset=%s)' \ %(repr(prepbufr_item),repr(bufr_dataset)) logger.warning(msg) raise GSIInputError(msg) elif not produtil.fileop.isnonempty(there): msg=there+': is non-existant or empty' logger.error(msg) raise GSIInputError(msg) deliver_file(there,'prepbufr.ALL',keep=True,logger=logger) def tdrdump(self,atime=None,morevars=None): """!Gets the dumped TDR data from the pure shell jhwrf_bufrdump job. @param atime The analysis time. @param morevars More string substitution variables for the hwrf.config.HWRFConfig.get() family of functions.""" if atime is None: atime=self._atime else: atime=to_datetime_rel(atime,self._atime) logger=self.log() input_catalog=self.conf.get('config','fcst_catalog') # dcomenv=os.path.join(os.environ.get('DCOMROOT','/dcom'),'prod') dcomenv=os.environ.get('DCOMROOT','/dcom') dcom=self.conf.get(input_catalog,'dcom',dcomenv) if os.path.isdir(dcom): btime=to_datetime_rel(-24*3600,atime) tank1=os.path.join(dcom,btime.strftime("%Y%m%d"),'b006/xx070') tank2=os.path.join(dcom,atime.strftime("%Y%m%d"),'b006/xx070') logger.info('tank1: %s, tank2: %s'%(tank1,tank2)) if isnonempty(tank1) or isnonempty(tank2): run(batchexe('%s/dumpjb'%(os.environ.get('USHobsproc_dump','/you/forgot/to/set/USHobsproc_dump'),)) \ [atime.strftime("%Y%m%d%H"), '3.00', 'tldplr'] \ .env(TANK=dcom, \ DATA=self._dirname)) tdrbufr='tldplr.ibm' if isnonempty(tdrbufr): produtil.fileop.deliver_file( tdrbufr,'tldplrbufr',keep=True,logger=logger) else: logger.info('tank1: %s, tank2: %s not exist'%(tank1,tank2)) elif produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' ): raise GSIInputError('dcom is not set in the system.conf [dir] section - aborting') else: logger.info('%s does not exist on %s. This is not wcoss. TDR Bufr' 'dump can only be run on WCOSS in real-time mode'%( dcom,produtil.cluster.name())) def hdobdump(self,atime=None,morevars=None): """!Gets the dumped HDOB data from the pure shell jhwrf_bufrdump job. @param atime The analysis time. @param morevars More string substitution variables for the hwrf.config.HWRFConfig.get() family of functions.""" if atime is None: atime=self._atime else: atime=to_datetime_rel(atime,self._atime) logger=self.log() input_catalog=self.conf.get('config','fcst_catalog') #dcomenv=os.path.join(os.environ.get('DCOMROOT','/dcom'),'prod') dcomenv=os.environ.get('DCOMROOT','/dcom') dcom=self.conf.get(input_catalog,'dcom',dcomenv) if os.path.isdir(dcom): btime=to_datetime_rel(-24*3600,atime) tank1=os.path.join(dcom,btime.strftime("%Y%m%d"),'b004/xx015') tank2=os.path.join(dcom,atime.strftime("%Y%m%d"),'b004/xx015') logger.info('tank1: %s, tank2: %s'%(tank1,tank2)) if isnonempty(tank1) or isnonempty(tank2): run(batchexe('%s/dumpjb'%(os.environ.get('USHobsproc_dump','/you/forgot/to/set/USHobsproc_dump'),)) \ [atime.strftime("%Y%m%d%H"), '3.00', 'hdob'] \ .env(TANK=dcom, \ DATA=self._dirname)) hdobbufr='hdob.ibm' if isnonempty(hdobbufr): produtil.fileop.deliver_file( hdobbufr,'hdobbufr',keep=True,logger=logger) else: logger.info('tank1: %s, tank2: %s not exist'%(tank1,tank2)) elif produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' ): raise GSIInputError('dcom is not set in the system.conf [dir] section - aborting') else: logger.info('%s does not exist on %s. This is not wcoss. HDOB Bufr' 'dump can only be run on WCOSS in real-time mode'%( dcom,produtil.cluster.name())) def nexraddump(self,atime=None,morevars=None): """!Gets the dumped nexrad data @param atime The analysis time. @param morevars More string substitution variables for the hwrf.config.HWRFConfig.get() family of functions.""" if atime is None: atime=self._atime else: atime=to_datetime_rel(atime,self._atime) logger=self.log() input_catalog=self.conf.get('config','fcst_catalog') #dcomenv=os.path.join(os.environ.get('DCOMROOT','/dcom'),'prod') dcomenv=os.environ.get('DCOMROOT','/dcom') dcom=self.conf.get(input_catalog,'dcom',dcomenv) # NEXRAD tanks are hourly # Initialize radial wind and reflectivity dumps to SKIP all hourly tanks skipnexrad={'SKIP_006010':'YES','SKIP_006011':'YES','SKIP_006012':'YES', 'SKIP_006013':'YES','SKIP_006014':'YES','SKIP_006015':'YES', 'SKIP_006016':'YES','SKIP_006017':'YES','SKIP_006018':'YES', 'SKIP_006019':'YES','SKIP_006020':'YES','SKIP_006021':'YES', 'SKIP_006022':'YES','SKIP_006023':'YES','SKIP_006024':'YES', 'SKIP_006025':'YES','SKIP_006026':'YES','SKIP_006027':'YES', 'SKIP_006028':'YES','SKIP_006029':'YES','SKIP_006030':'YES', 'SKIP_006031':'YES','SKIP_006032':'YES','SKIP_006033':'YES', 'SKIP_006040':'YES','SKIP_006041':'YES','SKIP_006042':'YES', 'SKIP_006043':'YES','SKIP_006044':'YES','SKIP_006045':'YES', 'SKIP_006046':'YES','SKIP_006047':'YES','SKIP_006048':'YES', 'SKIP_006049':'YES','SKIP_006050':'YES','SKIP_006051':'YES', 'SKIP_006052':'YES','SKIP_006053':'YES','SKIP_006054':'YES', 'SKIP_006055':'YES','SKIP_006056':'YES','SKIP_006057':'YES', 'SKIP_006058':'YES','SKIP_006059':'YES','SKIP_006060':'YES', 'SKIP_006061':'YES','SKIP_006062':'YES','SKIP_006063':'YES'} # Unset the needed tanks for the corresponding cycles subtypes_tmp=[9, 10, 11, 39, 40, 41] subtypes_str=[] for subtype in subtypes_tmp: subtypeHH=subtype+int(atime.strftime("%H")) if subtypeHH in [9, 39]: subtypeHH=subtypeHH+24 subtypes_str.append('SKIP_006'+str(subtypeHH).zfill(3)) for key in list(skipnexrad): if key in subtypes_str: skipnexrad.pop(key) # Dump globally since all reports over CONUS (and geographical filtering is # computationally expensive) LALO=0 if atime.strftime("%H") in ['00', '06', '12', '18']: DTIM_earliest_nexrad='-0.75' DTIM_latest_nexrad='+1.50' else: DTIM_earliest_nexrad='-0.50' DTIM_latest_nexrad='+0.50' if os.path.isdir(dcom): run(batchexe('%s/dumpjb'%(os.environ.get('USHobsproc_dump','/you/forgot/to/set/USHobsproc_dump'),)) \ [atime.strftime("%Y%m%d%H"), '0.5', 'nexrad'] \ .env(TANK=dcom, \ LALO=LALO, \ DTIM_earliest_nexrad=DTIM_earliest_nexrad, \ DTIM_latest_nexrad=DTIM_latest_nexrad, \ DATA=self._dirname, **skipnexrad)) nexradbufr='nexrad.ibm' if isnonempty(nexradbufr): produtil.fileop.deliver_file( nexradbufr,'l2rwbufr',keep=True,logger=logger) elif produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' ): raise GSIInputError('dcom is not set in the system.conf [dir] section - aborting') else: logger.info('%s does not exist on %s. This is not wcoss. NEXRAD Bufr' 'dump can only be run on WCOSS in real-time mode'%( dcom,produtil.cluster.name())) def sondedump(self,atime=None,**kwargs): """!Locates all GPS dropsonde files for real-time experiments. @param atime The analysis time. @param kwargs; currently does nothing, but for future usage a key/value pair for 'offset_seconds' can be included to define the +/- temporal window to collect observations; currently this defaults to 10800 seconds which is the width of the GSI data assimilation window. """ if atime is None: atime=self._atime else: atime=to_datetime_rel(atime,self._atime) logger=self.log() input_catalog=self.conf.get('config','fcst_catalog') dcomenv=os.environ.get('DCOMROOT','/dcom') dcom=self.conf.get(input_catalog,'dcom',dcomenv) if os.path.isdir(dcom): #datapath=os.path.join(dcom,'ldmdata/obs/upperair/recon') datapath=os.path.join(dcom,'ldmdata/obs/upperair/sonde') logger.info('datapath: %s'%datapath) if isnonempty(datapath): fs_kwargs={'atime':atime,'logger':logger,'datapath':datapath} formatsonde=FormatSonde(**fs_kwargs) tempdrop_list=formatsonde.run() cycle=datetime.datetime.strftime(atime,'%Y%m%d%H') filename=('dropsonde.%s.tar'%cycle) tempdrop_list_mod=list() workpath=os.getcwd() filenames=os.listdir(workpath) for item in filenames: if '.mod' in item: tempdrop_list_mod.append(item) if tempdrop_list_mod: with closing(tarfile.open(filename,'w')) as tar: # for item in tempdrop_list: for item in tempdrop_list_mod: tar.add(item,arcname=os.path.basename(item),\ recursive=False) # os.remove(item) else: logger.info('datapath: %s not exist'%datapath) elif produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' ): raise GSIInputError('dcom is not set in the system.conf [dir] section - aborting') else: logger.info('%s does not exist on %s. This is not wcoss. ' 'sondedump can only be run on WCOSS in real-time mode'%( dcom,produtil.cluster.name())) def prep_prepbufr(self,option): """!pre-process prepbufr data Options: * option 0: make no change * option 1: remove some inner-core data * option 2: flag/unflag mass and dropsonde u, v data * option 3: unflag dropsonde data, then reflag u/v within R64 * option 4: add the AOML/HRD TEMPDROP sondes, with computed drift """ logger=self.log() fortlink({ 21:"./prepbufr.ALL", 51:"./prepbufr"}) if option <= 0 or option > 4: logger.info('invalid option, skip prep_prepbufr') elif option == 1: fprog = 'hwrf_rem_prepbufr_typ_in_circle' prog = self.getexe(fprog) RRADC=self.conffloat('RRADC',0.) cmd = produtil.run.exe(prog).env(RLATC=self.storminfo.lat, \ RLONC=self.storminfo.lon, \ RRADC=RRADC) if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) elif option == 2: self.write_vitals() fprog = 'hwrf_change_prepbufr_qm_in_circle' prog = self.getexe(fprog) RRADC=self.conffloat('RRADC',0.) RBLDC=self.conffloat('RBLDC',0.) cmd = produtil.run.exe(prog).env(RRADC=RRADC, \ RBLDC=RBLDC) if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) elif option == 3: #first unflag all dropsonde data fprog = 'hwrf_change_prepbufr_qm_typ' prog = self.getexe(fprog) cmd = produtil.run.exe(prog) if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) if self.storminfo.wmax > self.conffloat('uvunflag_vmax',0.0): #need to move the output of hwrf_change_prepbufr_qm_type #to the input file expected for hwrf_change_prepbufr_qm_in_circle produtil.fileop.deliver_file("./prepbufr.ALL","./prebufr.ALL.orig",keep=False) produtil.fileop.deliver_file("./prepbufr","./prepbufr.orig",keep=False) produtil.fileop.deliver_file("./prepbufr.orig","./prepbufr.ALL") #then reflag data within R64 self.write_vitals() fprog = 'hwrf_change_prepbufr_qm_in_circle' prog = self.getexe(fprog) RRADC=self.conffloat('RRADC',0.) RBLDC=self.conffloat('RBLDC',0.) cmd = produtil.run.exe(prog).env(RRADC=RRADC, \ RBLDC=RBLDC) if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) elif option == 4: # for now self.prepbufr_tempdrop() def prepbufr_tempdrop(self): """!Pre-process AOML/HRD TEMPDROP observations and append them to the existing prepbufr.ALL file. """ logger=self.log() atime = self._atime realtime=bool(self.realtime) if (realtime and produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' )): cycle = atime.strftime('%Y%m%d%H') tarfilename = os.path.join(self._dirname,'dropsonde.%s.tar'%cycle) else: section = 'tempdrop_obstype' item = self.conf.get(section,'item') dataset = 'tempdropso' tarfilename = self._in_catalog.locate(dataset,item,atime=atime,\ logger=logger) logger.info('tempdrop tarfile: %s'%tarfilename) cwd = os.getcwd() tempdropdir = os.path.join(cwd,'tempdrop') os.mkdir(tempdropdir) os.chdir(tempdropdir) try: tar = tarfile.open(tarfilename) tar.extractall() tar.close() sonde_files=os.listdir(tempdropdir) filelist=list() for item in sonde_files: filelist.append(os.path.join(tempdropdir,item)) sonde_filelist = os.path.join(tempdropdir,'filelist.tempdrop') with open(sonde_filelist,'w') as f: for item in filelist: f.write('"%s"\n'%item) fprog = 'hwrf_tempdrop_sonde' prog = self.getexe(fprog) invars = dict() invars.update(TEMPDROP_SONDE_DATAPATH=os.path.join(tempdropdir,''),\ SONDE_FILELIST=sonde_filelist) nml_file = self.confstr('tempdrop_sonde_nml_file') nml_section = 'share' nl_filename = os.path.join(tempdropdir,'tempdrop-sonde.input') ni = hwrf.namelist.NamelistInserter(self.conf,nml_section) with open(nml_file,'rt') as nf: with open(nl_filename,'wt') as of: of.write(ni.parse(nf,logger=logger,source=nml_file,\ raise_all=True,atime=atime,**invars)) cmd = produtil.run.exe(prog) if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) tempdropfiles = os.listdir(tempdropdir) obs_filepath = os.path.join(tempdropdir,'filelist.obstobufr') with open(obs_filepath,'w') as f: for item in tempdropfiles: if '.drft' in item: f.write('"%s"\n'%os.path.join(tempdropdir,item)) # Create JSON formatted-file for the hwrf_obstobufr software. bufr_info_conf=self.conf.getstr('bufrprep','bufr_info_file') msg=('Using BUFR encoding information from file %s.'%bufr_info_conf) logger.info(msg) bufr_info_json=os.path.join(tempdropdir,'hsa.json') msg=('Writing BUFR encoding information to file %s for executable.'%\ bufr_info_json) logger.info(msg) kwargs={'conf_file':bufr_info_conf,'json_file':bufr_info_json} self.conf2json.run(**kwargs) bufr_filepath=os.path.join(tempdropdir,'prepbufr.hsa') msg=('BUFR file containing TEMP-DROP observations is %s.'%bufr_filepath) fprog = 'hwrf_obstobufr' prog = self.getexe(fprog) dateobj = datetime.datetime.strptime(str(atime),'%Y-%m-%d %H:%M:%S') analdate = datetime.datetime.strftime(dateobj,'%Y-%m-%d_%H:%M:%S') invars = dict() invars.update(ANALDATE=analdate,\ DATAPATH=os.path.join(tempdropdir,''),\ BUFR_FILEPATH=bufr_filepath,\ BUFR_TBLPATH=self.confstr('prepbufr_table_file'),\ OBS_FILEPATH=obs_filepath,\ OBSERR_FILEPATH=self.confstr('obs_error_file')) nml_file = self.confstr('obs_to_bufr_nml_file') nl_filename = os.path.join(tempdropdir,'obs-to-bufr.input') with open(nml_file,'rt') as nf: with open(nl_filename,'wt') as of: of.write(ni.parse(nf,logger=logger,source=nml_file,raise_all=\ True,atime=atime,**invars)) cmd = produtil.run.exe(prog) if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) os.chdir(cwd) prepbufr_all = os.path.join(self._dirname,'prepbufr.ALL') prepbufr_hsa = os.path.join(tempdropdir,'prepbufr.hsa') prepbufr_update = os.path.join(self._dirname,'prepbufr.update') prepbufr_file = os.path.join(self._dirname,'prepbufr') bufr_filelist = [prepbufr_all,prepbufr_hsa] with open(prepbufr_update,'wb') as wfd: for item in bufr_filelist: with open(item,'rb') as fd: shutil.copyfileobj(fd,wfd) produtil.fileop.deliver_file(prepbufr_update,prepbufr_file,\ keep=False) except tarfile.ReadError as e: msg="Error opening tarfile: %s with error info: %s"%(tarfilename,e) logger.warning(msg) os.chdir(cwd) prepbufr_all = os.path.join(self._dirname,'prepbufr.ALL') prepbufr_file = os.path.join(self._dirname,'prepbufr') msg="Pass along the original bufr file: %s to %s"%(prepbufr_all,prepbufr_file) logger.info(msg) produtil.fileop.deliver_file(prepbufr_all,prepbufr_file,\ keep=True) except: msg="prepbufr_tempdrop did not work, skip this step." logger.warning(msg) os.chdir(cwd) prepbufr_all = os.path.join(self._dirname,'prepbufr.ALL') prepbufr_file = os.path.join(self._dirname,'prepbufr') msg="Pass along the original bufr file: %s to %s"%(prepbufr_all,prepbufr_file) logger.info(msg) produtil.fileop.deliver_file(prepbufr_all,prepbufr_file,\ keep=True) def write_vitals(self,filename='tcvitals',logger=None): """!Writes the tcvitals (from self.storminfo) to the specified file. @param filename Name of the file to write @param logger A logging.Logger for log messages""" if logger is None: logger=self.log() logger.info('Writing tcvitals to %s'%(repr(filename),)) with open(filename,'wt') as f: f.write(self.storminfo.as_tcvitals()+"\n") assert(os.path.exists(filename)) def run(self): """!Runs and delivers the results.""" logger=self.log() try: logger.info('Run bufrprep in directory %s'%(self._dirname,)) if os.path.exists(self._dirname): logger.info('Delete old data in %s'%(self._dirname,)) shutil.rmtree(self._dirname) with NamedDir(self._dirname,keep=not self.scrub): """dump and process TDR, HDOB, and GPS dropsonde data""" realtime=bool(self.realtime) if (realtime and produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' )): self.tdrdump() self.hdobdump() self.nexraddump() try: self.sondedump() except: logger.warning('sondedump did not work properly. Skipped.') pass else: self.grab_bufr() if os.path.isfile('tldplrbufr'): self.getstormid() logger.info('storm id %s'%(self._stormid)) self.readtdrstmid() self.readtdrtime() self.set_tdrstatus() else: logger.info('Skip read tdr bufr.') """process prepbufr data""" prepbufrprep=self.confint('prepbufrprep',0) if prepbufrprep > 0: self.grab_prepbufr() self.prep_prepbufr(prepbufrprep) self.deliver_products() except Exception as e: logger.critical('bufrprep failed: '+str(e),exc_info=True) raise def readtdrstmid(self): """!Runs the hwrf_readtdrstmid program.""" self.log().info('readtdrstmid') logger=self.log() fprog = 'hwrf_readtdrstmid' prog = self.getexe(fprog) log = '%s/logs/%s_%s.log' %( self._dirname, self.__class__.__name__, fprog) cmd = produtil.run.exe(prog) << self._stormid if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) def readtdrtime(self): """!Runs the hwrf_readtdrtime program.""" self.log().info('readtdrtime') logger=self.log() fprog = 'hwrf_readtdrtime' prog = self.getexe(fprog) log = '%s/logs/%s_%s.log' %( self._dirname, self.__class__.__name__, fprog) cmd = produtil.run.exe(prog) if self.redirect: cmd = cmd >= log produtil.run.checkrun(cmd,logger=logger) def set_tdrstatus(self): """!Create a TDR status file in com directory""" self.log().info('set_tdrstatus') logger=self.log() stmidout=os.path.join(self._dirname,'stmid.dat') timeout=os.path.join(self._dirname,'tdrflag') rightstorm=isnonempty(stmidout) smalledgedump=isnonempty(timeout) tdrflagfile=os.path.join(self.conf.getdir('com'),self.icstr('{stormlabel}.tdr')) if rightstorm and not smalledgedump: with open(tdrflagfile,'wt') as f: f.write('ASSIMILATE_TDR=YES'+"\n") logger.info('tdr bufr is available for this storm, ASSIMILATE_TDR=YES') elif not rightstorm: logger.info('tdr bufr is not for this storm, ASSIMILATE_TDR=NO') else: logger.info('this tdr bufr file is a small edge dump, ASSIMILATE_TDR=NO') def deliver_products(self,atime=None,**kwargs): """!Delivers output products to the intercom directory. @param atime the analysis time @param kwargs Sent to hwrf.input.DataCatalog.parse()""" if atime is None: atime=self._atime else: atime=to_datetime_rel(atime,self._atime) logger=self.log() produtil.fileop.makedirs(self.outdir) if self.confint('prepbufrprep',0) > 0: prepbufr_item=self.confstr('prepbufr_item') there=self._in_catalog.parse(prepbufr_item, atime=atime,logger=logger,**kwargs) it=os.path.join(self.outdir,there) produtil.fileop.deliver_file( 'prepbufr',it,keep=False,logger=logger) if self.confint('prepbufrprep',0) == 4: if (bool(self.realtime) and produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' )): tarfilename='dropsonde.%s.tar'%atime.strftime('%Y%m%d%H') sondetarfile=os.path.join(self._dirname,tarfilename) if isnonempty(sondetarfile): it=os.path.join(self.outdir,tarfilename) produtil.fileop.deliver_file( sondetarfile,it,keep=True,logger=logger) sondetarprod=self.icstr('{com}/{out_prefix}.'+'dropsonde.tar') produtil.fileop.deliver_file( sondetarfile,sondetarprod,keep=True,logger=logger) rightstorm=isnonempty(os.path.join(self._dirname,'stmid.dat')) smalledgedump=isnonempty(os.path.join(self._dirname,'tdrflag')) if rightstorm: if not smalledgedump: item=self.conf.get('tdr_new_obstype','item') there=self._in_catalog.parse(item,atime=atime, logger=logger,obstype='tldplr') it=os.path.join(self.outdir,there) produtil.fileop.deliver_file( 'tldplrbufr',it,keep=True,logger=logger) if (bool(self.realtime) and produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' )): tdrprod=self.icstr('{com}/{stormlabel}.'+there) produtil.fileop.deliver_file( 'tldplrbufr',tdrprod,keep=False,logger=logger) set_ecflow_event('FoundTDR') if (bool(self.realtime) and produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' )): hdobbufr=os.path.join(self._dirname,'hdobbufr') if isnonempty(hdobbufr): item=self.conf.get('hdob_obstype','item') there=self._in_catalog.parse(item,atime=atime, logger=logger,obstype='hdob') it=os.path.join(self.outdir,there) produtil.fileop.deliver_file( 'hdobbufr',it,keep=True,logger=logger) hdobprod=self.icstr('{com}/{stormlabel}.'+there) produtil.fileop.deliver_file( 'hdobbufr',hdobprod,keep=False,logger=logger) #set_ecflow_event('FoundHDOB') if (bool(self.realtime) and produtil.cluster.name() in ( 'tide', 'gyre', 'luna', 'surge', 'dogwood', 'cactus' )): nexradbufr=os.path.join(self._dirname,'l2rwbufr') if isnonempty(nexradbufr): item=self.conf.get('nexrad_obstype','item') there=self._in_catalog.parse(item,atime=atime, logger=logger,obstype='l2rw') it=os.path.join(self.outdir,there) produtil.fileop.deliver_file( 'l2rwbufr',it,keep=True,logger=logger) nexradprod=self.icstr('{com}/{out_prefix}.'+there) produtil.fileop.deliver_file( 'l2rwbufr',nexradprod,keep=False,logger=logger) #set_ecflow_event('FoundNEXRAD')