#! /usr/bin/env python ##@namespace ush.hwrf_datastore_setrun # In general, a utility script for specifically modifying the product and task availability # of an HWRF sqlite3 database file. This module is an extension of hwrf_datastore # with added capabiltity to set tasks as having been run. Its specific intent # was to modify the hwrf database in order run the the forecast task from # a clean hwrf database that was created by the launcher task. The assumption # being that you have a valid intercom directory. # That is: Run launcher, have intercom, run forecast. # # This script can perform one of several operations on an sqlite3 file # generated by the HWRF system. # # **CALLING THIS SCRIPT: Usage** # # @code{.sh} # hwrf_datastore_setrun.py [CONFhwrf=] OP [ARG] [/path/to/file1.sqlite3 [file2.sqlite3 [...]]] # @endcode # # * CONFhwrf --- Optional, Command line MUST use format 'CONFhwrf=' followed by # an HWRF conf file ie. /path/to/storm1.conf # This can be set as an environment variable. The Command line value # overrides an environment variable setting. Use the command line if # using this script in automation (ie. calling from cron), else the # script prompts the user for confirmation. # # * fileN.sqlite3 --- Optional, HWRF sqlite3 database files. # If no database files are present than the hwrf # database file defined in CONFhwrf will be used. # # Valid operations (OP) are: # * DUMP --- dump entire database to stdout # * SET2RUN_WRF --- Sets up the database in order to run the wrf forecast task. # It Checks and sets the required tasks and input for running # wrf forecast as completed and products available. It also # unruns the forecast in case it has already been run. # * SET_RUN --- Checks and marks the specified task as having been run (see below) # * UNFAIL --- mark all failed or running tasks as unstarted # * UNRUN --- mark all tasks as unstarted # * UNRUN_ONE taskname --- unrun the specified task (see below) # #SET_RUN, UNRUN_ONE take an argument, and that argument is mandatory: the #task to mark as "run" or "unrun". The argument is the task id, which is everything in #the id after the %s:: in the output of a call to hwrf_datastore_setrun.py DUMP. # # **ABOUT THIS SCRIPT:** # # This script is used in support of Running the forecast task without running # initialization tasks. DTC supports several atmospheric scientists that only # want to run and experiment with the forecast model. They do no use Rocoto # for automation and instead run with DTC-provided wrapper scripts. # # To do this, DTC provides an end-to-end HWRF run of an intercom directory. # After running the launcher wrapper, this script is than run which allows # the use of wrappers to just run the forecast model using the initialization # information in the given intercom directory. # # Previously, to re-run the forecast model with wrappers, required one run # through all tasks. Additionally, you would be tied to the data location defined # in the datastore based on that run. This script provides convenience, # a shortcut and independence from data output location. # # **HOW TO USE THIS SCRIPT:** # # 1) Using this script assumes you are familiar setting up and running # HWRF with wrappers as described in the DTC HWRF users guide: # # http://www.dtcenter.org/HurrWRF/users/docs/index.php # # 2) The following variables are described in the DTC HWRF users guide, # and defined in hwrf_basic.conf. In the steps below you must substitute # these , with the path of where they are located on your system. # # HWRF installation location. \n # Working directory for all jobs. \n # Location of output files for use by next HWRF cycle. # # 3) Edit the following variables based on the DTC provided "intercom" directory. # # * Edit /wrappers/global_vars.sh # * START_TIME= # * SID= # # * Edit /parm/hwrf_basic.conf # * forecast_length= # * run_ocean= # # 4) Run the launcher wrapper: # # @code # cd /wrappers # ./launcher_wrapper # @endcode # # 5) Place your "intercom" directory under the directory, that was generated # by the launcher wrapper in the previous step. # # /intercom # # 6) Run as follows, the hwrf_datastore_setrun.py script with the SET2RUN_WRF argument: # # @code # bash # . /storm1.holdvars.txt # export PYTHONPATH=$USHhwrf # cd $USHhwrf # ./hwrf_datastore_setrun.py SET2RUN_WRF # @endcode # # 7) You are now ready to run the Forecast Model as described in the DTC users guide. # http://www.dtcenter.org/HurrWRF/users/docs/index.php import os from hwrf_datastore import * from produtil.datastore import PARTIAL import produtil.setup, produtil.log, produtil.cluster import hwrf_expt # Constants: ##@var AVAILABLE # Constant used to indicate a tasks product is available. AVAILABLE=1 """Constant used to indicate a tasks product is available.""" ##@var NOT_AVAILABLE # Constant used to indicate a tasks product is NOT available. NOT_AVAILABLE=0 """Constant used to indicate a tasks product is NOT available.""" # Globals: ##@var logger # Logging domain used for this script logger=None # Developer Note: # I'm getting the 'tasks' from the hwrf_expt but currently retrieving # the products for each task from the database. I currently can't find # a way to get all the input products ... both wrf input and WITH pom inputs .... def run_post(ds): """!Marks the tasks and products the hwrf post task requires to run, as completed and all its products as available, if they are available on disk specified by the datastore location. @param ds the produtil.datastore.Datastore to modify. """ # Not needed ... just run exhwrf_unpost.py pass def set2run_wrf(ds): """!Marks the tasks and products the hwrf forecast task requires to run, as completed and all its products as available, if they are available on disk specified by the datastore location. @param ds the produtil.datastore.Datastore to modify. """ conf = hwrf_expt.conf runwrf = hwrf_expt.runwrf ocean_flag=conf.getbool('config','run_ocean') ocean=conf.getstr('config','ocean_model','POM') # This is needed incase the runwrf task has been marked as COMPLETED. # Which doesn't occur until the products task is completed. # If marked as COMPLETED you will not be able to run the forecast task regardless # of setting the required input tasks below. For an initial database # it will not have been set. But the "unrunning" state is what we want. # So it makes sense to go ahead and unrun. runwrf.state=UNSTARTED runwrf.unrun() hwrf_expt.wrfcopier.unrun() # Below sets up all the tasks that provides the input files the forecast # checks for and requires to run. # # runwrf.inputs a dictionary of lists of all the inputs EXCEPT POM inputs. # key is input type typename: ie. 'geogrid' # value is input source: ie. hwrf.fcsttask.Geog2WRF object # ie. # print runwrf.inputs # defaultdict(, {'fort65': [], # 'wrfinput': [], # 'wrfanl-storm1outer': [], # 'wrfbdy': [], # 'geogrid': [], # 'metgrid': [], # 'wrfanl-storm1inner': []}) # Add the base wrf tasks wrf_tasks=list() for k,v in runwrf.inputs.items(): for wrf_input in v: taskname=wrf_input.src.taskname if taskname not in wrf_tasks: wrf_tasks.append(taskname) # TODO TEST: Need to TEST will this work if HYCOM # Appends the coupling taskname if being used. if ocean_flag: if ocean == 'HYCOM': wrf_tasks.append(hwrf_expt.hycominit.taskname) elif ocean == 'POM': wrf_tasks.append(hwrf_expt.pominit.taskname) # List of tasks that need to be completed for the wrf forecast to run. logger.info('The WRF tasks: %s'%(wrf_tasks)) for task in wrf_tasks: set_task_as_having_run(ds, task) def set_task_as_having_run(ds,did): """!Marks a specific task products as available if they are on disk in the location specified in the datastore. The task in the datastore is then updated to completed if all the products are on disk or partial if only some of the products are on disk. @param ds the produtil.datastore.Datastore to modify. @param did the taskname""" set_task_and_products(ds,did) def set_task_and_products(ds,did): """!Sets a specific tasks products as available if they are on disk in the location specified in the datastore. The task in the datastore is then updated to completed if all the products are on disk or partial if only some of the products are on disk. @param ds the produtil.datastore.Datastore to modify. @param did the taskname""" # ie. # did='gfsinit/geogrid' # taskid='**task**::gfsinit/geogrid' # productid_pattern='gfsinit/geogrid::%' taskid="%s::%s"%(TASK_CATEGORY,str(did)) productid_pattern="%s::%s"%(str(did),str('%')) # Get a list of products for the task with ds.transaction() as t: locations=t.query('SELECT location FROM products WHERE id LIKE "' + productid_pattern + '"') # location=path/product products=[location[0] for location in locations] logger.info('Checking taskid: %s'%taskid) # NOTE: subvals are a comma seperated list of tuples (1,30) back in to ? with ds.transaction() as t: for product in products: if os.path.exists(product): t.mutate("UPDATE products SET available=? WHERE id like ?", (AVAILABLE,productid_pattern)) logger.info('Marking product available: %s'%product) else: logger.info('Product not on disk: %s'%product) with ds.transaction() as t: availables=t.query('SELECT available FROM products WHERE id LIKE "' + productid_pattern + '"') products_available=[available[0] for available in availables] if AVAILABLE in products_available and NOT_AVAILABLE in products_available: t.mutate("UPDATE products SET available=? WHERE id=?", (PARTIAL, taskid)) logger.info('Marking task: %s as PARTIAL: %s'%(taskid,PARTIAL)) elif AVAILABLE in products_available and not NOT_AVAILABLE in products_available: t.mutate("UPDATE products SET available=? WHERE id=?", (COMPLETED, taskid)) logger.info('Marking task: %s as COMPLETED: %s'%(taskid,COMPLETED)) else: logger.info('No products are available on disk for task: %s'%(taskid)) def usage(args): """!Sends a program usage message to stderr and exits with status 2 @param args the command-line arguments that were provided.""" print('''\n FORMAT: hwrf_datastore_setrun.py [CONFhwrf=] OP [ARG] [/path/to/file1.sqlite3 [/path/to/file2.sqlite3 [...]]] where OP is one of: DUMP - dump entire database to stdout SET2RUN_WRF - Sets up the database in order to run the wrf forecast task. It Checks and sets the required tasks and input for running wrf forecast as completed and products available. It also unruns the forecast in case it has already been run. SET_RUN - Checks and marks the task as having been run (see below) UNFAIL - mark all failed or running tasks as unstarted UNRUN - mark all tasks as unstarted UNRUN_ONE - see below SET_RUN, UNRUN_ONE takes an argument, and that argument is mandatory: the task to mark as "run" or "unrun". The argument is the task id, which is everything in the id after the %s:: in the output of a hwrf_datastore.py DUMP. CONhwrf can be set as an environment variable. The command line setting takes precedence over an existing environment variable. Command line MUST use format 'CONFhwrf=' followed by an HWRF conf file ie. path/to/your/storm1.conf fileN.sqlite3 database files are optional. If no database files are present than the hwrf database file defined in CONFhwrf will be used. '''%(TASK_CATEGORY,), file=sys.stderr) print(args, file=sys.stderr) sys.exit(2) # for arg in args: # print>>sys.stderr,arg # sys.exit(2) def check_required_envs(confhwrf_arg): """!Checks for the required CONFhwrf environment variable. If CONFhwrf is provided on the command line it will override or set the existing evironment variable. If required conditions are not met than than exists with status 2. @param confhwrf_arg the path/filename used to set CONFhwrf""" try: assert(os.environ['CONFhwrf']) logger.info('Existing CONFhwrf= %s' % os.environ['CONFhwrf']) if confhwrf_arg: os.environ['CONFhwrf'] = confhwrf_arg logger.info('Changing CONFhwrf to: %s' % os.environ['CONFhwrf']) # This else exists as safety check by prompting the user to confirm # you want to modify the database defined in the file pointed to by your # CONFhwrf environment variable. It is meant to prevent an inadvertant # modification of an hwrf workflow database. # If you pass CONFhwrf on the command line, than you are not prompted. else: yes=input("\n OK modifying the hwrf database defined in the above CONFhwrf file ? (yes or no): \n") if not (yes.lower() == 'y' or yes.lower() == 'yes'): logger.info("Exiting script. You do not want to modify your Existing hwrf database. \n") sys.exit(0) except KeyError: if confhwrf_arg: os.environ['CONFhwrf'] = confhwrf_arg logger.info('Setting CONFhwrf: %s' % os.environ['CONFhwrf']) else: logger.info('You must define CONFhwrf environment variable or pass it in on the command line.') sys.exit(2) if not os.path.exists(os.environ['CONFhwrf']): logger.info('CONFhwrf file DOES NOT exist, EXITING.: %s' % os.environ['CONFhwrf']) sys.exit(2) def setup(confhwrf_arg): """!Setup Logging and initialize the experiment. """ global logger global datastore produtil.setup.setup() logger=logging.getLogger('hwrf_datastore_setrun') logger.setLevel(logging.INFO) # No handlers yet, so logs only to stdout. # Call check before hwrf_expt.init_module. check_required_envs(confhwrf_arg) hwrf_expt.init_module() conf=hwrf_expt.conf logdir=os.path.join( conf.getloc('CDSCRUB'), conf.getstr('config', 'SUBEXPT'), 'log') logname='hwrf_datastore_setrun_' + conf.getstr('config','STID') + \ '_' + conf.getstr('config','YMDH') + '.log' loghere=os.path.join(logdir,logname) if not os.path.exists(logdir): try: os.mkdir(logdir) except(IOError,OSError) as e: print('%s: cannot make directory: %s'%(logdir,repr(e)), file=sys.stderr) sys.exit(2) formatter = logging.Formatter('%(asctime)s: %(name)s: %(levelname)s: %(message)s') handler=logging.FileHandler(loghere) handler.setFormatter(formatter) logger.addHandler(handler) datastore=conf.getloc('datastore') def main(): """!Main program. Parses arguments and calls other functions in this program to do the real work.""" if len(sys.argv)<2: usage(sys.argv) opargs=[] readonly=False confhwrf_arg=None # ie. CONFhwrf=/pan2/.../pytmp/hwrf_trunk/com/2012102806/18L/storm1.conf if sys.argv[1].split('=')[0].upper().strip()== 'CONFHWRF': first_database_arg=3 confhwrf_arg=sys.argv[1].split('=')[1].strip() if sys.argv[2].upper() == 'DUMP': op=dump elif sys.argv[2].upper() == 'SET2RUN_WRF': op=set2run_wrf elif sys.argv[2].upper() == 'SET_RUN': op = set_task_as_having_run opargs = [sys.argv[3]] first_database_arg = 4 elif sys.argv[2].upper() == 'UNFAIL': op=unfail elif sys.argv[2].upper() == 'UNRUN': op=unrun elif sys.argv[2].upper() == 'UNRUN_ONE': op=unrun_one opargs=[sys.argv[3]] first_database_arg=4 else: usage('Unrecognized datastore operation %s' % (sys.argv[2],)) else: first_database_arg=2 if sys.argv[1].upper()=='DUMP': op=dump elif sys.argv[1].upper()=='SET2RUN_WRF': op=set2run_wrf elif sys.argv[1].upper()=='SET_RUN': op=set_task_as_having_run opargs=[sys.argv[2]] first_database_arg=3 elif sys.argv[1].upper()=='UNFAIL': op=unfail elif sys.argv[1].upper()=='UNRUN': op=unrun elif sys.argv[1].upper()=='UNRUN_ONE': op=unrun_one opargs=[sys.argv[2]] first_database_arg=3 else: usage('Unrecognized datastore operation %s'%(sys.argv[1],)) if len(sys.argv)