#!/usr/bin/env python ''' Module containing functions all workflow setups require ''' import random import re import os, sys, stat import socket import glob import subprocess import numpy as np from distutils.spawn import find_executable from datetime import datetime, timedelta import rocoto DATE_ENV_VARS=['CDATE','SDATE','EDATE'] SCHEDULER_MAP={'HERA':'slurm', 'ORION':'slurm', 'WCOSS':'lsf', 'WCOSS_DELL_P3':'lsf', 'WCOSS_C':'lsfcray', 'WCOSS2':'pbspro'} class UnknownMachineError(Exception): pass class UnknownConfigError(Exception): pass class ShellScriptException(Exception): def __init__(self,scripts,errors): self.scripts = scripts self.errors = errors super(ShellScriptException,self).__init__( str(errors)+ ': error processing'+ (' '.join(scripts))) def get_shell_env(scripts): vars=dict() runme=''.join([ f'source {s} ; ' for s in scripts ]) magic=f'--- ENVIRONMENT BEGIN {random.randint(0,64**5)} ---' runme+=f'/bin/echo -n "{magic}" ; /usr/bin/env -0' with open('/dev/null','w') as null: env=subprocess.Popen(runme,shell=True,stdin=null.fileno(), stdout=subprocess.PIPE) (out,err)=env.communicate() out = out.decode() begin=out.find(magic) if begin<0: raise ShellScriptException(scripts,'Cannot find magic string; ' 'at least one script failed: '+repr(out)) for entry in out[begin+len(magic):].split('\x00'): iequal=entry.find('=') vars[entry[0:iequal]] = entry[iequal+1:] return vars def get_script_env(scripts): default_env=get_shell_env([]) and_script_env=get_shell_env(scripts) vars_just_in_script=set(and_script_env)-set(default_env) union_env=dict(default_env) union_env.update(and_script_env) return dict([ (v,union_env[v]) for v in vars_just_in_script ]) def cast_or_not(type,value): try: return type(value) except ValueError: return value def get_configs(expdir): """ Given an experiment directory containing config files, return a list of configs minus the ones ending with ".default" """ result=list() for config in glob.glob(f'{expdir}/config.*'): if not config.endswith('.default'): result.append(config) return result def find_config(config_name, configs): for config in configs: if config_name == os.path.basename(config): return config raise UnknownConfigError(f'{config_name} does not exist (known: {repr(config_name)}), ABORT!') def source_configs(configs, tasks): ''' Given list of config files, source them and return a dictionary for each task Every task depends on config.base ''' dict_configs = {} # Return config.base as well dict_configs['base'] = config_parser([find_config('config.base', configs)]) # Source the list of input tasks for task in tasks: files = [] files.append(find_config('config.base', configs)) if task in ['eobs', 'eomg']: files.append(find_config('config.anal', configs)) files.append(find_config('config.eobs', configs)) elif task in ['eupd']: files.append(find_config('config.anal', configs)) files.append(find_config('config.eupd', configs)) elif task in ['efcs']: files.append(find_config('config.fcst', configs)) files.append(find_config('config.efcs', configs)) else: files.append(find_config(f'config.{task}', configs)) print(f'sourcing config.{task}') dict_configs[task] = config_parser(files) return dict_configs def config_parser(files): """ Given the name of config file, key-value pair of all variables in the config file is returned as a dictionary :param files: config file or list of config files :type files: list or str or unicode :return: Key value pairs representing the environment variables defined in the script. :rtype: dict """ if isinstance(files,(str, bytes)): files=[files] varbles=dict() for key,value in get_script_env(files).items(): if key in DATE_ENV_VARS: # likely a date, convert to datetime varbles[key] = datetime.strptime(value,'%Y%m%d%H') elif '.' in value: # Likely a number and that too a float varbles[key] = cast_or_not(float,value) else: # Still could be a number, may be an integer varbles[key] = cast_or_not(int,value) return varbles def detectMachine(): machines = ['HERA', 'ORION', 'WCOSS_C', 'WCOSS_DELL_P3', 'WCOSS2'] if os.path.exists('/scratch1/NCEPDEV'): return 'HERA' elif os.path.exists('/work/noaa'): return 'ORION' elif os.path.exists('/gpfs') and os.path.exists('/etc/SuSE-release'): return 'WCOSS_C' elif os.path.exists('/gpfs/dell2'): return 'WCOSS_DELL_P3' elif os.path.exists('/lfs/h2'): return 'WCOSS2' else: print(f'workflow is currently only supported on: {machines}') raise NotImplementedError('Cannot auto-detect platform, ABORT!') def get_scheduler(machine): try: return SCHEDULER_MAP[machine] except KeyError: raise UnknownMachineError(f'Unknown machine: {machine}, ABORT!') def create_wf_task(task, cdump='gdas', cycledef=None, envar=None, dependency=None, \ metatask=None, varname=None, varval=None, vardict=None, \ final=False): if metatask is None: taskstr = f'{task}' else: taskstr = f'{task}#{varname}#' metataskstr = f'{cdump}{metatask}' metatask_dict = {'metataskname': metataskstr, \ 'varname': f'{varname}', \ 'varval': f'{varval}', \ 'vardict': vardict} taskstr = f'{cdump}{taskstr}' cycledefstr = cdump if cycledef is None else cycledef task_dict = {'taskname': f'{taskstr}', \ 'cycledef': f'{cycledefstr}', \ 'maxtries': '&MAXTRIES;', \ 'command': f'&JOBS_DIR;/{task}.sh', \ 'jobname': f'&PSLOT;_{taskstr}_@H', \ 'account': '&ACCOUNT;', \ 'queue': f'&QUEUE_{task.upper()}_{cdump.upper()};', \ 'walltime': f'&WALLTIME_{task.upper()}_{cdump.upper()};', \ 'native': f'&NATIVE_{task.upper()}_{cdump.upper()};', \ 'memory': f'&MEMORY_{task.upper()}_{cdump.upper()};', \ 'resources': f'&RESOURCES_{task.upper()}_{cdump.upper()};', \ 'log': f'&ROTDIR;/logs/@Y@m@d@H/{taskstr}.log', \ 'envar': envar, \ 'dependency': dependency, \ 'final': final} # Add PARTITION_BATCH to all non-service jobs on Orion (SLURM) if get_scheduler(detectMachine()) in ['slurm'] and detectMachine() in ['ORION']: task_dict['partition'] = '&PARTITION_BATCH;' # Add PARTITION_SERVICE to all service jobs (SLURM) if get_scheduler(detectMachine()) in ['slurm'] and task in ['getic','arch','earc']: task_dict['partition'] = f'&PARTITION_{task.upper()}_{cdump.upper()};' if metatask is None: task = rocoto.create_task(task_dict) else: task = rocoto.create_metatask(task_dict, metatask_dict) task = ''.join(task) return task def create_firstcyc_task(cdump='gdas'): ''' This task is needed to run to finalize the first half cycle ''' task = 'firstcyc' taskstr = f'{task}' deps = [] data = '&EXPDIR;/logs/@Y@m@d@H.log' dep_dict = {'type':'data', 'data':data, 'offset':'24:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dep_dict = {'type':'cycleexist', 'condition':'not', 'offset':'-06:00:00'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) task_dict = {'taskname': f'{taskstr}' , \ 'cycledef': 'first', \ 'maxtries': '&MAXTRIES;', \ 'final' : True, \ 'command': 'sleep 1', \ 'jobname': f'&PSLOT;_{taskstr}_@H', \ 'account': '&ACCOUNT;', \ 'queue': '&QUEUE_SERVICE;', \ 'walltime': f'&WALLTIME_ARCH_{cdump.upper()};', \ 'native': f'&NATIVE_ARCH_{cdump.upper()};', \ 'resources': f'&RESOURCES_ARCH_{cdump.upper()};', \ 'log': f'&ROTDIR;/logs/@Y@m@d@H/{taskstr}.log', \ 'dependency': dependencies} if get_scheduler(detectMachine()) in ['slurm']: task_dict['queue'] = '&QUEUE;' task_dict['partition'] = '&PARTITION_SERVICE;' task = rocoto.create_task(task_dict) return ''.join(task) def get_gfs_interval(gfs_cyc): ''' return interval in hours based on gfs_cyc ''' # Get interval from cyc_input if gfs_cyc == 0: interval = None if gfs_cyc == 1: interval = '24:00:00' elif gfs_cyc == 2: interval = '12:00:00' elif gfs_cyc == 4: interval = '06:00:00' return interval def get_resources(machine, cfg, task, reservation, cdump='gdas'): scheduler = get_scheduler(machine) if cdump in ['gfs'] and f'wtime_{task}_gfs' in cfg.keys(): wtimestr = cfg[f'wtime_{task}_gfs'] else: wtimestr = cfg[f'wtime_{task}'] ltask = 'eobs' if task in ['eomg'] else task if cdump in ['gfs'] and f'memory_{task}_gfs' in cfg.keys(): memory = cfg.get(f'memory_{ltask}_gfs', None) else: memory = cfg.get(f'memory_{ltask}', None) if cdump in ['gfs'] and f'npe_{task}_gfs' in cfg.keys(): tasks = cfg[f'npe_{ltask}_gfs'] else: tasks = cfg[f'npe_{ltask}'] if cdump in ['gfs'] and f'npe_node_{task}_gfs' in cfg.keys(): ppn = cfg[f'npe_node_{ltask}_gfs'] else: ppn = cfg[f'npe_node_{ltask}'] if machine in [ 'WCOSS2', 'WCOSS_DELL_P3', 'HERA', 'ORION' ]: if cdump in ['gfs'] and f'nth_{task}_gfs' in cfg.keys(): threads = cfg[f'nth_{ltask}_gfs'] else: threads = cfg[f'nth_{ltask}'] nodes = np.int(np.ceil(np.float(tasks) / np.float(ppn))) memstr = '' if memory is None else str(memory) natstr = '' if scheduler in ['slurm']: natstr = '--export=NONE' if machine in ['HERA', 'ORION', 'WCOSS_C', 'WCOSS_DELL_P3', 'WCOSS2' ]: if machine in ['HERA', 'ORION', 'WCOSS2']: resstr = f'{nodes}:ppn={ppn}:tpp={threads}' else: resstr = f'{nodes}:ppn={ppn}' if machine in ['WCOSS_C'] and task in ['arch', 'earc', 'getic']: resstr += '' if machine in ['WCOSS_DELL_P3']: if not reservation in ['NONE']: natstr = f"-U {reservation} -R 'affinity[core({threads})]'" else: natstr = f"-R 'affinity[core({threads})]'" if task in ['arch', 'earc', 'getic']: natstr = "-R 'affinity[core(1)]'" if machine in ['WCOSS2']: natstr = "-l debug=true" # Add place settings if task in ['arch', 'earc', 'getic']: # dev_transfer shared only natstr += ",place=shared" else: natstr += ",place=vscatter" # Set either exclusive or shared - default on WCOSS2 is exclusive when not set if memory is None: natstr += ":exclhost" else: natstr += ":shared" elif machine in ['WCOSS']: resstr = f'{tasks}' if task in ['arch', 'earc', 'getic']: queuestr = '&QUEUE;' if scheduler in ['slurm'] else '&QUEUE_SERVICE;' else: queuestr = '&QUEUE;' return wtimestr, resstr, queuestr, memstr, natstr def create_crontab(base, cronint=5): ''' Create crontab to execute rocotorun every cronint (5) minutes ''' # No point creating a crontab if rocotorun is not available. rocotoruncmd = find_executable('rocotorun') if rocotoruncmd is None: print('Failed to find rocotorun, crontab will not be created') return # Leaving the code for a wrapper around crontab file if needed again later # if check_slurm(): # # cronintstr = '*/%d * * * *' % cronint # rocotorunstr = '%s -d %s/%s.db -w %s/%s.xml' % (rocotoruncmd, base['EXPDIR'], base['PSLOT'], base['EXPDIR'], base['PSLOT']) # # wrapper_strings = [] # wrapper_strings.append('#!/bin/env tcsh\n') # wrapper_strings.append('\n') # wrapper_strings.append('module load slurm\n') # wrapper_strings.append('module load rocoto/1.3.0-RC4\n') # wrapper_strings.append('\n') # wrapper_strings.append(rocotorunstr) # # hostname = 'tfe02' # script_file = os.path.join(base['EXPDIR'], '%s.sh' % base['PSLOT']) # # fh = open(script_file, 'w') # fh.write(''.join(wrapper_strings)) # os.chmod(script_file,stat.S_IRWXU|stat.S_IRWXG|stat.S_IRWXO) # fh.close() # # rocotorunstr = 'ssh %s %s/%s.sh' % (socket.gethostname(), base['EXPDIR'], base['PSLOT']) # # else: rocotorunstr = f'''{rocotoruncmd} -d {base['EXPDIR']}/{base['PSLOT']}.db -w {base['EXPDIR']}/{base['PSLOT']}.xml''' cronintstr = f'*/{cronint} * * * *' # On WCOSS, rocoto module needs to be loaded everytime cron runs if base['machine'] in ['WCOSS']: rocotoloadstr = '. /usrx/local/Modules/default/init/sh; module use -a /usrx/local/emc_rocoto/modulefiles; module load rocoto/1.3.0rc2)' rocotorunstr = f'({rocotoloadstr} {rocotorunstr})' try: REPLYTO = os.environ['REPLYTO'] except: REPLYTO = '' strings = [] strings.append('\n') strings.append(f'''#################### {base['PSLOT']} ####################\n''') strings.append(f'MAILTO="{REPLYTO}"\n') strings.append(f'{cronintstr} {rocotorunstr}\n') strings.append('#################################################################\n') strings.append('\n') fh = open(os.path.join(base['EXPDIR'], f'''{base['PSLOT']}.crontab'''), 'w') fh.write(''.join(strings)) fh.close() return