# This is a module file designed to store MPE-related functions for use # in the MPE workflow. # Logan Karsten # National Center for Atmospheric Research # Research Applications Laboratory # karsten@ucar.edu # 303-497-2693 import numpy as np import datetime import os import gribMod2 import shutil import gzip import errMod def percentMissingInRfc(statusMeta,gridMeta,st4Grid,rfcMsk,rfcVals,rfcNames): """ Function designed to calculate the percent of each RFC region that contains missing data from the Stage IV dataset being evaluated. Function will return an array with a length equal to the number of RFC regions. """ numRegions = len(rfcVals) ndvCheck = gridMeta.gribNdv # Loop through each RFC region and calculate the number of pixel # cells for the region. Then, calculate the number of valid data pixel # cells within the region. Finally, calculate the percentage of the # the RFC region that contains valid data. regionCount = np.empty([numRegions],np.int) noDataCount = np.empty([numRegions],np.int) noDataPercent = np.empty([numRegions],np.float) for rfc in range(0,numRegions): rfcValTmp = rfcVals[rfc] indRfc = np.where(rfcMsk == rfcValTmp) if len(indRfc[0]) == 0: statusMeta.errMsg = "RFC Mask contains no valid pixel cells " + \ " for RFC region: " + rfcNames[rfc] raise Exception() else: regionCount[rfc] = len(indRfc[0]) indNdv = np.where((rfcMsk == rfcValTmp) & (st4Grid == ndvCheck)) noDataCount[rfc] = len(indNdv[0]) noDataPercent[rfc] = (float(noDataCount[rfc])/float(regionCount[rfc]))*100.0 if noDataPercent[rfc] > 1.0: statusMeta.errMsg = "Found greater than 1.0 percent missing data " + \ " from RFC: " + rfcNames[rfc] + " for accumulation " + \ " timestep: " + statusMeta.dEndAcc.strftime('%Y-%m-%d %H:00:00') statusMeta.emcAccOk = False raise Exception() statusMeta.emcAccOk = True return noDataPercent,statusMeta def genRqiMask(statusMeta,gridMeta,mrmsOkGrid): """ Function to loop over the RQI step time in minutes specified by the user and open MRMS GC files. Each file will be checked for their RQI values and checked against the user-specified RQI threshold. This grid will be upated as files are opened with a final mask being returned to the user. """ # First calculate the total number of files (steps) we need to loop over # during the accumulation period. Set a temporary datetime object to keep # track of the time as we loop over files. numMrmsSteps = int(float(statusMeta.accDuration*60.0)/float(statusMeta.rqiStepMin)) dMrms = statusMeta.dBegAcc for mrmsStep in range(0,numMrmsSteps): # Compose the filepath to the MRMS files. if statusMeta.ncepWcoss == 1: mrmsFileCheck = statusMeta.mrmsDir + "/RadarQualityIndex/RadarQualityIndex_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2.gz' else: mrmsFileCheck = statusMeta.mrmsDir + "/RadarQualityIndex/" + dMrms.strftime('%Y') + \ "/" + dMrms.strftime('%m') + "/MRMS_RadarQualityIndex_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2.gz' if not os.path.isfile(mrmsFileCheck): statusMeta.statusMsg = "Expected MRMS file: " + mrmsFileCheck + " not found. " + \ " Moving to next MRMS timestep." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) # Increment the MRMS time by minutes dMrms = dMrms + datetime.timedelta(seconds=statusMeta.rqiStepMin*60.0) continue statusMeta.statusMsg = "RQI file being used: " + mrmsFileCheck try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # We will need to copy the gzipped file over to the scratch space, and unzip it. mrmsFileTmpGz = statusMeta.tmpDir + "/" + \ "MRMS_RadarQualityIndex_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2.gz' mrmsFileTmp = statusMeta.tmpDir + "/" + \ "MRMS_RadarQualityIndex_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2' # If the copy exists, remove it to be safe. try: if os.path.isfile(mrmsFileTmpGz): os.remove(mrmsFileTmpGz) except: statusMeta.errMsg = "Unable to remove old file: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # If the unzipped file exists, remove it to be safe. try: if os.path.isfile(mrmsFileTmp): os.remove(mrmsFileTmp) except: statusMeta.errMsg = "Unable to remove temporary GRIB file: " + mrmsFileTmp try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Copy the gzipped file over to the scratch directory try: shutil.copy(mrmsFileCheck,mrmsFileTmpGz) except: statusMeta.errMsg = "Unable to copy: " + mrmsFileCheck + " to: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Unzip the file in place. try: with gzip.open(mrmsFileTmpGz,'rb') as fTmpGz: with open(mrmsFileTmp,'wb') as fTmp: shutil.copyfileobj(fTmpGz,fTmp) except: statusMeta.errMsg = "Unable to unzip: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Remove the copy of the gzip file. try: if os.path.isfile(mrmsFileTmpGz): os.remove(mrmsFileTmpGz) except: statusMeta.errMsg = "Unable to remove old file: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # If the grid metadata hasn't been read in yet, read it in from the first file. if not gridMeta: try: gridMeta = gribMod2.gribMeta() if statusMeta.ncepWcoss == 1: gridMeta.readMeta(statusMeta,mrmsFileTmp,'RadarQualityIndex') else: gridMeta.readMeta(statusMeta,mrmsFileTmp,'0 m above mean sea level') except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Initialize two MRMS grids. One will be the grid to store 0/1 flags # for the MPE program to use in determining where to use MRMS # to dissagregate using MRMS. The second will be a temporary array # used to read in MRMS data from the GRIB files. if type(mrmsOkGrid) == type(None): mrmsOkGrid = np.empty([gridMeta.ny,gridMeta.nx],np.float64) mrmsOkGrid[:,:] = 0.0 mrmsTmpGrid = np.empty([gridMeta.ny,gridMeta.nx],np.float64) # Set the temporary grid to the GRIB NDV value mrmsTmpGrid[:,:] = gridMeta.gribNdv # Read in MRMS GC data try: if statusMeta.ncepWcoss == 1: gribMod2.getGribVariable(statusMeta,mrmsFileTmp,gridMeta,'RadarQualityIndex','RadarQualityIndex_0mabovemeansealevel','0 m above mean sea level',0,'unknown',mrmsTmpGrid) else: gribMod2.getGribVariable(statusMeta,mrmsFileTmp,gridMeta,'0 m above mean sea level','var209_8_0_0mabovemeansealevel','0 m above mean sea level',0,'unknown',mrmsTmpGrid) except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Convert no-data and no-coverage values to NDV try: indNoCoverage = np.where(mrmsTmpGrid == -3.0) indNoData = np.where(mrmsTmpGrid == -1.0) mrmsTmpGrid[indNoCoverage] = gridMeta.gribNdv mrmsTmpGrid[indNoData] = gridMeta.gribNdv except: statusMeta.errMsg = "Unable to calculate missing data coverage for: " + mrmsFileTmp try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() if mrmsStep == 0: # We are on the first step within the accumulation duration period. Calculate where the RQI value # is greater than or equal to the threshold set by the user. indContinue = np.where((mrmsTmpGrid >= statusMeta.rqiThresh) & (mrmsTmpGrid != gridMeta.gribNdv)) mrmsOkGrid[indContinue] = 1.0 else: # Determine where we have either missing data in the ok grid, or missing data in the # new grid that was just read in. Set those pixel cells to 0.0 in the ok grid. indRemove1 = np.where(mrmsOkGrid == gridMeta.gribNdv) mrmsTmpGrid[indRemove1] = gridMeta.gribNdv mrmsOkGrid[indRemove1] = 0.0 indRemove2 = np.where(mrmsTmpGrid == gridMeta.gribNdv) mrmsOkGrid[indRemove2] = 0.0 # Next, determine where the OK grid is 1.0, but we have failed to meet the RQI threshold for # this MRMS time step. Those OK values will go to 0.0 indRemove3 = np.where((mrmsTmpGrid < statusMeta.rqiThresh) & (mrmsOkGrid == 1.0)) mrmsOkGrid[indRemove3] = 0.0 # Determine where we have both the OK grid stating that we have 1.0, AND # the RQI grid is greater than or equal to the user threshold. These values # will stay 1.0 in the OK grid. indContinue = np.where((mrmsOkGrid != gridMeta.gribNdv) & (mrmsTmpGrid != gridMeta.gribNdv) & (mrmsOkGrid == 1.0) & (mrmsTmpGrid >= statusMeta.rqiThresh)) mrmsOkGrid[indContinue] = 1.0 # Increment the MRMS time by minutes dMrms = dMrms + datetime.timedelta(seconds=statusMeta.rqiStepMin*60.0) # Remove the temporary copy of the GRIB file. try: if os.path.isfile(mrmsFileTmp): os.remove(mrmsFileTmp) except: statusMeta.errMsg = "Unable to remove old file: " + mrmsFileTmp try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Set temporary arrays/index arrays to None to free up memory. mrmsTmpGrid = None indNoCoverage = None indNoData = None indContinue = None indRemove1 = None indRemove2 = None indRemove3 = None indContinue = None if type(mrmsOkGrid) == type(None): statusMeta.statusMsg = "No MRMS Files found in accumulation period. Will not use MRMS for disagregation" try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) statusMeta.useRqiMsk = False else: # All files have been read in, and an ok mask has been generated. Set the useMrms status # to True, and return the updated statusMeta, gridMeta, and calculated ok grid back to the user. statusMeta.useRqiMsk = True return statusMeta,gridMeta,mrmsOkGrid def readHourlyStageIV(statusMeta,gridMeta,st4AccDuration,geoConst): """ This is a function to read in hourly Stage IV data over the course of the accumulation duration period. """ # Ensure we don't have a none type gridMeta structure. Stage IV accumulation # data is required by the workflow, so by default we should have grid meta data. if not gridMeta: statusMeta.statusMsg + "Grid metadata for Stage IV is unexpectedly missing." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Set a temporary datetime object to iterate during the accumulation period. dSt4 = statusMeta.dBegAcc # Initialize the arrays that will be returned back to the main calling program # for further use. st4HrlyPrecip = np.empty([gridMeta.ny,gridMeta.nx,int(statusMeta.accDuration)],np.float64) st4HrlyTotalPrecip = np.empty([gridMeta.ny,gridMeta.nx],np.float64) st4FlatInd = np.empty([gridMeta.ny,gridMeta.nx],np.int) # Create arrays that will be used for the flat line test #flatTmp = np.empty([gridMeta.ny,gridMeta.nx],np.int) # Set the grids to the GRIB missing value. st4HrlyPrecip[:,:,:] = gridMeta.gribNdv st4HrlyTotalPrecip[:,:] = gridMeta.gribNdv for st4Step in range(0,int(statusMeta.accDuration)): # Iterate the datetime object. Since files represent the previous hour, we will # Iterate at the beginning of the loop. dSt4 = dSt4 + datetime.timedelta(seconds=3600.0) # Compose the path to the Stage IV hourly file. if statusMeta.ncepWcoss == 1: st4FileCheck = statusMeta.st4Dir + "/pcpanl." + dSt4.strftime('%Y%m%d') + \ "/st4_conus." + dSt4.strftime('%Y%m%d%H') + ".01h.grb2" else: st4FileCheck = statusMeta.st4Dir + "/" + dSt4.strftime('%Y') + \ "/" + dSt4.strftime('%Y%m%d') + \ "/ST4." + dSt4.strftime('%Y%m%d%H') + ".01h.gz" if not os.path.isfile(st4FileCheck): statusMeta.statusMsg = "Expected Stage IV file: " + st4FileCheck + \ " not found. Hourly Stage IV will not be used in disaggregation." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() statusMeta.statusMsg = "StageIV file being used: " + st4FileCheck try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) if statusMeta.ncepWcoss == 1: st4FileTmp2 = statusMeta.tmpDir + "/" + \ "st4_conus." + dSt4.strftime('%Y%m%d%H') + ".01h.grb2" # Copy the raw input file over to the scratch directory try: shutil.copy(st4FileCheck,st4FileTmp2) except: statusMeta.errMsg = "ERROR: Unable to copy: " + st4FileCheck + " to: " + st4FileTmp2 try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() else: # We will need to copy the gzipped file over to the scratch space, and unzip it. st4FileTmpGz = statusMeta.tmpDir + "/" + \ "ST4." + dSt4.strftime('%Y%m%d%H') + ".01h.gz" st4FileTmp = statusMeta.tmpDir + "/" + \ "ST4." + dSt4.strftime('%Y%m%d%H') + ".01h.grb" st4FileTmp2 = statusMeta.tmpDir + "/" + \ "ST4." + dSt4.strftime('%Y%m%d%H') + ".01h.grb2" # If the copy exists, remove it to be safe. try: if os.path.isfile(st4FileTmpGz): os.remove(st4FileTmpGz) except: statusMeta.errMsg = "Unable to remove old file: " + st4FileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # If the unzipped file exists, remove it to be safe. try: if os.path.isfile(st4FileTmp): os.remove(st4FileTmp) except: statusMeta.errMsg = "Unable to remove temporary GRIB file: " + st4FileTmp try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Copy the gzipped file over to the scratch directory try: shutil.copy(st4FileCheck,st4FileTmpGz) except: statusMeta.errMsg = "ERROR: Unable to copy: " + st4FileCheck + " to: " + st4FileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Unzip the file in place. try: with gzip.open(st4FileTmpGz,'rb') as fTmpGz: with open(st4FileTmp,'wb') as fTmp: shutil.copyfileobj(fTmpGz,fTmp) except: statusMeta.errMsg = "Unable to unzip: " + st4FileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Remove the copy of the gzip file. try: if os.path.isfile(st4FileTmpGz): os.remove(st4FileTmpGz) except: statusMeta.errMsg = "Unable to remove old file: " + st4FileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Convert the Stage IV GRIB1 file to GRIB2 try: gribMod2.gribOneToGribTwo(statusMeta,st4FileTmp,st4FileTmp2) except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Remove the temporary GRIB1 file. try: os.remove(st4FileTmp) except: statusMeta.errMsg = "Unable to remove temporary file: " + st4FileTmp try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Double check to make sure GRIB2 file exists. if not os.path.isfile(st4FileTmp2): statusMeta.errMsg = "Expected file: " + st4FileTmp2 + " not found." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Read in grid from STAGE IV GRIB file. try: gribMod2.getGribVariable(statusMeta,st4FileTmp2,gridMeta,'APCP','APCP_surface', 'surface',0,'kg/m^2',st4HrlyPrecip[:,:,st4Step]) except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Set the Channel Islands to missing. try: st4HrlyPrecip[geoConst.CICol,geoConst.CIRow,st4Step] = gridMeta.gribNdv except: statusMeta.errMsg = "Unable to assign Channel Islands to missing for: " + \ " Hourly Stage IV will not be used in dissagregation." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Calculate the total precipitation grid through the duration loop. if st4Step == 0: st4HrlyTotalPrecip[:,:] = st4HrlyPrecip[:,:,st4Step] else: indNdvTmp = np.where((st4HrlyTotalPrecip == gridMeta.gribNdv) | (st4HrlyPrecip[:,:,st4Step] == gridMeta.gribNdv)) indValidTmp = np.where((st4HrlyTotalPrecip != gridMeta.gribNdv) & (st4HrlyPrecip[:,:,st4Step] != gridMeta.gribNdv)) st4HrlyTotalPrecip[indNdvTmp] = gridMeta.gribNdv st4Tmp = st4HrlyPrecip[:,:,st4Step] st4HrlyTotalPrecip[indValidTmp] = st4HrlyTotalPrecip[indValidTmp] + st4Tmp[indValidTmp] # Perform the flat line test for the hourly data. try: if st4Step == 0: st4FlatInd[:,:] = 0 elif st4Step == 1: diffTmp = st4HrlyPrecip[:,:,st4Step] - st4HrlyPrecip[:,:,st4Step-1] diffTmp = np.absolute(diffTmp) indTmp = np.where((st4HrlyPrecip[:,:,st4Step] != gridMeta.gribNdv) & (st4HrlyPrecip[:,:,st4Step-1] != gridMeta.gribNdv) & (st4HrlyPrecip[:,:,st4Step] > 0.0) & (st4HrlyPrecip[:,:,st4Step-1] > 0.0) & (st4AccDuration != gridMeta.gribNdv) & (st4AccDuration > statusMeta.threshMM) & (diffTmp == 0.0)) st4FlatInd[indTmp] = 1 else: diffTmp = st4HrlyPrecip[:,:,st4Step] - st4HrlyPrecip[:,:,st4Step-1] diffTmp = np.absolute(diffTmp) indTmp = np.where((st4HrlyPrecip[:,:,st4Step] != gridMeta.gribNdv) & (st4HrlyPrecip[:,:,st4Step-1] != gridMeta.gribNdv) & (st4HrlyPrecip[:,:,st4Step] > 0.0) & (st4HrlyPrecip[:,:,st4Step-1] > 0.0) & (diffTmp == 0.0)) st4FlatInd[indTmp] = st4FlatInd[indTmp] + 1 except: st4FlatInd = None # Reset temporary arrays and index arrays to free up memory. indNdvTmp = None indValidTmp = None indValidTmp = None st4Tmp = None diffTmp = None indTmp = None # Remove the temporary copy of the GRIB file. try: if os.path.isfile(st4FileTmp2): os.remove(st4FileTmp2) except: statusMeta.errMsg = "Unable to remove old file: " + st4FileTmp2 try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # We will look for any pixel cells where Stage IV data was flat across ALL hours based on the index # grid being equal to the number of hours. indTmp = np.where(st4FlatInd != int(statusMeta.accDuration-1)) st4FlatInd[indTmp] = 0 indTmp = np.where(st4FlatInd == int(statusMeta.accDuration-1)) st4FlatInd[indTmp] = 1 # All data was successfully read in. Set the use hourly flag to True to guide the # workflow to next steps. statusMeta.haveHourlyMPE = True return statusMeta,gridMeta,st4HrlyTotalPrecip,st4HrlyPrecip,st4FlatInd def readHourlyMrmsGc(statusMeta,gridMeta): """ This is a function to read in hourly MRMS Gage-Corrected data over the course of the accumulation duration period. """ # Set a temporary datetime object to iterate during the accumulation period. dMrms = statusMeta.dBegAcc mrmsHrlyPrecip = None for mrmsStep in range(0,int(statusMeta.accDuration)): # Calculate the current time step plus one hour for file path composition. dMrmsNext = dMrms + datetime.timedelta(seconds=3600.0) # Compose the path to the MRMS hourly file. if statusMeta.ncepWcoss == 1: mrmsFileCheck = statusMeta.mrmsDir + "/MultiSensorQPE/MRMS_MultiSensor_QPE_01H_Pass2_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2.gz' else: mrmsFileCheck = statusMeta.mrmsDir + "/GaugeCorr_QPE_01H/" + dMrms.strftime('%Y') + \ "/" + dMrms.strftime('%m') + "/MRMS_GaugeCorr_QPE_01H_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2.gz' if not os.path.isfile(mrmsFileCheck): statusMeta.statusMsg = "Expected MRMS MultiSensorQPE file: " + mrmsFileCheck + \ " not found. Hourly MRMS GC will not be used in disaggregation." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() statusMeta.statusMsg = "MultiSensorQPE file being used: " + mrmsFileCheck try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # We will need to copy the gzipped file over to the scratch space, and unzip it. if statusMeta.ncepWcoss == 1: mrmsFileTmpGz = statusMeta.tmpDir + "/" + \ "MRMS_MultiSensor_QPE_01H_Pass2_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2.gz' mrmsFileTmp = statusMeta.tmpDir + "/" + \ "MRMS_MultiSensor_QPE_01H_Pass2_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2' else: mrmsFileTmpGz = statusMeta.tmpDir + "/" + \ "MRMS_GaugeCorr_QPE_01H_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2.gz' mrmsFileTmp = statusMeta.tmpDir + "/" + \ "MRMS_GaugeCorr_QPE_01H_00.00_" + \ dMrms.strftime('%Y%m%d-%H%M') + '00.grib2' # If the copy exists, remove it to be safe. try: if os.path.isfile(mrmsFileTmpGz): os.remove(mrmsFileTmpGz) except: statusMeta.errMsg = "Unable to remove old file: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # If the unzipped file exists, remove it to be safe. try: if os.path.isfile(mrmsFileTmp): os.remove(mrmsFileTmp) except: statusMeta.errMsg = "Unable to remove temporary GRIB file: " + mrmsFileTmp try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Copy the gzipped file over to the scratch directory try: shutil.copy(mrmsFileCheck,mrmsFileTmpGz) except: statusMeta.errMsg = "Unable to copy: " + mrmsFileCheck + " to: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Unzip the file in place. try: with gzip.open(mrmsFileTmpGz,'rb') as fTmpGz: with open(mrmsFileTmp,'wb') as fTmp: shutil.copyfileobj(fTmpGz,fTmp) except: statusMeta.errMsg = "Unable to unzip: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Remove the copy of the gzip file. try: if os.path.isfile(mrmsFileTmpGz): os.remove(mrmsFileTmpGz) except: statusMeta.errMsg = "Unable to remove old file: " + mrmsFileTmpGz try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() if not gridMeta: try: gridMeta = gribMod2.gribMeta() if statusMeta.ncepWcoss == 1: gridMeta.readMeta(statusMeta,mrmsFileTmp,'MultiSensorQPE01H') else: gridMeta.readMeta(statusMeta,mrmsFileTmp,'0 m above mean sea level') except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() if type(mrmsHrlyPrecip) == type(None): # Initialize the arrays that will be returned back to the main calling program # for further use. mrmsHrlyPrecip = np.empty([gridMeta.ny,gridMeta.nx,int(statusMeta.accDuration)],np.float64) mrmsHrlyTotalPrecip = np.empty([gridMeta.ny,gridMeta.nx],np.float64) # Set the grids to the GRIB missing value. mrmsHrlyPrecip[:,:,:] = gridMeta.gribNdv mrmsHrlyTotalPrecip[:,:] = gridMeta.gribNdv # Read in grid from MRMS GC GRIB file. try: if statusMeta.ncepWcoss == 1: gribMod2.getGribVariable(statusMeta,mrmsFileTmp,gridMeta,'MultiSensorQPE01H','MultiSensorQPE01H_0mabovemeansealevel','0 m above mean sea level',0,'unknown',mrmsHrlyPrecip[:,:,mrmsStep]) else: gribMod2.getGribVariable(statusMeta,mrmsFileTmp,gridMeta,'0 m above mean sea level','var209_6_9_0mabovemeansealevel','0 m above mean sea level',0,'unknown',mrmsHrlyPrecip[:,:,mrmsStep]) except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Filter out missing radar data values where the RQI < 0.0 indFilter = np.where(mrmsHrlyPrecip < 0.0) mrmsHrlyPrecip[indFilter] = gridMeta.gribNdv # Calculate the total precipitation grid through the duration loop. if mrmsStep == 0: mrmsHrlyTotalPrecip[:,:] = mrmsHrlyPrecip[:,:,mrmsStep] else: indNdvTmp = np.where((mrmsHrlyTotalPrecip == gridMeta.gribNdv) | (mrmsHrlyPrecip[:,:,mrmsStep] == gridMeta.gribNdv)) indValidTmp = np.where((mrmsHrlyTotalPrecip != gridMeta.gribNdv) & (mrmsHrlyPrecip[:,:,mrmsStep] != gridMeta.gribNdv)) mrmsHrlyTotalPrecip[indNdvTmp] = gridMeta.gribNdv mrmsTmp = mrmsHrlyPrecip[:,:,mrmsStep] mrmsHrlyTotalPrecip[indValidTmp] = mrmsHrlyTotalPrecip[indValidTmp] + mrmsTmp[indValidTmp] # Remove the temporary copy of the GRIB file. try: if os.path.isfile(mrmsFileTmp): os.remove(mrmsFileTmp) except: statusMeta.errMsg = "Unable to remove old file: " + mrmsFileTmp try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Iterate the datetime object. dMrms = dMrms + datetime.timedelta(seconds=3600.0) # Reset temporary arrays and index arrays to free up memory. indFilter = None indNdvTmp = None indValidTmp = None mrmsTmp = None statusMeta.useMrmsGc = True return statusMeta,gridMeta,mrmsHrlyTotalPrecip,mrmsHrlyPrecip def readHourlyHrrrPrecip(statusMeta,gridMeta): """ This is a function to read in hourly HRRR forecasted precipitation data over the course of the accumulation duration period. """ # Set a temporary datetime object to iterate during the accumulation period. dHrrr = statusMeta.dBegAcc hrrrHrlyPrecip = None # We will need calculate a cycle datetime object that will direct this function # to choose the correct file to open, based on the HRRR forecast hour chosen # by the user. if statusMeta.hrrrFcstHr == 1: dHrrrCycle = dHrrr elif statusMeta.hrrrFcstHr == 2: dHrrrCycle = dHrrr - datetime.timedelta(seconds=3600.0) elif statusMeta.hrrrFcstHr == 3: dHrrrCycle = dHrrr - datetime.timedelta(seconds=3600.0*2.0) for hrrrStep in range(0,int(statusMeta.accDuration)): # Compose the path to the MRMS hourly file. fcstHrStr = str(statusMeta.hrrrFcstHr).zfill(2) if statusMeta.ncepWcoss == 1: hrrrFile = statusMeta.hrrrDir + "/hrrr." + dHrrrCycle.strftime('%Y%m%d') + \ '/conus/hrrr.t' + dHrrrCycle.strftime('%H') + \ 'z.wrfsfcf' + fcstHrStr + '.grib2' else: hrrrFile = statusMeta.hrrrDir + "/" + dHrrrCycle.strftime('%Y') + \ "/" + dHrrrCycle.strftime('%m') + "/" + \ dHrrrCycle.strftime('%d') + "/hrrr." + \ dHrrrCycle.strftime('%Y%m%d') + '.t' + dHrrrCycle.strftime('%H') + \ 'z.wrfsfcf' + fcstHrStr + '.grib2' if not os.path.isfile(hrrrFile): statusMeta.statusMsg = "WARNING: Expected HRRR file: " + hrrrFile + \ " not found. Hourly HRRR will not be used in disaggregation." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() statusMeta.statusMsg = "HRRR file being used: " + hrrrFile try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # If this is the first HRRR GRIB file being read in, read in the metadata # information. if not gridMeta: try: gridMeta = gribMod2.gribMeta() gridMeta.readMeta(statusMeta,hrrrFile,'APCP') except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() if type(hrrrHrlyPrecip) == type(None): # Initialize the arrays that will be returned back to the main calling program # for further use. hrrrHrlyPrecip = np.empty([gridMeta.ny,gridMeta.nx,int(statusMeta.accDuration)],np.float64) hrrrHrlyTotalPrecip = np.empty([gridMeta.ny,gridMeta.nx],np.float64) # Set the grids to the GRIB missing value. hrrrHrlyPrecip[:,:,:] = gridMeta.gribNdv hrrrHrlyTotalPrecip[:,:] = gridMeta.gribNdv # Read in grid from HRRR GRIB file. try: gribMod2.getGribVariable(statusMeta,hrrrFile,gridMeta,'APCP','APCP_surface','surface', 0,'kg m**-2',hrrrHrlyPrecip[:,:,hrrrStep]) except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Calculate the total precipitation grid through the duration loop. if hrrrStep == 0: hrrrHrlyTotalPrecip[:,:] = hrrrHrlyPrecip[:,:,hrrrStep] else: indNdvTmp = np.where((hrrrHrlyTotalPrecip == gridMeta.gribNdv) | (hrrrHrlyPrecip[:,:,hrrrStep] == gridMeta.gribNdv)) indValidTmp = np.where((hrrrHrlyTotalPrecip != gridMeta.gribNdv) & (hrrrHrlyPrecip[:,:,hrrrStep] != gridMeta.gribNdv)) hrrrHrlyTotalPrecip[indNdvTmp] = gridMeta.gribNdv hrrrTmp = hrrrHrlyPrecip[:,:,hrrrStep] hrrrHrlyTotalPrecip[indValidTmp] = hrrrHrlyTotalPrecip[indValidTmp] + hrrrTmp[indValidTmp] # Iterate the datetime object. dHrrr = dHrrr + datetime.timedelta(seconds=3600.0) dHrrrCycle = dHrrrCycle + datetime.timedelta(seconds=3600) # Reset temporary arrays and index arrays to free memory indNdvTmp = None indValidTmp = None hrrrTmp = None statusMeta.useHrrr = True return statusMeta,gridMeta,hrrrHrlyTotalPrecip,hrrrHrlyPrecip def readHourlyRapPrecip(statusMeta,gridMeta): """ This is a function to read in hourly RAP forecasted precipitation data over the course of the accumulation duration period. """ # Set a temporary datetime object to iterate during the accumulation period. dRap = statusMeta.dBegAcc rapHrlyPrecip = None # We will need calculate a cycle datetime object that will direct this function # to choose the correct file to open, based on the HRRR forecast hour chosen # by the user. if statusMeta.hrrrFcstHr == 1: dRapCycle = dRap elif statusMeta.hrrrFcstHr == 2: dRapCycle = dRap - datetime.timedelta(seconds=3600.0) elif statusMeta.hrrrFcstHr == 3: dRapCycle = dRap - datetime.timedelta(seconds=3600.0*2.0) for rapStep in range(0,int(statusMeta.accDuration)): # Compose the path to the MRMS hourly file. fcstHrStr = str(statusMeta.hrrrFcstHr).zfill(2) if statusMeta.ncepWcoss == 1: rapFile = statusMeta.rapDir + "/rap." + dRapCycle.strftime('%Y%m%d') + \ "/rap.t" + dRapCycle.strftime('%H') + \ 'z.awp130pgrbf' + fcstHrStr + '.grib2' else: rapFile = statusMeta.rapDir + "/" + dRapCycle.strftime('%Y') + \ "/" + dRapCycle.strftime('%m') + "/" + \ dRapCycle.strftime('%d') + "/rap." + \ dRapCycle.strftime('%Y%m%d') + ".t" + dRapCycle.strftime('%H') + \ 'z.awp130pgrbf' + fcstHrStr + '.grib2' if not os.path.isfile(rapFile): statusMeta.statusMsg = "Expected RAP file: " + rapFile + \ " not found. Hourly RAP will not be used in disaggregation." try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() statusMeta.statusMsg = "RAP file being used: " + rapFile try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # If this is the first RAP GRIB file being read in, read in the metadata # information. if not gridMeta: try: gridMeta = gribMod2.gribMeta() gridMeta.readMeta(statusMeta,rapFile,'APCP') except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() if type(rapHrlyPrecip) == type(None): # Initialize the arrays that will be returned back to the main calling program # for further use. rapHrlyPrecip = np.empty([gridMeta.ny,gridMeta.nx,int(statusMeta.accDuration)],np.float64) rapHrlyTotalPrecip = np.empty([gridMeta.ny,gridMeta.nx],np.float64) # Set the grids to the GRIB missing value. rapHrlyPrecip[:,:,:] = gridMeta.gribNdv rapHrlyTotalPrecip[:,:] = gridMeta.gribNdv # Read in grid from RAP GRIB file. try: gribMod2.getGribVariable(statusMeta,rapFile,gridMeta,'APCP','APCP_surface','surface', 0,'kg m**-2',rapHrlyPrecip[:,:,rapStep]) except: try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) raise Exception() # Calculate the total precipitation grid through the duration loop. if rapStep == 0: rapHrlyTotalPrecip[:,:] = rapHrlyPrecip[:,:,rapStep] else: indNdvTmp = np.where((rapHrlyTotalPrecip == gridMeta.gribNdv) | (rapHrlyPrecip[:,:,rapStep] == gridMeta.gribNdv)) indValidTmp = np.where((rapHrlyTotalPrecip != gridMeta.gribNdv) & (rapHrlyPrecip[:,:,rapStep] != gridMeta.gribNdv)) rapHrlyTotalPrecip[indNdvTmp] = gridMeta.gribNdv rapTmp = rapHrlyPrecip[:,:,rapStep] rapHrlyTotalPrecip[indValidTmp] = rapHrlyTotalPrecip[indValidTmp] + rapTmp[indValidTmp] # Iterate the datetime object. dRap = dRap + datetime.timedelta(seconds=3600.0) dRapCycle = dRapCycle + datetime.timedelta(seconds=3600) # Reset temporary arrays and index arrays to free up memory indNdvTmp = None indValidTmp = None rapTmp = None statusMeta.useRap = True return statusMeta,gridMeta,rapHrlyTotalPrecip,rapHrlyPrecip def combinePrecip(statusMeta,nwmGeoMeta,st4GridMeta,st4AccGrid,st4HrlyTotalPrecip,st4HrlyPrecip,st4FlatInd, mrmsOkGrid,mrmsHrlyPrecip,mrmsHrlyTotalPrecip,mrmsGridMeta, hrrrHrlyPrecip,hrrrHrlyTotalPrecip,hrrrGridMeta, rapHrlyPrecip,rapHrlyTotalPrecip,rapGridMeta,rfcVals,rfcNames,rfcMskGrid): """ function that will combine/subdivide Stage IV / MRMS / HRRR / RAP where needed to produce a final MPE grid of precipitation values. """ # If any hourly MPE data were missing, declare a full-on missing data test by setting the hourly MPE data # and total ST4 grid to NDV if not statusMeta.haveHourlyMPE: statusMeta.statusMsg = "NO HOURLY MPE FOUND, SETTING HOURLY TO NDV" try: errMod.logWarning(statusMeta) except: errMod.errOutScreen(statusMeta) st4HrlyTotalPrecip = np.empty([st4GridMeta.ny,st4GridMeta.ny],np.float32) st4HrlyTotalPrecip[:,:] = st4GridMeta.gribNdv # If the user has provided a list of RFC's to throw out of use for hourly data. # This arose during NWM Version 2.0 development as deficiencies in CBRFC's # hourly Stage IV data were noted, but 6-hour accumulated data was ok. if len(statusMeta.rfcsExcludeHrly[0]) != 0: for rfcTmp in statusMeta.rfcsExcludeHrly: statusMeta.statusMsg = "OMITTING RFC: " + rfcTmp + " FROM ACCUMULATED DISAGREGATION" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) rfcValOmmit = rfcVals[np.where(rfcNames == rfcTmp)] indRfcOmmit = np.where(rfcMskGrid == rfcValOmmit) st4HrlyTotalPrecip[indRfcOmmit] = st4GridMeta.gribNdv # Perform missing data test. This identifies where there is MPE for full accumulation period, but MPE # is missing in at least one of the hourly MPE grids. missingInd = np.where((st4AccGrid != st4GridMeta.gribNdv) & (st4HrlyTotalPrecip == st4GridMeta.gribNdv)) missingCount = len(missingInd[0]) statusMeta.statusMsg = "Num Missing Cells = " + str(missingCount) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Identify cells where all the hourly MPE and the full-accumulation MPE are valid. These are "good". # This count is only needed for informational purposes. indGood = np.where((st4AccGrid != st4GridMeta.gribNdv) & (st4HrlyTotalPrecip != st4GridMeta.gribNdv)) goodDataCount = len(indGood[0]) statusMeta.statusMsg = "Num Good Cells = " + str(goodDataCount) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) if statusMeta.flatLine: # Determine where the hourly Stage IV data flat-lined indFlat = np.where(st4FlatInd == 1) flatCount = len(indFlat[0]) statusMeta.statusMsg = "Num Flat Cells = " + str(flatCount) try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Identify cells that will be subdivided. Theare four indices/counts that will continue to guide this # process. The first determines when and where MRMS data should be used to perform subdivision. The # other three identify cells where subdivision is needed. # # RQIMaskInd/RQIMaskCount - Identifies all cells where RQI failed to exceed the RQIThreshold throughout # the accumulation period. # missingInd/missingCount - Identifies all cells where the hourly MPE is missing data for at least one # hour of the accumulation period, even though the accumulation-period MPE is NOT # missing data. # gridFlatInd/gridFlatCount - Identifies cells where the hourly MPE flatlines across the accumulation period. subdivideFlag = np.empty([st4GridMeta.ny,st4GridMeta.nx],np.int) subdivideFlag[:,:] = 0 if missingCount > 0: subdivideFlag[missingInd] = 1 if statusMeta.flatLine: if flatCount > 0: subdivideFlag[indFlat] = 1 indTmp = np.where(subdivideFlag == 1) subdivideCount = len(indTmp[0]) statusMeta.statusMsg = "Subdividing ST4/MRMS/HRRR/RAP to generate hourly MPE grids." try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Initialize hourly output grids. hourlyOWPGrid = st4HrlyPrecip # Subdivide using MRMS GC data. if statusMeta.useMrmsGc and statusMeta.useRqiMsk: indTmp = np.where((subdivideFlag == 1) & (mrmsOkGrid == 1) & (mrmsHrlyTotalPrecip != mrmsGridMeta.gribNdv) & (mrmsHrlyTotalPrecip > 0.0)) if len(indTmp[0]) > 0: for hrTmp in range(0,statusMeta.accDuration): tempOWPGrid = hourlyOWPGrid[:,:,hrTmp] tempMRMSGCGrid = mrmsHrlyPrecip[:,:,hrTmp] tempOWPGrid[indTmp] = st4AccGrid[indTmp] * (tempMRMSGCGrid[indTmp]/mrmsHrlyTotalPrecip[indTmp]) hourlyOWPGrid[:,:,hrTmp] = tempOWPGrid subdivideFlag[indTmp] = 3 # subdivided with MRMS GC statusMeta.statusMsg = "SUBDIVIDED WITH MRMS FOR: " + str(len(indTmp[0])) + " CELLS" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Subdivide using hourly HRRR if statusMeta.useHrrr: indTmp = np.where((subdivideFlag == 1) & (hrrrHrlyTotalPrecip != hrrrGridMeta.gribNdv) & (hrrrHrlyTotalPrecip > 0.0)) if len(indTmp[0]) > 0: for hrTmp in range(0,statusMeta.accDuration): tempOWPGrid = hourlyOWPGrid[:,:,hrTmp] tempHRRRGrid = hrrrHrlyPrecip[:,:,hrTmp] tempOWPGrid[indTmp] = st4AccGrid[indTmp] * (tempHRRRGrid[indTmp]/hrrrHrlyTotalPrecip[indTmp]) hourlyOWPGrid[:,:,hrTmp] = tempOWPGrid subdivideFlag[indTmp] = 5 # subdivided with HRRR statusMeta.statusMsg = "SUBDIVIDED WITH HRRR FOR: " + str(len(indTmp[0])) + " CELLS" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Subdivide using hourly RAP if statusMeta.useRap: indTmp = np.where((subdivideFlag == 1) & (rapHrlyTotalPrecip != rapGridMeta.gribNdv) & (rapHrlyTotalPrecip > 0.0)) if len(indTmp[0]) > 0: for hrTmp in range(0,statusMeta.accDuration): tempOWPGrid = hourlyOWPGrid[:,:,hrTmp] tempRAPGrid = rapHrlyPrecip[:,:,hrTmp] tempOWPGrid[indTmp] = st4AccGrid[indTmp] * (tempRAPGrid[indTmp]/rapHrlyTotalPrecip[indTmp]) hourlyOWPGrid[:,:,hrTmp] = tempOWPGrid subdivideFlag[indTmp] = 6 # subdivided with HRRR statusMeta.statusMsg = "SUBDIVIDED WITH RAP FOR: " + str(len(indTmp[0])) + " CELLS" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Subdivide uniformly ("flatline") indTmp = np.where(subdivideFlag == 1) if len(indTmp[0]) > 0: for hrTmp in range(0,statusMeta.accDuration): tempOWPGrid = hourlyOWPGrid[:,:,hrTmp] tempOWPGrid[indTmp] = st4AccGrid[indTmp] / statusMeta.accDuration hourlyOWPGrid[:,:,hrTmp] = tempOWPGrid subdivideFlag[indTmp] = 8 statusMeta.statusMsg = "SUBDIVIDED FLAT FOR: " + str(len(indTmp[0])) + " CELLS" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) # Assign user-defined missing values to wherever the GRIB missing value from the Stage IV data exists. indTmp = np.where(hourlyOWPGrid == st4GridMeta.gribNdv) if len(indTmp[0]) > 0: hourlyOWPGrid[indTmp] = -9999 # If the user has provided a list of RFC's to throw out completely. # This arose during NWM Version 2.0 development as deficiencies in CBRFC's # hourly Stage IV data were noted, but 6-hour accumulated data was ok. This option # was put in as a configuration fail-safe if len(statusMeta.rfcsMskOut[0]) != 0: for rfcTmp in statusMeta.rfcsMskOut: statusMeta.statusMsg = "OMITTING RFC: " + rfcTmp + " FROM MPE GENERATION" try: errMod.logMsg(statusMeta) except: errMod.errOutScreen(statusMeta) rfcValOmmit = rfcVals[np.where(rfcNames == rfcTmp)] indRfcOmmit = np.where(rfcMskGrid == rfcValOmmit) for hrTmp in range(0,statusMeta.accDuration): tempOWPGrid = hourlyOWPGrid[:,:,hrTmp] tempOWPGrid[indRfcOmmit] = -9999 hourlyOWPGrid[:,:,hrTmp] == tempOWPGrid # Reset temporary arrays and index arrays to free up memory subdivideFlag = None missingInd = None indFlat = None indGood = None indTmp = None tempOWPGrid = None tempMRMSGCGrid = None tempHRRRGrid = None tempRAPGrid = None # Return the hourly MPE data back to the main calling program. return hourlyOWPGrid