#! /usr/bin/env python3 ################################################################################ # Script Name: hafs_format_sonde.py # Authors: NECP/EMC Hurricane Project Team and UFS Hurricane Application Team # Abstract: # This script deals with and formats TEMP-DROP message (observations). # History: # 10/16/2020: This script was adopted from a version developed by Henry R. # Winterbottom for HWRF. ################################################################################ # # Author: Henry R. Winterbottom # Email: Henry.Winterbottom@noaa.gov # This file is part of obs-preproc. # obs-preproc is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of # the License, or (at your option) any later version. # obs-preproc is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # You should have received a copy of the GNU General Public License # along with obs-preproc. If not, see # . #---- import argparse import collections import datetime import logging import numpy import os import string import sys import tarfile import contextlib from contextlib import closing #---- __author__ = "Henry R. Winterbottom" __copyright__ = "2019 Henry R. Winterbottom, NOAA/NCEP/EMC" __version__ = "1.0.0" __maintainer__ = "Henry R. Winterbottom" __email__ = "henry.winterbottom@noaa.gov" __status__ = "Development" #---- class FormatSonde(object): """ DESCRIPTION: This is the base-class object to format TEMP-DROP messages (observations) in accordance with the expectations of the tempdrop_sonde executable. INPUT VARIABLES: * opts_obj; a Python object containing the user command line options. """ def __init__(self,opts_obj): """ DESCRIPTION: Creates a new FormatSonde object. """ self.opts_obj=opts_obj opts_list=['cycle','datapath'] for item in opts_list: value=getattr(self.opts_obj,item) setattr(self,item,value) self.logger=FormatSondeLog() self.dateobj=datetime.datetime.strptime(self.cycle,'%Y-%m-%d_%H:%M:%S') self.srchstrs=['UZNT','UZPN','UZPA'] self.flag_list=['CCA'] self.max_offset_seconds=int(2*84600) def check_timestamp(self,file_timestamp,timestamps): """ DESCRIPTION: This method checks the validity of the timestamp used to assign filenames; if the difference between the date in the timestamp and the date of the forecast cycle is greater than 1-day, the filename timestamp is reset; in addition, if the timestamp assigned to the filename does not make sense, it is assigned to one that does. INPUT VARIABLES: * fts; a Python integer containing the timestamp (assuming the UNIX format %Y%m%d%H%M) to be assigned to the file name. * timestamps; a Python list of timestamps for which the current processing is occurring. OUTPUT VARIABLES: * fts; a Python integer containing the (updated) timestamp (assuming the UNIX format %Y%m%d%H%M) to be assigned to the file name. """ fts=file_timestamp try: timestamp_obj=datetime.datetime.strptime(file_timestamp,'%Y%m%d%H%M') offset_seconds=numpy.abs((self.dateobj-timestamp_obj).seconds) if offset_seconds>self.max_offset_seconds: yyyymmdd=sorted(timestamps)[0][0:8] hhmm=file_timestamp[-4::] fts='%s%s'%(yyyymmdd,hhmm) except ValueError: hhmm=file_timestamp[-4::] for timestamp in timestamps: yyyymmdd=timestamp[0:8] fts='%s%s'%(yyyymmdd,hhmm) try: timestamp_obj=datetime.datetime.strptime(fts,'%Y%m%d%H%M') break except: print('INFO: continue to the next timestamp.') pass return fts def collect_sondes(self,data): """ DESCRIPTION: This method loops through each key within an input Python dictionary containing all observations (data) and returns character strings denote the locations of observations to be formatted. INPUT VARIABLES: * data; a Python dictionary containing key and value pairs for all observations within the respective files to be processed. OUTPUT VARIABLES: * outinfostrs; a Python dictionary containing key (timestamp) and value (TEMP-DROP sonde observation headers) pairs. """ infostrs=dict() for key in sorted(data.keys()): infostrs[key]=list() for srchstr in self.srchstrs: for item in data[key]: if srchstr in item: infostrs[key].append(item.strip()) kwargs={'infostrs':infostrs[key],'data':data[key]} infostrs[key]=self.get_obsinfo(infostrs[key],data[key]) flag_infostrs=list() for item in list(infostrs[key].keys()): for flag_item in self.flag_list: if flag_item in item: flag_infostrs.append(item) msg=('Found the following flagged message headers for %s:\n %s\n'%\ (key,flag_infostrs)) self.logger.info(msg=msg) rmvinfostrs=list() for key in sorted(data.keys()): infostr=infostrs[key] for item in flag_infostrs: for flag_item in self.flag_list: if flag_item in item: string=item.replace(flag_item,'').rstrip() for ifs in list(infostr.keys()): if string==ifs: msn1=infostr[ifs]['mission'] obs1=infostr[ifs]['obid'] msn2=infostr[string]['mission'] obs2=infostr[string]['obid'] if (msn1==msn2) and (obs1==obs2): rmvinfostrs.append(string) msg=('Removing the following unique message header(s):\n %s\n'%\ set(rmvinfostrs)) self.logger.info(msg=msg) outinfostrs=list() for key in sorted(data.keys()): outstrs=list() for item in list(infostrs[key].keys()): if item not in rmvinfostrs: outstrs.append(item) outinfostrs.append(item) infostrs[key]=outstrs msg=('The following %d TEMP-DROP sonde message headers will be processed:\n%s\n'%\ (len(outinfostrs),outinfostrs)) self.logger.info(msg=msg) return outinfostrs def createfilelist(self): """ DESCRIPTION: This method creates a formatted list of files in accordance with the expectations of the tempdrop_sonde executable. """ workpath=os.getcwd() filenames=os.listdir(workpath) with open(os.path.join(workpath,'filelist.list'),'w') as f: for item in filenames: if '.mod' in item: f.write('"%s"\n'%os.path.join(workpath,item)) def find_sondefiles(self): """ DESCRIPTION: This method collects relevant sonde files to be processed; the relevancy is determined relative to the forecast cycle timestamp. """ offset_seconds=10800 time_dict={'year':{'cstrt':0,'cstp':4},'month':{'cstrt':4,'cstp':6},\ 'day':{'cstrt':6,'cstp':8},'hour':{'cstrt':8,'cstp':10},\ 'minute':{'cstrt':10,'cstp':12},'second':{'cstrt':12,'cstp':14}} datetime_kwargs=dict() time_key_list=['year','month','day','hour','minute','second'] cycle=datetime.datetime.strftime(self.dateobj,'%Y%m%d%H') for item in time_key_list: cstrt=time_dict[item]['cstrt'] cstp=time_dict[item]['cstp'] value=cycle[cstrt:cstp] if len(value)>0: datetime_kwargs[item]=int(value) else: datetime_kwargs[item]=0 timestamp=datetime.datetime(**datetime_kwargs) dtime=datetime.timedelta(seconds=offset_seconds) in_list=[(timestamp-dtime).strftime('%Y%m%d'),(timestamp+dtime).strftime('%Y%m%d')] timestamps=sorted(set(in_list)) filenames=os.listdir(self.datapath) filedict=dict() msg=('Cycle %s; searching for the following timestamps: %s'%(cycle,\ timestamps)) self.logger.info(msg=msg) self.tempdrop_list=list() for item in filenames: for timestamp in timestamps: if timestamp in item: filename=os.path.join(self.datapath,item) filedict[filename]=timestamp msg=('Found file %s for processing.'%filename) self.logger.info(msg=msg) break filedict=collections.OrderedDict(sorted(filedict.items())) return filedict def formatsondes(self): """ DESCRIPTION: This method formats TEMP-DROP messages (observations) in accordance with the expectations of the tempdrop_sonde executable. """ srchstrs=['REL','SPG','SPL'] excldstrs=['62626','REL','SPG','SPL'] for infile in self.tempdrop_list: if os.path.exists(infile): with open(infile,'rb') as inf: data=inf.read().decode("utf-8", "ignore") outfile=('%s.mod'%infile) datan=list() data.replace('\r','') data=data.split('\n') data=[_f for _f in data if _f] for item in data: item=self.stripmeta(instr=item) datan.append(item) data=datan with open(outfile,'w') as outf: for item in data: if any(s in item for s in excldstrs): pass else: outf.write('%s\n'%item) outdata=list() for (i,item) in enumerate(data): for srchstr in srchstrs: if srchstr in item: try: nstr=data[i]+data[i+1] nstr=self.stripmeta(instr=nstr) indx=nstr.index(srchstr) sstr=nstr[indx:indx+23] sstr=self.stripmeta(instr=sstr) outf.write('%s\n'%sstr) except IndexError: print('INFO: continue next srchstr') pass def get_obsinfo(self,infostrs,data): """ DESCRIPTION: This method collects the observation and aircraft flight identifications and returns a Python dictionary (obsdict) corresponding to the observation header string. INPUT VARIABLES: * infostrs; a Python list of observation header strings. * data; a Python list of all observations collected by driver level of script. OUTPUT VARIABLES: * obsdict; a Python dictionary containing the observation header string and the corresponding observation and aircraft flight identifications. """ obsdict=dict() whitelist=['OB'] for infostr in infostrs: obsdict[infostr]=dict() lnidx=0 for (i,item) in enumerate(data): if infostr in item: lnidx=i break for (i,item) in enumerate(data[lnidx::]): if 'OB' in item: obsitem=item.split().index('OB') obsdict[infostr]['obid']=item.split()[obsitem+1] obsdict[infostr]['mission']=item.split()[1] break return obsdict def read_sondefiles(self,filedict): """ DESCRIPTION: This method collects all observations contained within the files to be processed (filedict keys) and returns a Python dictionary containing key (timestamp) and value (observations) pairs. INPUT VARIABLES: * filedict; a Python dictionary containing key (timestamp) and values (file paths) pairs. OUTPUT VARIABLES: * data; a Python dictionary containing key (timestamp) and values (observations collected from the respective input file) pairs. """ data=dict() for infile in list(filedict.keys()): msg=('Processing file %s.'%infile) self.logger.info(msg=msg) year=filedict[infile][0:4] month=filedict[infile][4:6] day=filedict[infile][6:8] lnidx=0 #with open(infile,'rb') as f: with open(infile,'rb') as f: infdata=f.read().decode("utf-8", "ignore") data[filedict[infile]]=infdata.split('\n') return data def sondedump(self): """ DESCRIPTION: This method collects all GPS dropsonde files and prepares them for further processing as follows: (1) Finds all relevant files containing observations; this script assumes that the observation files (in the user specified data path) are prefixed with a timestamp (assuming the UNIX format) as %Y%m%d. (2) Reads all relevant sonde files and compiles a Python list of observations within the respective files.' (3) Finds all TEMP-DROP sonde observations; this is done by identifying header strings throughout the respective file(s). (4) Loops though all collect TEMP-DROP sonde observation headers and prepares individual files for all identified TEMP-DROP observations. """ # Collect sonde files relevant for the current cycle. filedict=self.find_sondefiles() # Collect sonde file observations. data=self.read_sondefiles(filedict=filedict) # Collect all relevant sonde observations. infostrs=self.collect_sondes(data=data) # Create concatenated list of sonde observations. infodata=dict() for timestamp in sorted(data.keys()): infodata[timestamp]=list() for item in data[timestamp]: item=self.stripmeta(instr=item) infodata[timestamp].append(item) # Loop through all timestamps and prepare individual files for # each observation. for infostr in set(infostrs): infostr=self.stripmeta(instr=infostr) mission_id=infostr.split()[1] timestr=infostr.split()[2] timestamps=sorted(data.keys()) for timestamp in timestamps: year=timestamp[0:4] month=timestamp[4:6] timestrday=timestr[0:2] fts=('%s%s%s'%(year,month,timestr)) kwargs={'file_timestamp':fts,'timestamps':timestamps} value=self.check_timestamp(**kwargs) if value is not None: fts=value idxs=[idx for idx, e in enumerate(infodata[timestamp]) if infostr==e] for idx in idxs: lnidx=idx outfile=('%s.%s'%(fts,mission_id)) i=1 while os.path.isfile(outfile): if os.path.isfile(outfile): outfile=('%s.%s.%s'%(fts,mission_id,i)) i=i+1 tdkwargs={'outfile':outfile,'infostr':infostr,'data':\ infodata[timestamp],'lnidx':lnidx} self.write_tempdrop(**tdkwargs) self.tempdrop_list.append(outfile) def stripmeta(self,instr): """ DESCRIPTION: This method stripts meta-characters and carriage returns from an input string. INPUT VARIABLES: * instr; a Python string possibly containing meta-characters. OUTPUT VARIABLES: * outstr; a Python string stripped of meta-characters and carriage returns. """ for c in (string.ascii_lowercase+string.ascii_uppercase): chkstr='^%s'%c outstr=instr.replace(chkstr,'') instr=outstr outstr=outstr.replace('\r','') return outstr def write_tempdrop(self,outfile,infostr,data,lnidx): """ DESCRIPTION: This method parses a list of strings and writes a given TEMP-DROP message to a user specified file. INPUT VARIABLES: * outfile; a Python string specifying the path to the output file to contain the TEMP-DROP message. * infostr; a Python string specifying the TEMP-DROP message header (e.g., UZNT10 KNHC 250600). * infdata; a Python list of data strings to be parsed using the infostr information. """ strtmsg=lnidx for (i,item) in enumerate(data[strtmsg::],1): if ';' in item: lnidx=i break if not item.strip(): lnidx=i break endmsg=strtmsg+lnidx msg=('Writing TEMP-DROP message to %s; data block: [%s,%s]' %\ (outfile,strtmsg,endmsg)) self.logger.info(msg=msg) with open(outfile,'wt+') as f: for item in data[strtmsg:endmsg]: item=self.stripmeta(instr=item) f.write('%s\n'%item) def build_tarball(self): """ DESCRIPTION: This method performs the following tasks: (1) Write all newly created TEMP-DROP formatted files (if any) in a tarball called 'dropsonde..tar'; for example, 'dropsonde.2017091106.tar'. """ cycle=datetime.datetime.strftime(self.dateobj,'%Y%m%d%H') filename=('dropsonde.%s.tar'%cycle) tempdrop_list_mod=list() workpath=os.getcwd() filenames=os.listdir(workpath) for item in filenames: if '.mod' in item: tempdrop_list_mod.append(item) if tempdrop_list_mod: with closing(tarfile.open(filename,'w')) as tar: for item in tempdrop_list_mod: if os.path.isfile(item): tar.add(item,arcname=os.path.basename(item),\ recursive=False) print(item) #os.remove(item) def run(self): """ DESCRIPTION: This method performs the following tasks: (1) Collects observations from external files (possibly) containing TEMP-DROP messages. (2) Formats (any) TEMP-DROP messages in accordance with the expectations of the tempdrop_sonde executable. (3) Creates a formatted list of TEMP-DROP message files (e.g., observations) to be processed (and in accordance with the expectations of the) tempdrop_sonde executable. (4) Build a tarball for the newly created TEMP-DROP formatted files """ self.sondedump() self.formatsondes() self.createfilelist() self.build_tarball() #---- class FormatSondeError(Exception): """ DESCRIPTION: This is the base-class for all module raised exceptions. INPUT VARIABLES: * msg; a Python string to accompany the raised exception. """ def __init__(self,msg): """ DESCRIPTION: Creates a new FormatSondeError object. """ super(FormatSondeError,self).__init__(msg) #---- class FormatSondeLog(object): """ DESCRIPTION: This is the base-class object for all Log instances. """ def __init__(self): """ DESCRIPTION: Creates a new Log object. """ self.exception=FormatSondeError def info(self,msg): """ DESCRIPTION: This method writes a message to the base-class Python logger via the INFO level. INPUT VARIABLES: * msg; a Python string containing the user specified logger message. """ self.log=self.setup(info=True) self.log.info(msg) def setup(self,info=False): """ DESCRIPTION: This method defines the Python logging object. OUTPUT VARIABLES: * log; a Python object containing the user specifed/define Python logging object. """ if info: format='%(levelname)s :: %(asctime)s : %(message)s' if not info: format='%(levelname)s :: %(asctime)s : %(pathname)s (%(lineno)s)'\ '; %(message)s' datefmt='%Y-%m-%d %H:%M:%S' log=logging log.basicConfig(stream=sys.stdout,level=logging.INFO,format=format,\ datefmt=datefmt) return log #---- class FormatSondeOptions(object): """ DESCRIPTION: This is the base-class object used to collect command line arguments provided by the user. """ def __init__(self): """ DESCRIPTION: Creates a new FormatSondeOptions object. """ self.parser=argparse.ArgumentParser() self.parser.add_argument('-c','--cycle',help='The forecast cycle timestamp; '\ 'formatted as (assuming UNIX convention) %Y-%m-%d_%H:%M:%S.',default=None) self.parser.add_argument('-d','--datapath',help='The path to the sonde files '\ 'containing TEMP-DROP observations.',default=None) self.opts_obj=lambda:None def run(self): """ DESCRIPTION: This method collects the user-specified command-line arguments; the available command line arguments are as follows: -c; The forecast cycle timestamp; formatted as (assuming UNIX convention) %Y-%m-%d_%H:%M:%S. -d; The path to the sonde files containing TEMP-DROP observations. OUTPUT VARIABLES: * opts_obj; a Python object containing the user command line options. """ opts_obj=self.opts_obj args_list=['cycle','datapath'] args=self.parser.parse_args() for item in args_list: value=getattr(args,item) if value is None: msg=('The argument %s cannot be NoneType. Aborting!!!'%item) raise FormatSondeError(msg=msg) else: setattr(opts_obj,item,value) return opts_obj #---- def main(): """ DESCRIPTION: This is the driver-level method to invoke the tasks within this script. """ options=FormatSondeOptions() opts_obj=options.run() formatsonde=FormatSonde(opts_obj=opts_obj) formatsonde.run() #---- if __name__=='__main__': main()