#! /usr/bin/env python ##@namespace scripts.exhwrf_post # Runs the Unified Post Processor on the output of the # exhwrf_forecst script. Note that this script is restartable: if it # fails, and you call it again, it will pick up where it left off. To # force a reprocessing of the entire post-processing system, call # exhwrf_unpost first. import os, produtil.cd, produtil.sigsafety, cProfile, logging, time,math import produtil.datastore, produtil.setup, produtil.log from produtil.log import jlogger from produtil.cd import NamedDir from produtil.ecflow import set_ecflow_meter from produtil.datastore import COMPLETED, FAILED, UNSTARTED from hwrf.numerics import to_datetime_rel, to_fraction def done(task): """!Is this task done? @param task a produtil.datastore.Task @returns True if the task's state is equal to produtil.datastore.COMPLETED or produtil.datastore.FAILED. Will return False for any other status.""" state=task.getstate() return state == COMPLETED or state==FAILED def set_meter_for(name,tasks,check=False): if name=='model': rtime=tasks.last_completed_time( streams=['history','auxhist2','auxhist3'],check=check) else: rtime=tasks.last_complete() if rtime is None: return dt=rtime-tasks.conf.cycle fhr=int(math.ceil(to_fraction(dt)/3600)) # forecast hour as int set_ecflow_meter(name,fhr,tasks.log()) def post(): """!Runs the post-processor""" produtil.setup.setup() jlogger.info('starting post') import hwrf_expt hwrf_expt.init_module() run_copier=hwrf_expt.conf.getbool('config','post_runs_wrfcopier',False) run_satpost=hwrf_expt.conf.getbool('config','run_satpost',True) # Make sure we check all tasks to see if they're posted: hwrf_expt.nonsatpost.state=UNSTARTED hwrf_expt.satpost.state=UNSTARTED if run_copier: hwrf_expt.wrfcopier.state=UNSTARTED logger=logging.getLogger('exhwrf_post') # Change to a temp directory to run the post: with NamedDir(hwrf_expt.WORKhwrf,logger=logger) as t: #hwrf_expt.ds.dump() # dump entire database state to stdout alldone=False while not alldone: before=int(time.time()) # Set meters before doing anything since it may have been # a while since we last updated them: set_meter_for('pressure',hwrf_expt.nonsatpost) if run_satpost: set_meter_for('satellite',hwrf_expt.satpost) # For this one update, we will recheck all wrf products to # allow them to get ahead of the post: set_meter_for('model',hwrf_expt.runwrf,check=True) # Copy native outputs to COM: if run_copier: if not done(hwrf_expt.wrfcopier): hwrf_expt.wrfcopier.runpart() # Run post-processing: if not done(hwrf_expt.nonsatpost): hwrf_expt.nonsatpost.runpart() set_meter_for('pressure',hwrf_expt.nonsatpost) set_meter_for('model',hwrf_expt.runwrf) if not done(hwrf_expt.nonsatpost): hwrf_expt.nonsatpost.runpart() set_meter_for('pressure',hwrf_expt.nonsatpost) set_meter_for('model',hwrf_expt.runwrf,check=True) if run_satpost and not done(hwrf_expt.satpost): hwrf_expt.satpost.runpart() set_meter_for('satellite',hwrf_expt.satpost) set_meter_for('model',hwrf_expt.runwrf) if not done(hwrf_expt.nonsatpost): hwrf_expt.nonsatpost.runpart() set_meter_for('pressure',hwrf_expt.nonsatpost) set_meter_for('model',hwrf_expt.runwrf) if run_satpost: # Set sat post meter before sleeping set_meter_for('satellite',hwrf_expt.satpost) set_meter_for('model',hwrf_expt.runwrf,check=True) alldone = ( done(hwrf_expt.satpost) or not run_satpost ) \ and done(hwrf_expt.nonsatpost) \ and ( not run_copier or done(hwrf_expt.wrfcopier) ) after=int(time.time()) took=after-before threshold=5 sleeptime=20 if took < threshold: logger.info( 'Post loop iteration took only %d seconds, which is ' 'less than the threshold of %d seconds. Will sleep ' '%d seconds.'%(took,threshold,sleeptime)) time.sleep(sleeptime) else: logger.info('Post loop iteration took %d seconds, ' 'which is above the threshold of %d. ' 'Sleeping only one second.'%(took,threshold)) time.sleep(1) # avoid thrash loop in case of logic error logger.info('Done sleeping.') jlogger.info('completed post') if __name__=='__main__': post() #cProfile.run('post()') # use instead of post() to get profiling info