# Program Name: nwm_grib2netcdf # Author: Donald Johnson (donald.w.johnson@noaa.gov) # Abstract: This program converts grib2 files produced by other models # and creates netcdf copies containing the variables needed by NWM # forcing engine # Usage: python nwm_grib2netcdf.py # Condition Codes from pathlib import Path import sys import subprocess import logging import argparse import os from datetime import datetime import time import shutil import re seconds_in_a_day = 86400 logger = logging.getLogger('nwm_grb2netcdf') logger.setLevel(logging.DEBUG) log_handler = logging.FileHandler('nwm_grib2netcdf.log',mode='w') log_handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_handler.setFormatter(formatter) logger.addHandler(log_handler) grib2_products_global = ["GFS_Production_GRIB2", "GFS_Test", "NAM_Conus_Nest_GRIB2", "HRRR_Conus_GRIB2", "RAP_Conus_GRIB2", "CFSv2_6Hr_Global_GRIB2", "WRF_ARW_Hawaii_GRIB2", "GFS_Production_025d_GRIB2", "NAM_Nest_3km_Hawaii", "NAM_Nest_3km_PuertoRico", "NAM_Nest_3km_Alaska", "MRMS_RadarOnly_QPE", "MRMS_RadarQualityIndex","MRMS_GaugeCorr_QPE", "MRMS_Hawaii_MultiSensor", "MRMS_Hawaii_RadarQualityIndex", "MRMS_Conus_Hawaii_MultiSensor_QPE", "NAM_Nest_3km_Puerto_Rico","WRF_ARW_Puerto_Rico_GRIB2"] arw_pattern = re.compile(".*\.(f..)\..+") rap_conus_pattern = re.compile(".*z\.awp130bgrb(f..)\..+") hrrr_conus_pattern = re.compile(".*z\.wrfsfc(f..)\..+") gfs_conus_pattern = re.compile(".*z\.sfluxgrb(f.+)\..+") nam_nest_pattern = re.compile(".*z\..+\.hires(f..)\..+") cfs_pattern = re.compile(".*flxf(.+)\.[0-9][0-9]\.(.+)\..+") # class for returning all file paths recursivly that match a pattern in target directory class RecursiveFilePatternMatch: def __init__(self, basepath, pattern): self.basepath = basepath self.pattern = pattern def get_paths(self): search_path = Path(self.basepath) attempts = 0 while attempts < 3: try: paths = search_path.glob(self.pattern) return list(paths) except FileNotFoundError: attempts = attempts + 1 if attempts >= 3: raise except: print("An unexpected error occured when searching the file system.") raise # class for converting a grib2 file to a netcdf file wiith the required variable # for the NWM forcing engine class ForcingNetcdfConverter: def __init__(self, product): self.gribvars = [] self.netcdfvars = ['TMP_2maboveground','SPFH_2maboveground','UGRD_10maboveground' 'VGRD_10maboveground','PRATE_surface','DSWRF_surface','DLWRF_surface','PRES_surface'] self.gribvars = ['TMP','SPFH','UGRD','VGRD','PRATE','DSWRF','DLWRF','PRES'] self.griblevels = ['2 m above ground','2 m above ground','10 m above ground','10 m above ground', 'surface','surface','surface','surface'] self.product = product self.products = frozenset(grib2_products_global) # setup logging self.logger = logging.getLogger('ForcingNetcdfConverter') self.logger.setLevel(logging.DEBUG) self.logger.addHandler(log_handler) def warn(self,message): self.logger.warn(message) def error(self,message): self.logger.error(message) def critical(self,message): self.logger.critical(message) def info(self,message): self.logger.info(message) def debug(self,message): self.logger.debug(message) def convert(self, src_path, dst_path, sentinel_file): # get the parent of the distination and check that it exists pp = Path(dst_path).parent if not pp.exists(): # make parents if necessary pp.mkdir(parents=True) if self.product in self.products: if self.product in ["MRMS_RadarOnly_QPE", "MRMS_RadarQualityIndex", "MRMS_GaugeCorr_QPE", "MRMS_Hawaii_MultiSensor", \ "MRMS_Hawaii_RadarQualityIndex", "MRMS_Conus_Hawaii_MultiSensor_QPE"]: try: # set up GRIB2TABLE if needed: if not os.environ.get('GRIB2TABLE'): g2path = os.path.join("grib2.tbl") with open(g2path, 'wt') as g2t: g2t.write( "209:1:0:0:161:1:6:30:MultiSensorQPE01H:" "Multi-sensor estimated precipitation accumulation 1-hour:mm\n" "209:1:0:0:161:1:6:37:MultiSensorQPE01H:" "Multi-sensor estimated precipitation accumulation 1-hour:mm\n" ) os.environ['GRIB2TABLE'] = g2path src_file = Path(src_path + ".gz") dst_file = Path(dst_path) #print("Source file:" + str(src_file)) #print("Destination file:" + str(dst_file)) tmp_file = dst_file.parent.joinpath(src_file.name) #print("Temp file: " + str(tmp_file)) #make sure the dest directory exists dst_file.parent.mkdir(parents=True, exist_ok=True) #copy the src file to the tmp location shutil.copy(str(src_file), str(tmp_file)) #unzip the compressed grib file subprocess.run(["gunzip " + str(tmp_file)],shell=True) #update the tmp_file name to match decompressed file tmp_file = tmp_file.with_suffix('') #use wgrib2 for conversion cmd = "$WGRIB2 " + str(tmp_file) + " -netcdf " + str(dst_path[0:-3]) + ".nc" self.info(self.product + " Running command: \"" + cmd +"\"") # call wgrib2 program cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) sentinel_file.touch() tmp_file.unlink() except: self.info("Failed to find hour in : " + src_path) elif self.product in ["NAM_Nest_3km_Hawaii","NAM_Nest_3km_Puerto_Rico"]: try: hr = int( nam_nest_pattern.match(src_path).groups()[0][1:] ) #command to extract need NWM variable this gets some extra variables if ( 1 <= hr and hr <= 48 ): cmd = "$WGRIB2 " +src_path + \ " -match \":(TMP):80 m above ground:|:(TMP|SPFH):2 m above ground:|:(UGRD|VGRD):10 m above ground:|:(PRES|APCP|HGT|PRATE|DSWRF|DLWRF):surface:\"" + \ " -netcdf " + dst_path self.info(self.product + " Running command: \"" + cmd +"\"") # call wgrib2 program cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) Path(dst_path).unlink(missing_ok=True) sentinel_file.touch() else: self.info("NAM hour not in range 1-48 found in " + src_path + " ignoring file.") except: self.info("Failed to find hour in " + src_path) elif self.product in ["WRF_ARW_Hawaii_GRIB2","WRF_ARW_Puerto_Rico_GRIB2"]: # command to extract need NWM variable this gets some extra variables try: hr = int( arw_pattern.match(src_path).groups()[0][1:] ) prevhr = hr - 1 if ( 1 <= hr and hr <= 48 ): cmd = "$WGRIB2 " +src_path + \ " -match ':(TMP):80 m above ground:" + str(hr) + " hour fcst:|" + \ ":(SPFH):2 m above ground:" + str(hr) + " hour fcst:|" + \ ":(UGRD|VGRD):10 m above ground:" + str(hr) +" hour fcst:|" + \ ":APCP:surface:" + str(prevhr) + "-"+ str(hr) + " hour acc fcst:|" + \ ":PRES:surface:" + str(hr) + " hour fcst:|:(HGT):surface:'" + \ " -netcdf " + dst_path self.info(self.product + " Running command: \"" + cmd +"\"") # call wgrib2 program cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) Path(dst_path).unlink(missing_ok=True) sentinel_file.touch() else: self.info("ARW hour not in range 1-48 found in " + src_path + " ignoring file.") except: self.info("Failed to find hour in " + src_path) elif self.product in ["HRRR_Conus_GRIB2"]: # command to extract need NWM variables try: hr = int( hrrr_conus_pattern.match(src_path).groups()[0][1:] ) prevhr = hr - 1 if ( 1 <= hr and hr <= 18 ): cmd = "$WGRIB2 " +src_path + \ " -match ':(TMP|SPFH):2 m above ground:" + str(hr) +" hour fcst:|" + \ ":(UGRD|VGRD):10 m above ground:"+ str(hr) + " hour fcst:|" + \ ":(PRES|DLWRF|DSWRF):surface:" + str(hr) + " hour fcst:|" + \ ":APCP:surface:" + str(prevhr) + "-" + str(hr) + " hour acc fcst:|" + \ ":HGT:surface:" + str(hr) + " hour fcst:' -netcdf " + dst_path self.info(self.product + " Running command: \"" + cmd +"\"") # call wgrib2 program$$ cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) Path(dst_path).unlink(missing_ok=True) sentinel_file.touch() else: self.info("HRRR outside the range 1-18 found in " + src_path + " ignoring file.") except: self.info("Failed to find hour in " + src_path) elif self.product in ["RAP_Conus_GRIB2"]: # command to extract need NWM variables try: hr = int( rap_conus_pattern.match(src_path).groups()[0][1:] ) prevhr = hr - 1 if ( 1 <= hr and hr <= 18 ): cmd = "$WGRIB2 " +src_path + \ " -match ':(TMP|SPFH):2 m above ground:" + str(hr) +" hour fcst:|" + \ ":(UGRD|VGRD):10 m above ground:"+ str(hr) + " hour fcst:|" + \ ":(PRES|DLWRF|DSWRF):surface:" + str(hr) + " hour fcst:|" + \ ":APCP:surface:" + str(prevhr) + "-" + str(hr) + " hour acc fcst:|" + \ ":HGT:surface:" + str(hr) + " hour fcst:' -netcdf " + dst_path self.info(self.product + " Running command: \"" + cmd +"\"") # call wgrib2 program cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) Path(dst_path).unlink(missing_ok=True) sentinel_file.touch() else: self.info("RAP hour outside the range of 1-18 found in " + src_path + " ignoring file.") except: self.info("Failed to find hour in " + src_path) elif self.product in ["GFS_Production_025d_GRIB2"]: try: hr = int( gfs_conus_pattern.match(src_path).groups()[0][1:] ) prevhr = hr - 1 if ( 1 <= hr and hr <= 240 ): if ( hr <= 120 or hr % 3 == 0 ): # command to extract need NWM variable this gets some extra variables cmd = "$WGRIB2 " +src_path + \ " -match \":(TMP|SPFH):2 m above ground:|:(UGRD|VGRD):10 m above ground:|:(PRES|PRATE|DLWRF|DSWRF|HGT|APCP):surface:\"" + \ " -netcdf " + dst_path # call wgrib2 program cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) Path(dst_path).unlink(missing_ok=True) sentinel_file.touch() else: self.info("GFS hour greater than 120 found that is not divisible by 3 in " + src_path + "ignorning file.") else: self.info("GFS hour greater than 240 found in " + src_path + " ignoring file.") except: self.info("Failed to find hour in " + src_path) elif self.product in ["CFSv2_6Hr_Global_GRIB2"]: try: # test for regexp match and then extract CFS date strings from path match = cfs_pattern.match(src_path) #make datetime objects and get timedelta dt1 = datetime.strptime(match.groups()[0],'%Y%m%d%H') dt2 = datetime.strptime(match.groups()[1],'%Y%m%d%H') if ( (dt1 - dt2).days <= 30 ): # command to extract need NWM variable this gets some extra variables cmd = "$WGRIB2 " +src_path + \ " -match \":(TMP|SPFH):2 m above ground:|:(UGRD|VGRD):10 m above ground:|:(PRES|PRATE|DLWRF|DSWRF|HGT|APCP):surface:\"" + \ " -netcdf " + dst_path # call wgrib2 program cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) Path(dst_path).unlink(missing_ok=True) sentinel_file.touch() else: self.info("CFS dates found in " + src_path + " are greateer than 30 days apart. Ignoring this file. ") except: self.info("Failed to parse CFS forecast dates.") else: # command to extract need NWM variable this gets some extra variables cmd = "$WGRIB2 " +src_path + \ " -match \":(TMP|SPFH):2 m above ground:|:(UGRD|VGRD):10 m above ground:|:(PRES|PRATE|DLWRF|DSWRF|HGT|APCP):surface:\"" + \ " -netcdf " + dst_path # call wgrib2 program$$$ cmdOutput = subprocess.Popen([cmd],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True) out, err = cmdOutput.communicate() exitcode = cmdOutput.returncode if exitcode != 0: print("FATAL ERROR: wgrib2 return non zero error code for file " + src_path) Path(dst_path).unlink(missing_ok=True) sentinel_file.touch() else: #unexpect product self.error("Conversion requested for unknown product: " + self.product) return # function to return the differnce between two lists def diff(lst1, lst2): temp = set(lst2) lst3 = [value for value in lst1 if value not in temp] return lst3 def main(): # get the command line parser parser = argparse.ArgumentParser(description='Convert GRIB2 model output files to netcdf for ingest by NWM forcing engine.') parser.add_argument('-i', '--input-dir', required=True, type=str, help='The root directory to search for input files') parser.add_argument('-o', '--output-dir', required=True, type=str, help='The root directory for converted files') parser.add_argument('-p', '--product', required=True, type=str, choices=grib2_products_global, help='The product to find input files for') parser.add_argument('-t', '--temp-dir', required=True, help='The location to store temporary files') args = parser.parse_args() #gribpath = '/gpfs/dell1/nco/ops/com/gfs/prod' #netcdfpath = 'converted' gribpath = args.input_dir netcdfpath = args.output_dir product = args.product temp_dir = args.temp_dir # pattern to match all the input and output files input_pattern = { 'GFS_Production_025d_GRIB2' : 'gfs.????????/??/atmos/gfs.*sfluxgrbf*.grib2.idx', 'GFS_Test' : 'gfs.*/*/atmos/gfs.*sfluxgrbf*.grib2.idx', 'HRRR_Conus_GRIB2' : 'hrrr.????????/conus/hrrr.t??z.*wrfsfcf*.grib2.idx', 'CFSv2_6Hr_Global_GRIB2' : 'cfs.????????/*/6hrly_grib*/flxf*.grb2.idx', 'RAP_Conus_GRIB2' : 'rap.????????/rap.*z.awp130bgrb*.grib2.idx', 'MRMS_RadarOnly_QPE' : 'conus/RadarOnly_QPE/RadarOnly_QPE_01H_00.00_*0000.grib2.gz', 'MRMS_RadarQualityIndex' : '**/RadarQualityIndex/RadarQualityIndex_00.00_*0000.grib2.gz', 'MRMS_GaugeCorr_QPE' : '**/GaugeCorr_QPE/GaugeCorr_QPE_01H_00.00_*0000.grib2.gz', 'WRF_ARW_Hawaii_GRIB2' : 'hiresw.????????/hiresw.t*z.arw_2p5km.f[01234]?.hi.grib2.idx', 'NAM_Nest_3km_Hawaii' : 'nam.????????/nam.t*z.hawaiinest.hiresf[01234]?.tm00.grib2.idx', 'MRMS_Hawaii_MultiSensor' : '**/MultiSensor/*/MRMS_EXP_MultiSensor_QPE_??H_Pass1_00.00_*-??0000.grib2.gz', 'MRMS_Hawaii_RadarQualityIndex' : '**/RadarQualityIndex/*/MRMS_EXP_RadarQualityIndex_00.00_*-??0000.grib2.gz', 'NAM_Nest_3km_Puerto_Rico' : 'nam.????????/nam.t*z.priconest.hiresf[01234]?.tm00.grib2.idx', 'WRF_ARW_Puerto_Rico_GRIB2' : 'hiresw.????????/hiresw.t*z.arw_2p5km.f[01234]?.pr.grib2.idx', 'MRMS_Conus_Hawaii_MultiSensor_QPE' : '[c,h][o,a][n,w]*/MultiSensorQPE/MRMS_MultiSensor_QPE_01H_Pass?_00.00_*.grib2.gz' } output_pattern = { 'GFS_Production_025d_GRIB2' : 'gfs.????????/??/atmos/gfs.*sfluxgrbf*.nc', 'GFS_Test' : '/gfs.*/*/atmos/gfs.*sfluxgrbf*.nc', 'HRRR_Conus_GRIB2' : 'hrrr.????????/conus/hrrr.*wrfsfcf*.nc', 'CFSv2_6Hr_Global_GRIB2' : 'cfs.????????/*/6hrly_grib*/flxf*.nc', 'RAP_Conus_GRIB2' : 'rap.????????/rap.*z.awp130bgrb*.nc', 'MRMS_RadarOnly_QPE' : 'conus/RadarOnly_QPE/RadarOnly_QPE_01H_00.00_*00.nc', 'MRMS_RadarQualityIndex' : '/RadarQualityIndex/RadarQualityIndex_00.00_*00.nc', 'MRMS_GaugeCorr_QPE' : '/GaugeCorr_QPE/GaugeCorr_QPE_01H_00.00_*00.nc', 'WRF_ARW_Hawaii_GRIB2' : 'hiresw.????????/hiresw.t*z.arw_2p5km.f*.hi.nc', 'NAM_Nest_3km_Hawaii' : 'nam.????????/nam.t*z.hawaiinest.hiresf*.tm00.nc', 'MRMS_Hawaii_MultiSensor' : '/MultiSensor/*/MRMS_EXP_MultiSensor_QPE_??H_Pass1_00.00_*-??0000.nc', 'MRMS_Hawaii_RadarQualityIndex' : '/RadarQualityIndex/*/MRMS_EXP_RadarQualityIndex_00.00_*-??0000.nc', 'NAM_Nest_3km_Puerto_Rico' : 'nam.????????/nam.t*z.priconest.hiresf[01234]?.tm00.nc', 'WRF_ARW_Puerto_Rico_GRIB2' : 'hiresw.????????/hiresw.t*z.arw_2p5km.f[01234]?.pr.nc', 'MRMS_Conus_Hawaii_MultiSensor_QPE' : '[c,h][o,a][n,w]*/MultiSensorQPE/MRMS_MultiSensor_QPE_01H_Pass?_00.00_*.nc' } pattern1 = input_pattern[product] pattern2 = output_pattern[product] # check for sentinel value sentinel_file = Path(temp_dir) / (product+'.sentinel') if sentinel_file.exists(): lockfile_update_time = os.path.getmtime(str(sentinel_file)) ct = time.time() if ct - lockfile_update_time < 5 * 60 : logger.info("Current Lock file detected, terminating process.") return 1 else: logger.info("Old lock file located, ignoring lock file.") sentinel_file.touch() else: sentinel_file.touch() #sometimes date arrives late so check for files twice once at startup and once after the files # found at startup have been processed for run in ["main","failsafe"]: # make the pattern matching files pm1 = RecursiveFilePatternMatch(gribpath,pattern1) #print(pattern1) # get the paths of sflux files with and index file # the index file is only created after the full file is downloaded target_file_paths = pm1.get_paths() #print(target_file_paths) #remove the grib2.index from the end of each path #change paths to be relative to the search directory and remove .grib2.index target_file_paths = [path.relative_to(gribpath).with_suffix('').with_suffix('') for path in target_file_paths] logger.info("Found " + str(len(target_file_paths)) + " conversion targets ") #now we need get the list of converted files nc_p = Path(netcdfpath) # if the output path does not exist creat it if nc_p.exists() == False: logger.info("Creating output path: " + str(nc_p)) nc_p.mkdir() # list the files that have been converted pm2 = RecursiveFilePatternMatch(netcdfpath,pattern2) converted_paths = pm2.get_paths() logger.info("Found " + str(len(converted_paths)) + " already converted files") #remove the .netcdf so that the file name are comparable converted_paths = [path.relative_to(netcdfpath).with_suffix('') for path in converted_paths] #print(target_file_paths) #print(converted_paths) #calculate the files that in the prod directories but not converted to_convert = diff(target_file_paths, converted_paths) logger.info(str(len(to_convert)) + " files to convert") #calculate the file that are converted but are no longer in prod to_remove = diff(converted_paths, target_file_paths) logger.info(str(len(to_remove)) + " files to remove") # convert each grib2 file netcdf_converter = ForcingNetcdfConverter(product) for path in to_convert: src_path = Path(gribpath).joinpath(path) dst_path = Path(netcdfpath).joinpath(path) # try to read the maximum age in days of a file to process from the enviorment variable 'PRECONVERSION_MAX_DAYS' # if it does not exist or can not be read set this value to 2 try: max_days_str = os.getenv("PRECONVERSION_MAX_DAYS") if max_days_str is None or max_days_str == "": max_days = 2 else: max_days = int(max_days_str) except: max_days = 2 try: # check modification date to see if we want to copy if product in ["MRMS_RadarOnly_QPE", "MRMS_RadarQualityIndex","MRMS_GaugeCorr_QPE","MRMS_Hawaii_MultiSensor",\ "MRMS_Hawaii_RadarQualityIndex","MRMS_Conus_Hawaii_MultiSensor_QPE"]: tpath = str(src_path)+".grib2.gz" elif product in ["CFSv2_6Hr_Global_GRIB2"]: tpath = str(src_path)+".grb2" else: tpath = str(src_path)+".grib2" file_mod_time = os.path.getmtime(tpath) current_time = time.time() # check file modification file to ignore old data if ( current_time - file_mod_time < max_days * seconds_in_a_day): # print number of days old file is for debug print((current_time - file_mod_time) / seconds_in_a_day) # convert src_path to dest path if product in ["CFSv2_6Hr_Global_GRIB2"]: logger.info("Converting "+str(src_path)+".grb2 to "+str(dst_path)+".nc") print("Converting "+str(src_path)+".grb2 to "+str(dst_path)+".nc") netcdf_converter.convert(str(src_path)+".grb2",str(dst_path)+".nc",sentinel_file) else: logger.info("Converting "+str(src_path)+".grib2 to "+str(dst_path)+".nc") print("Converting "+str(src_path)+".grib2 to "+str(dst_path)+".nc") netcdf_converter.convert(str(src_path)+".grib2",str(dst_path)+".nc",sentinel_file) else: # print files that are skipped # print("old Path: " + str(tpath) + " skipping") pass except Exception as ex: import traceback print(ex) traceback.print_exc() print("unable to check modification date for " + str(tpath)) logger.info("Done converting files") # remove out of date files for path in to_remove: #print(path) if path.exists(): logger.info("Removing "+str(path)) path.unlink() logger.info("Done removing files") # end for run loop #close the log file log_handler.close() #remove sentinel sentinel_file.unlink() return if __name__ == "__main__": main()