#!/usr/bin/env python ''' PROGRAM: Create the ROCOTO workflow given the configuration of the GFS parallel AUTHOR: Rahul.Mahajan rahul.mahajan@noaa.gov FILE DEPENDENCIES: 1. config files for the parallel; e.g. config.base, config.fcst[.gfs], etc. Without these dependencies, the script will fail OUTPUT: 1. PSLOT.xml: XML workflow 2. PSLOT.crontab: crontab for ROCOTO run command ''' import os import sys import re import numpy as np from datetime import datetime, timedelta from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from collections import OrderedDict import rocoto import workflow_utils as wfu def main(): parser = ArgumentParser(description='Setup XML workflow and CRONTAB for a GFS parallel.', formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('--expdir', help='full path to experiment directory containing config files', type=str, required=False, default=os.environ['PWD']) args = parser.parse_args() configs = wfu.get_configs(args.expdir) _base = wfu.config_parser([wfu.find_config('config.base', configs)]) if not os.path.samefile(args.expdir, _base['EXPDIR']): print 'MISMATCH in experiment directories!' print 'config.base: EXPDIR = %s' % repr(_base['EXPDIR']) print 'input arg: --expdir = %s' % repr(args.expdir) sys.exit(1) gfs_steps = ['prep', 'anal', 'fcst', 'postsnd', 'post', 'awips', 'gempak', 'vrfy', 'arch'] hyb_steps = ['eobs', 'eomg', 'eupd', 'ecen', 'efcs', 'epos', 'earc'] steps = gfs_steps + hyb_steps if _base.get('DOHYBVAR', 'NO') == 'YES' else gfs_steps dict_configs = wfu.source_configs(configs, steps) # Check and set gfs_cyc specific variables if dict_configs['base']['gfs_cyc'] != 0: dict_configs['base'] = get_gfs_cyc_dates(dict_configs['base']) # First create workflow XML create_xml(dict_configs) # Next create the crontab wfu.create_crontab(dict_configs['base']) return def get_gfs_cyc_dates(base): ''' Generate GFS dates from experiment dates and gfs_cyc choice ''' base_out = base.copy() gfs_cyc = base['gfs_cyc'] sdate = base['SDATE'] edate = base['EDATE'] interval_gfs = wfu.get_gfs_interval(gfs_cyc) # Set GFS cycling dates hrdet = 0 if gfs_cyc == 1: hrinc = 24 - sdate.hour hrdet = edate.hour elif gfs_cyc == 2: if sdate.hour in [0, 12]: hrinc = 12 elif sdate.hour in [6, 18]: hrinc = 6 if edate.hour in [6, 18]: hrdet = 6 elif gfs_cyc == 4: hrinc = 6 sdate_gfs = sdate + timedelta(hours=hrinc) edate_gfs = edate - timedelta(hours=hrdet) if sdate_gfs > edate: print 'W A R N I N G!' print 'Starting date for GFS cycles is after Ending date of experiment' print 'SDATE = %s, EDATE = %s' % (sdate.strftime('%Y%m%d%H'), edate.strftime('%Y%m%d%H')) print 'SDATE_GFS = %s, EDATE_GFS = %s' % (sdate_gfs.strftime('%Y%m%d%H'), edate_gfs.strftime('%Y%m%d%H')) gfs_cyc = 0 base_out['gfs_cyc'] = gfs_cyc base_out['SDATE_GFS'] = sdate_gfs base_out['EDATE_GFS'] = edate_gfs base_out['INTERVAL_GFS'] = interval_gfs fhmax_gfs = {} for hh in ['00', '06', '12', '18']: fhmax_gfs[hh] = base.get('FHMAX_GFS_%s' % hh, 'FHMAX_GFS_00') base_out['FHMAX_GFS'] = fhmax_gfs return base_out def get_preamble(): ''' Generate preamble for XML ''' strings = [] strings.append('\n') strings.append('\n') return ''.join(strings) def get_definitions(base): ''' Create entities related to the experiment ''' machine = base.get('machine', wfu.detectMachine()) scheduler = wfu.get_scheduler(machine) strings = [] strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['PSLOT']) strings.append('\t\n' % base['SDATE'].strftime('%Y%m%d%H%M')) strings.append('\t\n' % base['EDATE'].strftime('%Y%m%d%H%M')) if base['gfs_cyc'] != 0: strings.append(get_gfs_dates(base)) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['RUN_ENVIR']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['EXPDIR']) strings.append('\t\n' % base['ROTDIR']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['HOMEgfs']) strings.append('\t\n' % base['BASE_JOB']) strings.append('\t\n' % base['DMPDIR']) strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['ACCOUNT']) strings.append('\t\n' % base['QUEUE']) strings.append('\t\n' % base['QUEUE_ARCH']) if scheduler in ['slurm']: strings.append('\t\n' % base['QUEUE_ARCH']) strings.append('\t\n' % scheduler) strings.append('\n') strings.append('\t\n') strings.append('\t\n') strings.append('\n') strings.append('\t\n') strings.append('\t\n') strings.append('\t\n') strings.append('\t\n') strings.append('\n') return ''.join(strings) def get_gfs_dates(base): ''' Generate GFS dates entities ''' strings = [] strings.append('\n') strings.append('\t\n') strings.append('\t\n' % base['SDATE_GFS'].strftime('%Y%m%d%H%M')) strings.append('\t\n' % base['EDATE_GFS'].strftime('%Y%m%d%H%M')) strings.append('\t\n' % base['INTERVAL_GFS']) return ''.join(strings) def get_gdasgfs_resources(dict_configs, cdump='gdas'): ''' Create GDAS or GFS resource entities ''' base = dict_configs['base'] machine = base.get('machine', wfu.detectMachine()) scheduler = wfu.get_scheduler(machine) do_bufrsnd = base.get('DO_BUFRSND', 'NO').upper() do_gempak = base.get('DO_GEMPAK', 'NO').upper() do_awips = base.get('DO_AWIPS', 'NO').upper() tasks = ['prep', 'anal', 'fcst', 'post', 'vrfy', 'arch'] if cdump in ['gfs'] and do_bufrsnd in ['Y', 'YES']: tasks += ['postsnd'] if cdump in ['gfs'] and do_gempak in ['Y', 'YES']: tasks += ['gempak'] if cdump in ['gfs'] and do_awips in ['Y', 'YES']: tasks += ['awips'] dict_resources = OrderedDict() for task in tasks: cfg = dict_configs[task] wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, cdump=cdump) taskstr = '%s_%s' % (task.upper(), cdump.upper()) strings = [] strings.append('\t\n' % (taskstr, queuestr)) if scheduler in ['slurm'] and task in ['arch']: strings.append('\t\n' % taskstr ) strings.append('\t\n' % (taskstr, wtimestr)) strings.append('\t\n' % (taskstr, resstr)) if len(memstr) != 0: strings.append('\t\n' % (taskstr, memstr)) strings.append('\t\n' % (taskstr, natstr)) dict_resources['%s%s' % (cdump, task)] = ''.join(strings) return dict_resources def get_hyb_resources(dict_configs): ''' Create hybrid resource entities ''' base = dict_configs['base'] machine = base.get('machine', wfu.detectMachine()) scheduler = wfu.get_scheduler(machine) lobsdiag_forenkf = base.get('lobsdiag_forenkf', '.false.').upper() eupd_cyc= base.get('EUPD_CYC', 'gdas').upper() dict_resources = OrderedDict() # These tasks can be run in either or both cycles tasks1 = ['eobs', 'eomg', 'eupd'] if lobsdiag_forenkf in ['.T.', '.TRUE.']: tasks.remove('eomg') if eupd_cyc in ['BOTH']: cdumps = ['gfs', 'gdas'] elif eupd_cyc in ['GFS']: cdumps = ['gfs'] elif eupd_cyc in ['GDAS']: cdumps = ['gdas'] for cdump in cdumps: for task in tasks1: cfg = dict_configs['eobs'] if task in ['eomg'] else dict_configs[task] wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, cdump=cdump) taskstr = '%s_%s' % (task.upper(), cdump.upper()) strings = [] strings.append('\t\n' % (taskstr, queuestr)) strings.append('\t\n' % (taskstr, wtimestr)) strings.append('\t\n' % (taskstr, resstr)) if len(memstr) != 0: strings.append('\t\n' % (taskstr, memstr)) strings.append('\t\n' % (taskstr, natstr)) dict_resources['%s%s' % (cdump, task)] = ''.join(strings) # These tasks are always run as part of the GDAS cycle cdump = 'gdas' tasks2 = ['ecen', 'efcs', 'epos', 'earc'] for task in tasks2: cfg = dict_configs[task] wtimestr, resstr, queuestr, memstr, natstr = wfu.get_resources(machine, cfg, task, cdump=cdump) taskstr = '%s_%s' % (task.upper(), cdump.upper()) strings = [] strings.append('\t\n' % (taskstr, queuestr)) if scheduler in ['slurm'] and task in ['earc']: strings.append('\t\n' % taskstr ) strings.append('\t\n' % (taskstr, wtimestr)) strings.append('\t\n' % (taskstr, resstr)) if len(memstr) != 0: strings.append('\t\n' % (taskstr, memstr)) strings.append('\t\n' % (taskstr, natstr)) dict_resources['%s%s' % (cdump, task)] = ''.join(strings) return dict_resources def get_gdasgfs_tasks(dict_configs, cdump='gdas'): ''' Create GDAS or GFS tasks ''' envars = [] if wfu.get_scheduler(wfu.detectMachine()) in ['slurm']: envars.append(rocoto.create_envar(name='SLURM_SET', value='YES')) envars.append(rocoto.create_envar(name='RUN_ENVIR', value='&RUN_ENVIR;')) envars.append(rocoto.create_envar(name='HOMEgfs', value='&HOMEgfs;')) envars.append(rocoto.create_envar(name='EXPDIR', value='&EXPDIR;')) envars.append(rocoto.create_envar(name='CDATE', value='@Y@m@d@H')) envars.append(rocoto.create_envar(name='CDUMP', value='%s' % cdump)) envars.append(rocoto.create_envar(name='PDY', value='@Y@m@d')) envars.append(rocoto.create_envar(name='cyc', value='@H')) base = dict_configs['base'] gfs_cyc = base.get('gfs_cyc', 0) dohybvar = base.get('DOHYBVAR', 'NO').upper() eupd_cyc = base.get('EUPD_CYC', 'gdas').upper() do_bufrsnd = base.get('DO_BUFRSND', 'NO').upper() do_gempak = base.get('DO_GEMPAK', 'NO').upper() do_awips = base.get('DO_AWIPS', 'NO').upper() dumpsuffix = base.get('DUMP_SUFFIX', '') dict_tasks = OrderedDict() # prep deps = [] dep_dict = {'type': 'metatask', 'name': '%spost' % 'gdas', 'offset': '-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) data = '&ROTDIR;/gdas.@Y@m@d/@H/gdas.t@Hz.atmf009.nemsio' dep_dict = {'type': 'data', 'data': data, 'offset': '-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) data = '&DMPDIR;/%s%s.@Y@m@d/@H/%s.t@Hz.updated.status.tm00.bufr_d' % (cdump, dumpsuffix, cdump) dep_dict = {'type': 'data', 'data': data} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) gfs_enkf = True if eupd_cyc in ['BOTH', 'GFS'] and dohybvar in ['Y', 'YES'] else False if gfs_enkf and cdump in ['gfs']: if gfs_cyc == 4: task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies) else: task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies, cycledef='gdas') else: task = wfu.create_wf_task('prep', cdump=cdump, envar=envars, dependency=dependencies) dict_tasks['%sprep' % cdump] = task # anal deps = [] dep_dict = {'type': 'task', 'name': '%sprep' % cdump} deps.append(rocoto.add_dependency(dep_dict)) if dohybvar in ['y', 'Y', 'yes', 'YES']: dep_dict = {'type': 'metatask', 'name': '%sepmn' % 'gdas', 'offset': '-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) else: dependencies = rocoto.create_dependency(dep=deps) task = wfu.create_wf_task('anal', cdump=cdump, envar=envars, dependency=dependencies) dict_tasks['%sanal' % cdump] = task # fcst deps = [] dep_dict = {'type': 'task', 'name': '%sanal' % cdump} deps.append(rocoto.add_dependency(dep_dict)) if cdump in ['gdas']: dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': '-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=deps) elif cdump in ['gfs']: dependencies = rocoto.create_dependency(dep=deps) task = wfu.create_wf_task('fcst', cdump=cdump, envar=envars, dependency=dependencies) dict_tasks['%sfcst' % cdump] = task # post deps = [] data = '&ROTDIR;/%s.@Y@m@d/@H/%s.t@Hz.log#dep#.nemsio' % (cdump, cdump) dep_dict = {'type': 'data', 'data': data} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': '%sfcst' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=deps) fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#') fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#') ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;') postenvars = envars + [fhrgrp] + [fhrlst] + [ROTDIR] varname1, varname2, varname3 = 'grp', 'dep', 'lst' varval1, varval2, varval3 = get_postgroups(dict_configs['post'], cdump=cdump) vardict = {varname2: varval2, varname3: varval3} task = wfu.create_wf_task('post', cdump=cdump, envar=postenvars, dependency=dependencies, metatask='post', varname=varname1, varval=varval1, vardict=vardict) dict_tasks['%spost' % cdump] = task # vrfy deps = [] dep_dict = {'type': 'metatask', 'name': '%spost' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) task = wfu.create_wf_task('vrfy', cdump=cdump, envar=envars, dependency=dependencies) dict_tasks['%svrfy' % cdump] = task if cdump in ['gfs'] and do_bufrsnd in ['Y', 'YES']: #postsnd deps = [] dep_dict = {'type': 'task', 'name': '%sfcst' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) task = wfu.create_wf_task('postsnd', cdump=cdump, envar=envars, dependency=dependencies) dict_tasks['%spostsnd' % cdump] = task if cdump in ['gfs'] and do_awips in ['Y', 'YES']: # awips deps = [] data = '&ROTDIR;/%s.@Y@m@d/@H/%s.t@Hz.sfluxgrb#dep#.grib2.idx' % (cdump, cdump) dep_dict = {'type': 'data', 'data': data} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': '%spost' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=deps) fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#') fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#') ROTDIR = rocoto.create_envar(name='ROTDIR', value='&ROTDIR;') awipsenvars = envars + [fhrgrp] + [fhrlst] + [ROTDIR] varname1, varname2, varname3 = 'grp', 'dep', 'lst' varval1, varval2, varval3 = get_awipsgroups(dict_configs['awips'], cdump=cdump) vardict = {varname2: varval2, varname3: varval3} task = wfu.create_wf_task('awips', cdump=cdump, envar=awipsenvars, dependency=dependencies, metatask='awips', varname=varname1, varval=varval1, vardict=vardict) dict_tasks['%sawips' % cdump] = task if cdump in ['gfs'] and do_gempak in ['Y', 'YES']: # gempak deps = [] dep_dict = {'type': 'metatask', 'name': '%spost' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) task = wfu.create_wf_task('gempak', cdump=cdump, envar=envars, dependency=dependencies) dict_tasks['%sgempak' % cdump] = task # arch deps = [] dep_dict = {'type': 'task', 'name': '%svrfy' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'streq', 'left': '&ARCHIVE_TO_HPSS;', 'right': 'YES'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task = wfu.create_wf_task('arch', cdump=cdump, envar=envars, dependency=dependencies) dict_tasks['%sarch' % cdump] = task return dict_tasks def get_hyb_tasks(dict_configs, cycledef='enkf'): ''' Create Hybrid tasks ''' # Determine groups based on ensemble size and grouping base = dict_configs['base'] nens = base['NMEM_ENKF'] lobsdiag_forenkf = base.get('lobsdiag_forenkf', '.false.').upper() eupd_cyc = base.get('EUPD_CYC', 'gdas').upper() eobs = dict_configs['eobs'] nens_eomg = eobs['NMEM_EOMGGRP'] neomg_grps = nens / nens_eomg EOMGGROUPS = ' '.join(['%02d' % x for x in range(1, neomg_grps + 1)]) efcs = dict_configs['efcs'] nens_efcs = efcs['NMEM_EFCSGRP'] nefcs_grps = nens / nens_efcs EFCSGROUPS = ' '.join(['%02d' % x for x in range(1, nefcs_grps + 1)]) earc = dict_configs['earc'] nens_earc = earc['NMEM_EARCGRP'] nearc_grps = nens / nens_earc EARCGROUPS = ' '.join(['%02d' % x for x in range(0, nearc_grps + 1)]) envars = [] if wfu.get_scheduler(wfu.detectMachine()) in ['slurm']: envars.append(rocoto.create_envar(name='SLURM_SET', value='YES')) envars.append(rocoto.create_envar(name='RUN_ENVIR', value='&RUN_ENVIR;')) envars.append(rocoto.create_envar(name='HOMEgfs', value='&HOMEgfs;')) envars.append(rocoto.create_envar(name='EXPDIR', value='&EXPDIR;')) envars.append(rocoto.create_envar(name='CDATE', value='@Y@m@d@H')) #envars.append(rocoto.create_envar(name='CDUMP', value='%s' % cdump)) envars.append(rocoto.create_envar(name='PDY', value='@Y@m@d')) envars.append(rocoto.create_envar(name='cyc', value='@H')) ensgrp = rocoto.create_envar(name='ENSGRP', value='#grp#') dict_tasks = OrderedDict() if eupd_cyc in ['BOTH']: cdumps = ['gfs', 'gdas'] elif eupd_cyc in ['GFS']: cdumps = ['gfs'] elif eupd_cyc in ['GDAS']: cdumps = ['gdas'] for cdump in cdumps: envar_cdump = rocoto.create_envar(name='CDUMP', value='%s' % cdump) envars1 = envars + [envar_cdump] # eobs deps = [] dep_dict = {'type': 'task', 'name': '%sprep' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'metatask', 'name': '%sepmn' % 'gdas', 'offset': '-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task = wfu.create_wf_task('eobs', cdump=cdump, envar=envars1, dependency=dependencies, cycledef=cycledef) dict_tasks['%seobs' % cdump] = task # eomn, eomg if lobsdiag_forenkf in ['.F.', '.FALSE.']: deps = [] dep_dict = {'type': 'task', 'name': '%seobs' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) eomgenvars= envars1 + [ensgrp] task = wfu.create_wf_task('eomg', cdump=cdump, envar=eomgenvars, dependency=dependencies, metatask='eomn', varname='grp', varval=EOMGGROUPS, cycledef=cycledef) dict_tasks['%seomn' % cdump] = task # eupd deps = [] if lobsdiag_forenkf in ['.F.', '.FALSE.']: dep_dict = {'type': 'metatask', 'name': '%seomn' % cdump} else: dep_dict = {'type': 'task', 'name': '%seobs' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) task = wfu.create_wf_task('eupd', cdump=cdump, envar=envars1, dependency=dependencies, cycledef=cycledef) dict_tasks['%seupd' % cdump] = task # All hybrid tasks beyond this point are always executed in the GDAS cycle cdump = 'gdas' envar_cdump = rocoto.create_envar(name='CDUMP', value='%s' % cdump) envars1 = envars + [envar_cdump] cdump_eupd = 'gfs' if eupd_cyc in ['GFS'] else 'gdas' # ecen deps = [] dep_dict = {'type': 'task', 'name': '%sanal' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'task', 'name': '%seupd' % cdump_eupd} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task = wfu.create_wf_task('ecen', cdump=cdump, envar=envars1, dependency=dependencies, cycledef=cycledef) dict_tasks['%secen' % cdump] = task # efmn, efcs deps = [] dep_dict = {'type': 'task', 'name': '%secen' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type': 'cycleexist', 'condition': 'not', 'offset': '-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=deps) efcsenvars = envars1 + [ensgrp] task = wfu.create_wf_task('efcs', cdump=cdump, envar=efcsenvars, dependency=dependencies, metatask='efmn', varname='grp', varval=EFCSGROUPS, cycledef=cycledef) dict_tasks['%sefmn' % cdump] = task # epmn, epos deps = [] dep_dict = {'type': 'metatask', 'name': '%sefmn' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) fhrgrp = rocoto.create_envar(name='FHRGRP', value='#grp#') fhrlst = rocoto.create_envar(name='FHRLST', value='#lst#') eposenvars = envars1 + [fhrgrp] + [fhrlst] varname1, varname2, varname3 = 'grp', 'dep', 'lst' varval1, varval2, varval3 = get_eposgroups(dict_configs['epos'], cdump=cdump) vardict = {varname2: varval2, varname3: varval3} task = wfu.create_wf_task('epos', cdump=cdump, envar=eposenvars, dependency=dependencies, metatask='epmn', varname=varname1, varval=varval1, vardict=vardict) dict_tasks['%sepmn' % cdump] = task # eamn, earc deps = [] dep_dict = {'type': 'metatask', 'name': '%sepmn' % cdump} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) earcenvars = envars1 + [ensgrp] task = wfu.create_wf_task('earc', cdump=cdump, envar=earcenvars, dependency=dependencies, metatask='eamn', varname='grp', varval=EARCGROUPS, cycledef=cycledef) dict_tasks['%seamn' % cdump] = task return dict_tasks def get_workflow_header(base): ''' Create the workflow header block ''' strings = [] strings.append('\n') strings.append(']>\n') strings.append('\n') strings.append('\n') strings.append('\n') strings.append('\t&EXPDIR;/logs/@Y@m@d@H.log\n') strings.append('\n') strings.append('\t\n') strings.append('\t&SDATE; &SDATE; 06:00:00\n') strings.append('\t&SDATE; &EDATE; 06:00:00\n') strings.append('\t&SDATE; &EDATE; 06:00:00\n') if base['gfs_cyc'] != 0: strings.append('\t&SDATE_GFS; &EDATE_GFS; &INTERVAL_GFS;\n') strings.append('\n') return ''.join(strings) def get_workflow_footer(): ''' Generate workflow footer ''' strings = [] strings.append('\n\n') return ''.join(strings) def get_postgroups(post, cdump='gdas'): fhmin = post['FHMIN'] fhmax = post['FHMAX'] fhout = post['FHOUT'] # Get a list of all forecast hours if cdump in ['gdas']: fhrs = range(fhmin, fhmax+fhout, fhout) elif cdump in ['gfs']: fhmax = np.max([post['FHMAX_GFS_00'],post['FHMAX_GFS_06'],post['FHMAX_GFS_12'],post['FHMAX_GFS_18']]) fhout = post['FHOUT_GFS'] fhmax_hf = post['FHMAX_HF_GFS'] fhout_hf = post['FHOUT_HF_GFS'] fhrs_hf = range(fhmin, fhmax_hf+fhout_hf, fhout_hf) fhrs = fhrs_hf + range(fhrs_hf[-1]+fhout, fhmax+fhout, fhout) npostgrp = post['NPOSTGRP'] ngrps = npostgrp if len(fhrs) > npostgrp else len(fhrs) fhrs = ['f%03d' % f for f in fhrs] fhrs = np.array_split(fhrs, ngrps) fhrs = [f.tolist() for f in fhrs] fhrgrp = ' '.join(['%03d' % x for x in range(0, ngrps+1)]) fhrdep = ' '.join(['f000'] + [f[-1] for f in fhrs]) fhrlst = ' '.join(['anl'] + ['_'.join(f) for f in fhrs]) return fhrgrp, fhrdep, fhrlst def get_awipsgroups(awips, cdump='gdas'): fhmin = awips['FHMIN'] fhmax = awips['FHMAX'] fhout = awips['FHOUT'] # Get a list of all forecast hours if cdump in ['gdas']: fhrs = range(fhmin, fhmax+fhout, fhout) elif cdump in ['gfs']: fhmax = np.max([awips['FHMAX_GFS_00'],awips['FHMAX_GFS_06'],awips['FHMAX_GFS_12'],awips['FHMAX_GFS_18']]) fhout = awips['FHOUT_GFS'] fhmax_hf = awips['FHMAX_HF_GFS'] fhout_hf = awips['FHOUT_HF_GFS'] if fhmax > 240: fhmax = 240 if fhmax_hf > 240: fhmax_hf = 240 fhrs_hf = range(fhmin, fhmax_hf+fhout_hf, fhout_hf) fhrs = fhrs_hf + range(fhrs_hf[-1]+fhout, fhmax+fhout, fhout) nawipsgrp = awips['NAWIPSGRP'] ngrps = nawipsgrp if len(fhrs) > nawipsgrp else len(fhrs) fhrs = ['f%03d' % f for f in fhrs] fhrs = np.array_split(fhrs, ngrps) fhrs = [f.tolist() for f in fhrs] fhrgrp = ' '.join(['%03d' % x for x in range(0, ngrps)]) fhrdep = ' '.join([f[-1] for f in fhrs]) fhrlst = ' '.join(['_'.join(f) for f in fhrs]) return fhrgrp, fhrdep, fhrlst def get_eposgroups(epos, cdump='gdas'): fhmin = epos['FHMIN_ENKF'] fhmax = epos['FHMAX_ENKF'] fhout = epos['FHOUT_ENKF'] fhrs = range(fhmin, fhmax+fhout, fhout) neposgrp = epos['NEPOSGRP'] ngrps = neposgrp if len(fhrs) > neposgrp else len(fhrs) fhrs = ['f%03d' % f for f in fhrs] fhrs = np.array_split(fhrs, ngrps) fhrs = [f.tolist() for f in fhrs] fhrgrp = ' '.join(['%03d' % x for x in range(0, ngrps)]) fhrdep = ' '.join([f[-1] for f in fhrs]) fhrlst = ' '.join(['_'.join(f) for f in fhrs]) return fhrgrp, fhrdep, fhrlst def dict_to_strings(dict_in): strings = [] for key in dict_in.keys(): strings.append(dict_in[key]) strings.append('\n') return ''.join(strings) def create_xml(dict_configs): ''' Given an dictionary of sourced config files, create the workflow XML ''' from __builtin__ import any as b_any base = dict_configs['base'] dohybvar = base.get('DOHYBVAR', 'NO').upper() gfs_cyc = base.get('gfs_cyc', 0) eupd_cyc = base.get('EUPD_CYC', 'gdas').upper() # Start collecting workflow pieces preamble = get_preamble() definitions = get_definitions(base) workflow_header = get_workflow_header(base) workflow_footer = get_workflow_footer() # Get GDAS related entities, resources, workflow dict_gdas_resources = get_gdasgfs_resources(dict_configs) dict_gdas_tasks = get_gdasgfs_tasks(dict_configs) # Get hybrid related entities, resources, workflow if dohybvar in ['Y', 'YES']: dict_hyb_resources = get_hyb_resources(dict_configs) dict_hyb_tasks = get_hyb_tasks(dict_configs) # Removes &MEMORY_JOB_DUMP post mortem from hyb tasks hyp_tasks = {'gdaseobs':'gdaseobs', 'gdaseomg':'gdaseomn', 'gdaseupd':'gdaseupd','gdasecen':'gdasecen','gdasefcs':'gdasefmn','gdasepos':'gdasepmn','gdasearc':'gdaseamn'} for each_task, each_resource_string in dict_hyb_resources.iteritems(): #print each_task,hyp_tasks[each_task] #print dict_hyb_tasks[hyp_tasks[each_task]] if 'MEMORY' not in each_resource_string: if each_task in dict_hyb_tasks: temp_task_string = [] for each_line in re.split(r'(\s+)', dict_hyb_tasks[each_task]): if 'memory' not in each_line: temp_task_string.append(each_line) dict_hyb_tasks[each_task] = ''.join(temp_task_string) if hyp_tasks[each_task] in dict_hyb_tasks: temp_task_string = [] for each_line in re.split(r'(\s+)', dict_hyb_tasks[hyp_tasks[each_task]]): if 'memory' not in each_line: temp_task_string.append(each_line) dict_hyb_tasks[hyp_tasks[each_task]] = ''.join(temp_task_string) # Get GFS cycle related entities, resources, workflow dict_gfs_resources = get_gdasgfs_resources(dict_configs, cdump='gfs') dict_gfs_tasks = get_gdasgfs_tasks(dict_configs, cdump='gfs') # Removes &MEMORY_JOB_DUMP post mortem from gdas tasks for each_task, each_resource_string in dict_gdas_resources.iteritems(): if each_task not in dict_gdas_tasks: continue if 'MEMORY' not in each_resource_string: temp_task_string = [] for each_line in re.split(r'(\s+)', dict_gdas_tasks[each_task]): if 'memory' not in each_line: temp_task_string.append(each_line) dict_gdas_tasks[each_task] = ''.join(temp_task_string) # Removes &MEMORY_JOB_DUMP post mortem from gfs tasks for each_task, each_resource_string in dict_gfs_resources.iteritems(): if each_task not in dict_gfs_tasks: continue if 'MEMORY' not in each_resource_string: temp_task_string = [] for each_line in re.split(r'(\s+)', dict_gfs_tasks[each_task]): if 'memory' not in each_line: temp_task_string.append(each_line) dict_gfs_tasks[each_task] = ''.join(temp_task_string) # Put together the XML file xmlfile = [] xmlfile.append(preamble) xmlfile.append(definitions) xmlfile.append(dict_to_strings(dict_gdas_resources)) if dohybvar in ['Y', 'YES']: xmlfile.append(dict_to_strings(dict_hyb_resources)) if gfs_cyc != 0: xmlfile.append(dict_to_strings(dict_gfs_resources)) elif gfs_cyc == 0 and dohybvar in ['Y', 'YES'] and eupd_cyc in ['BOTH', 'GFS']: xmlfile.append(dict_gfs_resources['gfsprep']) xmlfile.append(workflow_header) xmlfile.append(dict_to_strings(dict_gdas_tasks)) if dohybvar in ['Y', 'YES']: xmlfile.append(dict_to_strings(dict_hyb_tasks)) if gfs_cyc != 0: xmlfile.append(dict_to_strings(dict_gfs_tasks)) elif gfs_cyc == 0 and dohybvar in ['Y', 'YES'] and eupd_cyc in ['BOTH', 'GFS']: xmlfile.append(dict_gfs_tasks['gfsprep']) xmlfile.append('\n') xmlfile.append(wfu.create_firstcyc_task()) xmlfile.append(workflow_footer) # Write the XML file fh = open('%s/%s.xml' % (base['EXPDIR'], base['PSLOT']), 'w') fh.write(''.join(xmlfile)) fh.close() return if __name__ == '__main__': main() sys.exit(0)