#! /usr/bin/env python3 """!Base class of tasks run by HAFS. This module contains the HAFSTask class, a subclass of produtil.datastore.Task intended to be the base class of any HAFS task. It provides logging services, easy access to a config file, access to tcvitals, and extends the hafs.config.HAFSConfig string interpolation to include vitals information. It also provides a standard way of setting and querying the work and output directories of a task, and whether the task should scrub its output.""" import re, os import tcutil.numerics, tcutil.storminfo import produtil.log from produtil.datastore import Task ##@var __all__ # Symbols exported by "from hafs.hafstask import *" __all__=['HAFSTask'] ##@var UNSPECIFIED # Special constant used to detect unspecified arguments to functions. # This allows None to be sent. UNSPECIFIED=object() class HAFSTask(Task): """!The base class of tasks run by the HAFS system. This class represents a task to be run by the HAFS system. It can be configured by an hafs.config.HAFSConfig object. Internal state information is stored in an produtil.datastore.Datastore. Each task has its own workdir, outdir and scrub flag, as well as its own vitals information. Execution of a task is done by calling run(), and the products generated by this task can be iterated with products()""" def __init__(self,dstore,conf,section,taskname=None,workdir=None, outdir=None,storminfo=UNSPECIFIED,taskvars=UNSPECIFIED, **kwargs): """!Creates an HAFSTask @param dstore passed to Datum: the Datastore object for this Task @param conf the conf object for this task @param section the conf section for this task @param taskname Optional: the taskname in the datastore. Default: the section name @param workdir directory in which this task should run. Any value set in the database will override this value. @param outdir directory where output should be copied. This argument must not be changed throughout the lifetime of the HAFS datstore database file. @param storminfo the storm vitals information for the storm this task is running. @param taskvars additonal variables for string expansion, sent to the taskvars arguments of hafs.config.HAFSConfig member functions. @param kwargs passed to the parent class constructor.""" if taskname is None: taskname=section conf.register_hafs_task(taskname) self.__taskvars=dict() if taskvars is not UNSPECIFIED: for k,v in taskvars.items(): self.tvset(k,v) self._conf=conf self._section=str(section) self._storminfo=storminfo if taskname is not None and not isinstance(taskname,str): raise TypeError('The taskname must be None or a basestring ' 'subclass') if not isinstance(section,str): raise TypeError('The section be a basestring subclass') if workdir is None: workdir=self.confstr('workdir','') if workdir is None or workdir=='': workdir=os.path.join(self.getdir('WORKhafs'),taskname) if outdir is None: outdir=self.confstr('workdir','') if outdir is None or outdir=='': outdir=os.path.join(self.getdir('intercom'),taskname) with dstore.transaction(): super(HAFSTask,self).__init__(dstore,taskname=taskname, logger=conf.log(taskname),**kwargs) mworkdir=self.meta('workdir','') moutdir=self.meta('outdir','') if mworkdir: workdir=mworkdir else: self['workdir']=workdir if moutdir: outdir=moutdir else: self['outdir']=outdir if storminfo is UNSPECIFIED: if hasattr(conf,'syndat'): self.storminfo=conf.syndat elif storminfo is not None: if isinstance(storminfo,str): self.storminfo=tcutil.storminfo.StormInfo() self.storminfo.update(conf.items('config')) self.storminfo.parse_vitals(storminfo) elif isinstance(storminfo,tcutil.storminfo.StormInfo): self.storminfo=storminfo.copy() else: raise TypeError("The storminfo argument to HAFSTask() must " 'be None, a string or ' 'tcutil.storminfo.StormInfo') ##@var storminfo # The tcutil.storminfo.StormInfo describing the vitals information # for the storm processed by this HAFSTask. # Intent is to be absolutely certain this is the fakestorm # and that the fake storm id is uppercase. @property def isfakestorm(self): """Is this the fake storm of a multistorm HAFS run?""" isfakestorm = False if self._conf.getbool('config','run_multistorm',False): fakestormid=self._conf.getstr('config','fakestormid', 'nofakeid') if fakestormid != 'nofakeid': if fakestormid == self.storminfo.stormid3: isfakestorm = True elif fakestormid.upper() == self.storminfo.stormid3.upper(): raise hafs.exceptions.HAFSConfigInsane( "Multistorm run: isfakestorm property can not be " "set since case mismatch between fakestormid: %s " "and stormid3: %s"%(fakestormid,self.storminfo.stormid3)) else: raise hafs.exceptions.HAFSConfigInsane( "Multistorm run: isfakestorm property can not be set since No " "'fakestormid' in config section of storm %s." "Check if you defined a list of multistorm ids." %(self.storminfo.stormid3)) return isfakestorm @property def ismultistorm(self): """Is this a multistorm HAFS run?""" ismultistorm = False if self._conf.getbool('config','run_multistorm',False): ismultistorm = True return ismultistorm def get_workdir(self): """!Returns the directory the class should work in, as set by the "workdir" metadata value.""" workdir=self.meta('workdir','') if not workdir: workdir=os.path.join(self.getdir('WORKhafs'),self.taskname) assert(workdir!='/') return workdir def set_workdir(self,val): """!Sets the directory the class should work in. This sets the "workdir" metadata value. @param val the new work directory""" self['workdir']=str(val) ##@var workdir # The directory in which this task should be run workdir=property(get_workdir,set_workdir,None, """!The directory in which this task should be run.""") def get_outdir(self): """!Gets the directory that should receive output data. This is in the "outdir" metadata value.""" outdir=self.meta('outdir','') if not outdir: outdir=os.path.join(self.getdir('intercom'),self.taskname) assert(outdir!='/') return outdir def set_outdir(self,val): """!Sets the directory that should receive output data. Sets the "outdir" metadata value. @param val the new output directory""" self['outdir']=str(val) ##@var outdir # The directory in which this task should deliver its final output. # Note that changing this will NOT update products already in the # database. outdir=property(get_outdir,set_outdir,None, """!The directory to which this task should deliver its final output.""") @property def realtime(self): """!Is this job a real-time forecast job? Is this job running an actual, real-time forecast for an event happening now? This is different than self.conf.realtime: it allows this job's config section to locally override the value.""" return self.confbool('realtime',True) @property def redirect(self): """!Should subprograms' outputs be redirected to separate files?""" return self.confbool('redirect',False) @property def scrub(self): """!Should temporary files be deleted as soon as they are not needed?""" return self.confbool('scrub',True) def tvset(self,opt,val): """!Sets a taskvar option's value. Sets an object-local (taskvar) value for option "opt" to value "val". This will override config settings from the HAFSConfig object. These are sent into the taskvars= parameter to the various HAFSConfig member functions (hence the "tv" in "tvset"). @param opt the name of the taskvar @param val the string value of the option""" sopt=str(opt) if sopt[0:2]=='__': raise hafs.exceptions.InvalidConfigOptName( '%s: invalid option name. Cannot begin with __'%(sopt,)) self.__taskvars[sopt]=val def tvdel(self,opt): """!Deletes an object-local value set by tvset. @param opt the name of the taskvar to delete""" sopt=str(opt) del self.__taskvars[sopt] def tvget(self,opt): """!Gets a taskvar's value Returns the value of an object-local (taskvar) option set by tvset. @param opt the taskvar whose value should be returned""" sopt=str(opt) return self.__taskvars[sopt] def tvhave(self,opt=UNSPECIFIED): """!Is a taskvar set? If an option is specified, determines if the given option has an object-local (taskvar) value. If no option is specified, returns True if ANY object-local values (taskvars) exist for any options. @param opt Optional: the name of the taskvar being checked.""" if opt is UNSPECIFIED: return len(self.__taskvars)>0 sopt=str(opt) return sopt in self.__taskvars @property def taskvars(self): """!The dict of object-local values used for string substitution.""" return self.__taskvars def confint(self,opt,default=None,badtypeok=False,section=None, morevars=None): """!Alias for self.conf.getint for section self.section. @param opt the option name @param section Optional: the section. Default: self.section. @param default if specified and not None, then the default is returned if an option has no value or the section does not exist @param badtypeok is True, and the conversion fails, and a default is specified, the default will be returned. @param morevars dict of more variables for string expansion""" if(section is None): section=self._section return self._conf.getint(section,opt,default,badtypeok, morevars=morevars,taskvars=self.__taskvars) def confstr(self,opt,default=None,badtypeok=False,section=None, morevars=None): """!Alias for self.conf.getstr for section self.section. @param opt the option name @param section Optional: the section. Default: self.section @param default if specified and not None, then the default is returned if an option has no value or the section does not exist @param badtypeok is True, and the conversion fails, and a default is specified, the default will be returned. @param morevars dict of more variables for string expansion""" if(section is None): section=self._section return self._conf.getstr(section,opt,default,badtypeok, morevars=morevars,taskvars=self.__taskvars) def conffloat(self,opt,default=None,badtypeok=False,section=None, morevars=None): """!Alias for self.conf.getfloat for section self.section. @param opt the option name @param section Optional: the section. Default: self.section @param default if specified and not None, then the default is returned if an option has no value or the section does not exist @param badtypeok is True, and the conversion fails, and a default is specified, the default will be returned. @param morevars dict of more variables for string expansion""" if(section is None): section=self._section return self._conf.getfloat(section,opt,default,badtypeok, morevars=morevars,taskvars=self.__taskvars) def confbool(self,opt,default=None,badtypeok=False,section=None, morevars=None): """!Alias for self.conf.getbool for section self.section. @param opt the option name @param section Optional: the section. Default: self.section @param default if specified and not None, then the default is returned if an option has no value or the section does not exist @param badtypeok is True, and the conversion fails, and a default is specified, the default will be returned. @param morevars dict of more variables for string expansion""" if(section is None): section=self._section return self._conf.getbool(section,opt,default,badtypeok, morevars=morevars,taskvars=self.__taskvars) def confget(self,opt,default=None,badtypeok=False,section=None, morevars=None): """!Alias for self.conf.get for section self.section. @param opt the option name @param section Optional: the section. Default: self.section @param default if specified and not None, then the default is returned if an option has no value or the section does not exist @param badtypeok is True, and the conversion fails, and a default is specified, the default will be returned. @param morevars dict of more variables for string expansion""" if(section is None): section=self._section return self._conf.get(section,opt,default,badtypeok, morevars=morevars,taskvars=self.__taskvars) def confitems(self,section=None,morevars=None): """!Alias for self.conf.items for section self.section. @param section Optional: the section. Default: self.section. @param morevars variables for string substitution""" if(section is None): section=self._section return self._conf.items(section,morevars=morevars,taskvars=self.__taskvars) def confstrinterp(self,string,section=None,**kwargs): """!Alias for self.icstr for backward compatibility @param string the string to expand @param section Optional: the section in which to expand it. Default: self.section. @param kwargs: more arguments for string substitution""" return self.icstr(string,section=section,**kwargs) def conftimestrinterp(self,string,ftime,atime=None,section=None, **kwargs): """!Alias for self.timestr for backward comaptibility @param string the string to expand @param ftime: the forecast time @param atime: Optional: the analysis time. Default: self.conf.cycle @param section Optional: the section in which to expand it. Default: self.section. @param kwargs: more arguments for string substitution""" return self.timestr(string,ftime,atime=atime,section=section, taskvars=self.__taskvars,**kwargs) def confraw(self,opt,default=None,section=None): """!Get a raw configuration value before string expansion. Returns the raw, uninterpolated value for the specified option, raising an exception if that option is unset. Will not search other sections, and will not search the taskvars, unlike other conf accessors. @param opt the option of interest @param section Optional: the section. Default: self.section @param default Optional: value to return if nothing is found.""" if section is None: section=self.section return self._conf.getraw(section,opt,default) def icstr(self,string,section=None,**kwargs): """!Expands a string in the given conf section. Given a string, expand it as if it was a value in the specified conf section. Makes this objects tcvitals, if any, available via the "vit" variable while interpolating strings. @param string the string to expand @param section Optional: the section in which to expand it. Default: self.section. @param kwargs: more arguments for string substitution""" if(section is None): section=self._section if self.storminfo and 'vit' not in kwargs: kwargs['vit']=self.storminfo.__dict__ return self._conf.strinterp(section,string,__taskvars=self.__taskvars, **kwargs) def timestr(self,string,ftime=None,atime=None,section=None,**kwargs): """!Expands a string in the given conf section, including time vars Expands a string in the given conf section (default: self.section), and includes forecast and analysis time (default: conf.cycle) information in the variables that can be expanded. The mandatory ftime argument is the forecast time which will be used to expand values such as fHH, fYMDH, etc. The optional atime will be used to expand aHH, aYMDH, etc., and the two will be used together for forecast minus analysis fields like fahr. See hafs.config.timestrinterp for details As with self.icstr, this class's vitals are available via the "vit" variable while interpolating strings. @param string the string to expand @param ftime: the forecast time @param atime: Optional: the analysis time. Default: self.conf.cycle @param section Optional: the section in which to expand it. Default: self.section. @param kwargs: more arguments for string substitution""" if(section is None): section=self._section if self.storminfo and 'vit' not in kwargs: kwargs['vit']=self.storminfo.__dict__ if 'taskvars' in kwargs: return self._conf.timestrinterp(section,string,ftime,atime,**kwargs) else: return self._conf.timestrinterp(section,string,ftime,atime, __taskvars=self.__taskvars,**kwargs) def getdir(self,opt,default=None,morevars=None): """!Alias for hafs.config.HAFSConfig.get() for the "dir" section. @param opt the option name @param default Optional: default value if nothing is found. @param morevars Optional: more variables for string substitution""" return self._conf.get('dir',opt,default,morevars=morevars, taskvars=self.__taskvars) def getexe(self,opt,default=None,morevars=None): """!Alias for hafs.config.HAFSConfig.get() for the "exe" section. @param opt the option name @param default Optional: default value if nothing is found. @param morevars Optional: more variables for string substitution""" return self._conf.get('exe',opt,default,morevars=morevars, taskvars=self.__taskvars) def getconf(self): """!Returns this HAFSTask's hafs.config.HAFSConfig object.""" return self._conf ##@var conf # This HAFSTask's hafs.config.HAFSConfig object conf=property(getconf,None,None, """!The HAFSConfig for this HAFSTask (read-only)""") def getsection(self): """!Returns this HAFSTask's section name in the HAFSConfig.""" return self._section ##@var section # The confsection in self.section for this HAFSTask (read-only) section=property(getsection,None,None, """!The confsection in self.section for this HAFSTask (read-only)""") def log(self,subdom=None): """!Obtain a logging domain. Creates or returns a logging.Logger. If subdom is None or unspecified, returns a cached logger for this task's logging domain. Otherwise, returns a logger for the specified subdomain of this task's logging domain. @param subdom Optional: the desired logging domain""" if subdom is None: return self._logger return self._conf.log(self.taskname+'.'+str(subdom)) def inputiter(self): """!Iterates over all inputs required by this task. Iterates over dict-like objects suitable for input to hafs.input.InputSource.get. Each object contains the following keywords: * dataset: string name of the dataset (gfs, gdas1, gefs, enkf, etc.) * item: string name of the object (ie.: gfs_sf, gfs_sfcanl, bufr) * atime: self.conf.cycle * ftime: only present when relevant: the forecast time, in a format accepted by to_datetime_rel * enkfmem: only present when relevant: the ENKF member ID * obstype: only present when relevant: the bufr data type. Other keywords may be present if needed. They will be passed on by hafs.input.InputSource for string replacement.""" return yield {} # ensures this is an iterator