#!/usr/bin/env python ''' Module containing functions all workflow setups require ''' import random import re import os, sys, stat import socket import glob import subprocess import numpy as np from distutils.spawn import find_executable from datetime import datetime, timedelta import rocoto DATE_ENV_VARS=['CDATE','SDATE','EDATE'] SCHEDULER_MAP={'HERA':'slurm', 'THEIA':'slurm', 'WCOSS':'lsf', 'WCOSS_DELL_P3':'lsf', 'WCOSS_C':'lsfcray'} class UnknownMachineError(Exception): pass class UnknownConfigError(Exception): pass class ShellScriptException(Exception): def __init__(self,scripts,errors): self.scripts = scripts self.errors = errors super(ShellScriptException,self).__init__( str(errors)+ ': error processing'+ (' '.join(scripts))) def get_shell_env(scripts): vars=dict() runme=''.join([ 'source %s ; '%(s,) for s in scripts ]) magic='--- ENVIRONMENT BEGIN %d ---'%random.randint(0,64**5) runme+='/bin/echo -n "%s" ; /usr/bin/env -0'%(magic,) with open('/dev/null','wb+') as null: env=subprocess.Popen(runme,shell=True,stdin=null.fileno(), stdout=subprocess.PIPE) (out,err)=env.communicate() begin=out.find(magic) if begin<0: raise ShellScriptException(scripts,'Cannot find magic string; ' 'at least one script failed: '+repr(out)) for entry in out[begin+len(magic):].split('\x00'): iequal=entry.find('=') vars[entry[0:iequal]] = entry[iequal+1:] return vars def get_script_env(scripts): default_env=get_shell_env([]) and_script_env=get_shell_env(scripts) vars_just_in_script=set(and_script_env)-set(default_env) union_env=dict(default_env) union_env.update(and_script_env) return dict([ (v,union_env[v]) for v in vars_just_in_script ]) def cast_or_not(type,value): try: return type(value) except ValueError: return value def get_configs(expdir): """ Given an experiment directory containing config files, return a list of configs minus the ones ending with ".default" """ result=list() for config in glob.glob('%s/config.*' % expdir): if not config.endswith('.default'): result.append(config) return result def find_config(config_name, configs): for config in configs: if config_name == os.path.basename(config): return config raise UnknownConfigError("%s does not exist (known: %s), ABORT!" % ( config_name,repr(config_name))) def source_configs(configs, tasks): ''' Given list of config files, source them and return a dictionary for each task Every task depends on config.base ''' dict_configs = {} # Return config.base as well dict_configs['base'] = config_parser([find_config('config.base', configs)]) # Source the list of input tasks for task in tasks: files = [] files.append(find_config('config.base', configs)) if task in ['eobs', 'eomg']: files.append(find_config('config.anal', configs)) files.append(find_config('config.eobs', configs)) elif task in ['eupd']: files.append(find_config('config.anal', configs)) files.append(find_config('config.eupd', configs)) elif task in ['efcs']: files.append(find_config('config.fcst', configs)) files.append(find_config('config.efcs', configs)) else: files.append(find_config('config.%s' % task, configs)) print 'sourcing config.%s' % task dict_configs[task] = config_parser(files) return dict_configs def config_parser(files): """ Given the name of config file, key-value pair of all variables in the config file is returned as a dictionary :param files: config file or list of config files :type files: list or str or unicode :return: Key value pairs representing the environment variables defined in the script. :rtype: dict """ if isinstance(files,basestring): files=[files] varbles=dict() for key,value in get_script_env(files).iteritems(): if key in DATE_ENV_VARS: # likely a date, convert to datetime varbles[key] = datetime.strptime(value,'%Y%m%d%H') elif '.' in value: # Likely a number and that too a float varbles[key] = cast_or_not(float,value) else: # Still could be a number, may be an integer varbles[key] = cast_or_not(int,value) return varbles def check_slurm(print_message = False): # Seriously? # What happens if srun IS in your path but the scheduler is something else? if find_executable('srun'): if print_message: print 'Info: Using Slurm as scheduler because srun was found in your path' non_decimal = re.compile(r'[^\d.]+') rocoto_version = non_decimal.sub('',find_executable('rocotorun').replace('.','')) if int(rocoto_version) < 130: print 'WARNING: XML workflow is being made to use Slurm because it was set in your' print 'environment and the correct version of Rocoto is not loaded.' print 'Make sure to use Rocoto 1.3.0rc2 or newer (example: module load rocoto/1.3.0rc2).' return True else: return False def detectMachine(): machines = ['THEIA', 'HERA', 'WCOSS_C', 'WCOSS_DELL_P3'] if os.path.exists('/scratch1/NCEPDEV'): return 'HERA' elif os.path.exists('/scratch3/NCEPDEV'): return 'THEIA' elif os.path.exists('/gpfs') and os.path.exists('/etc/SuSE-release'): return 'WCOSS_C' elif os.path.exists('/gpfs/dell2'): return 'WCOSS_DELL_P3' else: print 'workflow is currently only supported on: %s' % ' '.join(machines) raise NotImplementedError('Cannot auto-detect platform, ABORT!') def get_scheduler(machine): try: return SCHEDULER_MAP[machine] except KeyError: raise UnknownMachineError('Unknown machine: %s, ABORT!' % machine) def create_wf_task(task, cdump='gdas', cycledef=None, envar=None, dependency=None, \ metatask=None, varname=None, varval=None, vardict=None, \ final=False): if metatask is None: taskstr = '%s' % task else: taskstr = '%s#%s#' % (task, varname) metataskstr = '%s%s' % (cdump, metatask) metatask_dict = {'metataskname': metataskstr, \ 'varname': '%s' % varname, \ 'varval': '%s' % varval, \ 'vardict': vardict} taskstr = '%s%s' % (cdump, taskstr) cycledefstr = cdump if cycledef is None else cycledef task_dict = {'taskname': '%s' % taskstr, \ 'cycledef': '%s' % cycledefstr, \ 'maxtries': '&MAXTRIES;', \ 'command': '&JOBS_DIR;/%s.sh' % task, \ 'jobname': '&PSLOT;_%s_@H' % taskstr, \ 'account': '&ACCOUNT;', \ 'queue': '&QUEUE_%s_%s;' % (task.upper(), cdump.upper()), \ 'walltime': '&WALLTIME_%s_%s;' % (task.upper(), cdump.upper()), \ 'native': '&NATIVE_%s_%s;' % (task.upper(), cdump.upper()), \ 'memory': '&MEMORY_%s_%s;' % (task.upper(), cdump.upper()), \ 'resources': '&RESOURCES_%s_%s;' % (task.upper(), cdump.upper()), \ 'log': '&ROTDIR;/logs/@Y@m@d@H/%s.log' % taskstr, \ 'envar': envar, \ 'dependency': dependency, \ 'final': final} if task in ['getic','arch','earc'] and get_scheduler(detectMachine()) in ['slurm']: task_dict['partition'] = '&PARTITION_%s_%s;' % (task.upper(),cdump.upper()) if metatask is None: task = rocoto.create_task(task_dict) else: task = rocoto.create_metatask(task_dict, metatask_dict) task = ''.join(task) return task def create_firstcyc_task(cdump='gdas'): ''' This task is needed to run to finalize the first half cycle ''' task = 'firstcyc' taskstr = '%s' % task deps = [] data = '&EXPDIR;/logs/@Y@m@d@H.log' dep_dict = {'type':'data', 'data':data, 'offset':'24:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type':'cycleexist', 'condition':'not', 'offset':'-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task_dict = {'taskname': '%s' % taskstr, \ 'cycledef': 'first', \ 'maxtries': '&MAXTRIES;', \ 'final' : True, \ 'command': 'sleep 1', \ 'jobname': '&PSLOT;_%s_@H' % taskstr, \ 'account': '&ACCOUNT;', \ 'queue': '&QUEUE_ARCH;', \ 'walltime': '&WALLTIME_ARCH_%s;' % cdump.upper(), \ 'native': '&NATIVE_ARCH_%s;' % cdump.upper(), \ 'resources': '&RESOURCES_ARCH_%s;' % cdump.upper(), \ 'log': '&ROTDIR;/logs/@Y@m@d@H/%s.log' % taskstr, \ 'queue': '&QUEUE_ARCH_%s;' % cdump.upper(), \ 'dependency': dependencies} if get_scheduler(detectMachine()) in ['slurm']: task_dict['queue'] = '&QUEUE;' task_dict['partition'] = '&PARTITION_ARCH;' task = rocoto.create_task(task_dict) return ''.join(task) def get_gfs_interval(gfs_cyc): ''' return interval in hours based on gfs_cyc ''' # Get interval from cyc_input if gfs_cyc == 0: interval = None if gfs_cyc == 1: interval = '24:00:00' elif gfs_cyc == 2: interval = '12:00:00' elif gfs_cyc == 4: interval = '06:00:00' return interval def get_resources(machine, cfg, task, cdump='gdas'): scheduler = get_scheduler(machine) if cdump in ['gfs'] and 'wtime_%s_gfs' % task in cfg.keys(): wtimestr = cfg['wtime_%s_gfs' % task] else: wtimestr = cfg['wtime_%s' % task] ltask = 'eobs' if task in ['eomg'] else task memory = cfg.get('memory_%s' % ltask, None) if cdump in ['gfs'] and 'npe_%s_gfs' % task in cfg.keys(): tasks = cfg['npe_%s_gfs' % ltask] else: tasks = cfg['npe_%s' % ltask] if cdump in ['gfs'] and 'npe_node_%s_gfs' % task in cfg.keys(): ppn = cfg['npe_node_%s_gfs' % ltask] else: ppn = cfg['npe_node_%s' % ltask] if machine in [ 'WCOSS_DELL_P3', 'HERA']: threads = cfg['nth_%s' % ltask] nodes = np.int(np.ceil(np.float(tasks) / np.float(ppn))) memstr = '' if memory is None else str(memory) natstr = '' if scheduler in ['slurm']: natstr = '--export=NONE' if machine in ['THEIA', 'HERA', 'WCOSS_C', 'WCOSS_DELL_P3']: if machine in ['HERA']: resstr = '%d:ppn=%d:tpp=%d' % (nodes, ppn, threads) else: resstr = '%d:ppn=%d' % (nodes, ppn) if machine in ['WCOSS_C'] and task in ['arch', 'earc', 'getic']: resstr += '' if machine in ['WCOSS_DELL_P3']: natstr = "-R 'affinity[core(%d)]'" % (threads) if task in ['arch', 'earc', 'getic']: natstr = "-R 'affinity[core(1)]'" elif machine in ['WCOSS']: resstr = '%d' % tasks if task in ['arch', 'earc', 'getic']: queuestr = '&QUEUE;' if scheduler in ['slurm'] else '&QUEUE_ARCH;' else: queuestr = '&QUEUE;' return wtimestr, resstr, queuestr, memstr, natstr def create_crontab(base, cronint=5): ''' Create crontab to execute rocotorun every cronint (5) minutes ''' # No point creating a crontab if rocotorun is not available. rocotoruncmd = find_executable('rocotorun') if rocotoruncmd is None: print 'Failed to find rocotorun, crontab will not be created' return # Leaving the code for a wrapper around crontab file if needed again later # if check_slurm() and base['machine'] in ['THEIA']: # # cronintstr = '*/%d * * * *' % cronint # rocotorunstr = '%s -d %s/%s.db -w %s/%s.xml' % (rocotoruncmd, base['EXPDIR'], base['PSLOT'], base['EXPDIR'], base['PSLOT']) # # wrapper_strings = [] # wrapper_strings.append('#!/bin/env tcsh\n') # wrapper_strings.append('\n') # wrapper_strings.append('module load slurm\n') # wrapper_strings.append('module load rocoto/1.3.0-RC4\n') # wrapper_strings.append('\n') # wrapper_strings.append(rocotorunstr) # # hostname = 'tfe02' # script_file = os.path.join(base['EXPDIR'], '%s.sh' % base['PSLOT']) # # fh = open(script_file, 'w') # fh.write(''.join(wrapper_strings)) # os.chmod(script_file,stat.S_IRWXU|stat.S_IRWXG|stat.S_IRWXO) # fh.close() # # rocotorunstr = 'ssh %s %s/%s.sh' % (socket.gethostname(), base['EXPDIR'], base['PSLOT']) # # else: rocotorunstr = '%s -d %s/%s.db -w %s/%s.xml' % (rocotoruncmd, base['EXPDIR'], base['PSLOT'], base['EXPDIR'], base['PSLOT']) cronintstr = '*/%d * * * *' % cronint # On WCOSS, rocoto module needs to be loaded everytime cron runs if base['machine'] in ['WCOSS']: rocotoloadstr = '. /usrx/local/Modules/default/init/sh; module use -a /usrx/local/emc_rocoto/modulefiles; module load rocoto/20170119-master)' rocotorunstr = '(%s %s)' % (rocotoloadstr, rocotorunstr) try: REPLYTO = os.environ['REPLYTO'] except: REPLYTO = '' strings = [] strings.append('\n') strings.append('#################### %s ####################\n' % base['PSLOT']) strings.append('MAILTO="%s"\n' % REPLYTO) strings.append('%s %s\n' % (cronintstr, rocotorunstr)) strings.append('#################################################################\n') strings.append('\n') fh = open(os.path.join(base['EXPDIR'], '%s.crontab' % base['PSLOT']), 'w') fh.write(''.join(strings)) fh.close() return