#!/usr/bin/env python ##!/usr/bin/env python3 # Alex Richert, 20200715: Python 3-ification; added COMPATH and RSYNC_OPTIONS to variables_to_copy # v1.1.0 Kit Menlove 2016-12-21 # v1.1.1 Kit Menlove 2017-06-08 Ignore (do not copy) nodes that have a defstatus of complete. ##from sys import argv, exit, version_info ##assert version_info.major==3 import os from datetime import date, timedelta, datetime from ecflow import * from subprocess import Popen, PIPE import argparse families_to_copy = ('wcoss_1', 'wcoss_2', 'wcoss_eval') nodes_to_not_copy = ('/ecf_scripts', '/cfs/recovery', '/hrrr/nwges', '/opsprod', '/opspara', '/opstest', '/swmf', '/canned', '/logs', '/output', '/deps', '/idsb') parser = argparse.ArgumentParser(description='Add one or more dated catchup families to the WCOSS transfers to assist in transferring past data.') parser.add_argument('-e', '--envir', metavar='envir', choices=('prod', 'para', 'test'), default='prod', help="Environment of the transfer jobs to catch up on (prod, para, test, or all) (default: prod)") parser.add_argument('-d', '--days', metavar="number_of_days", type=int, help="Number of previous days to catch up") parser.add_argument('-c', '--chronological', action='store_true', help="Order the dates chronologically instead of reverse chronologically") parser.add_argument('-f', '--force', action='store_true', help="Do not ask for confirmation") parser.add_argument('dates', nargs='*', help="Specific date(s) in YYYYMMDD format to catch up") args = parser.parse_args() #with open("/lfs/h1/ops/prod/config/hostfile") as f: # ecfhost = f.readline().rstrip() #if args.envir == 'prod': # ecfport=31415 #else: # ecfport=14142 #print("ECF_HOST " + ecfhost) #print(ecfport) ##print(args.port) sourcepath = '/'+args.envir+'/primary/cron/transfer/v2.4/' destpath = '/'+args.envir+'/primary/cron/transfer/v2.4/' transfer_job_limit = 20 # Set to 0 to disable limits check_job_limit = 12 # Set to 0 to disable limits # Don't want CPUs in the check_viable scripts; do we need to copy the 'ECF_SCRIPT_CMD'? variables_to_copy = ('ECF_JOB_CMD','QUEUE','MACHINE_PHASE','PACKAGE','RSYNC_DIRECTION','CPUS','COMPATH','RSYNC_OPTIONS') task_meter = Meter("P", 0, 100, 100) node_autocancel = Autocancel(1) if transfer_job_limit > 0: transfers_limit = Limit("catchup_jobs", transfer_job_limit) transfers_inlimit = InLimit("catchup_jobs") else: transfers_limit = None transfers_inlimit = None if check_job_limit > 0: check_limit = Limit("check_jobs", check_job_limit) check_inlimit = InLimit("check_jobs") else: check_limit = None check_inlimit = None # Function that copies the variables in the variables_to_copy tuple def copy_variables(source_node, dest_node): for var in source_node.variables: if var.name() in variables_to_copy or var.name().endswith('_ver'): dest_node.add_variable(var.name(), source_node.find_variable(var.name()).value()) # Function that copies a source_node, and by recursion all its descendent nodes, to a # dest_node def copy_node(source_node, dest_node, task_meter=None, task_repeat=None, task_inlimit=None): for child_node in source_node.nodes: if child_node.get_abs_node_path().endswith(nodes_to_not_copy): continue elif child_node.get_defstatus() == DState.complete: continue elif isinstance(child_node, Family): dest_fam = Family(child_node.name()) copy_variables(child_node, dest_fam) dest_node.add_family(dest_fam) copy_node(child_node, dest_fam, task_meter, task_repeat, task_inlimit) elif isinstance(child_node, Task): dest_task = Task(child_node.name()) if task_meter: dest_task.add_meter(task_meter) if task_repeat: dest_task.add_repeat(task_repeat) if child_node.get_trigger(): new_trigger = child_node.get_trigger().get_expression() dest_task.add_trigger(new_trigger.replace(sourcepath, destpath)) copy_variables(child_node, dest_task) if task_inlimit: dest_task.add_inlimit(task_inlimit) dest_task.add_label("l", '') dest_node.add_task(dest_task) else: print("ERROR! handling " + child_node.name() + " of type " + type(child_node)) continue # Function that gets the number of days to catch up from the user, if it has not been # specified on the command line, and verify that the dates are correct def get_number_of_catchup_days(): while True: try: number_of_catchup_days = int(input("Please enter the number of days to catch up: ")) if 1 <= number_of_catchup_days <= 14: return number_of_catchup_days else: print("You have entered an invalid number of days to catch up. Please try again.") except ValueError: print("You have entered an invalid number of days to catch up. Please try again.") ## Check if the user entered a valid number of catchup days via the command line # datetime.strptime(argv[1], '%Y%m%d') # number_of_catchup_days = 1 # except ValueError: # print ("The date you entered is not valid. Please enter a date in YYYYMMDD format.") # exit() if args.dates: number_of_catchup_days = len(args.dates) elif args.days: number_of_catchup_days = args.days else: number_of_catchup_days = get_number_of_catchup_days() # Get the number of catchup days, if it was not entered on the command line, and the # direction of the catchup (chronological or reverse chronological) while True: print("The following days will be added to " + destpath + ":") if args.chronological: days_back_list = range(number_of_catchup_days, 0, -1) else: days_back_list = range(1, number_of_catchup_days+1) if args.dates: catchup_dates = args.dates else: catchup_dates = list() for days_back in days_back_list: catchup_dates.append( ( date.today() - timedelta(days=days_back) ).strftime("%Y%m%d") ) for PDY in catchup_dates: print("\t" + PDY) if args.force: break response = input("Is this correct? (Y/N) ") if response.upper() == 'Y' or response.lower() == 'yes': break else: args.dates = None number_of_catchup_days = get_number_of_catchup_days() task_repeat = None try: # Create objects to interface with ecFlow cli = Client() # cli.set_host_port(ecfhost,ecfport) cli.get_server_defs() defs = cli.get_defs() newdef = Defs() # Create the new catchup suite and add appropriate ecFlow variables destpath_parts = destpath.strip('/').split('/') destsuite = newdef.add_suite(destpath_parts.pop(0)) destroot = destsuite for destfam in destpath_parts: destroot = destroot.add_family(destfam) for sourcevar in defs.find_abs_node(sourcepath.rstrip('/')).variables: destroot.add_variable(sourcevar) destroot.add_autocancel(node_autocancel) if transfers_limit: destroot.add_limit(transfers_limit) if check_limit: destroot.add_limit(check_limit) suspend_list = list() for catchup_date in catchup_dates: date_fam = destroot.add_family(catchup_date) date_fam.add_variable("PDY", catchup_date) date_fam.add_autocancel(node_autocancel) # Copy all of the transfer families from the transfer suite to the catchup suite for family_name in families_to_copy: destfam = date_fam.add_family(family_name) sourcefam = defs.find_abs_node(sourcepath+family_name) if sourcefam == None: print ("ERROR! " + sourcepath+family_name + " not found! Please check the families_to_copy list within the script") copy_node(sourcefam, destfam, task_meter, task_repeat, transfers_inlimit) for family_var in sourcefam.variables: destfam.add_variable(family_var) suspend_list.append(destfam.get_abs_node_path()) # Add the chk_viable jobs to the suite check_viable_backup_family = date_fam.add_family("check") for family_name in families_to_copy: destfam = check_viable_backup_family.add_family(family_name) sourcefam = defs.find_abs_node(sourcepath+family_name) if sourcefam == None: print ("ERROR! " + sourcepath+family_name + " not found! Please check the families_to_copy list within the script") copy_node(sourcefam, destfam, None, None, check_inlimit) for family_var in sourcefam.variables: if family_var.name() != "ECF_SCRIPT_CMD": destfam.add_variable(family_var) destfam.add_variable("ECF_SCRIPT_CMD", "%PACKAGEHOME%/ecf/check.ecf.sh %ECF_NAME% %MACHINE_PHASE:%") suspend_list.append(check_viable_backup_family.get_abs_node_path()) # Add the catchup suite to the graph if it does not exist. If it does exist, add the new date families. dest_node = defs.find_abs_node(destpath.rstrip('/')) if dest_node: if not dest_node.is_suspended(): cli.suspend(destpath) for catchup_date in catchup_dates: cli.replace(destpath+catchup_date, destsuite.get_defs(), True, True) else: cli.load(destsuite.get_defs()) # Suspend all of the families under the top level cli.suspend(suspend_list) if dest_node: # If the node wasn't suspended when this script was started, resume it if not dest_node.is_suspended(): cli.resume(destpath) else: cli.begin_suite(destpath.rstrip('/')) # print("Setup of catchup jobs in " + destpath + " on " + ecfhost + "(" + str(ecfport) + ")" + " is complete.") print("Setup of catchup jobs in " + destpath + " is complete.") except RuntimeError as e: print(str(e))