#!/usr/bin/env python3 """ Parse a host-model registry XML file and return the captured variables. """ # CCPP framework imports from metavar import VarDictionary from ddt_library import VarDDT, DDTLibrary from parse_tools import ParseContext, CCPPError, ParseInternalError from parse_tools import context_string from parse_tools import FORTRAN_SCALAR_REF_RE ############################################################################### class HostModel(VarDictionary): """Class to hold the data from a host model""" def __init__(self, meta_tables, name_in, run_env): """Initialize this HostModel object. is a list of parsed host metadata tables. is the name for this host model. is the CCPPFrameworkEnv object for this framework run. """ self.__name = name_in self.__var_locations = {} # Local name to module map self.__loop_vars = None # Loop control vars in interface calls self.__used_variables = None # Local names which have been requested self.__deferred_finds = None # Used variables that were missed at first self.__run_env = run_env # First, process DDT headers meta_headers = list() for sect in [x.sections() for x in meta_tables.values()]: meta_headers.extend(sect) # end for # Initialize our dictionaries # Initialize variable dictionary super().__init__(self.name, run_env) self.__ddt_lib = DDTLibrary('{}_ddts'.format(self.name), run_env, ddts=[d for d in meta_headers if d.header_type == 'ddt']) self.__ddt_dict = VarDictionary("{}_ddt_vars".format(self.name), run_env, parent_dict=self) # Now, process the code headers by type self.__metadata_tables = meta_tables for header in [h for h in meta_headers if h.header_type != 'ddt']: title = header.title if run_env.logger is not None: msg = 'Adding {} {} to host model' run_env.logger.debug(msg.format(header.header_type, title)) # End if if header.header_type == 'module': # Set the variable modules modname = header.title for var in header.variable_list(): self.add_variable(var, run_env) lname = var.get_prop_value('local_name') self.__var_locations[lname] = modname self.ddt_lib.check_ddt_type(var, header, lname=lname) if var.is_ddt(): self.ddt_lib.collect_ddt_fields(self.__ddt_dict, var, run_env) # End if # End for elif header.header_type == 'host': if self.__name is None: # Grab the first host name we see self.__name = header.name # End if for var in header.variable_list(): self.add_variable(var, run_env) self.ddt_lib.check_ddt_type(var, header) if var.is_ddt(): self.ddt_lib.collect_ddt_fields(self.__ddt_dict, var, run_env) # End if # End for loop_vars = header.variable_list(std_vars=False, loop_vars=True, consts=False) if loop_vars: # loop_vars are part of the host-model interface call # at run time. As such, they override the host-model # array dimensions. self.__loop_vars = VarDictionary(self.name, run_env) # End if for hvar in loop_vars: std_name = hvar.get_prop_value('standard_name') if std_name not in self.__loop_vars: self.__loop_vars.add_variable(hvar, run_env) else: ovar = self.__loop_vars[std_name] ctx1 = context_string(ovar.context) ctx2 = context_string(hvar.context) lname1 = ovar.get_prop_value('local_name') lname2 = hvar.get_prop_value('local_name') errmsg = ("Duplicate host loop var for {n}:\n" " Dup: {l1}{c1}\n Orig: {l2}{c2}") raise CCPPError(errmsg.format(n=self.name, l1=lname1, c1=ctx1, l2=lname2, c2=ctx2)) # End if # End for else: errmsg = "Invalid host model metadata header type, {} ({}){}" errmsg += "\nType must be 'module' or 'host'" ctx = context_string(header.context) raise CCPPError(errmsg.format(header.title, header.header_type, ctx)) # End if # End while if self.name is None: errmsg = 'No name found for host model, add a host metadata entry' raise CCPPError(errmsg) # End if # Finally, turn on the use meter so we know which module variables # to 'use' in a host cap. self.__used_variables = set() # Local names which have been requested self.__deferred_finds = set() # Used variables that were missed at first @property def name(self): """Return the host model name""" return self.__name @property def loop_vars(self): """Return this host model's loop variables""" return self.__loop_vars @property def ddt_lib(self): """Return this host model's DDT library""" return self.__ddt_lib # XXgoldyXX: v needed? @property def constituent_module(self): """Return the name of host model constituent module""" return "{}_ccpp_constituents".format(self.name) # XXgoldyXX: ^ needed? def argument_list(self, loop_vars=True): """Return a string representing the host model variable arg list""" args = [v.call_string(self) for v in self.variable_list(loop_vars=loop_vars, consts=False)] return ', '.join(args) def metadata_tables(self): """Return a copy of this host models metadata tables""" return dict(self.__metadata_tables) def host_variable_module(self, local_name): """Return the module name for a host variable""" if local_name in self.__var_locations: return self.__var_locations[local_name] # End if return None def variable_locations(self): """Return a set of module-variable and module-type pairs. These represent the locations of all host model data with a listed source location (variables with no source are omitted).""" varset = set() lnames = self.prop_list('local_name') # Attempt to realize deferred lookups if self.__deferred_finds is not None: for std_name in list(self.__deferred_finds): var = self.find_variable(standard_name=std_name) if var is not None: self.__deferred_finds.remove(std_name) # End if # End for # End if # Now, find all the used module variables for name in lnames: module = self.host_variable_module(name) used = self.__used_variables and (name in self.__used_variables) if module and used: varset.add((module, name)) # No else, either no module or a zero-length module name # End if # End for return varset def find_variable(self, standard_name=None, source_var=None, any_scope=False, clone=None, search_call_list=False, loop_subst=False): """Return the host model variable matching or None If is True, substitute a begin:end range for an extent. """ my_var = super().find_variable(standard_name=standard_name, source_var=source_var, any_scope=any_scope, clone=clone, search_call_list=search_call_list, loop_subst=loop_subst) if my_var is None: # Check our DDT library if standard_name is None: if source_var is None: emsg = ("One of or " + "must be passed.") raise ParseInternalError(emsg) # end if standard_name = source_var.get_prop_value('standard_name') # end if # Since we are the parent of the DDT library, only check that dict my_var = self.__ddt_dict.find_variable(standard_name=standard_name, any_scope=False) # End if if loop_subst: if my_var is None: my_var = self.find_loop_subst(standard_name) # End if if my_var is not None: # If we get here, the host does not have the requested # variable but does have a replacement set. Create a new # variable to use to send to suites. ##XXgoldyXX: This cannot be working since find_loop_subst ## returns a tuple new_name = self.new_internal_variable_name(prefix=self.name) ctx = ParseContext(filename='host_model.py') new_var = my_var.clone(new_name, source_name=self.name, source_type="HOST", context=ctx) self.add_variable(new_var, self.__run_env) my_var = new_var # End if # End if if my_var is None: if self.__deferred_finds is not None: self.__deferred_finds.add(standard_name) # End if elif self.__used_variables is not None: lname = my_var.get_prop_value('local_name') # Try to add any index references (should be method?) imatch = FORTRAN_SCALAR_REF_RE.match(lname) if imatch is not None: vdims = [x.strip() for x in imatch.group(2).split(',') if ':' not in x] for vname in vdims: _ = self.find_variable(standard_name=vname) # End for # End if if isinstance(my_var, VarDDT): lname = my_var.get_parent_prop('local_name') # End if self.__used_variables.add(lname) # End if return my_var def add_variable(self, newvar, run_env, exists_ok=False, gen_unique=False, adjust_intent=False): """Add if it does not conflict with existing entries. For the host model, this includes entries in used DDT variables. If is True, attempting to add an identical copy is okay. If is True, a new local_name will be created if a local_name collision is detected. if is True, adjust conflicting intents to inout.""" standard_name = newvar.get_prop_value('standard_name') cvar = self.find_variable(standard_name=standard_name, any_scope=False) if cvar is None: # Check the DDT dictionary cvar = self.__ddt_dict.find_variable(standard_name=standard_name, any_scope=False) # end if if cvar and (not exists_ok): emsg = "Attempt to add duplicate host model variable, {}{}." emsg += "\nVariable originally defined{}" ntx = context_string(newvar.context) ctx = context_string(cvar.context) raise CCPPError(emsg.format(standard_name, ntx, ctx)) # end if # No collision, proceed normally super().add_variable(newvar=newvar, run_env=run_env, exists_ok=exists_ok, gen_unique=gen_unique, adjust_intent=False) def add_host_variable_module(self, local_name, module, logger=None): """Add a module name location for a host variable""" if local_name not in self.__var_locations: if logger is not None: emsg = 'Adding variable, {}, from module, {}' logger.debug(emsg.format(local_name, module)) # End if self.__var_locations[local_name] = module else: emsg = "Host variable, {}, already located in module" raise CCPPError(emsg.format(self.__var_locations[local_name])) # End if def call_list(self, phase): "Return the list of variables passed by the host model to the host cap" hdvars = list() loop_vars = phase == 'run' for hvar in self.variable_list(loop_vars=loop_vars, consts=False): lname = hvar.get_prop_value('local_name') if self.host_variable_module(lname) is None: hdvars.append(hvar) # End if # End for return hdvars ############################################################################### if __name__ == "__main__": # pylint: disable=ungrouped-imports from parse_tools import init_log, set_log_to_null import doctest import sys # pylint: enable=ungrouped-imports _LOGGER = init_log('host_registry') set_log_to_null(_LOGGER) # First, run doctest fail, _ = doctest.testmod() sys.exit(fail) # end if