#!/usr/bin/env python
'''
PROGRAM:
Create the ROCOTO workflow given the configuration of the GFS parallel
AUTHOR:
Rahul.Mahajan
rahul.mahajan@noaa.gov
FILE DEPENDENCIES:
1. config files for the parallel; e.g. config.base, config.fcst[.gfs], etc.
Without these dependencies, the script will fail
OUTPUT:
1. PSLOT.xml: XML workflow
2. PSLOT.crontab: crontab for ROCOTO run command
'''
import os
import sys
import re
import numpy as np
from datetime import datetime, timedelta
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from collections import OrderedDict
import rocoto
import workflow_utils as wfu
def main():
parser = ArgumentParser(description='Setup XML workflow and CRONTAB for a GFS parallel.', formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('--expdir', help='full path to experiment directory containing config files', type=str, required=False, default=os.environ['PWD'])
args = parser.parse_args()
configs = wfu.get_configs(args.expdir)
_base = wfu.config_parser([wfu.find_config('config.base', configs)])
if not os.path.samefile(args.expdir, _base['EXPDIR']):
print('MISMATCH in experiment directories!')
print(f'config.base: EXPDIR = {repr(_base["EXPDIR"])}')
print(f'input arg: --expdir = {repr(args.expdir)}')
sys.exit(1)
gfs_steps = ['prep', 'anal', 'analdiag', 'analcalc', 'gldas', 'fcst', 'postsnd', 'post', 'vrfy', 'arch']
gfs_steps_gempak = ['gempak']
gfs_steps_awips = ['awips']
#hyb_steps = ['eobs', 'eomg', 'eupd', 'ecen', 'efcs', 'epos', 'earc']
metp_steps = ['metp']
wav_steps = ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt']
#Implement additional wave jobs at later date
wav_steps_gempak = ['wavegempak']
wav_steps_awips = ['waveawipsbulls', 'waveawipsgridded']
# From gfsv16b latest
# gfs_steps = ['prep', 'anal', 'gldas', 'fcst', 'postsnd', 'post', 'awips', 'gempak', 'vrfy', 'metp', 'arch']
hyb_steps = ['eobs', 'ediag', 'eomg', 'eupd', 'ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc']
steps = gfs_steps + hyb_steps if _base.get('DOHYBVAR', 'NO') == 'YES' else gfs_steps
steps = steps + metp_steps if _base.get('DO_METP', 'NO') == 'YES' else steps
steps = steps + gfs_steps_gempak if _base.get('DO_GEMPAK', 'NO') == 'YES' else steps
steps = steps + gfs_steps_awips if _base.get('DO_AWIPS', 'NO') == 'YES' else steps
steps = steps + wav_steps if _base.get('DO_WAVE', 'NO') == 'YES' else steps
steps = steps + wav_steps_gempak if _base.get('DO_GEMPAK', 'NO') == 'YES' else steps
steps = steps + wav_steps_awips if _base.get('DO_AWIPS', 'NO') == 'YES' else steps
steps = steps + ['wdqms'] if _base.get('DO_WDQMS', 'NO') == 'YES' else steps
steps = steps + ['sfcprep'] if _base.get('DO_SFCPREP', 'NO') == 'YES' else steps
dict_configs = wfu.source_configs(configs, steps)
# Check and set gfs_cyc specific variables
if dict_configs['base']['gfs_cyc'] != 0:
dict_configs['base'] = get_gfs_cyc_dates(dict_configs['base'])
# npe_node_max is the same for all tasks, so just use the one from fcst
dict_configs['base']['npe_node_max'] = dict_configs['fcst']['npe_node_max']
# First create workflow XML
create_xml(dict_configs)
# Next create the crontab
wfu.create_crontab(dict_configs['base'])
return
def get_gfs_cyc_dates(base):
'''
Generate GFS dates from experiment dates and gfs_cyc choice
'''
base_out = base.copy()
gfs_cyc = base['gfs_cyc']
sdate = base['SDATE']
edate = base['EDATE']
interval_gfs = wfu.get_gfs_interval(gfs_cyc)
# Set GFS cycling dates
hrdet = 0
if gfs_cyc == 1:
hrinc = 24 - sdate.hour
hrdet = edate.hour
elif gfs_cyc == 2:
if sdate.hour in [0, 12]:
hrinc = 12
elif sdate.hour in [6, 18]:
hrinc = 6
if edate.hour in [6, 18]:
hrdet = 6
elif gfs_cyc == 4:
hrinc = 6
sdate_gfs = sdate + timedelta(hours=hrinc)
edate_gfs = edate - timedelta(hours=hrdet)
if sdate_gfs > edate:
print('W A R N I N G!')
print('Starting date for GFS cycles is after Ending date of experiment')
print(f'SDATE = {sdate.strftime("%Y%m%d%H")}, EDATE = {edate.strftime("%Y%m%d%H")}')
print(f'SDATE_GFS = {sdate_gfs.strftime("%Y%m%d%H")}, EDATE_GFS = {edate_gfs.strftime("%Y%m%d%H")}')
gfs_cyc = 0
base_out['gfs_cyc'] = gfs_cyc
base_out['SDATE_GFS'] = sdate_gfs
base_out['EDATE_GFS'] = edate_gfs
base_out['INTERVAL_GFS'] = interval_gfs
fhmax_gfs = {}
for hh in ['00', '06', '12', '18']:
fhmax_gfs[hh] = base.get(f'FHMAX_GFS_{hh}', 'FHMAX_GFS_00')
base_out['FHMAX_GFS'] = fhmax_gfs
return base_out
def get_preamble():
'''
Generate preamble for XML
'''
strings = []
strings.append('\n')
strings.append('\n')
return ''.join(strings)
def get_definitions(base):
'''
Create entities related to the experiment
'''
machine = base.get('machine', wfu.detectMachine())
scheduler = wfu.get_scheduler(machine)
hpssarch = base.get('HPSSARCH', 'NO').upper()
strings = []
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
if base['gfs_cyc'] != 0:
strings.append(get_gfs_dates(base))
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
if scheduler in ['slurm'] and machine in ['ORION']:
strings.append(f'''\t\n''')
if scheduler in ['slurm']:
strings.append(f'''\t\n''')
strings.append(f'\t\n')
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append('\n')
strings.append('\t\n')
strings.append('\t\n')
strings.append('\t\n')
strings.append('\t\n')
strings.append('\n')
return ''.join(strings)
def get_gfs_dates(base):
'''
Generate GFS dates entities
'''
strings = []
strings.append('\n')
strings.append('\t\n')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
strings.append(f'''\t\n''')
return ''.join(strings)
def get_gdasgfs_resources(dict_configs, cdump='gdas'):
'''
Create GDAS or GFS resource entities
'''
base = dict_configs['base']
machine = base.get('machine', wfu.detectMachine())
scheduler = wfu.get_scheduler(machine)
do_bufrsnd = base.get('DO_BUFRSND', 'NO').upper()
do_gempak = base.get('DO_GEMPAK', 'NO').upper()
do_awips = base.get('DO_AWIPS', 'NO').upper()
do_metp = base.get('DO_METP', 'NO').upper()
do_gldas = base.get('DO_GLDAS', 'NO').upper()
do_wdqms = base.get('DO_WDQMS', 'NO').upper()
do_wave = base.get('DO_WAVE', 'NO').upper()
do_wave_cdump = base.get('WAVE_CDUMP', 'BOTH').upper()
do_sfcprep = base.get('DO_SFCPREP', 'NO').upper()
reservation = base.get('RESERVATION', 'NONE').upper()
if do_sfcprep in ['Y', 'YES']:
tasks = ['sfcprep', 'prep', 'anal', 'analcalc']
else:
tasks = ['prep', 'anal', 'analcalc']
if cdump in ['gdas']:
tasks += ['analdiag']
if do_wdqms in ['Y', 'YES']:
tasks += ['wdqms']
if cdump in ['gdas'] and do_gldas in ['Y', 'YES']:
tasks += ['gldas']
if cdump in ['gdas'] and do_wave in ['Y', 'YES'] and do_wave_cdump in ['GDAS', 'BOTH']:
#tasks += ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostbndpnt', 'wavepostpnt', 'wavestat']
tasks += ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt']
tasks += ['fcst', 'post', 'vrfy', 'arch']
if cdump in ['gfs'] and do_wave in ['Y', 'YES'] and do_wave_cdump in ['GFS', 'BOTH']:
#tasks += ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostbndpnt', 'wavepostpnt', 'wavestat']
tasks += ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt']
if cdump in ['gfs'] and do_bufrsnd in ['Y', 'YES']:
tasks += ['postsnd']
if do_gempak in ['Y', 'YES']:
tasks += ['gempak']
if cdump in ['gfs'] and do_wave in ['Y', 'YES'] and do_gempak in ['Y', 'YES']:
tasks += ['wavegempak']
if cdump in ['gfs'] and do_awips in ['Y', 'YES']:
tasks += ['awips']
if cdump in ['gfs'] and do_metp in ['Y', 'YES']:
tasks += ['metp']
if cdump in ['gfs'] and do_wave in ['Y', 'YES'] and do_awips in ['Y', 'YES']:
tasks += ['waveawipsbulls', 'waveawipsgridded']
dict_resources = OrderedDict()
for task in tasks:
cfg = dict_configs[task]
wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, reservation, cdump=cdump)
taskstr = f'{task.upper()}_{cdump.upper()}'
strings = []
strings.append(f'\t\n')
if scheduler in ['slurm'] and machine in ['ORION'] and task not in ['arch']:
strings.append(f'\t\n')
if scheduler in ['slurm'] and task in ['arch']:
strings.append(f'\t\n')
strings.append(f'\t\n')
strings.append(f'\t\n')
if len(memstr) != 0:
strings.append(f'\t\n')
strings.append(f'\t\n')
dict_resources[f'{cdump}{task}'] = ''.join(strings)
return dict_resources
def get_hyb_resources(dict_configs):
'''
Create hybrid resource entities
'''
base = dict_configs['base']
machine = base.get('machine', wfu.detectMachine())
scheduler = wfu.get_scheduler(machine)
lobsdiag_forenkf = base.get('lobsdiag_forenkf', '.false.').upper()
eupd_cyc= base.get('EUPD_CYC', 'gdas').upper()
reservation = base.get('RESERVATION', 'NONE').upper()
dict_resources = OrderedDict()
# These tasks can be run in either or both cycles
if lobsdiag_forenkf in ['.T.', '.TRUE.']:
tasks1 = ['eobs', 'ediag', 'eupd', 'echgres']
else:
tasks1 = ['eobs', 'eomg', 'eupd', 'echgres']
if eupd_cyc in ['BOTH']:
cdumps = ['gfs', 'gdas']
elif eupd_cyc in ['GFS']:
cdumps = ['gfs']
elif eupd_cyc in ['GDAS']:
cdumps = ['gdas']
for cdump in cdumps:
for task in tasks1:
cfg = dict_configs['eobs'] if task in ['eomg'] else dict_configs[task]
wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, reservation, cdump=cdump)
taskstr = f'{task.upper()}_{cdump.upper()}'
strings = []
strings.append(f'\t\n')
strings.append(f'\t\n')
strings.append(f'\t\n')
if len(memstr) != 0:
strings.append(f'\t\n')
strings.append(f'\t\n')
dict_resources[f'{cdump}{task}'] = ''.join(strings)
# These tasks are always run as part of the GDAS cycle
cdump = 'gdas'
tasks2 = ['ecen', 'esfc', 'efcs', 'epos', 'earc']
for task in tasks2:
cfg = dict_configs[task]
wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, reservation, cdump=cdump)
taskstr = f'{task.upper()}_{cdump.upper()}'
strings = []
strings.append(f'\t\n')
if scheduler in ['slurm'] and machine in ['ORION'] and task not in ['earc']:
strings.append(f'\t\n')
if scheduler in ['slurm'] and task in ['earc']:
strings.append(f'\t\n')
strings.append(f'\t\n')
strings.append(f'\t\n')
if len(memstr) != 0:
strings.append(f'\t\n')
strings.append(f'\t\n')
dict_resources[f'{cdump}{task}'] = ''.join(strings)
return dict_resources
def get_gdasgfs_tasks(dict_configs, cdump='gdas'):
'''
Create GDAS or GFS tasks
'''
envars = []
if wfu.get_scheduler(wfu.detectMachine()) in ['slurm']:
envars.append(rocoto.create_envar(name='SLURM_SET', value='YES'))
envars.append(rocoto.create_envar(name='RUN_ENVIR', value='&RUN_ENVIR;'))
envars.append(rocoto.create_envar(name='HOMEgfs', value='&HOMEgfs;'))
envars.append(rocoto.create_envar(name='EXPDIR', value='&EXPDIR;'))
envars.append(rocoto.create_envar(name='CDATE', value='@Y@m@d@H'))
envars.append(rocoto.create_envar(name='CDUMP', value=f'{cdump}'))
envars.append(rocoto.create_envar(name='PDY', value='@Y@m@d'))
envars.append(rocoto.create_envar(name='cyc', value='@H'))
envars.append(rocoto.create_envar(name='GDATE', value='@Y@m@d@H'))
envars.append(rocoto.create_envar(name='GDUMP', value='gdas'))
envars.append(rocoto.create_envar(name='gPDY', value='@Y@m@d'))
envars.append(rocoto.create_envar(name='gcyc', value='@H'))
base = dict_configs['base']
gfs_cyc = base.get('gfs_cyc', 0)
gldas_cyc = base.get('gldas_cyc', 0)
dohybvar = base.get('DOHYBVAR', 'NO').upper()
eupd_cyc = base.get('EUPD_CYC', 'gdas').upper()
do_bufrsnd = base.get('DO_BUFRSND', 'NO').upper()
do_gempak = base.get('DO_GEMPAK', 'NO').upper()
do_awips = base.get('DO_AWIPS', 'NO').upper()
do_metp = base.get('DO_METP', 'NO').upper()
do_gldas = base.get('DO_GLDAS', 'NO').upper()
do_wdqms = base.get('DO_WDQMS', 'NO').upper()
do_wave = base.get('DO_WAVE', 'NO').upper()
do_wave_cdump = base.get('WAVE_CDUMP', 'BOTH').upper()
do_sfcprep = base.get('DO_SFCPREP', 'NO').upper()
dumpsuffix = base.get('DUMP_SUFFIX', '')
gridsuffix = base.get('SUFFIX', '')
dict_tasks = OrderedDict()
# sfcprep (emcsfc_sfc_prep)
if do_sfcprep in ['Y', 'YES']:
deps = []
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}post', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&DMPDIR;/{cdump}{dumpsuffix}.@Y@m@d/@H/atmos/{cdump}.t@Hz.snow.usaf.grib2'
dep_dict = {'type': 'data', 'data': data}
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('sfcprep', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}sfcprep'] = task
# prep
deps = []
if do_sfcprep in ['Y', 'YES']:
dep_dict = {'type': 'task', 'name': f'{cdump}sfcprep'}
else:
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}post', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&ROTDIR;/gdas.@Y@m@d/@H/atmos/gdas.t@Hz.atmf009{gridsuffix}'
dep_dict = {'type': 'data', 'data': data, 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&DMPDIR;/{cdump}{dumpsuffix}.@Y@m@d/@H/atmos/{cdump}.t@Hz.updated.status.tm00.bufr_d'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
gfs_enkf = True if eupd_cyc in ['BOTH', 'GFS'] and dohybvar in ['Y', 'YES'] else False
if gfs_enkf and cdump in ['gfs']:
if gfs_cyc == 4:
task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies)
else:
task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies, cycledef='gdas')
else:
task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}prep'] = task
# wave tasks in gdas or gfs or both
if do_wave_cdump in ['BOTH']:
cdumps = ['gfs', 'gdas']
elif do_wave_cdump in ['GFS']:
cdumps = ['gfs']
elif do_wave_cdump in ['GDAS']:
cdumps = ['gdas']
# waveinit
if do_wave in ['Y', 'YES'] and cdump in cdumps:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}prep'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='or', dep=deps)
task = wfu.create_wf_task('waveinit', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}waveinit'] = task
# waveprep
if do_wave in ['Y', 'YES'] and cdump in cdumps:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}waveinit'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('waveprep', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}waveprep'] = task
# anal
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}prep'}
deps.append(rocoto.add_dependency(dep_dict))
if dohybvar in ['y', 'Y', 'yes', 'YES']:
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
else:
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('anal', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}anal'] = task
# analcalc
deps1 = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.loginc.txt'
dep_dict = {'type': 'data', 'data': data}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
deps.append(rocoto.add_dependency(dep_dict))
if dohybvar in ['y', 'Y', 'yes', 'YES'] and cdump == 'gdas':
dep_dict = {'type': 'task', 'name': f'{"gdas"}echgres', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
else:
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('analcalc', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}analcalc'] = task
# analdiag
if cdump in ['gdas']:
deps1 = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.loginc.txt'
dep_dict = {'type': 'data', 'data': data}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='or', dep=deps1)
deps2 = []
deps2 = dependencies1
dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
deps2.append(rocoto.add_dependency(dep_dict))
dependencies2 = rocoto.create_dependency(dep_condition='and', dep=deps2)
task = wfu.create_wf_task('analdiag', cdump=cdump, envar=envars, dependency=dependencies2)
dict_tasks[f'{cdump}analdiag'] = task
# wdqms
if cdump in ['gdas'] and do_wdqms in ['Y', 'YES']:
deps1 = []
dep_dict = {'type': 'task', 'name': f'{cdump}analdiag'}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='and', dep=deps1)
task = wfu.create_wf_task('wdqms', cdump=cdump, envar=envars, dependency=dependencies1)
dict_tasks[f'{cdump}wdqms'] = task
# gldas
if cdump in ['gdas'] and do_gldas in ['Y', 'YES']:
deps1 = []
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='and', dep=deps1)
task = wfu.create_wf_task('gldas', cdump=cdump, envar=envars, dependency=dependencies1)
dict_tasks[f'{cdump}gldas'] = task
# fcst
deps1 = []
if cdump in ['gdas']:
deps3 = []
if do_gldas in ['Y', 'YES']:
dep_dict = {'type': 'task', 'name': f'{cdump}gldas'}
deps3.append(rocoto.add_dependency(dep_dict))
else:
dep_dict = {'type': 'task', 'name': f'{cdump}analcalc'}
deps3.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
deps3.append(rocoto.add_dependency(dep_dict))
dependencies3 = rocoto.create_dependency(dep_condition='and', dep=deps3)
deps1 = dependencies3
dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': '-06:00:00'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='or', dep=deps1)
elif cdump in ['gfs']:
dep_dict = {'type': 'task', 'name': f'{cdump}anal'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep=deps1)
if do_wave in ['Y', 'YES'] and cdump in cdumps:
deps2 = []
deps2 = dependencies1
dep_dict = {'type': 'task', 'name': f'{cdump}waveprep'}
deps2.append(rocoto.add_dependency(dep_dict))
dependencies2 = rocoto.create_dependency(dep_condition='and', dep=deps2)
task = wfu.create_wf_task('fcst', cdump=cdump, envar=envars, dependency=dependencies2)
else:
task = wfu.create_wf_task('fcst', cdump=cdump, envar=envars, dependency=dependencies1)
dict_tasks[f'{cdump}fcst'] = task
# post
deps = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.log#dep#.txt'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='or', dep=deps)
fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#')
fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#')
ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;')
postenvars = envars + [fhrgrp] + [fhrlst] + [ROTDIR]
varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = get_postgroups(dict_configs['post'], cdump=cdump)
vardict = {varname2: varval2, varname3: varval3}
task = wfu.create_wf_task('post', cdump=cdump, envar=postenvars, dependency=dependencies,
metatask='post', varname=varname1, varval=varval1, vardict=vardict)
dict_tasks[f'{cdump}post'] = task
# wavepostsbs
if do_wave in ['Y', 'YES'] and cdump in cdumps:
deps = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/wave/rundata/{cdump}wave.out_grd.gnh_10m.@Y@m@d.@H0000'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/wave/rundata/{cdump}wave.out_grd.aoc_9km.@Y@m@d.@H0000'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/wave/rundata/{cdump}wave.out_grd.gsh_15m.@Y@m@d.@H0000'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('wavepostsbs', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}wavepostsbs'] = task
# wavepostbndpnt
if do_wave in ['Y', 'YES'] and cdump in ['gfs']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('wavepostbndpnt', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}wavepostbndpnt'] = task
# wavepostbndpntbll
if do_wave in ['Y', 'YES'] and cdump in ['gfs']:
deps = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.logf180.txt'
dep_dict = {'type': 'data', 'data': data}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type':'task', 'name':f'{cdump}wavepostbndpnt'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('wavepostbndpntbll', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}wavepostbndpntbll'] = task
# wavepostpnt
if do_wave in ['Y', 'YES'] and cdump in ['gdas']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('wavepostpnt', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}wavepostpnt'] = task
if do_wave in ['Y', 'YES'] and cdump in ['gfs']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type':'task', 'name':f'{cdump}wavepostbndpntbll'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('wavepostpnt', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}wavepostpnt'] = task
# wavegempak
if do_wave in ['Y', 'YES'] and do_gempak in ['Y', 'YES'] and cdump in ['gfs']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('wavegempak', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}wavegempak'] = task
# waveawipsgridded
if do_wave in ['Y', 'YES'] and do_awips in ['Y', 'YES'] and cdump in ['gfs']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('waveawipsgridded', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}waveawipsgridded'] = task
# waveawipsbulls
if do_wave in ['Y', 'YES'] and do_awips in ['Y', 'YES'] and cdump in ['gfs']:
deps = []
dep_dict = {'type':'task', 'name':f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type':'task', 'name':f'{cdump}wavepostpnt'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('waveawipsbulls', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}waveawipsbulls'] = task
# wavestat
#if do_wave in ['Y', 'YES'] and cdump in cdumps:
# deps = []
# dep_dict = {'type':'task', 'name':'%swavepost' % cdump}
# deps.append(rocoto.add_dependency(dep_dict))
# dependencies = rocoto.create_dependency(dep=deps)
# task = wfu.create_wf_task('wavestat', cdump=cdump, envar=envars, dependency=dependencies)
# dict_tasks['%swavestat' % cdump] = task
# vrfy
deps = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('vrfy', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}vrfy'] = task
# metp
if cdump in ['gfs'] and do_metp in ['Y', 'YES']:
deps = []
dep_dict = {'type':'metatask', 'name':f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
sdate_gfs = rocoto.create_envar(name='SDATE_GFS', value='&SDATE_GFS;')
metpcase = rocoto.create_envar(name='METPCASE', value='#metpcase#')
metpenvars = envars + [sdate_gfs] + [metpcase]
varname1 = 'metpcase'
varval1 = 'g2g1 g2o1 pcp1'
task = wfu.create_wf_task('metp', cdump=cdump, envar=metpenvars, dependency=dependencies,
metatask='metp', varname=varname1, varval=varval1)
dict_tasks[f'{cdump}metp'] = task
#postsnd
if cdump in ['gfs'] and do_bufrsnd in ['Y', 'YES']:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}fcst'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('postsnd', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}postsnd'] = task
# awips
if cdump in ['gfs'] and do_awips in ['Y', 'YES']:
deps = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#')
fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#')
ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;')
awipsenvars = envars + [fhrgrp] + [fhrlst] + [ROTDIR]
varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = get_awipsgroups(dict_configs['awips'], cdump=cdump)
vardict = {varname2: varval2, varname3: varval3}
task = wfu.create_wf_task('awips', cdump=cdump, envar=awipsenvars, dependency=dependencies,
metatask='awips', varname=varname1, varval=varval1, vardict=vardict)
dict_tasks[f'{cdump}awips'] = task
# gempak
if do_gempak in ['Y', 'YES']:
deps = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;')
gempakenvars = envars + [ROTDIR]
task = wfu.create_wf_task('gempak', cdump=cdump, envar=gempakenvars, dependency=dependencies)
dict_tasks[f'{cdump}gempak'] = task
# arch
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}vrfy'}
deps.append(rocoto.add_dependency(dep_dict))
if do_wave in ['Y', 'YES']:
dep_dict = {'type': 'task', 'name': f'{cdump}wavepostsbs'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}wavepostpnt'}
deps.append(rocoto.add_dependency(dep_dict))
if cdump in ['gfs']:
dep_dict = {'type': 'task', 'name': f'{cdump}wavepostbndpnt'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('arch', cdump=cdump, envar=envars, dependency=dependencies)
dict_tasks[f'{cdump}arch'] = task
return dict_tasks
def get_hyb_tasks(dict_configs, cycledef='enkf'):
'''
Create Hybrid tasks
'''
# Determine groups based on ensemble size and grouping
base = dict_configs['base']
nens = base['NMEM_ENKF']
lobsdiag_forenkf = base.get('lobsdiag_forenkf', '.false.').upper()
eupd_cyc = base.get('EUPD_CYC', 'gdas').upper()
eobs = dict_configs['eobs']
nens_eomg = eobs['NMEM_EOMGGRP']
neomg_grps = nens / nens_eomg
EOMGGROUPS = ' '.join([f'{x:02d}' for x in range(1, int(neomg_grps) + 1)])
efcs = dict_configs['efcs']
nens_efcs = efcs['NMEM_EFCSGRP']
nefcs_grps = nens / nens_efcs
EFCSGROUPS = ' '.join([f'{x:02d}' for x in range(1, int(nefcs_grps) + 1)])
earc = dict_configs['earc']
nens_earc = earc['NMEM_EARCGRP']
nearc_grps = nens / nens_earc
EARCGROUPS = ' '.join([f'{x:02d}' for x in range(0, int(nearc_grps) + 1)])
envars = []
if wfu.get_scheduler(wfu.detectMachine()) in ['slurm']:
envars.append(rocoto.create_envar(name='SLURM_SET', value='YES'))
envars.append(rocoto.create_envar(name='RUN_ENVIR', value='&RUN_ENVIR;'))
envars.append(rocoto.create_envar(name='HOMEgfs', value='&HOMEgfs;'))
envars.append(rocoto.create_envar(name='EXPDIR', value='&EXPDIR;'))
envars.append(rocoto.create_envar(name='CDATE', value='@Y@m@d@H'))
#envars.append(rocoto.create_envar(name='CDUMP', value=f'{cdump}'))
envars.append(rocoto.create_envar(name='PDY', value='@Y@m@d'))
envars.append(rocoto.create_envar(name='cyc', value='@H'))
ensgrp = rocoto.create_envar(name='ENSGRP', value='#grp#')
dict_tasks = OrderedDict()
if eupd_cyc in ['BOTH']:
cdumps = ['gfs', 'gdas']
elif eupd_cyc in ['GFS']:
cdumps = ['gfs']
elif eupd_cyc in ['GDAS']:
cdumps = ['gdas']
for cdump in cdumps:
envar_cdump = rocoto.create_envar(name='CDUMP', value=f'{cdump}')
envars1 = envars + [envar_cdump]
# eobs
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}prep'}
deps.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{"gdas"}epmn', 'offset': '-06:00:00'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
task = wfu.create_wf_task('eobs', cdump=cdump, envar=envars1, dependency=dependencies, cycledef=cycledef)
dict_tasks[f'{cdump}eobs'] = task
# eomn, eomg
if lobsdiag_forenkf in ['.F.', '.FALSE.']:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}eobs'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
eomgenvars= envars1 + [ensgrp]
task = wfu.create_wf_task('eomg', cdump=cdump, envar=eomgenvars, dependency=dependencies,
metatask='eomn', varname='grp', varval=EOMGGROUPS, cycledef=cycledef)
dict_tasks[f'{cdump}eomn'] = task
# ediag
else:
deps = []
dep_dict = {'type': 'task', 'name': f'{cdump}eobs'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('ediag', cdump=cdump, envar=envars1, dependency=dependencies, cycledef=cycledef)
dict_tasks[f'{cdump}ediag'] = task
# eupd
deps = []
if lobsdiag_forenkf in ['.F.', '.FALSE.']:
dep_dict = {'type': 'metatask', 'name': f'{cdump}eomn'}
else:
dep_dict = {'type': 'task', 'name': f'{cdump}ediag'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
task = wfu.create_wf_task('eupd', cdump=cdump, envar=envars1, dependency=dependencies, cycledef=cycledef)
dict_tasks[f'{cdump}eupd'] = task
# All hybrid tasks beyond this point are always executed in the GDAS cycle
cdump = 'gdas'
envar_cdump = rocoto.create_envar(name='CDUMP', value=f'{cdump}')
envars1 = envars + [envar_cdump]
cdump_eupd = 'gfs' if eupd_cyc in ['GFS'] else 'gdas'
# ecmn, ecen
deps1 = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.loganl.txt'
dep_dict = {'type': 'data', 'data': data}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}analcalc'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='or', dep=deps1)
deps2 = []
deps2 = dependencies1
dep_dict = {'type': 'task', 'name': f'{cdump_eupd}eupd'}
deps2.append(rocoto.add_dependency(dep_dict))
dependencies2 = rocoto.create_dependency(dep_condition='and', dep=deps2)
fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#')
fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#')
ecenenvars = envars1 + [fhrgrp] + [fhrlst]
varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = get_ecengroups(dict_configs, dict_configs['ecen'], cdump=cdump)
vardict = {varname2: varval2, varname3: varval3}
task = wfu.create_wf_task('ecen', cdump=cdump, envar=ecenenvars, dependency=dependencies2,
metatask='ecmn', varname=varname1, varval=varval1, vardict=vardict)
dict_tasks[f'{cdump}ecmn'] = task
# esfc
deps1 = []
data = f'&ROTDIR;/{cdump}.@Y@m@d/@H/atmos/{cdump}.t@Hz.loganl.txt'
dep_dict = {'type': 'data', 'data': data}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}analcalc'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='or', dep=deps1)
deps2 = []
deps2 = dependencies1
dep_dict = {'type': 'task', 'name': f'{cdump_eupd}eupd'}
deps2.append(rocoto.add_dependency(dep_dict))
dependencies2 = rocoto.create_dependency(dep_condition='and', dep=deps2)
task = wfu.create_wf_task('esfc', cdump=cdump, envar=envars1, dependency=dependencies2, cycledef=cycledef)
dict_tasks[f'{cdump}esfc'] = task
# efmn, efcs
deps1 = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}ecmn'}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'task', 'name': f'{cdump}esfc'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='and', dep=deps1)
deps2 = []
deps2 = dependencies1
dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': '-06:00:00'}
deps2.append(rocoto.add_dependency(dep_dict))
dependencies2 = rocoto.create_dependency(dep_condition='or', dep=deps2)
efcsenvars = envars1 + [ensgrp]
task = wfu.create_wf_task('efcs', cdump=cdump, envar=efcsenvars, dependency=dependencies2,
metatask='efmn', varname='grp', varval=EFCSGROUPS, cycledef=cycledef)
dict_tasks[f'{cdump}efmn'] = task
# echgres
deps1 = []
dep_dict = {'type': 'task', 'name': f'{cdump}fcst'}
deps1.append(rocoto.add_dependency(dep_dict))
dep_dict = {'type': 'metatask', 'name': f'{cdump}efmn'}
deps1.append(rocoto.add_dependency(dep_dict))
dependencies1 = rocoto.create_dependency(dep_condition='and', dep=deps1)
task = wfu.create_wf_task('echgres', cdump=cdump, envar=envars1, dependency=dependencies1, cycledef=cycledef)
dict_tasks[f'{cdump}echgres'] = task
# epmn, epos
deps = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}efmn'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#')
fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#')
eposenvars = envars1 + [fhrgrp] + [fhrlst]
varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = get_eposgroups(dict_configs['epos'], cdump=cdump)
vardict = {varname2: varval2, varname3: varval3}
task = wfu.create_wf_task('epos', cdump=cdump, envar=eposenvars, dependency=dependencies,
metatask='epmn', varname=varname1, varval=varval1, vardict=vardict)
dict_tasks[f'{cdump}epmn'] = task
# eamn, earc
deps = []
dep_dict = {'type': 'metatask', 'name': f'{cdump}epmn'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
earcenvars = envars1 + [ensgrp]
task = wfu.create_wf_task('earc', cdump=cdump, envar=earcenvars, dependency=dependencies,
metatask='eamn', varname='grp', varval=EARCGROUPS, cycledef=cycledef)
dict_tasks[f'{cdump}eamn'] = task
return dict_tasks
def get_workflow_header(base):
'''
Create the workflow header block
'''
strings = []
strings.append('\n')
strings.append(']>\n')
strings.append('\n')
strings.append('\n')
strings.append('\n')
strings.append('\t&EXPDIR;/logs/@Y@m@d@H.log\n')
strings.append('\n')
strings.append('\t\n')
strings.append('\t&SDATE; &SDATE; 06:00:00\n')
strings.append('\t&SDATE; &EDATE; 06:00:00\n')
strings.append('\t&SDATE; &EDATE; 06:00:00\n')
if base['gfs_cyc'] != 0:
strings.append('\t&SDATE_GFS; &EDATE_GFS; &INTERVAL_GFS;\n')
strings.append('\n')
return ''.join(strings)
def get_workflow_footer():
'''
Generate workflow footer
'''
strings = []
strings.append('\n\n')
return ''.join(strings)
def get_postgroups(post, cdump='gdas'):
fhmin = post['FHMIN']
fhmax = post['FHMAX']
fhout = post['FHOUT']
# Get a list of all forecast hours
if cdump in ['gdas']:
fhrs = range(fhmin, fhmax+fhout, fhout)
elif cdump in ['gfs']:
fhmax = np.max([post['FHMAX_GFS_00'],post['FHMAX_GFS_06'],post['FHMAX_GFS_12'],post['FHMAX_GFS_18']])
fhout = post['FHOUT_GFS']
fhmax_hf = post['FHMAX_HF_GFS']
fhout_hf = post['FHOUT_HF_GFS']
fhrs_hf = range(fhmin, fhmax_hf+fhout_hf, fhout_hf)
fhrs = list(fhrs_hf) + list(range(fhrs_hf[-1]+fhout, fhmax+fhout, fhout))
npostgrp = post['NPOSTGRP']
ngrps = npostgrp if len(fhrs) > npostgrp else len(fhrs)
fhrs = [f'f{f:03d}' for f in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [f.tolist() for f in fhrs]
fhrgrp = ' '.join([f'{x:03d}' for x in range(0, ngrps+1)])
fhrdep = ' '.join(['anl'] + [f[-1] for f in fhrs])
fhrlst = ' '.join(['anl'] + ['_'.join(f) for f in fhrs])
return fhrgrp, fhrdep, fhrlst
def get_awipsgroups(awips, cdump='gdas'):
fhmin = awips['FHMIN']
fhmax = awips['FHMAX']
fhout = awips['FHOUT']
# Get a list of all forecast hours
if cdump in ['gdas']:
fhrs = range(fhmin, fhmax+fhout, fhout)
elif cdump in ['gfs']:
fhmax = np.max([awips['FHMAX_GFS_00'],awips['FHMAX_GFS_06'],awips['FHMAX_GFS_12'],awips['FHMAX_GFS_18']])
fhout = awips['FHOUT_GFS']
fhmax_hf = awips['FHMAX_HF_GFS']
fhout_hf = awips['FHOUT_HF_GFS']
if fhmax > 240:
fhmax = 240
if fhmax_hf > 240:
fhmax_hf = 240
fhrs_hf = range(fhmin, fhmax_hf+fhout_hf, fhout_hf)
fhrs = list(fhrs_hf) + list(range(fhrs_hf[-1]+fhout, fhmax+fhout, fhout))
nawipsgrp = awips['NAWIPSGRP']
ngrps = nawipsgrp if len(fhrs) > nawipsgrp else len(fhrs)
fhrs = [f'f{f:03d}' for f in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [f.tolist() for f in fhrs]
fhrgrp = ' '.join([f'{x:03d}' for x in range(0, ngrps)])
fhrdep = ' '.join([f[-1] for f in fhrs])
fhrlst = ' '.join(['_'.join(f) for f in fhrs])
return fhrgrp, fhrdep, fhrlst
def get_ecengroups(dict_configs, ecen, cdump='gdas'):
base = dict_configs['base']
if base.get('DOIAU_ENKF', 'NO') == 'YES' :
fhrs = list(base.get('IAUFHRS','6').split(','))
ifhrs = [f'f00{ff}' for ff in fhrs]
ifhrs0 = ifhrs[0]
nfhrs = len(fhrs)
ifhrs = [f'f00{ff}' for ff in fhrs]
ifhrs0 = ifhrs[0]
nfhrs = len(fhrs)
necengrp = ecen['NECENGRP']
ngrps = necengrp if len(fhrs) > necengrp else len(fhrs)
ifhrs = np.array_split(ifhrs, ngrps)
fhrgrp = ' '.join([f'{x:03d}' for x in range(0, ngrps)])
fhrdep = ' '.join([f[-1] for f in ifhrs])
fhrlst = ' '.join(['_'.join(f) for f in ifhrs])
else:
fhrgrp='000'
fhrdep='f006'
fhrlst='f006'
return fhrgrp, fhrdep, fhrlst
def get_eposgroups(epos, cdump='gdas'):
fhmin = epos['FHMIN_ENKF']
fhmax = epos['FHMAX_ENKF']
fhout = epos['FHOUT_ENKF']
fhrs = range(fhmin, fhmax+fhout, fhout)
neposgrp = epos['NEPOSGRP']
ngrps = neposgrp if len(fhrs) > neposgrp else len(fhrs)
fhrs = [f'f{f:03d}' for f in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [f.tolist() for f in fhrs]
fhrgrp = ' '.join([f'{x:03d}' for x in range(0, ngrps)])
fhrdep = ' '.join([f[-1] for f in fhrs])
fhrlst = ' '.join(['_'.join(f) for f in fhrs])
return fhrgrp, fhrdep, fhrlst
def dict_to_strings(dict_in):
strings = []
for key in dict_in.keys():
strings.append(dict_in[key])
strings.append('\n')
return ''.join(strings)
def create_xml(dict_configs):
'''
Given an dictionary of sourced config files,
create the workflow XML
'''
from builtins import any as b_any
#from __builtin__ import any as b_any
base = dict_configs['base']
dohybvar = base.get('DOHYBVAR', 'NO').upper()
gfs_cyc = base.get('gfs_cyc', 0)
eupd_cyc = base.get('EUPD_CYC', 'gdas').upper()
# Start collecting workflow pieces
preamble = get_preamble()
definitions = get_definitions(base)
workflow_header = get_workflow_header(base)
workflow_footer = get_workflow_footer()
# Get GDAS related entities, resources, workflow
dict_gdas_resources = get_gdasgfs_resources(dict_configs)
dict_gdas_tasks = get_gdasgfs_tasks(dict_configs)
# Get hybrid related entities, resources, workflow
if dohybvar in ['Y', 'YES']:
dict_hyb_resources = get_hyb_resources(dict_configs)
dict_hyb_tasks = get_hyb_tasks(dict_configs)
# Removes &MEMORY_JOB_DUMP post mortem from hyb tasks
hyp_tasks = {'gdaseobs':'gdaseobs',
'gdasediag':'gdasediag',
'gdaseomg':'gdaseomn',
'gdaseupd':'gdaseupd',
'gdasecen':'gdasecmn',
'gdasesfc':'gdasesfc',
'gdasefcs':'gdasefmn',
'gdasepos':'gdasepmn',
'gdasearc':'gdaseamn',
'gdasechgres':'gdasechgres'}
for each_task, each_resource_string in dict_hyb_resources.items():
#print(each_task,hyp_tasks[each_task])
#print(dict_hyb_tasks[hyp_tasks[each_task]])
if 'MEMORY' not in each_resource_string:
if each_task in dict_hyb_tasks:
temp_task_string = []
for each_line in re.split(r'(\s+)', dict_hyb_tasks[each_task]):
if 'memory' not in each_line:
temp_task_string.append(each_line)
dict_hyb_tasks[each_task] = ''.join(temp_task_string)
if hyp_tasks[each_task] in dict_hyb_tasks:
temp_task_string = []
for each_line in re.split(r'(\s+)', dict_hyb_tasks[hyp_tasks[each_task]]):
if 'memory' not in each_line:
temp_task_string.append(each_line)
dict_hyb_tasks[hyp_tasks[each_task]] = ''.join(temp_task_string)
# Get GFS cycle related entities, resources, workflow
dict_gfs_resources = get_gdasgfs_resources(dict_configs, cdump='gfs')
dict_gfs_tasks = get_gdasgfs_tasks(dict_configs, cdump='gfs')
# Removes &MEMORY_JOB_DUMP post mortem from gdas tasks
for each_task, each_resource_string in dict_gdas_resources.items():
if each_task not in dict_gdas_tasks:
continue
if 'MEMORY' not in each_resource_string:
temp_task_string = []
for each_line in re.split(r'(\s+)', dict_gdas_tasks[each_task]):
if 'memory' not in each_line:
temp_task_string.append(each_line)
dict_gdas_tasks[each_task] = ''.join(temp_task_string)
# Removes &MEMORY_JOB_DUMP post mortem from gfs tasks
for each_task, each_resource_string in dict_gfs_resources.items():
if each_task not in dict_gfs_tasks:
continue
if 'MEMORY' not in each_resource_string:
temp_task_string = []
for each_line in re.split(r'(\s+)', dict_gfs_tasks[each_task]):
if 'memory' not in each_line:
temp_task_string.append(each_line)
dict_gfs_tasks[each_task] = ''.join(temp_task_string)
# Put together the XML file
xmlfile = []
xmlfile.append(preamble)
xmlfile.append(definitions)
xmlfile.append(dict_to_strings(dict_gdas_resources))
if dohybvar in ['Y', 'YES']:
xmlfile.append(dict_to_strings(dict_hyb_resources))
if gfs_cyc != 0:
xmlfile.append(dict_to_strings(dict_gfs_resources))
elif gfs_cyc == 0 and dohybvar in ['Y', 'YES'] and eupd_cyc in ['BOTH', 'GFS']:
xmlfile.append(dict_gfs_resources['gfsprep'])
xmlfile.append(workflow_header)
xmlfile.append(dict_to_strings(dict_gdas_tasks))
if dohybvar in ['Y', 'YES']:
xmlfile.append(dict_to_strings(dict_hyb_tasks))
if gfs_cyc != 0:
xmlfile.append(dict_to_strings(dict_gfs_tasks))
elif gfs_cyc == 0 and dohybvar in ['Y', 'YES'] and eupd_cyc in ['BOTH', 'GFS']:
xmlfile.append(dict_gfs_tasks['gfsprep'])
xmlfile.append('\n')
xmlfile.append(wfu.create_firstcyc_task())
xmlfile.append(workflow_footer)
# Write the XML file
fh = open(f'{base["EXPDIR"]}/{base["PSLOT"]}.xml', 'w')
fh.write(''.join(xmlfile))
fh.close()
return
if __name__ == '__main__':
main()
sys.exit(0)