#!/usr/bin/env python
'''
PROGRAM:
Create the ROCOTO workflow for a forecast only experiment given the configuration of the GFS parallel
AUTHOR:
Rahul.Mahajan
rahul.mahajan@noaa.gov
FILE DEPENDENCIES:
1. config files for the parallel; e.g. config.base, config.fcst[.gfs], etc.
Without this dependency, the script will fail
OUTPUT:
1. PSLOT.xml: XML workflow
2. PSLOT.crontab: crontab for ROCOTO run command
'''
import os
import sys
import re
import numpy as np
from datetime import datetime
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import rocoto
import workflow_utils as wfu
taskplan = ['getic', 'waveinit', 'waveprep', 'fcst', 'post', 'wavepostsbs', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavegempak', 'waveawipsbulls', 'waveawipsgridded', 'postsnd', 'gempak', 'awips', 'vrfy', 'metp', 'arch']
def main():
parser = ArgumentParser(description='Setup XML workflow and CRONTAB for a forecast only experiment.', formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--expdir',help='full path to experiment directory containing config files', type=str, required=False, default=os.environ['PWD'])
parser.add_argument('--cdump',help='cycle to run forecasts', type=str, choices=['gdas', 'gfs'], default='gfs', required=False)
args = parser.parse_args()
configs = wfu.get_configs(args.expdir)
_base = wfu.config_parser([wfu.find_config('config.base', configs)])
if not os.path.samefile(args.expdir,_base['EXPDIR']):
print('MISMATCH in experiment directories!')
print(f'''config.base: EXPDIR = {repr(_base['EXPDIR'])}''')
print(f'input arg: --expdir = {repr(args.expdir)}')
sys.exit(1)
dict_configs = wfu.source_configs(configs, taskplan)
dict_configs['base']['CDUMP'] = args.cdump
# npe_node_max is the same for all tasks, so just use the one from fcst
dict_configs['base']['npe_node_max'] = dict_configs['fcst']['npe_node_max']
# First create workflow XML
create_xml(dict_configs)
# Next create the crontab
wfu.create_crontab(dict_configs['base'])
return
def get_preamble():
'''
Generate preamble for XML
'''
strings = []
strings.append('\n')
strings.append('\n')
return ''.join(strings)
def get_definitions(base):
'''
Create entities related to the experiment
'''
machine = base.get('machine', wfu.detectMachine())
scheduler = wfu.get_scheduler(machine)
hpssarch = base.get('HPSSARCH', 'NO').upper()
strings = []
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
if base['INTERVAL'] is None:
print('cycle INTERVAL cannot be None')
sys.exit(1)
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
if scheduler in ['slurm'] and machine in ['ORION']:
strings.append(f'''\t\n''')
if scheduler in ['slurm']:
strings.append(f'''\t\n''')
strings.append(f'\t\n')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append('\t\n')
strings.append('\t\n')
strings.append('\t\n')
strings.append('\n')
return ''.join(strings)
def get_resources(dict_configs, cdump='gdas'):
'''
Create resource entities
'''
strings = []
strings.append('\t\n')
strings.append('\n')
base = dict_configs['base']
machine = base.get('machine', wfu.detectMachine())
reservation = base.get('RESERVATION', 'NONE').upper()
scheduler = wfu.get_scheduler(machine)
do_wave = base.get('DO_WAVE', 'NO').upper()
do_bufrsnd = base.get('DO_BUFRSND', 'NO').upper()
do_gempak = base.get('DO_GEMPAK', 'NO').upper()
do_awips = base.get('DO_AWIPS', 'NO').upper()
do_metp = base.get('DO_METP', 'NO').upper()
for task in taskplan:
cfg = dict_configs[task]
wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, reservation, cdump=cdump)
taskstr = f'{task.upper()}_{cdump.upper()}'
strings.append(f'\t\n')
if scheduler in ['slurm'] and machine in ['ORION'] and task not in ['getic', 'arch']:
strings.append(f'\t\n')
if scheduler in ['slurm'] and task in ['getic', 'arch']:
strings.append(f'\t\n')
strings.append(f'\t\n')
strings.append(f'\t\n')
if len(memstr) != 0:
strings.append(f'\t\n')
strings.append(f'\t\n')
strings.append('\n')
strings.append('\t\n')
return ''.join(strings)
def get_postgroups(post, cdump='gdas'):
fhmin = post['FHMIN']
fhmax = post['FHMAX']
fhout = post['FHOUT']
# Get a list of all forecast hours
if cdump in ['gdas']:
fhrs = list(range(fhmin, fhmax+fhout, fhout))
elif cdump in ['gfs']:
fhmax = np.max([post['FHMAX_GFS_00'],post['FHMAX_GFS_06'],post['FHMAX_GFS_12'],post['FHMAX_GFS_18']])
fhout = post['FHOUT_GFS']
fhmax_hf = post['FHMAX_HF_GFS']
fhout_hf = post['FHOUT_HF_GFS']
fhrs_hf = list(range(fhmin, fhmax_hf+fhout_hf, fhout_hf))
fhrs = fhrs_hf + list(range(fhrs_hf[-1]+fhout, fhmax+fhout, fhout))
npostgrp = post['NPOSTGRP']
ngrps = npostgrp if len(fhrs) > npostgrp else len(fhrs)
fhrs = [f'f{f:03d}' for f in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [f.tolist() for f in fhrs]
fhrgrp = ' '.join([f'{x:03d}' for x in range(1, ngrps+1)])
fhrdep = ' '.join([f[-1] for f in fhrs])
fhrlst = ' '.join(['_'.join(f) for f in fhrs])
return fhrgrp, fhrdep, fhrlst
def get_workflow(dict_configs, cdump='gdas'):
'''
Create tasks for forecast only workflow
'''
envars = []
envars.append(rocoto.create_envar(name='RUN_ENVIR', value='&RUN_ENVIR;'))
envars.append(rocoto.create_envar(name='HOMEgfs', value='&HOMEgfs;'))
envars.append(rocoto.create_envar(name='EXPDIR', value='&EXPDIR;'))
envars.append(rocoto.create_envar(name='CDATE', value='@Y@m@d@H'))
envars.append(rocoto.create_envar(name='CDUMP', value='&CDUMP;'))
envars.append(rocoto.create_envar(name='PDY', value='@Y@m@d'))
envars.append(rocoto.create_envar(name='cyc', value='@H'))
envars.append(rocoto.create_envar(name='GDATE', value='@Y@m@d@H'))
envars.append(rocoto.create_envar(name='GDUMP', value='gdas'))
envars.append(rocoto.create_envar(name='gPDY', value='@Y@m@d'))
envars.append(rocoto.create_envar(name='gcyc', value='@H'))
base = dict_configs['base']
machine = base.get('machine', wfu.detectMachine())
hpssarch = base.get('HPSSARCH', 'NO').upper()
do_wave = base.get('DO_WAVE', 'NO').upper()
do_wave_cdump = base.get('WAVE_CDUMP', 'BOTH').upper()
do_bufrsnd = base.get('DO_BUFRSND', 'NO').upper()
do_gempak = base.get('DO_GEMPAK', 'NO').upper()
do_awips = base.get('DO_AWIPS', 'NO').upper()
do_vrfy = base.get('DO_VRFY', 'YES').upper()
do_metp = base.get('DO_METP', 'NO').upper()
tasks = []
# getic
if hpssarch in ['YES']:
deps = []
data = '&ROTDIR;/&CDUMP;.@Y@m@d/@H/atmos/INPUT/sfc_data.tile6.nc'
dep_dict = {'type':'data', 'data':data}
deps.append(rocoto.add_dependency(dep_dict))
data = '&ROTDIR;/&CDUMP;.@Y@m@d/@H/atmos/RESTART/@Y@m@d.@H0000.sfcanl_data.tile6.nc'
dep_dict = {'type':'data', 'data':data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='nor', dep=deps)
task = wfu.create_wf_task('getic', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# waveinit
if do_wave in ['Y', 'YES'] and do_wave_cdump in ['GFS', 'BOTH']:
task = wfu.create_wf_task('waveinit', cdump=cdump, envar=envars)
tasks.append(task)
tasks.append('\n')
# waveprep
if do_wave in ['Y', 'YES'] and do_wave_cdump in ['GFS', 'BOTH']:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}waveinit'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('waveprep', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# fcst
deps = []
data = '&ROTDIR;/&CDUMP;.@Y@m@d/@H/atmos/INPUT/sfc_data.tile6.nc'
dep_dict = {'type':'data', 'data':data}
deps.append(rocoto.add_dependency(dep_dict))
data = '&ROTDIR;/&CDUMP;.@Y@m@d/@H/atmos/RESTART/@Y@m@d.@H0000.sfcanl_data.tile6.nc'
dep_dict = {'type':'data', 'data':data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='or', dep=deps)
if do_wave in ['Y', 'YES'] and do_wave_cdump in ['GFS', 'BOTH']:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}waveprep'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies2 = rocoto.create_dependency(dep_condition='and', dep=deps)
deps = []
deps.append(dependencies)
if do_wave in ['Y', 'YES'] and do_wave_cdump in ['GFS', 'BOTH']:
deps.append(dependencies2)
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('fcst', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# post
deps = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.log#dep#.txt'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#')
fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#')
ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;')
postenvars = envars + [fhrgrp] + [fhrlst] + [ROTDIR]
varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = get_postgroups(dict_configs['post'], cdump=cdump)
vardict = {varname2: varval2, varname3: varval3}
task = wfu.create_wf_task('post', cdump=cdump, envar=postenvars, dependency=dependencies,
metatask='post', varname=varname1, varval=varval1, vardict=vardict)
tasks.append(task)
tasks.append('\n')
# wavepostsbs
if do_wave in ['Y', 'YES'] and do_wave_cdump in ['GFS', 'BOTH']:
deps = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/wave/rundata/{cdump}wave.out_grd.gnh_10m.@Y@m@d.@H0000'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/wave/rundata/{cdump}wave.out_grd.aoc_9km.@Y@m@d.@H0000'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/wave/rundata/{cdump}wave.out_grd.gsh_15m.@Y@m@d.@H0000'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('wavepostsbs', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# wavepostbndpnt
if do_wave in ['Y', 'YES']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('wavepostbndpnt', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# wavepostbndpntbll
if do_wave in ['Y', 'YES']:
deps = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.logf180.txt'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('wavepostbndpntbll', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# wavepostpnt
if do_wave in ['Y', 'YES']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type':'task', 'name':f'{cdump}wavepostbndpntbll'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('wavepostpnt', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# wavegempak
if do_wave in ['Y', 'YES'] and do_gempak in ['Y', 'YES']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('wavegempak', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# waveawipsbulls
if do_wave in ['Y', 'YES'] and do_awips in ['Y', 'YES']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type':'task', 'name':f'{cdump}wavepostpnt'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('waveawipsbulls', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# waveawipsgridded
if do_wave in ['Y', 'YES'] and do_awips in ['Y', 'YES']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('waveawipsgridded', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
#postsnd
if do_bufrsnd in ['Y', 'YES']:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('postsnd', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# awips
if do_awips in ['Y', 'YES']:
deps = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#')
fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#')
ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;')
awipsenvars = envars + [fhrgrp] + [fhrlst] + [ROTDIR]
varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = get_awipsgroups(dict_configs['awips'], cdump=cdump)
vardict = {varname2: varval2, varname3: varval3}
task = wfu.create_wf_task('awips', cdump=cdump, envar=awipsenvars, dependency=dependencies,
metatask='awips', varname=varname1, varval=varval1, vardict=vardict)
tasks.append(task)
tasks.append('\n')
# gempak
if do_gempak in ['Y', 'YES']:
deps = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;')
gempakenvars = envars + [ROTDIR]
task = wfu.create_wf_task('gempak', cdump=cdump, envar=gempakenvars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# vrfy
if do_vrfy in ['Y', 'YES']:
deps = []
dep_dict = {'type':'metatask', 'name':f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('vrfy', cdump=cdump, envar=envars, dependency=dependencies)
tasks.append(task)
tasks.append('\n')
# metp
if do_metp in ['Y', 'YES']:
deps = []
dep_dict = {'type':'metatask', 'name':f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
sdate_gfs = rocoto.create_envar(name='SDATE_GFS', value='&SDATE;')
metpcase = rocoto.create_envar(name='METPCASE', value='#metpcase#')
metpenvars = envars + [sdate_gfs] + [metpcase]
varname1 = 'metpcase'
varval1 = 'g2g1 g2o1 pcp1'
task = wfu.create_wf_task('metp', cdump=cdump, envar=metpenvars, dependency=dependencies,
metatask='metp', varname=varname1, varval=varval1)
tasks.append(task)
tasks.append('\n')
# arch
deps = []
dep_dict = {'type':'metatask', 'name':f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
if do_vrfy in ['Y', 'YES']:
dep_dict = {'type':'task', 'name':f'{cdump}vrfy'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type':'streq', 'left':'&ARCHIVE_TO_HPSS;', 'right':f'{hpssarch}'}
deps.append(rocoto.add_dependency(dep_dict))
if do_wave in ['Y', 'YES']:
dep_dict = {'type': 'task', 'name': f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}wavepostpnt'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}wavepostbndpnt'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('arch', cdump=cdump, envar=envars, dependency=dependencies, final=True)
tasks.append(task)
tasks.append('\n')
return ''.join(tasks)
def get_awipsgroups(awips, cdump='gfs'):
fhmin = awips['FHMIN']
fhmax = awips['FHMAX']
fhout = awips['FHOUT']
# Get a list of all forecast hours
if cdump in ['gdas']:
fhrs = range(fhmin, fhmax+fhout, fhout)
elif cdump in ['gfs']:
fhmax = np.max([awips['FHMAX_GFS_00'],awips['FHMAX_GFS_06'],awips['FHMAX_GFS_12'],awips['FHMAX_GFS_18']])
fhout = awips['FHOUT_GFS']
fhmax_hf = awips['FHMAX_HF_GFS']
fhout_hf = awips['FHOUT_HF_GFS']
if fhmax > 240:
fhmax = 240
if fhmax_hf > 240:
fhmax_hf = 240
fhrs_hf = range(fhmin, fhmax_hf+fhout_hf, fhout_hf)
fhrs = list(fhrs_hf) + list(range(fhrs_hf[-1]+fhout, fhmax+fhout, fhout))
nawipsgrp = awips['NAWIPSGRP']
ngrps = nawipsgrp if len(fhrs) > nawipsgrp else len(fhrs)
fhrs = [f'f{f:03d}' for f in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [f.tolist() for f in fhrs]
fhrgrp = ' '.join([f'{x:03d}' for x in range(0, ngrps)])
fhrdep = ' '.join([f[-1] for f in fhrs])
fhrlst = ' '.join(['_'.join(f) for f in fhrs])
return fhrgrp, fhrdep, fhrlst
def get_workflow_body(dict_configs, cdump='gdas'):
'''
Create the workflow body
'''
strings = []
strings.append('\n')
strings.append(']>\n')
strings.append('\n')
strings.append('\n')
strings.append('\n')
strings.append('\t&EXPDIR;/logs/@Y@m@d@H.log\n')
strings.append('\n')
strings.append('\t\n')
strings.append(f'\t&SDATE; &EDATE; &INTERVAL;\n')
strings.append('\n')
strings.append(get_workflow(dict_configs, cdump=cdump))
strings.append('\n')
strings.append('\n')
return ''.join(strings)
def create_xml(dict_configs):
'''
Given an experiment directory containing config files and
XML directory containing XML templates, create the workflow XML
'''
dict_configs['base']['INTERVAL'] = wfu.get_gfs_interval(dict_configs['base']['gfs_cyc'])
base = dict_configs['base']
preamble = get_preamble()
definitions = get_definitions(base)
resources = get_resources(dict_configs, cdump=base['CDUMP'])
workflow = get_workflow_body(dict_configs, cdump=base['CDUMP'])
# Removes &MEMORY_JOB_DUMP post mortem from gdas tasks
temp_workflow = ''
memory_dict = []
for each_resource_string in re.split(r'(\s+)', resources):
if 'MEMORY' in each_resource_string:
memory_dict.append(each_resource_string)
for each_line in re.split(r'(\s+)', workflow):
if 'MEMORY' not in each_line:
temp_workflow += each_line
else:
if any( substring in each_line for substring in memory_dict):
temp_workflow += each_line
workflow = temp_workflow
# Start writing the XML file
fh = open(f'{base["EXPDIR"]}/{base["PSLOT"]}.xml', 'w')
fh.write(preamble)
fh.write(definitions)
fh.write(resources)
fh.write(workflow)
fh.close()
return
if __name__ == '__main__':
main()
sys.exit(0)