################################## # # A collection of functions for building the various # NWM model configurations # ################################## from ecflow import Defs, Suite, Family, Task, Edit, Trigger from os.path import join def create_forcings_family(cycle, domains, coastal=True, member=None): """ Create a family of forcing tasks, including coastal forcings if desired. Forcing tasks are created for all domains and coastal domains for the requested cycle :param cycle: The model cycle (e.g. analysis, short_range, etc) :param domains: The domains configuration dict :param coastal: Include coastal domains (default is True) :param member: Ensemble member. Default is None (no ensemble) """ forcings_family = Family("forcings", Edit(NWM_CYCLE=cycle, NWM_ENSEMBLE_MEM="" if not member else member)) nwm_cycle = cycle if member is None else f"{cycle}_mem{member}" for domain in domains: if nwm_cycle not in domains[domain]['cycle_length']: continue configs_dir = get_param_value(domains,[domain,'forcing_configs_dir',nwm_cycle]) params_dir = get_param_value(domains,[domain,'forcing_params_dir',nwm_cycle]) input_dir = get_param_value(domains,[domain,'forcing_input_dir',nwm_cycle]) archive_dir = get_param_value(domains,[domain,'forcing_archive_dir',nwm_cycle]) domain_dir = get_param_value(domains,[domain,'domain_dir',nwm_cycle]) geogrid_file = get_param_value(domains,[domain,'geogrid_file',nwm_cycle]) spatial_metadata_file = get_param_value(domains,[domain,'spatial_metadata_file',nwm_cycle]) cycle_length = get_param_value(domains,[domain,'cycle_length',nwm_cycle]) nwm_family = Family(domain, Edit(NWM_DOMAIN=domain), Edit( FORCING_ARCHIVE_DIR=archive_dir, DOMAIN_DIR=domain_dir, GEOGRID_FILE=geogrid_file, SPATIAL_METADATA_FILE=spatial_metadata_file, FORCING_CONFIGS_DIR=configs_dir, FORCING_PARAMS_DIR=params_dir, FORCING_INPUT_DIR=input_dir, LENGTH_HRS=cycle_length, WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'fe']), NUM_COMPUTE_NODES=get_param_value(domains,[domain,'num_compute_nodes',nwm_cycle,'fe']), MEMORY_GB=get_param_value(domains,[domain,'memory_gb',nwm_cycle,'fe']), NUM_CPUS=get_param_value(domains,[domain,'num_cpus',nwm_cycle,'fe']) ) ) nwm_family += Task("nwm_forcings") forcings_family += nwm_family if coastal: for coastal_domain in get_param_value(domains,[domain,'coastal']): coastal_family = Family("coastal_%s" % coastal_domain, Edit(COASTAL_DOMAIN=coastal_domain, COASTAL_FORCING_JOBCOUNT=get_param_value(domains,[domain,'coastal_forcings_jobcount',nwm_cycle]), WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'fe']), NUM_COMPUTE_NODES=get_param_value(domains,[domain,'num_compute_nodes',nwm_cycle,'fe']), MEMORY_GB=get_param_value(domains,[domain,'memory_gb',nwm_cycle,'fe']), NUM_CPUS=get_param_value(domains,[domain,'num_cpus',nwm_cycle,'fe']) ) ) coastal_family += Task("nwm_forcings_coastal", Trigger("../nwm_forcings == complete"), Edit(COASTAL_WORK_DIR=join('coastal', coastal_domain))) nwm_family += coastal_family return forcings_family ###################### Model families ################################## def create_model_family(cycle, model_exe, tables_dir, domains, coastal=True, useda=True, requiresCycle=None, restartCycle=None, member=None, forcingsCycle=None): """ Create a family of tasks for running the NWM. :param cycle: The model cycle (e.g. analysis, short_range, etc) :param model_exe: The path to the model executable :param tables_dir: The directory containg model .TBL files :param domains: The domains configuration dict :param coastal: If True, include coastal tasks. Default is True :param useda: If True, use Data Assimilation. Default is True :param requiresCycle: If not None, set up a trigger to wait for the required cycle's NWM task to complete. Default is None :param restartCycle: The name of the cycle to get RESTART files from. If None, use RESTARTS from the current cycle. Default is None :param forcingsCycle: Forcings are used from this cycle instead of the current cycle. Default is None (use forcings from the current cycle) :param member: Ensemble member. Default is None (no ensemble) """ if forcingsCycle is None: forcingsCycle = cycle + ("" if member is None else f"_mem{member}") if restartCycle is None: restartCycle = "" nwm_cycle = cycle if member is None else f"{cycle}_mem{member}" model_family = Family("nwm_model", Edit( MODEL_EXECUTABLE=model_exe, NWM_RESTART_CYCLE=restartCycle, NWM_CYCLE=cycle, NWM_BASE_CYCLE=forcingsCycle, USE_DA="true" if useda else "false", NWM_ENSEMBLE_MEM="" if not member else member, TIMESLICE_ARCHIVE_DIR="" ) ) for domain in domains: if nwm_cycle not in domains[domain]['cycle_length']: continue cycle_length = get_param_value(domains,[domain,'cycle_length',nwm_cycle]) domain_dir = get_param_value(domains,[domain,'domain_dir']) nwm_family = Family(domain, Edit( DOMAIN_DIR=domain_dir, NWM_DOMAIN=domain, MODEL_TABLES_DIR=tables_dir, HYDRO_RESTART="", HRLDAS_RESTART="", LENGTH_HRS=cycle_length, WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'model']), NUM_COMPUTE_NODES=get_param_value(domains,[domain,'num_compute_nodes',nwm_cycle,'model']), MEMORY_GB=get_param_value(domains,[domain,'memory_gb',nwm_cycle,'model']), NUM_CPUS=get_param_value(domains,[domain,'num_cpus',nwm_cycle,'model']) ) ) if requiresCycle: nwm_family += Trigger(f"../../{requiresCycle}/nwm_model/{domain}/nwm_model == complete") nwm_family += Task("nwm_model", Trigger(f"../../../{forcingsCycle}/forcings/{domain}/nwm_forcings == complete")) model_family += nwm_family if coastal and useda: # Coastal only used on DA runs for coastal_domain in get_param_value(domains,[domain,'coastal']): nwm_family += create_coastal_family(domain, domains, coastal_domain, cycle, forcingsCycle, restartCycle, member) return model_family def create_coastal_family(domain, domains, coastal_domain, cycle, forcingsCycle, restartCycle, member=None): """ Create coastal tasks for the given domain and cycle :param domain: The name of the NWM domain :param domains: The domains configuration dict :param coastal_domain: The name of NWM coastal domain :param cycle: The name of the model cycle :param forcingsCycle: The name of the cycle used for forcings :param restartCycle: The name of the cycle to get RESTART files from. If None, use RESTARTS from the current cycle. Default is None :param member: Ensemble member. Default is None (no ensemble) """ nwm_cycle = cycle if member is None else f"{cycle}_mem{member}" coastal_job_dir = join('coastal', coastal_domain) # don't run until the base NWM cycle and base coastal forcings are complete trigger_exp = f"../../../{forcingsCycle}/forcings/{domain}/coastal_{coastal_domain}/nwm_forcings_coastal == complete and " + \ f"nwm_model == complete and ../../output/{domain}/push_output_model == complete" # wait for the coastal model to complete in the restart cycle too, so we have a restart, if this is a forecast if restartCycle != "": trigger_exp += f" and ../../../{restartCycle}/output/{domain}/{coastal_domain}/push_output_coastal == complete" else: restartCycle = nwm_cycle coastal_family = Family(f"nwm_coastal_{coastal_domain}", Edit(COASTAL_DOMAIN=coastal_domain, COASTAL_WORK_DIR=coastal_job_dir, NWM_RESTART_CYCLE=restartCycle, NWM_BASE_CYCLE=forcingsCycle, COASTAL_ESTOFS_FILE="", COASTAL_PSURGE_FILE="", REGRIDDED_ESTOFS_FILE=""), Trigger(trigger_exp) ) coastal_family += Task("nwm_coastal_update_params") coastal_family += Task("nwm_coastal_regrid_estofs", Edit(WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'coastal','regrid_estofs']))) coastal_family += Task("nwm_coastal_merge_etsofs_psurge", Edit(WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'coastal','merge_psurge'])), Trigger("nwm_coastal_regrid_estofs == complete")) coastal_family += Task("nwm_coastal_initial_discharge", Edit(WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'coastal','initial_discharge'])), Trigger("../nwm_model == complete")) coastal_family += Task("nwm_coastal_combine_sink_source", Edit(WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'coastal','combine_sink_source'])), Trigger("nwm_coastal_initial_discharge == complete")) coastal_family += Task("nwm_coastal_merge_source_sink", Edit(WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'coastal','merge_source_sink'])), Trigger("nwm_coastal_combine_sink_source == complete")) coastal_family += Task("nwm_coastal", Edit( WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'schism']), NUM_COMPUTE_NODES=get_param_value(domains,[domain,'num_compute_nodes',nwm_cycle,'schism']), MEMORY_GB=get_param_value(domains,[domain,'memory_gb',nwm_cycle,'schism']), NUM_CPUS=get_param_value(domains,[domain,'num_cpus',nwm_cycle,'schism']) ), Trigger("nwm_coastal_merge_source_sink == complete and " "nwm_coastal_merge_etsofs_psurge == complete and " "nwm_coastal_update_params == complete")) coastal_family += Task("nwm_coastal_filter_data", Edit(WALLTIME=get_param_value(domains,[domain,'walltime',nwm_cycle,'coastal','filter_data'])), Trigger("nwm_coastal == complete")) return coastal_family def create_output_family(cycle,domains,member=None,coastal=True): """ Create data output tasks :param cycle: The name of the model cycle :param domains: The domains configuration dict :param member: Ensemble member. Default is None (no ensemble) :param coastal: If True, include coastal output. Default is True """ output_family = Family("output", Edit(NWM_CYCLE=cycle, NWM_ENSEMBLE_MEM="" if not member else member)) nwm_cycle = cycle if member is None else f"{cycle}_mem{member}" for domain in domains: length_hrs = get_param_value(domains, [domain, 'cycle_length', nwm_cycle]) if length_hrs is None: continue #if nwm_cycle not in domains[domain]['cycle_length']: # continue nwm_family = Family(domain, Edit( NWM_DOMAIN=domain, NWM_ENSEMBLE_MEM="" if not member else member, LENGTH_HRS=length_hrs ) ) nwm_family += Task("push_output_model", Trigger(f"../../nwm_model/{domain}/nwm_model == complete")) # Add any coastal output if coastal: for c in get_param_value(domains,[domain,'coastal']): nwm_family += Family(c, Edit(COASTAL_DOMAIN=c), Task(f"push_output_coastal", Trigger(f"../../../nwm_model/{domain}/nwm_coastal_{c} == complete") ) ) output_family += nwm_family return output_family def get_param_value(params, levels): """ Get the value of a parameter at whatever level of the parameter tree is defined, or None. Wildcards '*' and partial wildcards (e.g. 'analysis*') may be used in the params definition. :param params: The parameters dictionary :param levels: The key to use at each tree level """ val = params for t in levels: if not isinstance(val, dict): return val found = False if t in val: val = val[t] found = True if not found: for v in val: if '*' in v and v != '*': if v.replace('*',"") in t: val = val[v] found = True break if not found and '*' in val: val = val['*'] found = True if not found: return None return val