#!/usr/bin/env python ''' MODULE: rocoto.py ABOUT: Helper module to create tasks, metatasks, and dependencies AUTHOR: Rahul.Mahajan rahul.mahajan@noaa.gov ''' def create_metatask(task_dict, metatask_dict): ''' create a Rocoto metatask given a dictionary containing task and metatask information :param metatask_dict: metatask key-value parameters :type metatask_dict: dict :param task_dict: task key-value parameters :type task_dict: dict :return: Rocoto metatask :rtype: list ''' # Grab metatask info from the metatask_dict metataskname = metatask_dict.get('metataskname', 'demometatask') varname = metatask_dict.get('varname', 'demovar') varval = metatask_dict.get('varval', 1) vardict = metatask_dict.get('vardict', None) strings = [] strings.append('\n' % metataskname) strings.append('\n') strings.append('\t%s\n' % (varname, str(varval))) if vardict is not None: for key in vardict.keys(): value = str(vardict[key]) strings.append('\t%s\n' % (key, value)) strings.append('\n') tasklines = create_task(task_dict) for tl in tasklines: strings.append('%s' % tl) if tl == '\n' else strings.append('\t%s' % tl) strings.append('\n') strings.append('\n') return strings def create_task(task_dict): ''' create a Rocoto task given a dictionary containing task information :param task_dict: task key-value parameters :type task_dict: dict :return: Rocoto task :rtype: list ''' # Grab task info from the task_dict taskname = task_dict.get('taskname', 'demotask') cycledef = task_dict.get('cycledef', 'democycle') maxtries = task_dict.get('maxtries', 3) final = task_dict.get('final', False) command = task_dict.get('command', 'sleep 10') jobname = task_dict.get('jobname', 'demojob') account = task_dict.get('account', 'batch') queue = task_dict.get('queue', 'debug') partition = task_dict.get('partition', None) walltime = task_dict.get('walltime', '00:01:00') log = task_dict.get('log', 'demo.log') native = task_dict.get('native', None) memory = task_dict.get('memory', None) resources = task_dict.get('resources', None) envar = task_dict.get('envar', None) dependency = task_dict.get('dependency', None) str_maxtries = str(maxtries) str_final = ' final="true"' if final else '' envar = envar if isinstance(envar, list) else [envar] strings = [] strings.append('\n' % \ (taskname, cycledef, str_maxtries, str_final)) strings.append('\n') strings.append('\t%s\n' % command) strings.append('\n') strings.append('\t%s\n' % jobname) strings.append('\t%s\n' % account) strings.append('\t%s\n' % queue) if partition is not None: strings.append('\t%s\n' % partition) if resources is not None: strings.append('\t%s\n' % resources) strings.append('\t%s\n' % walltime) if memory is not None: strings.append('\t%s\n' % memory) if native is not None: strings.append('\t%s\n' % native) strings.append('\n') strings.append('\t%s\n' % log) strings.append('\n') if envar[0] is not None: for e in envar: strings.append('\t%s\n' % e) strings.append('\n') if dependency is not None: strings.append('\t\n') for d in dependency: strings.append('\t\t%s\n' % d) strings.append('\t\n') strings.append('\n') strings.append('\n') return strings def add_dependency(dep_dict): ''' create a simple Rocoto dependency given a dictionary with dependency information :param dep_dict: dependency key-value parameters :type dep_dict: dict :return: Rocoto simple dependency :rtype: str ''' dep_condition = dep_dict.get('condition', None) dep_type = dep_dict.get('type', None) if dep_type in ['task', 'metatask']: string = add_task_tag(dep_dict) elif dep_type in ['data']: string = add_data_tag(dep_dict) elif dep_type in ['cycleexist']: string = add_cycle_tag(dep_dict) elif dep_type in ['streq', 'strneq']: string = add_streq_tag(dep_dict) else: msg = 'Unknown dependency type %s' % dep_dict['type'] raise KeyError(msg) if dep_condition is not None: string = '<%s>%s' % (dep_condition, string, dep_condition) return string def add_task_tag(dep_dict): ''' create a simple task or metatask tag :param dep_dict: dependency key-value parameters :type dep_dict: dict :return: Rocoto simple task or metatask dependency :rtype: str ''' dep_type = dep_dict.get('type', None) dep_name = dep_dict.get('name', None) dep_offset = dep_dict.get('offset', None) if dep_name is None: msg = 'a %s name is necessary for %s dependency' % (dep_type, dep_type) raise KeyError(msg) string = '<' string += '%sdep %s="%s"' % (dep_type, dep_type, dep_name) if dep_offset is not None: string += ' cycle_offset="%s"' % dep_offset string += '/>' return string def add_data_tag(dep_dict): ''' create a simple data tag :param dep_dict: dependency key-value parameters :type dep_dict: dict :return: Rocoto simple task or metatask dependency :rtype: str ''' dep_type = dep_dict.get('type', None) dep_data = dep_dict.get('data', None) dep_offset = dep_dict.get('offset', None) if dep_data is None: msg = 'a data value is necessary for %s dependency' % dep_type raise KeyError(msg) if dep_offset is None: if '@' in dep_data: offset_string_b = '' offset_string_e = '' else: offset_string_b = '' offset_string_e = '' else: offset_string_b = '' % dep_offset offset_string_e = '' string = '' string += '%s%s%s' % (offset_string_b, dep_data, offset_string_e) string += '' return string def add_cycle_tag(dep_dict): ''' create a simple cycle exist tag :param dep_dict: dependency key-value parameters :type dep_dict: dict :return: Rocoto simple task or metatask dependency :rtype: str ''' dep_type = dep_dict.get('type', None) dep_offset = dep_dict.get('offset', None) if dep_offset is None: msg = 'an offset value is necessary for %s dependency' % dep_type raise KeyError(msg) string = '' % dep_offset return string def add_streq_tag(dep_dict): ''' create a simple string comparison tag :param dep_dict: dependency key-value parameters :type dep_dict: dict :return: Rocoto simple task or metatask dependency :rtype: str ''' dep_type = dep_dict.get('type', None) dep_left = dep_dict.get('left', None) dep_right = dep_dict.get('right', None) fail = False msg = '' if dep_left is None: msg += 'a left value is necessary for %s dependency' % dep_type fail = True if dep_right is None: if fail: msg += '\n' msg += 'a right value is necessary for %s dependency' % dep_type fail = True if fail: raise KeyError(msg) string = '<%s>%s%s' % (dep_type, dep_left, dep_right, dep_type) return string def _traverse(o, tree_types=(list, tuple)): ''' Traverse through a list of lists or tuples and yeild the value Objective is to flatten a list of lists or tuples :param o: list of lists or not :type o: list, tuple, scalar :param tree_types: trees to travers :type tree_types: tuple :return: value in the list or tuple :rtype: scalar ''' if isinstance(o, tree_types): for value in o: for subvalue in _traverse(value, tree_types): yield subvalue else: yield o def create_dependency(dep_condition=None, dep=None): ''' create a compound dependency given a list of dependendies, and compounding condition the list of dependencies are created using add_dependency :param dep_condition: dependency condition :type dep_condition: boolean e.g. and, or, true, false :param dep: dependency :type dep: str or list :return: Rocoto compound dependency :rtype: list ''' dep = dep if isinstance(dep, list) else [dep] strings = [] if dep_condition is not None: strings.append('<%s>' % dep_condition) if dep[0] is not None: for d in dep: if dep_condition is None: strings.append('%s' % d) else: for e in _traverse(d): strings.append('\t%s' % e) if dep_condition is not None: strings.append('' % dep_condition) return strings def create_envar(name=None,value=None): ''' create an Rocoto environment variable given name and value returns the environment variable as a string :param name: name of the environment variable :type name: str :param value: value of the environment variable :type value: str or float or int or unicode :return: Rocoto environment variable key-value pair :rtype: str ''' string = '' string += '' string += '%s' % name string += '%s' % str(value) string += '' return string