# Top level program to call the MPE workflow for generating hourly MPE # analysis from STAGE IV products on NCEP's WCOSS servers. This workflow # requires at a minimum the following Python packages: # 1.) sqlite3 # 2.) netCDF4 # # When the program is initialized, it will first search for a # sqlite file that contains information on the status of processing # for each time step over the past XXXX days. If this sqlite file # is not found, the program will assume it's either been removed, # or a new one needs to be created. # Logan Karsten # National Center for Atmospheric Research # Research Applications Laboratory # karsten@ucar.edu # 303-497-2693 # Import necessary libraries/packages import os import sys import datetime import numpy as np import argparse import datetime import logging import warnings warnings.filterwarnings("ignore") # Set the Python path to include package specific functions. prPath = os.path.realpath(__file__) pathSplit = prPath.split('/') workPath = '/' libPath = '/' for j in range(1,len(pathSplit)-1): workPath = workPath + pathSplit[j] + '/' libPath = libPath + pathSplit[j] + '/' libPath = libPath + 'lib/Python' sys.path.insert(0,libPath) #import dbMod import statusMod import errMod import dateMod import ioMod import mpeMod import ESMF_regrid import nwmGeospatialMod def main(argv): # Parse arguments. User must input a job name. parser = argparse.ArgumentParser(description='Main program to generate NWM MPE hourly files') parser.add_argument('--dateLookBack',metavar='dateLookBack',type=str,nargs='?', help='Optional date (YYYYMMDDHH format) for lookback. Used for retrospective generation') parser.add_argument('--numHoursLookBack',metavar='numHoursLookBack',type=int,nargs='?', help='Optional number of hours to look back to override what is in mpe.config') parser.add_argument('--wk',metavar='tmpDir',type=str,nargs='?', help='Working Directory holds MPE.config file') # Process optional arguments args = parser.parse_args() # Initialize our optional values. optDateEnd = None optNumHoursBack = None if args.dateLookBack: try: optDateEnd = datetime.datetime.strptime(args.dateLookBack,'%Y%m%d%H') except: print("Invalid optional look-back date of: " + args.dateLookBack + " passed to main program.") sys.exit(1) if args.numHoursLookBack: if args.numHoursLookBack <= 0: print("Optional lookback hours must be greater than 0.") sys.exit(1) else: optNumHoursBack = int(args.numHoursLookBack) # First establish expected paths to the sqlite3 file and # configuration file. configPath = workPath + "MPE.config" if args.wk: if not os.path.exists( args.wk ): print("wkDir: " + args.wk + " does not exist!") sys.exit(1) else: configPath = args.wk + "/MPE.config" # Initialize the status object containing metadata about the workflow statusMeta = statusMod.statusMeta() # Ensure the configuration file is present. This is necessary to run the # MPE workflow. if not os.path.isfile(configPath): print("ERROR: Unable to find mandatory config file: " + configPath) sys.exit(1) else: statusMeta.configPath = configPath # Read in workflow parameters, paths, etc try: statusMeta.readConfig() except: errMod.errOutScreen(statusMeta) # Read in geospatial metadata from the NWM land metadata file. nwmGeoMeta = nwmGeospatialMod.nwmGeoMeta() try: nwmGeoMeta.readMeta(statusMeta) except: errMod.errOutScreen(statusMeta) # Initialize datetime variables for the workflow. try: dateMod.initDatesMpe(statusMeta) except: errMod.errOutScreen(statusMeta) # Re-initialize dates based on optional arguments. try: dateMod.initOptDatesMpe(statusMeta,optDateEnd,optNumHoursBack) except: errMod.errOutScreen(statusMeta) # Initialize the geoConstants object which contains global parameters. try: geoConst = ioMod.geoConstants() except: errMod.errOutScreen(statusMeta) # Create a LOCK file for this process. We will update this LOCK file with # a blank line throughout the workflow as a means to update the modification # time of the file. Upon sucessfull completion of the workflow, the LOCK # file will be removed. If there is an existing LOCK file, we will check # for it's modification time. If the modification time is less than a predefined # limit, we assume there is still a process running. If the modification time # is greater than the limit, we assume the process is no longer running. In # that instance, remove the LOCK file and crete a new one. lockPath = statusMeta.logDir + "/MPE_PROCESS.LOCK" if os.path.isfile(lockPath): print("LOCK FILE FOUND") # Determine the last time modified of this lock file. dtRunCheck = datetime.datetime.now() - datetime.datetime.fromtimestamp(os.path.getmtime(lockPath)) if dtRunCheck.seconds/60.0 < 15.0: # We are going to assume a previous process is still running on the system. print("ASSUMING PROCESS IS STILL RUNNING") print("PLEASE WAIT 15 MINUTES OR MANUALLY REMOVE: " + lockPath) sys.exit(0) else: # We are going to assume the process is no longer running and will # remove the LOCK file. print("ASSUMING PROCESS IS NO LONGER RUNNING") os.remove(lockPath) lockObj = open(lockPath,'w') lockObj.write('\"PID\"\n') lockObj.write(str(os.getpid())) lockObj.close() else: lockObj = open(lockPath,'w') lockObj.write('\"PID\"\n') lockObj.write(str(os.getpid())) lockObj.close() statusMeta.lockPath = lockPath # Loop through each accumulation period. Perform the following steps: # 1.) Read in the accumulated STAGE IV data for that period. # 2.) Read in the hourly STAGE IV data for that period. # 3.) Run checks to determine what kind of processing occurred for period. # Create a temporary array to hold status values for each hour in the accumulation # period. These values will be pulled from the DB file for when the program # is restarted. Values are also logged within the DB file as things progress. keySlot = np.empty([int(statusMeta.accDuration)],np.float) # Initialize the RFC mask grid rfcMskGrid = None for accPeriod in range(0,int(statusMeta.nLoopSteps)): # Establish the beginning and ending date for the accumulation period. statusMeta.dBegAcc = statusMeta.dInit + datetime.timedelta(seconds=(accPeriod*statusMeta.accDuration)*3600.0) statusMeta.dEndAcc = statusMeta.dInit + datetime.timedelta(seconds=((accPeriod+1)*statusMeta.accDuration)*3600.0) # Establish a log file to hold all information on the workflow for this accumulation period. statusMeta.logFile = statusMeta.logDir + "/MPE_LOG_" + statusMeta.dBegAcc.strftime('%Y%m%d%H') + \ "_" + statusMeta.dEndAcc.strftime('%Y%m%d%H') # Reset status values for this accumulation period. statusMeta.haveAccData = False statusMeta.emcAccOk = False statusMeta.useRqiMsk = False statusMeta.haveHourlyMPE = False statusMeta.flatLine = False statusMeta.useMrmsGc = False statusMeta.useHrrr = False statusMeta.useRap = False # Initialize the Log file try: errMod.initLog(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "BEGINNING OF PROCESSING. CURRENT TIME = " + statusMeta.dReal.strftime('%Y-%m-%d %H:%M') try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Initialiaze our key status array to 0.0. keySlot[:] = 0.0 # Initialize HRRR/MRMS/RAP/STAGE IV metadata (derived from GRIB files) to # None type until they get read in later. st4GridMeta = None mrmsGridMeta = None hrrrGridMeta = None rapGridMeta = None # Initialize the total accumulation grid read in from a single file. st4AccGrid = None statusMeta.haveAccData = False # Initialize our hourly Stage IV related variables st4HrlyPrecip = None st4HrlyTotalPrecip = None st4FlatInd = None # Initialize the MRMS RQI test grid to None type. mrmsOkGrid = None # Initialize the MRMS hourly grids. mrmsHrlyPrecip = None mrmsHrlyTotalPrecip = None # Initialize the output arrays to None. hrrrHrlyPrecip = None hrrrHrlyTotalPrecip = None rapHrlyPrecip = None rapHrlyTotalPrecip = None # Initialize HRAP regridded arrays to None. mrmsOkGridHrap = None mrmsHrlyPrecipHrap = None mrmsHrlyTotalPrecipHrap = None hrrrHrlyPrecipHrap = None hrrrHrlyTotalPrecipHrap = None rapHrlyPrecipHrap = None rapHrlyTotalPrecipHrap = None # Initialize the output array to be None. hourlyHrapMPEGrids = None hourlyMPEGrids = None for hourIter in range(0,int(statusMeta.accDuration)): numHoursTmp = accPeriod*statusMeta.accDuration + hourIter + 1 statusMeta.dCurr = statusMeta.dInit + datetime.timedelta(seconds=numHoursTmp*3600.0) statusMeta.statusMsg = "CURRENT DATE = " + statusMeta.dCurr.strftime('%Y-%m-%d %H') try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "BEGINNING CURRENT ACCUMULATION PERIOD = " + statusMeta.dBegAcc.strftime('%Y-%m-%d %H') try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "END CURRENT ACCUMULATION PERIOD = " + statusMeta.dEndAcc.strftime('%Y-%m-%d %H') try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # If we have previous error statuses of -0.1,-0.2, reset to 0.0 # and try to re-run the beginning of the workflow. if hourIter == 0: if float(keySlot[hourIter]) == -0.1: keySlot[:] = 0.0 if float(keySlot[hourIter]) == -0.2: keySlot[:] = 0.0 if float(keySlot[hourIter]) == -0.25: keySlot[:] = 0.0 if float(keySlot[hourIter]) == -0.3: keySlot[:] = 0.0 if keySlot[hourIter] == 0.0: # If any hours have this status, this implies Stage IV data, # along with HRRR/RAP/MRMS need to be read in, and checks need # to take place and process hourly HRAP OWP MPE grids. # if type(st4AccGrid) == type(None) and not statusMeta.haveAccData: # Try to read in the Stage IV accumulation file. This is necessary # for the workflow to continue. statusMeta.statusMsg = "GETTING ACCUMULATED STAGE IV DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: st4AccGrid,st4GridMeta = ioMod.getStage4Acc(st4GridMeta,statusMeta) keySlot[:] = 0.1 #ioMod.outputNetCDF(statusMeta,st4AccGrid,statusMeta.outDir + "/st4AccGrid.nc") except: keySlot[:] = -0.1 statusMeta.haveAccData = False st4AccGrid = None st4GridMeta = None try: errMod.logErr(statusMeta) except: errMod.errOutScreen(statusMeta) else: # This implies we have already read in the accumulation file. keySlot[:] = 0.1 if keySlot[hourIter] == 0.1: # We have successfully read in the accumulated STAGE IV file. # Next, read in the grid of RFC values on the HRAP grid # corresponding to each region. if type(rfcMskGrid) == type(None) and not statusMeta.haveRfcMsk: statusMeta.statusMsg = "GETTING RFC MASK DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: hrapXCoords,hrapYCoords,rfcVals,rfcNames,rfcMskGrid,st4GridMeta = ioMod.getRfcMsk(st4GridMeta,statusMeta) keySlot[:] = 0.2 #ioMod.outputNetCDF(statusMeta,rfcMskGrid,statusMeta.outDir + "/rfcMskGrid.nc") except: keySlot[:] = -0.2 rfcMskGrid = None statusMeta.haveRfcMsk = False try: errMod.logErr(statusMeta) except: errMod.errOutScreen(statusMeta) else: # RFC mask data has properly been read in once. keySlot[:] = 0.2 if keySlot[hourIter] == 0.2: # Calculate the percent from each RFC where the accumulated Stage IV # data is missing. # First, assign Channel Island HRAP cells to the GRIB missing value. if hourIter == 0: statusMeta.statusMsg = "ASSIGNING MISSING VALUES TO ACCUMULATED ST4 GRID FOR CHANNEL ISLANDS" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: st4AccGrid[geoConst.CICol,geoConst.CIRow] = st4GridMeta.gribNdv keySlot[:] = 0.25 except: statusMeta.errMsg = "Unable to assign missing values to Channel Island " + \ " pixel cells for: " + statusMeta.dCurr.strftime('%Y-%m-%d %H:00:00') errMod.logErr(statusMeta) keySlot[:] = -0.25 statusMeta.haveAccData = False st4AccGrid = None if keySlot[hourIter] == 0.25: # Calculate the percent of accumulation data that is missing in each RFC. # If we reach the threshold of 1%, we exit out. if hourIter == 0: statusMeta.statusMsg = "CALCULATING PERCENT MISSING FROM ACC ST4" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: noDataPercent,statusMeta = mpeMod.percentMissingInRfc(statusMeta,st4GridMeta,st4AccGrid,rfcMskGrid,rfcVals,rfcNames) keySlot[:] = 0.3 except: keySlot[:] = -0.3 statusMeta.haveAccData = False st4AccGrid = None statusMeta.emcAccOk = False try: errMod.logErr(statusMeta) except: errMod.errOutScreen(statusMeta) if keySlot[hourIter] == 0.3: # Calculate the MRMS GC RQI mask for the accumulation period. If any # issues arise, we simply continue on and keep the useMrms status # as false. Report warning messages as necessary. statusMeta.statusMsg = "CALCULATING RQI MASK FROM MRMS DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) if hourIter == 0: #try: # statusMeta,mrmsGridMeta,mrmsOkGrid = mpeMod.genRqiMask(statusMeta,mrmsGridMeta,mrmsOkGrid) # keySlot[:] = 0.4 #ioMod.outputNetCDF(statusMeta,mrmsOkGrid,statusMeta.outDir + "/rqiGrid.nc") #except: # statusMeta.statusMsg = "Insufficient MRMS GC data to generate RQI mask" statusMeta.statusMsg = "Skipped function to generate RQI mask" keySlot[:] = 0.4 # Advance status value to default to read in hourly Stage IV. statusMeta.useRqiMsk = False mrmsOkGrid = None mrmsGridMeta = None try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) if mrmsOkGrid is not None: statusMeta.statusMsg = "REGRIDDING MRMS TO HRAP USING WEIGHT FILE: " + statusMeta.mrmsWghtPath try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: mrmsOkGridHrap = ESMF_regrid.with_weights(statusMeta,mrmsOkGrid,statusMeta.mrmsWghtPath,mrmsGridMeta.gribNdv) except: statusMeta.statusMsg = "Unable to regrid MRMS to HRAP. Will not use MRMS is disagregation of MPE." keySlot[:] = 0.4 statusMeta.useRqiMsk = False mrmsOkGridHrap = None mrmsOkGrid = None mrmsGridMeta = None try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) #ioMod.outputNetCDF(statusMeta,mrmsOkGridHrap,statusMeta.outDir + "/MRMS_OkGridHrap.nc") # Reset the MRMS native grid to None to free up memory mrmsOkGrid = None if keySlot[hourIter] == 0.4: # Read in the hourly Stage IV data for each hour during the accumulation period. statusMeta.statusMsg = "READING HOURLY STAGE IV DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) if hourIter == 0: try: (statusMeta,st4GridMeta,st4HrlyTotalPrecip, st4HrlyPrecip,st4FlatInd) = mpeMod.readHourlyStageIV(statusMeta,st4GridMeta,st4AccGrid,geoConst) keySlot[:] = 0.5 #ioMod.outputNetCDF(statusMeta,st4HrlyPrecip,statusMeta.outDir + "/st4HrlyPrecip.nc") #ioMod.outputNetCDF(statusMeta,st4FlatInd,statusMeta.outDir + "/st4FlatInd.nc") except: st4HrlyPrecip = None st4GridMeta = None st4HrlyTotalPrecip = None statusMeta.haveHourlyMPE = False st4FlatInd = None keySlot[:] = 0.5 # Advance workflow to read in hourly MRMS data. statusMeta.statusMsg = "Unable to read in hourly Stage IV data. Will not use in disagregation." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) if statusMeta.haveHourlyMPE: if type(st4FlatInd) == type(None): statusMeta.flatLine = False statusMeta.statusMsg = "Unable to perform flatline test for hourly Stage IV data." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) else: statusMeta.flatLine = True if keySlot[hourIter] == 0.5: # Read in hourly MRMS GC data for each hour during the accumulation period. statusMeta.statusMsg = "READING IN MRMS GC DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) if hourIter == 0: try: (statusMeta,mrmsGridMeta,mrmsHrlyTotalPrecip, mrmsHrlyPrecip) = mpeMod.readHourlyMrmsGc(statusMeta,mrmsGridMeta) keySlot[:] = 0.6 #ioMod.outputNetCDF(statusMeta,mrmsHrlyPrecip,statusMeta.outDir + "/mrmsGcHrlyPrecip.nc") #ioMod.outputNetCDF(statusMeta,mrmsHrlyTotalPrecip,statusMeta.outDir + "/mrmsGcHrlyTotalPrecip.nc") except: statusMeta.statusMsg = "Unable to read in MRMS GC for duration. Will not use in disagregation." mrmsHrlyPrecip = None mrmsHrlyTotalPrecip = None statusMeta.useMrmsGc = False keySlot[:] = 0.6 # Advance workflow to read in hourly HRRR data. try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) if mrmsHrlyPrecip is not None: statusMeta.statusMsg = "Regridding hourly MRMS to HRAP." try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) mrmsHrlyPrecipHrap = np.empty([st4GridMeta.ny,st4GridMeta.nx,statusMeta.accDuration],np.float32) try: for i in range(0,statusMeta.accDuration): mrmsHrlyPrecipHrap[:,:,i] = ESMF_regrid.with_weights(statusMeta,mrmsHrlyPrecip[:,:,i], statusMeta.mrmsWghtPath,mrmsGridMeta.gribNdv) keySlot[:] = 0.6 except: statusMeta.statusMsg = "Failure to regrid hourly MRMS GC to HRAP. MRMS will not be used in disagregation." mrmsHrlyPrecip = None mrmsHrlyTotalPrecip = None mrmsHrlyPrecipHrap = None keySlot[:] = 0.6 statusMeta.useMrmsGc = False try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) #ioMod.outputNetCDF(statusMeta,mrmsHrlyPrecipHrap,statusMeta.outDir + "/MRMS_HrlyPrecipHrap.nc") if mrmsHrlyTotalPrecip is not None: statusMeta.statusMsg = "Regridding MRMS Total GC to HRAP" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: mrmsHrlyTotalPrecipHrap = ESMF_regrid.with_weights(statusMeta,mrmsHrlyTotalPrecip, statusMeta.mrmsWghtPath,mrmsGridMeta.gribNdv) keySlot[:] = 0.6 except: statusMeta.statusMsg = "Failure to regrid MRMS Total GC to HRAP. MRMS will not be used in disagregation." mrmsHrlyPrecip = None mrmsHrlyTotalPrecip = None mrmsHrlyPrecipHrap = None mrmsHrlyTotalPrecipHrap = None keySlot[:] = 0.6 statusMeta.useMrmsGc = False try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) #ioMod.outputNetCDF(statusMeta,mrmsHrlyTotalPrecipHrap,statusMeta.outDir + "/MRMS_HrlyPrecipTotalHrap.nc") # Reset temporary arrays to free up memory mrmsHrlyPrecip = None mrmsHrlyTotalPrecip = None if keySlot[hourIter] == 0.6: statusMeta.statusMsg = "READING IN HRRR DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Read in hourly HRRR data for each hour during the accumulation period. if hourIter == 0: try: (statusMeta,hrrrGridMeta,hrrrHrlyTotalPrecip, hrrrHrlyPrecip) = mpeMod.readHourlyHrrrPrecip(statusMeta,hrrrGridMeta) keySlot[:] = 0.7 #ioMod.outputNetCDF(statusMeta,hrrrHrlyPrecip,statusMeta.outDir + "/hrrrHrlyPrecip.nc") #ioMod.outputNetCDF(statusMeta,hrrrHrlyTotalPrecip,statusMeta.outDir + "/hrrrHrlyTotalPrecip.nc") except: statusMeta.statusMsg = "Failure to regrid HRRR to HRAP. HRRR will not be used in disagregation." hrrrHrlyPrecip = None hrrrHrlyTotalPrecip = None statusMeta.useHrrr = False keySlot[:] = 0.7 # Advance workflow to read in hourly RAP data. try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) if hrrrHrlyPrecip is not None: statusMeta.statusMsg = "REGRIDDING HRRR TO HRAP DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) hrrrHrlyPrecipHrap = np.empty([st4GridMeta.ny,st4GridMeta.nx,statusMeta.accDuration],np.float32) try: for i in range(0,statusMeta.accDuration): hrrrHrlyPrecipHrap[:,:,i] = ESMF_regrid.with_weights(statusMeta,hrrrHrlyPrecip[:,:,i], statusMeta.hrrrWghtPath,hrrrGridMeta.gribNdv) keySlot[:] = 0.7 except: statusMeta.statusMsg = "Failure to regrid hourly HRRR to HRAP. HRRR will not be used in disagregation." hrrrHrlyPrecip = None hrrrHrlyTotalPrecip = None statusMeta.useHrrr = False hrrrHrlyPrecipHrap = None keySlot[:] = 0.7 try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) #ioMod.outputNetCDF(statusMeta,hrrrHrlyPrecipHrap,statusMeta.outDir + "/HRRR_HrlyPrecipHrap.nc") if hrrrHrlyTotalPrecip is not None: statusMeta.statusMsg = "REGRIDDING HRRR TOTAL TO HRAP" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: hrrrHrlyTotalPrecipHrap = ESMF_regrid.with_weights(statusMeta,hrrrHrlyTotalPrecip, statusMeta.hrrrWghtPath,hrrrGridMeta.gribNdv) keySlot[:] = 0.7 except: statusMeta.statusMsg = "Failure to regrid total HRRR to HRAP. HRRR will not be used in disagregation." hrrrHrlyPrecip = None hrrrHrlyTotalPrecip = None statusMeta.useHrrr = False hrrrHrlyPrecipHrap = None hrrrHrlyTotalPrecipHrap = None keySlot[:] = 0.7 try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) #ioMod.outputNetCDF(statusMeta,hrrrHrlyTotalPrecipHrap,statusMeta.outDir + "/HRRR_HrlyPrecipTotalHrap.nc") # Reset arrays to free up memory hrrrHrlyPrecip = None hrrrHrlyTotalPrecip = None if keySlot[hourIter] == 0.7: statusMeta.statusMsg = "READING IN RAP DATA" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) if hourIter == 0: try: (statusMeta,rapGridMeta,rapHrlyTotalPrecip, rapHrlyPrecip) = mpeMod.readHourlyRapPrecip(statusMeta,rapGridMeta) keySlot[:] = 0.8 #ioMod.outputNetCDF(statusMeta,rapHrlyPrecip,statusMeta.outDir + "/rapHrlyPrecip.nc") #ioMod.outputNetCDF(statusMeta,rapHrlyTotalPrecip,statusMeta.outDir + "/rapHrlyTotalPrecip.nc") except: statusMeta.statusMsg = "Failure to read in hourly RAP data. RAP will not be used in disagregation." rapHrlyPrecip = None rapHrlyTotalPrecip = None statusMeta.useRap = False keySlot[:] = 0.8 try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) if rapHrlyPrecip is not None: statusMeta.statusMsg = "REGRIDDING RAP TO HRAP." try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) rapHrlyPrecipHrap = np.empty([st4GridMeta.ny,st4GridMeta.nx,statusMeta.accDuration],np.float32) try: for i in range(0,statusMeta.accDuration): rapHrlyPrecipHrap[:,:,i] = ESMF_regrid.with_weights(statusMeta,rapHrlyPrecip[:,:,i], statusMeta.rapWghtPath,rapGridMeta.gribNdv) keySlot[:] = 0.8 except: statusMeta.statusMsg = "Failure to regrid hourly RAP data to HRAP. Will not use in disagregation." rapHrlyPrecip = None rapHrlyTotalPrecip = None rapHrlyPrecipHrap = None statusMeta.useRap = False keySlot[:] = 0.8 try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) #ioMod.outputNetCDF(statusMeta,rapHrlyPrecipHrap,statusMeta.outDir + "/RAP_HrlyPrecipHrap.nc") if rapHrlyTotalPrecip is not None: statusMeta.statusMsg = "REGRIDDING TOTAL RAP TO HRAP." try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: rapHrlyTotalPrecipHrap = ESMF_regrid.with_weights(statusMeta,rapHrlyTotalPrecip, statusMeta.rapWghtPath,rapGridMeta.gribNdv) keySlot[:] = 0.8 except: statusMeta.statusMsg = "Failure to regrid total RAP data to HRAP. Will not use in disagregation." rapHrlyPrecip = None rapHrlyTotalPrecip = None rapHrlyPrecipHrap = None rapHrlyTotalPrecipHrap = None statusMeta.useRap = False keySlot[:] = 0.8 try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) #ioMod.outputNetCDF(statusMeta,rapHrlyTotalPrecipHrap,statusMeta.outDir + "/RAP_HrlyPrecipTotalHrap.nc") # Reset arrays to free up memory rapHrlyPrecip = None rapHrlyTotalPrecip = None if keySlot[hourIter] == 0.8: statusMeta.statusMsg = "COMBINING PRECIPITATION INTO FINAL MPE GRIDS...." try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: hourlyHrapMPEGrids = mpeMod.combinePrecip(statusMeta,nwmGeoMeta,st4GridMeta,st4AccGrid, st4HrlyTotalPrecip,st4HrlyPrecip,st4FlatInd, mrmsOkGridHrap,mrmsHrlyPrecipHrap, mrmsHrlyTotalPrecipHrap,mrmsGridMeta, hrrrHrlyPrecipHrap,hrrrHrlyTotalPrecipHrap,hrrrGridMeta, rapHrlyPrecipHrap,rapHrlyTotalPrecipHrap,rapGridMeta,rfcVals, rfcNames,rfcMskGrid) keySlot[:] = 0.9 except: keySlot[:] = -0.9 hourlyHrapMPEGrids = None statusMeta.errMsg = "Failure to disagregate ST4/MRMS/HRRR/RAP for generation of hourly MPE grids." try: errMod.logErr(statusMeta) except: errMod.errOutScreen(statusMeta) # Reset arrays to free up memory st4AccGrid = None st4HrlyTotalPrecip = None st4HrlyPrecip = None st4FlatInd = None mrmsOkGridHrap = None mrmsHrlyPrecipHrap = None mrmsHrlyTotalPrecipHrap = None hrrrHrlyPrecipHrap = None hrrrHrlyTotalPrecipHrap = None rapHrlyPrecipHrap = None rapHrlyTotalPrecipHrap = None # Regrid the HRAP MPE data to the NWM grid. if keySlot[hourIter] == 0.9: #if hourIter == 0: #hourlyMPEGrids = np.empty([nwmGeoMeta.ny,nwmGeoMeta.nx,statusMeta.accDuration],np.float32) statusMeta.statusMsg = "Regridding HRAP MPE to NWM." try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: hourlyMPEGrid = ESMF_regrid.with_weights(statusMeta,hourlyHrapMPEGrids[:,:,hourIter], statusMeta.hrapNwmWghtPath,-9999) keySlot[hourIter] = 0.95 #for i in range(0,statusMeta.accDuration): # hourlyMPEGrids[:,:,i] = ESMF_regrid.with_weights(statusMeta,hourlyHrapMPEGrids[:,:,i], # statusMeta.hrapNwmWghtPath,-9999) # keySlot[i] = 0.95 except: statusMeta.errMsg = "Failure to regrid hourly MPE from HRAP to NWM." #keySlot[:] = 0.95 # We will advance here..... Set output grids to None type. In this case, # # the workflow will simply use existing forcing files. keySlot[hourIter] = -0.95 hourlyMPEGrid = None #hourlyMPEGrids = None try: errMod.logErr(statusMeta) except: errMod.errOutScreen(statusMeta) if keySlot[hourIter] == 0.95: statusMeta.statusMsg = "Creating MPE NetCDF file for hour:" + str(hourIter) + "." try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) try: ioMod.outputMpeNetCDF(statusMeta,nwmGeoMeta,hourlyMPEGrid) #ioMod.outputMpeNetCDF(statusMeta,nwmGeoMeta,hourlyMPEGrids[:,:,hourIter]) keySlot[hourIter] = 1.0 except: statusMeta.errMsg = "Failure to create output MPE NetCDF files." keySlot[hourIter] = -1.0 try: errMod.logErr(statusMeta) except: errMod.errOutScreen(statusMeta) # Reset arrays to free up memory hourlyMPEGrid = None # Reset arrays to free up memory hourlyHrapMPEGrid = None statusMeta.statusMsg = "DO WE HAVE RFC MASK? " + str(statusMeta.haveRfcMsk) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "ACCUMULATED FILE STATUS = " + str(statusMeta.haveAccData) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "DID WE FIND HOURLY STAGE 4 DATA? " + str(statusMeta.emcAccOk) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "ARE WE USING THE RQI MASK? " + str(statusMeta.useRqiMsk) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "DO WE HAVE HOURLY MPE? " + str(statusMeta.haveHourlyMPE) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "DID WE EXECUTE A FLAT LINE TEST? " + str(statusMeta.flatLine) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "ARE WE USING HOURLY MRMS GC DATA? " + str(statusMeta.useMrmsGc) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "ARE WE USING HRRR? " + str(statusMeta.useHrrr) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "ARE WE USING RAP? " + str(statusMeta.useRap) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "----------------------------------------------------" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.statusMsg = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Close the Log file try: errMod.closeLog(statusMeta) except: errMod.errOutScreen(statusMeta) # Remove the LOCK file for a graceful exit. if os.path.isfile(lockPath): try: os.remove(lockPath) except: print("ERROR: Unable to remove LOCK file: " + lockPath) sys.exit(1) if __name__ == "__main__": main(sys.argv[1:])