#!/usr/bin/env python ''' PROGRAM: Create the ROCOTO workflow for a forecast only experiment given the configuration of the GFS parallel AUTHOR: Rahul.Mahajan rahul.mahajan@noaa.gov FILE DEPENDENCIES: 1. config files for the parallel; e.g. config.base, config.fcst[.gfs], etc. Without this dependency, the script will fail OUTPUT: 1. PSLOT.xml: XML workflow 2. PSLOT.crontab: crontab for ROCOTO run command ''' import os import sys import re import numpy as np from datetime import datetime from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter import rocoto import workflow_utils as wfu taskplan = ['getic', 'fv3ic', 'fcst', 'post', 'vrfy', 'arch'] def main(): parser = ArgumentParser(description='Setup XML workflow and CRONTAB for a forecast only experiment.', formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--expdir',help='full path to experiment directory containing config files', type=str, required=False, default=os.environ['PWD']) parser.add_argument('--cdump',help='cycle to run forecasts', type=str, choices=['gdas', 'gfs'], default='gfs', required=False) args = parser.parse_args() configs = wfu.get_configs(args.expdir) _base = wfu.config_parser([wfu.find_config('config.base', configs)]) if not os.path.samefile(args.expdir,_base['EXPDIR']): print 'MISMATCH in experiment directories!' print 'config.base: EXPDIR = %s' % repr(_base['EXPDIR']) print 'input arg: --expdir = %s' % repr(args.expdir) sys.exit(1) dict_configs = wfu.source_configs(configs, taskplan) dict_configs['base']['CDUMP'] = args.cdump # First create workflow XML create_xml(dict_configs) # Next create the crontab wfu.create_crontab(dict_configs['base']) return def get_preamble(): ''' Generate preamble for XML ''' strings = [] strings.append('\n') strings.append('\n') return ''.join(strings) def get_definitions(base): ''' Create entities related to the experiment ''' machine = base.get('machine', wfu.detectMachine()) scheduler = wfu.get_scheduler(machine) strings = [] strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['PSLOT']) strings.append('\t\n' % base['CDUMP']) strings.append('\t\n' % base['CASE']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['SDATE'].strftime('%Y%m%d%H%M')) strings.append('\t\n' % base['EDATE'].strftime('%Y%m%d%H%M')) if base['INTERVAL'] is None: print 'cycle INTERVAL cannot be None' sys.exit(1) strings.append('\t\n' % base['INTERVAL']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['RUN_ENVIR']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['EXPDIR']) strings.append('\t\n' % base['ROTDIR']) strings.append('\t\n' % base['ICSDIR']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['HOMEgfs']) strings.append('\t\n' % base['BASE_JOB']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['ACCOUNT']) strings.append('\t\n' % base['QUEUE']) strings.append('\t\n' % base['QUEUE_ARCH']) if scheduler in ['slurm']: strings.append('\t\n' % base['QUEUE_ARCH']) strings.append('\t\n' % scheduler) strings.append('\n') strings.append('\t\n') strings.append('\t\n') strings.append('\n') strings.append('\t\n') strings.append('\t\n') strings.append('\t\n') strings.append('\t\n') strings.append('\n') return ''.join(strings) def get_resources(dict_configs, cdump='gdas'): ''' Create resource entities ''' strings = [] strings.append('\t\n') strings.append('\n') base = dict_configs['base'] machine = base.get('machine', wfu.detectMachine()) scheduler = wfu.get_scheduler(machine) for task in taskplan: cfg = dict_configs[task] wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, cdump=cdump) taskstr = '%s_%s' % (task.upper(), cdump.upper()) strings.append('\t\n' % (taskstr, queuestr)) if scheduler in ['slurm'] and task in ['getic', 'arch']: strings.append('\t\n' % taskstr ) strings.append('\t\n' % (taskstr, wtimestr)) strings.append('\t\n' % (taskstr, resstr)) if len(memstr) != 0: strings.append('\t\n' % (taskstr, memstr)) strings.append('\t\n' % (taskstr, natstr)) strings.append('\n') strings.append('\t\n') return ''.join(strings) def get_postgroups(post, cdump='gdas'): fhmin = post['FHMIN'] fhmax = post['FHMAX'] fhout = post['FHOUT'] # Get a list of all forecast hours if cdump in ['gdas']: fhrs = range(fhmin, fhmax+fhout, fhout) elif cdump in ['gfs']: fhmax = np.max([post['FHMAX_GFS_00'],post['FHMAX_GFS_06'],post['FHMAX_GFS_12'],post['FHMAX_GFS_18']]) fhout = post['FHOUT_GFS'] fhmax_hf = post['FHMAX_HF_GFS'] fhout_hf = post['FHOUT_HF_GFS'] fhrs_hf = range(fhmin, fhmax_hf+fhout_hf, fhout_hf) fhrs = fhrs_hf + range(fhrs_hf[-1]+fhout, fhmax+fhout, fhout) npostgrp = post['NPOSTGRP'] ngrps = npostgrp if len(fhrs) > npostgrp else len(fhrs) fhrs = ['f%03d' % f for f in fhrs] fhrs = np.array_split(fhrs, ngrps) fhrs = [f.tolist() for f in fhrs] fhrgrp = ' '.join(['%03d' % x for x in range(1, ngrps+1)]) fhrdep = ' '.join([f[-1] for f in fhrs]) fhrlst = ' '.join(['_'.join(f) for f in fhrs]) return fhrgrp, fhrdep, fhrlst def get_workflow(dict_configs, cdump='gdas'): ''' Create tasks for forecast only workflow ''' envars = [] envars.append(rocoto.create_envar(name='RUN_ENVIR', value='&RUN_ENVIR;')) envars.append(rocoto.create_envar(name='HOMEgfs', value='&HOMEgfs;')) envars.append(rocoto.create_envar(name='EXPDIR', value='&EXPDIR;')) envars.append(rocoto.create_envar(name='CDATE', value='@Y@m@d@H')) envars.append(rocoto.create_envar(name='CDUMP', value='&CDUMP;')) envars.append(rocoto.create_envar(name='PDY', value='@Y@m@d')) envars.append(rocoto.create_envar(name='cyc', value='@H')) tasks = [] # getics deps = [] data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/pgbanl.&CDUMP;.@Y@m@d@H' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/siganl.&CDUMP;.@Y@m@d@H' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/sfcanl.&CDUMP;.@Y@m@d@H' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) deps = rocoto.create_dependency(dep_condition='and', dep=deps) dependencies = rocoto.create_dependency(dep_condition='not', dep=deps) task = wfu.create_wf_task('getic', cdump=cdump, envar=envars, dependency=dependencies) tasks.append(task) tasks.append('\n') # chgres deps = [] data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/siganl.&CDUMP;.@Y@m@d@H' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/sfcanl.&CDUMP;.@Y@m@d@H' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) deps = [] data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/&CASE;/INPUT/gfs_data.tile6.nc' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/&CASE;/INPUT/sfc_data.tile6.nc' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) deps = rocoto.create_dependency(dep_condition='and', dep=deps) dependencies2 = rocoto.create_dependency(dep_condition='not', dep=deps) deps = [] deps.append(dependencies) deps.append(dependencies2) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task = wfu.create_wf_task('fv3ic', cdump=cdump, envar=envars, dependency=dependencies) tasks.append(task) tasks.append('\n') # fcst deps = [] data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/&CASE;/INPUT/gfs_data.tile6.nc' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) data = '&ICSDIR;/@Y@m@d@H/&CDUMP;/&CASE;/INPUT/sfc_data.tile6.nc' dep_dict = {'type':'data', 'data':data} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task = wfu.create_wf_task('fcst', cdump=cdump, envar=envars, dependency=dependencies) tasks.append(task) tasks.append('\n') # post deps = [] data = '&ROTDIR;/%s.@Y@m@d/@H/%s.t@Hz.log#dep#.nemsio' % (cdump, cdump) dep_dict = {'type': 'data', 'data': data} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#') fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#') ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;') postenvars = envars + [fhrgrp] + [fhrlst] + [ROTDIR] varname1, varname2, varname3 = 'grp', 'dep', 'lst' varval1, varval2, varval3 = get_postgroups(dict_configs['post'], cdump=cdump) vardict = {varname2: varval2, varname3: varval3} task = wfu.create_wf_task('post', cdump=cdump, envar=postenvars, dependency=dependencies, metatask='post', varname=varname1, varval=varval1, vardict=vardict) tasks.append(task) tasks.append('\n') # vrfy deps = [] dep_dict = {'type':'metatask', 'name':'%spost' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) task = wfu.create_wf_task('vrfy', cdump=cdump, envar=envars, dependency=dependencies) tasks.append(task) tasks.append('\n') # arch deps = [] dep_dict = {'type':'metatask', 'name':'%spost' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type':'task', 'name':'%svrfy' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type':'streq', 'left':'&ARCHIVE_TO_HPSS;', 'right':'YES'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task = wfu.create_wf_task('arch', cdump=cdump, envar=envars, dependency=dependencies, final=True) tasks.append(task) tasks.append('\n') return ''.join(tasks) def get_workflow_body(dict_configs, cdump='gdas'): ''' Create the workflow body ''' strings = [] strings.append('\n') strings.append(']>\n') strings.append('\n') strings.append('\n') strings.append('\n') strings.append('\t&EXPDIR;/logs/@Y@m@d@H.log\n') strings.append('\n') strings.append('\t\n') strings.append('\t&SDATE; &EDATE; &INTERVAL;\n' % cdump) strings.append('\n') strings.append(get_workflow(dict_configs, cdump=cdump)) strings.append('\n') strings.append('\n') return ''.join(strings) def create_xml(dict_configs): ''' Given an experiment directory containing config files and XML directory containing XML templates, create the workflow XML ''' dict_configs['base']['INTERVAL'] = wfu.get_gfs_interval(dict_configs['base']['gfs_cyc']) base = dict_configs['base'] preamble = get_preamble() definitions = get_definitions(base) resources = get_resources(dict_configs, cdump=base['CDUMP']) workflow = get_workflow_body(dict_configs, cdump=base['CDUMP']) # Removes &MEMORY_JOB_DUMP post mortem from gdas tasks temp_workflow = '' memory_dict = [] for each_resource_string in re.split(r'(\s+)', resources): if 'MEMORY' in each_resource_string: memory_dict.append(each_resource_string) for each_line in re.split(r'(\s+)', workflow): if 'MEMORY' not in each_line: temp_workflow += each_line else: if any( substring in each_line for substring in memory_dict): temp_workflow += each_line workflow = temp_workflow # Start writing the XML file fh = open('%s/%s.xml' % (base['EXPDIR'], base['PSLOT']), 'w') fh.write(preamble) fh.write(definitions) fh.write(resources) fh.write(workflow) fh.close() return if __name__ == '__main__': main() sys.exit(0)