##@namespace hwrf_alerts #This module contains the track delivery function and all dbn #alerts in the HWRF system. It is intended to sit on top of the #ush/hwrf_expt.py as the NCO alert and NHC data delivery module. #Generally the functions in this module are run by the scripts/ex*.py #programs just before handing control over to the hwrf_expt package. import os, socket, re, logging import produtil.fileop from produtil.dbnalert import DBNAlert from produtil.run import run, exe, batchexe from produtil.log import jlogger from produtil.ecflow import set_ecflow_event from hwrf.numerics import to_datetime_rel, to_datetime import hwrf_expt class TrackDeliveryFailed(Exception): """!Raised when the send_nhc_track() is unable to deliver the track file.""" ######################################################################## ### EMERGENCY FALLBACK ALERT SYSTEM #################################### ######################################################################## absent=object() def send_email(email_list,subject,email_from=absent, content_text=None,content_file=None, logger=None): i_am_nco=( os.environ.get('RUN_ENVIR','EMC').upper() == 'NCO' ) if i_am_nco: import mail if content_file: with open(content_file,'rt') as f: content_text=f.read() addresses=re.split('[, \t]+',email_list) for address in addresses: # JY- 01/24, not working for now. mail.send(subject, content_text, to_address=address, from_name=email_from) logger.info('Error: Skipping email now, mail.py not working in HWRF. Email to %s '%(address,)) return elif not produtil.ecflow.in_ecflow(): logger.info('Skipping email because you are not NCO and you ' 'are not using ecFlow.') return else: return if logger is None: logger=logging.getLogger('hwrf_alerts.send_email') cmd=batchexe('ssh')['slogin1','mail','-s',"'"+subject+"'"] if content_file: cmd=cmd < content_file else: assert(content_text is not None) cmd=cmd << content_text # Set the "From:" line if requested: if email_from is absent: email_from=os.environ.get('HWRF_EMAIL_FROM', 'nco.spa@noaa.gov') if email_from: cmd=cmd['-r',email_from] logger.info('Email "From:" address is: %s'%(repr(email_from),)) else: logger.warning('Not changing email "From:" line. Will use system ' 'default. This may break email if the system default' 'has a domain that is not world-visible.') addresses=re.split('[, \t]+',email_list) cmd=cmd[addresses] # add the addresses to the argument list status=run(cmd,logger=logger) return status def fallback_callback(allow,name,details): """!Called when an emergency fallback is requested. Return True to allow, False to deny . @param allow True or False, whether the other fallback filters have allowed this fallback thus far. @param name The name of the fallback. @param details Human-readable details about this fallback.""" conf=hwrf_expt.conf logger=conf.log('fallbacks') ucname=name.upper().replace('_',' ') subject=conf.strinterp('config', "HWRF {FALLNAME}: {cyc}Z HWRF{storm_num} {vit[basinname]} " "Tropical System {vit[stormname]} ({vit[stormid3]})", FALLNAME=ucname) content=conf.timestrinterp('config',"""ATTN Operator: HWRF storm slot {storm_num} enacted a fallback mechanism to disable an optional part of the HWRF workflow. This is not necessarily an error but should merit operator attention. Storm = {vit[basinname]} Tropical System {vit[stormname]} {vit[stormid3]} R.S.M.C. = {vit[center]} Job = {ENV[job|-unknown]} Storm slot = hwrf{storm_num} Fcst. cycle = {ayear}-{amonth}-{aday} {aHH} Fallback name = {fallback_name} Details: {fallback_details} Sincerely, HWRF """, fallback_name=name, fallback_details=details) # Determine destination addresses: fallback_email_list=os.environ.get('HWRF_FALLBACK_EMAIL_LIST','') \ or conf.getstr('config','fallback_email_list', '') \ or 'nco.spa@noaa.gov' email_from=os.environ.get('HWRF_EMAIL_FROM', 'nco.spa@noaa.gov') jlogger.info('Contacting operator about fallback '+name) send_email(fallback_email_list,subject,email_from, content_text=content,logger=logger) return allow # use default value from [config] allow_fallbacks option ######################################################################## ### TRACK FILE DELIVERY ################################################ ######################################################################## def send_nhc_track(*args,**kwargs): """!Sends the track file to special NHC disk locations on the WCOSS supercomputer. @param args,kwargs Ignored. @note This does not sort the track file, and does not grep out the correct storm, since that is already done before this function starts.""" from hwrf_expt import tracker, conf logger=tracker.log('to_nhc') ENV=os.environ if ENV.get('SENDCOM','NO').upper() != 'YES': logger.warning('Not delivering track file to NHC locations because SENDCOM is not YES') return COMOUTatcf=tracker.getdir('COMOUTatcf','missing') if COMOUTatcf=='missing': if 'COMOUTatcf' in ENV: COMOUTatcf=ENV['COMOUTatcf'] else: msg='''ABORTING: Cannot deliver track file: $COMOUTatcf is unset, and is not in hwrf.conf. Don't know where to send track file.''' logger.critical(msg) raise TrackDeliveryFailed(msg) i_am_nco=( ENV.get('RUN_ENVIR','EMC').upper() == 'NCO' ) if not i_am_nco and COMOUTatcf[0:4]=='/com': logger.warning('Not delivering track file to NHC locations because you are not NCO and your destination directory is inside /com or /com2') return # src = the track file (name is automatically generated by tracker.py) src=tracker.product('cleanatcfunix').location # tgt = something like /com/nhc/prod/atcf/bb062013/ncep_abb062013.dat tgt=os.path.join(COMOUTatcf, '{hwrfbasin2}{stnum:02d}{when.year}/ncep_a{hwrfbasin2}{stnum:02d}{when.year}.dat' \ .format(**tracker.storminfo.__dict__).lower()) logger.info('%s: will read this track file'%(src,)) logger.info('%s: will append track here'%(tgt,)) with open(src,'rt') as f: outfull='' out112='' for line in f: line=line.rstrip() outfull+=line+'\n' rline=line[0:112]+'\n' out112+=rline if 'COMOUTgltrk' in ENV or ENV.get('RUN_ENVIR','EMC').upper() == 'NCO': gltrkdir=ENV['COMOUTgltrk'] yy=tracker.storminfo.when.year%100 glatuxarch=os.path.join(gltrkdir,'tracks.atcfunix.%02d'%(yy,)) logger.info('%s: will also append track here'%(glatuxarch,)) produtil.fileop.makedirs(gltrkdir,logger=logger) # Append the track in a single operation: with open(glatuxarch,'at') as o: o.write(out112) # Make the /com/nhc/prod/atcf/bb062013 part: tgtdir=os.path.dirname(tgt) produtil.fileop.makedirs(tgtdir,logger=logger) # Append the track to the target file in a single operation: with open(tgt,'at') as f: f.write(outfull) logger.info('%s: track delivered from %s'%(tgt,src)) set_ecflow_event('SentTrackToNHC',logger=logger) ######################################################################## ### DBN ALERTS ######################################################### ######################################################################## def add_wave_alerts(): """!Adds dbn alerts for the WRF program's wrfdiag files, which are used by the downstream wave model. Ensures the wrfdiag files have the necessary dbn_alerts to trigger delivery to the NCEP FTP server and ecFlow events (if required). Those files will be delivered by the NHCProducts object at the end of the JHWRF_PRODUCTS job, after the hwrf_nhc_products program completes. We run the hwrf_nhc_products program before delivering the wrfdiag files to ensure that the files are plausable before delivering them to the wave model.""" from hwrf_expt import nhcp, conf # Loop over all wrfdiag files: nalerts=0 logger=conf.log('add_wave_alerts') for prod in nhcp.wrfdiag_products(): # Add a dbn_alert to send as "HWRF_WAVE_INPUT" logger.info('%s@%s: added alert for this product'%(repr(prod),repr(prod.did))) nalerts+=1 prod.add_callback(DBNAlert(['MODEL','HWRF_NC','{job}', '{location}'])) logger.info('Added %d wave alerts'%nalerts) def add_ww3prod_alerts(): """!Adds dbn alerts for the HWRF WW3 wave products. Adds calls to the dbnet alerts so that the WW3 wave product files will be delivered to the NCEP FTP server.""" from hwrf_expt import ww3post, conf logger=conf.log('add_ww3prod_alerts') ww3post._products['ww3grb2'][0].add_callback(DBNAlert(['MODEL','HWRF_WW3GB2','{job}','{location}'])) ww3post._products['ww3grb2idx'][0].add_callback(DBNAlert(['MODEL','HWRF_WW3GB2_WIDX','{job}','{location}'])) ww3post._products['ww3outpbull'][0].add_callback(DBNAlert(['MODEL','HWRF_WW3TAR','{job}','{location}'])) ww3post._products['ww3outpcbull'][0].add_callback(DBNAlert(['MODEL','HWRF_WW3TAR','{job}','{location}'])) ww3post._products['ww3outpcsbull'][0].add_callback(DBNAlert(['MODEL','HWRF_WW3TAR','{job}','{location}'])) ww3post._products['ww3outpspec'][0].add_callback(DBNAlert(['MODEL','HWRF_WW3TAR','{job}','{location}'])) logger.info('Added ww3prod alerts') ######################################################################## def add_regrib_alerts(): """!Adds dbn alerts for GRIB products by adding DBNAlert objects to the hwrf_expt.gribber. Adds calls to the dbnet alerts so that GRIB files will be delivered to the NCEP FTP server.""" from hwrf_expt import cycle, gribber, conf # Arrays of datetime objects that refer to six-hourly and # three-hourly output times: flen=conf.getint('config','forecast_length',126) # iflen=flen/6 # nsix=flen/6+1 # nthree=flen/3+1 iflen=flen//6 nsix=flen//6+1 nthree=flen//3+1 atime=to_datetime(cycle) six_hourly=[ to_datetime_rel(3600*6*x,atime) for x in range(nsix) ] three_hourly=[ to_datetime_rel(3600*3*x,atime) for x in range(nthree) ] # Alert all three-hourly files for non-satellite post: nonsat_mapping=dict(p123_core='HWRF_GB2', p123_storm='HWRF_GB2', p123_global='HWRF_GB2', p123_synop='HWRF_GB2') for (prodname,alertname) in nonsat_mapping.items(): for time in three_hourly: for p in gribber.products(name=prodname,time=time): p.add_callback(DBNAlert(['MODEL',alertname,'{job}','{location}'])) p.add_callback(DBNAlert(['MODEL',alertname+'_WIDX','{job}','{location}.idx'])) break # Alert all three-hourly GRIB2 files for non-satellite post: sat_mapping=dict(s123_core='HWRF_GB2', s123_storm='HWRF_GB2', s123_global='HWRF_GB2', s123_synop='HWRF_GB2') # Alert all six-hourly GRIB2 files for satellite post: for (prodname,alertname) in sat_mapping.items(): for time in three_hourly: for p in gribber.products(name=prodname,time=time): p.add_callback(DBNAlert(['MODEL',alertname,'{job}','{location}'])) p.add_callback(DBNAlert(['MODEL',alertname+'_WIDX','{job}','{location}.idx'])) break ######################################################################## def email_afos_to_sdm(afos,*args,**kwargs): """!Emails the AFOS file to the NOAA Senior Duty Meterologist (SDM) This function is called from scripts.exhwrf_output to email the AFOS file (simplified track file) to the Senior Duty Meteorologist (NOAA SDM). It is controlled by a number of environment variables, requested by NCEP Central Operations: * EMAIL_SDM=YES/NO --- turn on or off the email * HWRF_TRACK_EMAIL_LIST=(list of emails) --- list of email addresses to receive the AFOS file. * HWRF_EMAIL_SSH_MACHINE --- no longer used. This was the machine to ssh into, to send the mail. The mail is now sent directly from the machine running the exhwrf_output script. The following configuration settings have the same meaning as the environment variables: * [config] email_sdm = $EMAIL_SDM * [config] track_email_list = $HWRF_TRACK_EMAIL_LIST * [config] email_ssh_machine = $HWRF_EMAIL_SSH_MACHINE In all cases, the environment variables override the config file settings (again, by request of NCO). """ from hwrf_expt import nhcp, conf if not afos.available: jlogger.error('Not emailing AFOS file: Product is not available.') return logger=conf.log('email_sdm') email_sdm_str=os.environ.get('EMAIL_SDM','YES').upper() email_sdm=email_sdm_str=='YES' if not email_sdm: jlogger.error('EMAIL_SDM=%s in environment - disable email to SDM' %(email_sdm_str,)) return email_sdm_flag=conf.getbool('config','email_sdm',False) if not email_sdm_flag: jlogger.error('email_sdm=no in config files - disable email to SDM') return afosfile=afos.location if not afosfile or not produtil.fileop.isnonempty(afosfile): jlogger.error('Not emailing AFOS file: is empty or nonexistent: ' +afosfile) return subject=conf.strinterp('config', "{cyc}Z HWRF Output for {vit[basinname]} " "Tropical System {vit[stormname]} ({vit[stormid3]})") # Set the "From:" line if requested: email_from=os.environ.get('HWRF_EMAIL_FROM', 'nco.spa@noaa.gov') if email_from: logger.info('Email "From:" address is: %s'%(repr(email_from),)) else: logger.warning('Not changing email "From:" line. Will use system ' 'default. This may break email if the system default' 'has a domain that is not world-visible.') # Add the destination addresses: track_email_list=os.environ.get('HWRF_TRACK_EMAIL_LIST','') \ or conf.getstr('config','track_email_list', '') \ or 'SDM@noaa.gov' addresses=re.split('[, \t]+',track_email_list) jlogger.info('Sending AFOS file to SDM.') status=send_email(track_email_list,subject,email_from, content_file=afosfile,logger=logger) set_ecflow_event('emailSDM',logger=logger) ######################################################################## def add_nhc_alerts(): """!Adds dbn alerts for the hwrf_nhc_products program's output by adding DBNAlert objects to the hwrf_expt.nhcp object's Products.""" from hwrf_expt import nhcp afos=nhcp.product('afos') afos.add_callback(DBNAlert(['MODEL','HWRF_AFOS','{job}','{location}'])) # afos.add_callback(email_afos_to_sdm,[afos]) moved to output job nhcp.product('htcf').add_callback(DBNAlert(['MODEL','HWRF_ASCII','{job}','{location}'])) nhcp.product('rainfall').add_callback(DBNAlert(['MODEL','HWRF_ASCII','{job}','{location}'])) nhcp.product('wind10m').add_callback(DBNAlert(['MODEL','HWRF_ASCII','{job}','{location}'])) nhcp.product('wind10hrly').add_callback(DBNAlert(['MODEL','HWRF_ASCII','{job}','{location}'])) nhcp.product('stats').add_callback(DBNAlert(['MODEL','HWRF_STATS','{job}','{location}'])) nhcp.product('swath').add_callback(DBNAlert(['MODEL','HWRF_GB2','{job}','{location}'])) nhcp.product('swathidx').add_callback(DBNAlert(['MODEL','HWRF_GB2_WIDX','{job}','{location}'])) ######################################################################## def add_tracker_alerts(): """!Adds dbn alerts for the tracker and requests delivery of the tracker to NHC deck locations.""" from hwrf_expt import tracker hr3=tracker.product('atcf3hourly') hr6=tracker.product('atcfshort6hr') tr=tracker.product('cleanatcfunix') logger=hwrf_expt.conf.log('tracker_alerts') logger.debug('Add tracker DBN alert: cleaned 3hourly track is %s'%(hr3.did,)) logger.debug('Add tracker DBN alert: cleaned, 112-char-line, 6hourly subset track is %s'%(hr6.did,)) logger.debug('Add tracker DBN alert: cleaned, complete track is %s'%(tr.did,)) hr3.add_callback(DBNAlert(['MODEL','HWRF_ASCII','{job}','{location}'])) hr6.add_callback(DBNAlert(['MODEL','HWRF_ASCII','{job}','{location}'])) tr.add_callback(DBNAlert(['MODEL','HWRF_ASCII','{job}','{location}'])) tr.add_callback(send_nhc_track) assert(hr3.has_callbacks()) assert(hr6.has_callbacks()) assert(tr.has_callbacks())