#! /usr/bin/env python ############################################################################### # Module name: resume_forecast # # # # Author : Zhengtao Cui (Zhengtao.Cui@noaa.gov) # # # # Initial version date: # # # # Last modification date: 5/11/2022 # # # # Description: Restart the real-time NWM jobs on WCOSS for a given start # # time in the past # # # ############################################################################### import os, sys, time, urllib, getopt, copy import subprocess import getpass import gzip, shutil, re import numpy as np import json from string import * from datetime import datetime, timedelta from Graph import Graph from distutils.dir_util import copy_tree def timedelta_total_seconds(timedelta): """ convert timedelta to seconds """ return ( timedelta.microseconds + 0.0 + (timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 def main(argv): """ function to get input arguments """ nwdir = '' pdy = '' cycle = '' queue = 'devhigh' runOneCycle=False listrestarts=False useecflow=False machine = 'cray' version='X.X' tag='' try: opts, args = getopt.getopt(argv,"hd:m:t:c:q:oleb:v:g:",\ ["help", "dir=", "comroot=", "pdy=", "cyc=", "queue=",\ "onecycle", "list_restarts", "useecflow", "machine", "version", \ "tag" ]) except getopt.GetoptError: print( 'resume_forecast.py -d -m -t -c -q [-o] [-l] [-e] [-b ] [-v ] [-g ]' ) sys.exit(2) for opt, arg in opts: print( opt, arg ) if opt == '-h': print( 'resume_forecast.py -d -m -t -c -q [-o] [-l] [-e] [-b ] [-v ] [-g ]' ) sys.exit() elif opt in ('-d', "--dir"): nwdir = arg if not os.path.exists( nwdir ): if not os.path.isdir( nwdir ): print( 'nw dir ', nwdir, ' does not exist!' ) sys.exit() elif opt in ('-m', "--comroot"): comroot = arg if not os.path.exists( comroot ): if not os.path.isdir( comroot ): print( 'com root ', comroot, ' does not exist!' ) sys.exit() elif opt in ('-t', "--pdy" ): pdy = arg elif opt in ('-c', "--cyc" ): cycle=arg elif opt in ('-q', "--queue" ): queue=arg elif opt == '-o' or opt == '--onecycle': runOneCycle = True elif opt == '-l' or opt == '--list_restarts': listrestarts = True elif opt == '-e' or opt == '--ecflow': useecflow = True elif opt == '-b' or opt == '--machine': machine = arg elif opt == '-v' or opt == '--version': version = arg elif opt == '-g' or opt == '--tag': tag = arg # print 'com dir is "', comdir, '"' # print 'pdy is "', pdy, '"' # print 'cyc is "', cycle, '"' return (nwdir, comroot, pdy, cycle, queue, runOneCycle, listrestarts,\ useecflow, machine, version, tag) class NWM_Job_Runner: "Manage and run NWM jobs" def __init__(self, nwdir, comroot, ecffile, pdy, cyc, mem = None, \ depend = None, startmin = 0, delaycycles = 0): """ Initialize the job runner """ self.nwdir = nwdir self.comroot = comroot self.ecffilename = nwdir + '/' + ecffile self.pdy = pdy self.cyc = cyc self.mem = mem self.jobid = None self.depend = depend self.status = None self.startmin = startmin self.delaycycles = delaycycles def addDependent( self, dep ): """ add a dependent to the job """ dep.printjob() if self.depend: self.depend.append( dep ) else: self.depend = [ dep ] def submitjob(self, queuename, depend = None, options = None ): """ submit a NWM job to a WCOSS queue """ if self.isSubmitted(): return; with open ( self.ecffilename ) as ecffile: ecf = ecffile.read() ecf = ecf.replace( "%CYC%", self.cyc ) ecf = ecf.replace( "%STIME%", datetime.now().strftime("%H%M")) ecf = ecf.replace( "$ECFTESTDIR", self.nwdir + "/ecf/test" ) ecf = ecf.replace( "# CALL executable job script here", \ "export PDY=" + self.pdy +\ "\n# CALL executable job script here" ) if self.mem: ecf = ecf.replace( "%MEM%", self.mem ) ecf = re.sub( '(q "dev")|(q dev)', 'q '+ queuename, ecf) # # do not attach dependency if the job has done. # if you submit a job with dependency that has done or exited, # the job will not run. # depend_not_done = [] if depend: for d in depend: if d.isSubmitted(): status = d.jobstatus() if status == "R" or status == "Q" or status == "H": depend_not_done.append( d ) print( "Warning: dependent job is submitted. " ) else: # depend_not_done.append( d ) print( "Warning: dependent job not found in the submitted job list, assuming it is done. " ) bsubcmd = 'qsub' if options: bsubcmd += ' ' + options try: if depend_not_done: woption = ' -W depend=afterok' for d in depend_not_done: woption += ':' + d.jobid proc = subprocess.Popen( 'echo \'' + \ ecf + '\' | ' + bsubcmd + woption, \ shell=True, cwd=(comroot+'/com/logs'), \ stdout=subprocess.PIPE) #print( 'XXXXXXX echo \'' + ecf + \ # '\' | ' + bsubcmd + woption) else: #print( ecf ) #print( bsubcmd ) proc = subprocess.Popen('echo \'' + ecf + '\' | ' + \ bsubcmd, shell=True, cwd=(comroot+'/com/logs'), \ stdout=subprocess.PIPE) except OSError as e: print( e ) raise RuntimeError( 'RuntimeError: Non-existent file!' ) except ValueError as e: print( e ) raise RuntimeError( \ 'Popen is called with invalid arguments!' ) output = proc.stdout.read().decode('utf-8').strip() #print( output ) #print( "----" ) m = re.match(r"(\d+)\.[a-z0-9]+", output) if m: self.jobid = output #print( self.jobid) else: raise RuntimeError( 'Could not obtain job id!' ) print( "Job is submitted successfully: ", end=' ' ) self.printjob() def jobstatus(self ): """ Check the job status of this job runner on WCOSS """ if self.status is not None: # E Job is exiting after having run # F Job is finished if self.status == "E" or self.status == "F": return self.status try: proc = subprocess.Popen('qstat -x -f -F json ' + self.jobid, shell=True, stdout=subprocess.PIPE) except OSError as e: print( e ) raise RuntimeError( 'RuntimeError: Non-existent file!' ) except ValueError as e: print( e ) raise RuntimeError( \ 'Popen is called with invalid arguments!' ) output = proc.stdout.read().decode('utf-8') jobstat = json.loads( output ) #print( jobstat["Jobs"][ self.jobid ][ "job_state" ] ) self.status = jobstat["Jobs"][ self.jobid ][ "job_state" ] return self.status def jobname(self ): """ Return the job name of this job runner """ try: proc = subprocess.Popen('qstat -x -f -F json ' + self.jobid, shell=True, stdout=subprocess.PIPE) except OSError as e: print( e ) raise RuntimeError( 'RuntimeError: Non-existent file!' ) except ValueError as e: print( e ) raise RuntimeError( \ 'Popen is called with invalid arguments!' ) output = proc.stdout.read().decode('utf-8') jobstat = json.loads( output ) jobname = jobstat["Jobs"][ self.jobid ][ "Job_Name" ] return jobname def isSubmitted(self): """ Check if the job has been submitted """ return not self.jobid is None def get_jobid(self): """ Check if the job has been submitted """ return self.jobid def ecf(self): """ Get the ECF filename of this job runner """ return os.path.basename( self.ecffilename ) def get_startmin(self): return self.startmin def get_delayedCycles(self): return self.delaycycles def get_PDY(self): return self.pdy def get_CYC(self): return self.cyc def printjob(self): """ Print the job information to the stdout """ print( self.ecf() ) print( " pdy=", self.pdy, " cyc=", self.cyc ) if self.mem : print( " mem=", self.mem ) if self.isSubmitted(): print( " jobid = ", self.jobid, " status=", \ self.jobstatus(), " jobname=", self.jobname() ) else: print( " job not submitted " ) def __key(self): return ( self.nwdir, \ self.comroot, \ self.ecffilename, \ self.pdy, \ self.cyc ) \ if self.mem is None else \ ( self.nwdir, \ self.comroot, \ self.ecffilename, \ self.pdy, \ self.cyc, \ self.mem ) def __hash__(self): return hash(self.__key()) def __eq__(self, other): if isinstance(other, NWM_Job_Runner): return self.__key() == other.__key() return NotImplemented # def __eq__(self, other): # if not isinstance( other, NWM_Job_Runner ): # return NotImplemented # else: # return self.nwdir == other.nwdir and \ # self.comroot == other.comroot and \ # self.ecffilename == other.ecffilename and \ # self.pdy == other.pdy and \ # self.cyc == other.cyc if self.mem is None \ # else \ # self.nwdir == other.nwdir and \ # self.comroot == other.comroot and \ # self.ecffilename == other.ecffilename and \ # self.pdy == other.pdy and \ # self.cyc == other.cyc and \ # self.mem == other.mem # # def __hash__(self): # return hash( (self.nwdir, self.comroot, self.ecffilename, \ # self.pdy, self.cyc ) ) if self.mem is None \ # else \ # hash( (self.nwdir, self.comroot, self.ecffilename, \ # self.pdy, self.cyc, self.mem ) ) class NWM_Jobs: "Manage and run all NWM jobs in a cycle" def __init__( self, nwdir, comroot, pdy, cyc, machine = "cray" ): """ Initialize the NWM_jobs object """ self.nwdir = nwdir self.comroot = comroot self.machine = machine self.alljobs = Graph( ) self.pdy = pdy self.cyc = cyc self.restarts = [] self.forcingfiles = [] def find_job_by_ecf( self, ecf ): """ find the job for given ECF filename """ for job in self.alljobs.vertices(): if job.ecf() == ecf: return job def find_job( self, ecf, pdy, cyc, mem=None ): """ Find the job by PDY, cycle and/or member """ found_job = None for job in self.alljobs.vertices(): if job.ecf() == ecf and job.pdy == pdy and job.cyc == cyc: if mem: if job.mem == mem: found_job = job break else: found_job = job break return found_job def find_job_by_job( self, job ): found_job = None for j in self.alljobs.vertices(): if j == job: found_job = j break return found_job def add_job( self, job, dependents=None ): self.alljobs.add_vertex( job ) if dependents: for d in dependents: self.alljobs.add_vertex( d ) self.alljobs.add_edge( (job, d) ) def start_jobs( self, queuename='devhigh' ): currenttime = \ datetime.strptime( datetime.now().strftime("%Y%m%d%H"), \ "%Y%m%d%H" ) currentmin = datetime.now().minute jobtime = datetime.strptime( self.pdy+self.cyc, "%Y%m%d%H" ) for j in self.alljobs.vertices(): if jobtime == currenttime: if currentmin < j.get_startmin(): self.submit_jobs( j, queuename, \ f'-b {self.cyc}:{currentmin:02d}' ) else: self.submit_jobs( j, queuename ) else: self.submit_jobs( j, queuename ) def start_jobs2( self, queuename='devhigh' ): currenttime = \ datetime.strptime( datetime.now().strftime("%Y%m%d%H"), \ "%Y%m%d%H" ) currentmin = datetime.now().minute #print( "start_jobs2!") for r in self.alljobs.find_root_vertices(): for j in reversed( self.alljobs.breadth_first_order( r ) ): neighbors = self.alljobs.find_neighbors( j ) jobtime = datetime.strptime(\ j.get_PDY() + j.get_CYC(), "%Y%m%d%H" ) if jobtime == currenttime - \ timedelta( hours = j.get_delayedCycles()): if currentmin < j.get_startmin(): # j.submitjob( queuename, neighbors, \ # f'-b {self.cyc}:{j.get_startmin():02d}' ) # print( "bsub options: " + \ # f"-b {self.cyc}:{j.get_startmin():02d}" ) print( "current time: " +\ currenttime.strftime("%Y-%m-%d %H:%M")) print( f"job start time: {self.cyc}:{j.get_startmin():02d}" ) print( f"Job not submitted, let the cron submit the job." ) else: j.submitjob( queuename, neighbors ) else: j.submitjob( queuename, neighbors ) #print( "done start_jobs2!") def submit_jobs( self, root_job, queuename='devhigh', \ options = None ): """ Submit a NWM job by a given queue """ neighbors = self.alljobs.find_neighbors( root_job ) if neighbors: for v in neighbors: self.submit_jobs( v, queuename, options ) root_job.submitjob(queuename, depend = neighbors,\ options = options ) print( "---------------------------------------------" ) print( "Job: " + root_job.ecf() + '( ' + root_job.pdy + \ ' ' + root_job.cyc + ' ' + \ root_job.get_jobid() + ' )' + \ " submitted with dependents:" ) for v in neighbors: print( " ", v.ecf(), v.pdy, v.cyc, v.get_jobid() ) print( "=============================================" ) else: root_job.submitjob(queuename, depend=None, \ options=options ) print( "---------------------------------------------" ) print( "Job: " + root_job.ecf() + ' ( ' + root_job.pdy + \ ' ' + root_job.cyc + ' ' + root_job.get_jobid() + ' )' +\ "submitted without dependent" ) print( "=============================================" ) def hasAllJobsDone( self ): """ Check if all jobs have been done """ done = True for j in self.alljobs.vertices(): j.printjob() if j.isSubmitted() : status = j.jobstatus() if status == "R" or status == "Q" or status == "H": done = False break; return done def hasJobDone( self, ecf, pdy, cyc, mem=None ): """ Check if the job identified by ECF, PDY, cycle, and/or member has been done """ #print("Has job done*******************") done = True job = self.find_job( ecf, pdy, cyc, mem) if job.isSubmitted() : #print("Has job done*******************Job submitted") status = job.jobstatus() #print("Has job done*******************", status) if status == "R" or status == "Q" or status == "H": #print("Has job done*******************Done False") done = False else: print( "Warning: job not submitted! " ) job.printjob() return done def get_ecf( self, ecffile ): #return { "cray" : lambda x: x, \ # "ibm" : lambda x: re.sub( '.ecf$', '_ibm.ecf', x) }\ # .get( self.machine )( ecffile ) return ecffile # # create Hawaii or Puertorico jobs # @staticmethod def createHawaiiPuertoricoJobs(nwdir, comroot, pdy, cyc, domain, isfirstcyc=False, machine = "cray" ): if not ( domain == 'hawaii' or domain == 'puertorico' ): print( "ERROR: Unknown domain - " + domain + "!!" ) return None jobs = NWM_Jobs(nwdir, comroot, pdy, cyc, machine ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # if domain == 'hawaii': # thisjobtime -= timedelta( hours = 1 ) forcing_ana = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_forcing_analysis_assim_{domain}.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, \ 25 if domain=='hawaii' else 25, \ 0 if domain=='hawaii' else 0 ) ana = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_analysis_assim_{domain}.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, \ [ forcing_ana ], \ 30 if domain=='hawaii' else 50, \ 0 if domain=='hawaii' else 0 ) ana_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_analysis_assim_{domain}_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, \ [ ana ], \ 40 if domain=='hawaii' else 58, \ 0 if domain=='hawaii' else 0 ) #add coastal AnA ana_coastal = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_analysis_assim_coastal_{domain}.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, \ [ ana_no_da ], \ 45 if domain=='hawaii' else 55, \ 0 if domain=='hawaii' else 0 ) jobs.add_job( forcing_ana, [] ) jobs.add_job( ana, [ forcing_ana ] ) jobs.add_job( ana_no_da, [ ana ] ) jobs.add_job( ana_coastal, [ ana_no_da ] ) if cyc == ( '02' if domain == 'hawaii' else '08' ) or \ cyc == ( '14' if domain == 'hawaii' else '20' ): thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 2 ) forcing_short = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_forcing_short_range_{domain}.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, \ 55 if domain=='hawaii' else 50, 2 ) short = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_short_range_{domain}.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_short ], \ 59 if domain=='hawaii' else 58, 2 ) jobs.add_job( short, [ forcing_short ] ) short_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_short_range_{domain}_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_short ], \ 35 if domain=='hawaii' else 50, 3 ) jobs.add_job( short_no_da, [short] ) short_coastal = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_short_range_coastal_{domain}.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ short ], \ 40 if domain=='hawaii' else 15, 3 ) jobs.add_job( short_coastal, [ short ] ) #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}_no_da_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') #coastal jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_{"hawaii_" if domain=="hawaii" else "prvi_"}' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #forcing for the first cycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_{domain}/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) #discharge for the first cycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/analysis_assim_{domain}/' f'nwm.t{thisjobtime.strftime("%H")}z.analysis_assim.channel_rt.tm00{"00.hawaii" if domain=="hawaii" else ".puertorico"}.nc' ) if isfirstcyc: #For AnAs thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 3 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') #coastal jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_{"hawaii_ana_" if domain=="hawaii" else "prvi_ana_"}' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #forcing for the first cycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_{domain}/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) #discharge for the first cycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_{domain}_no_da/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.CHRTOUT_DOMAIN1' ) #For short if domain == 'hawaii': if cyc == '01' or cyc == '02': thisjobtime = datetime.strptime( pdy+'00', "%Y%m%d%H" ) elif cyc == '13' or cyc == '14': thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) else: thisjobtime = None else: if cyc == '07' or cyc == '08': thisjobtime = datetime.strptime( pdy+'06', "%Y%m%d%H" ) elif cyc == '19' or cyc == '20': thisjobtime = datetime.strptime( pdy+'18', "%Y%m%d%H" ) else: thisjobtime = None if thisjobtime: jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_{domain}_no_da_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_{"hawaii_" if domain=="hawaii" else "prvi_"}' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #no da forcing files jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/short_range_{domain}_no_da/' f'{thisjobtime.strftime("%Y%m%d%H")}' ) #Coastal forcing for the first cycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_{domain}/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) #coastal forcing jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/short_range_{domain}/' f'{thisjobtime.strftime("%Y%m%d%H")}' ) #discharge for the first cycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/analysis_assim_{domain}/' f'nwm.t{thisjobtime.strftime("%H")}z.analysis_assim.channel_rt.tm00' f'{"00.hawaii" if domain=="hawaii" else ".puertorico"}.conus.nc' ) #discharge for coastal if domain == 'hawaii': jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/short_range_{domain}/' f'nwm.t{thisjobtime.strftime("%H")}z.short_range.channel_rt.' f'f04800.{domain}.nc' ) for i in range(0, 48): for j in ['00', '15', '30', '45' ]: jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/short_range_{domain}/' f'nwm.t{thisjobtime.strftime("%H")}z.short_range.channel_rt.' f'f{i:03d}{j}.{domain}.nc' ) else: for i in range(0, 49): jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/short_range_{domain}/' f'nwm.t{thisjobtime.strftime("%H")}z.short_range.channel_rt.' f'f{i:03d}.{domain}.nc' ) # # # # PR/HI SR no da runs at the next cycle of PR SR. For cycle 9z/3z and # # 21z/15z the SR has already done. Therefore create SR no da and # # copy forcing files. # # # if domain == 'hawaii': # if cyc == '03' or cyc == '15': # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 3 ) # pre_short_no_da = NWM_Job_Runner( nwdir, comroot, \ # jobs.get_ecf( f'/ecf/test/jnwm_short_range_{domain}_no_da.ecf' ), \ # thisjobtime.strftime("%Y%m%d"), \ # thisjobtime.strftime("%H"), \ # None, None, 35, 3 ) # jobs.add_job( pre_short_no_da ) # # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/short_range_{domain}_no_da/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # pre_short_coastal = NWM_Job_Runner( nwdir, comroot, \ # jobs.get_ecf( f'/ecf/test/jnwm_short_range_coastal_{domain}.ecf' ), \ # thisjobtime.strftime("%Y%m%d"), \ # thisjobtime.strftime("%H"), \ # None, None, 40, 3 ) # # jobs.add_job( pre_short_coastal ) # # #forcing for the first cycle # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_{domain}/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) # # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/short_range_{domain}/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' # f'hotstart_analysis_assim_coastal_hawaii' # f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) # # # # Add discharge from COM # #discharge for the first cycle # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/analysis_assim_{domain}/' # f'nwm.t{thisjobtime.strftime("%H")}z.analysis_assim.channel_rt.tm0000.hawaii.nc' ) # # #discharge for the last cycle # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/short_range_{domain}/' # f'nwm.t{thisjobtime.strftime("%H")}z.short_range.channel_rt.' # f'f04800.{domain}.nc' ) # for i in range(0, 48): # for j in ['00', '15', '30', '45' ]: # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/short_range_{domain}/' # f'nwm.t{thisjobtime.strftime("%H")}z.short_range.channel_rt.' # f'f{i:03d}{j}.{domain}.nc' ) # # else: # if cyc == '09' or cyc == '21': # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 3 ) # pre_short_no_da = NWM_Job_Runner( nwdir, comroot, \ # jobs.get_ecf( f'/ecf/test/jnwm_short_range_{domain}_no_da.ecf' ), \ # thisjobtime.strftime("%Y%m%d"), \ # thisjobtime.strftime("%H"), \ # None, None, 15, 3 ) # jobs.add_job( pre_short_no_da ) # # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/short_range_{domain}_no_da/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # pre_short_coastal = NWM_Job_Runner( nwdir, comroot, \ # jobs.get_ecf( f'/ecf/test/jnwm_short_range_coastal_{domain}.ecf' ), \ # thisjobtime.strftime("%Y%m%d"), \ # thisjobtime.strftime("%H"), \ # None, None, 15, 3 ) # jobs.add_job( pre_short_coastal ) # # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_{domain}/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) # # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/short_range_{domain}/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' # f'hotstart_analysis_assim_coastal_puertorico' # f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) # # # # # Add discharge from COM # #discharge for the first cycle # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/analysis_assim_{domain}/' # f'nwm.t{thisjobtime.strftime("%H")}z.analysis_assim.channel_rt.tm00.{domain}.nc' ) # for i in range(0, 49): # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/short_range_{domain}/' # f'nwm.t{thisjobtime.strftime("%H")}z.short_range.channel_rt.' # f'f{i:03d}.{domain}.nc' ) return jobs # # create Alaska jobs # @staticmethod def createAlaskaJobs(nwdir, comroot, pdy, cyc, isfirstcyc=False, machine = "cray" ): jobs = NWM_Jobs(nwdir, comroot, pdy, cyc, machine ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) forcing_ana = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_forcing_analysis_assim_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, \ 20, \ 0 ) ana = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_analysis_assim_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, \ [ forcing_ana ], \ 25, \ 0 ) ana_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_analysis_assim_alaska_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, \ [ ana ], \ 50, \ 0 ) jobs.add_job( ana_no_da, [ ana ] ) jobs.add_job( ana, [ forcing_ana ] ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) if cyc == '23' or ( isfirstcyc and ( cyc == '21' or cyc == 22 ) ): thisjobtime = datetime.strptime( pdy+'20', "%Y%m%d%H" ) # # use AnA extend restart files at 23z # i.e. get 20z restart files jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend_alaska_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') else: thisjobtime -= timedelta( hours = 3 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') # if isfirstcyc and ( cyc == '21' or cyc == 22 ): # # thisjobtime = datetime.strptime( pdy+'20', "%Y%m%d%H" ) # # # # use AnA extend restart files at 23z # # i.e. get 20z restart files # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') # #no da restart # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_extend_alaska_no_da/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') if cyc in [ '01', '04', '07', '10', '13', '16', '19', '22' ]: thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 1 ) forcing_short = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_forcing_short_range_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, \ 15, 1 ) short = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( f'/ecf/test/jnwm_short_range_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_short ], \ 35, 1 ) jobs.add_job( short, [ forcing_short, ana ] ) if isfirstcyc: if cyc in [ '01', '04', '07', '10', '13', '16', '19', '22' ]: thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 1 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') if cyc == '20': thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) forcing_ana_ext = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_analysis_assim_extend_alaska.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, None, 30, 0 ) ana_ext = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_extend_alaska.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_ana_ext ], 45, 0) jobs.add_job( ana_ext, [ forcing_ana_ext ] ) ana_ext_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_extend_alaska_no_da.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ana_ext ], 55, 0) jobs.add_job( ana_ext_no_da, [ ana_ext ] ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 32 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend_alaska_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') if isfirstcyc: if int(cyc) <= 20: thisjobtime = datetime.strptime( pdy+'20', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 32 ) else: thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend_alaska_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') # # AnA extended needs AnA forcing files from 12z previous day # #endtime = datetime.strptime( pdy+cyc, "%Y%m%d%H") endtime = datetime.strptime( pdy+cyc, "%Y%m%d%H") - \ timedelta( hours = 3 ) #thisjobtime += timedelta( hours = 1 ) #while thisjobtime <= endtime: # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_alaska/' # f'{thisjobtime.strftime("%Y%m%d%H")}/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) # thisjobtime += timedelta( hours = 1 ) jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_alaska/' f'{thisjobtime.strftime("%Y%m%d%H")}/' ) if cyc == '05' or cyc == '11' or cyc == '17' or cyc == '23': thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 5 ) forcing_medium = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_medium_range_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 5, 5 ) forcing_medium_blend = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_medium_range_blend_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 5, 5 ) for mem in range(1, 7): medium_range = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ f'{mem:d}', [ forcing_medium ], 50, 5 ) jobs.add_job( medium_range, [ forcing_medium ] ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') if mem == 1: medium_range_no_da = NWM_Job_Runner( nwdir, \ comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ medium_range ], 40, 6 ) jobs.add_job( medium_range_no_da, [ medium_range ] ) #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_no_da_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') medium_range_blend = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_blend_alaska.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ f'{mem:d}', [ forcing_medium_blend ], 35, 5 ) jobs.add_job( medium_range_blend, [ forcing_medium_blend ] ) if isfirstcyc: if int(cyc) >= 0 and int(cyc) <= 5: thisjobtime = datetime.strptime( pdy+'00', "%Y%m%d%H" ) elif int(cyc) > 5 and int(cyc) <= 11: thisjobtime = datetime.strptime( pdy+'06', "%Y%m%d%H" ) elif int(cyc) > 11 and int(cyc) <= 17: thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) elif int(cyc) > 17 and int(cyc) <= 23: thisjobtime = datetime.strptime( pdy+'18', "%Y%m%d%H" ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_no_da_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') #medium range_blend forcings #jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/medium_range_blend_alaska/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # Medium range needs forcing files from 30 cycs ago # endtime = thisjobtime - timedelta( hours = 6) thisjobtime -= timedelta( hours = 30 ) while thisjobtime <= endtime: jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/medium_range_alaska/' f'{thisjobtime.strftime("%Y%m%d%H")}' ) thisjobtime += timedelta( hours = 6 ) # #The MR open loop starts on the next cycle after MR. It is #possible that the MR has already done if the cycle is #0z, 6z, 12z or 18z. Therefore create a new no jobs and #copy restart and forcing files. # # if cyc == "00" or cyc=="06" or cyc=="12" or cyc=="18": # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 6 ) # # medium_range_no_da = NWM_Job_Runner( nwdir, \ # comroot, \ # jobs.get_ecf( '/ecf/test/jnwm_medium_range_alaska_no_da.ecf' ), \ # thisjobtime.strftime("%Y%m%d"), \ # thisjobtime.strftime("%H"), \ # None, None, 40, 6 ) # jobs.add_job( medium_range_no_da ) # #no da restart # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_alaska_no_da_forecast/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') # # # # Medium range no da forcing files # # # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/medium_range_alaska_no_da/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # return jobs # # create timeslices jobs # @staticmethod def createTimeslicesJobs(nwdir, comroot, pdy, cyc, machine = "cray" ): tsjobs = NWM_Jobs(nwdir, comroot, pdy, cyc, machine ) usgs_ts = NWM_Job_Runner( nwdir, comroot, \ tsjobs.get_ecf( '/ecf/test/jnwm_usgs_timeslices.ecf'), pdy, cyc ) ace_ts = NWM_Job_Runner( nwdir, comroot, \ tsjobs.get_ecf( '/ecf/test/jnwm_ace_timeslices.ecf'), pdy, cyc ) ca_ts = NWM_Job_Runner(nwdir, comroot, \ tsjobs.get_ecf( '/ecf/test/jnwm_canada_timeslices.ecf'), pdy, cyc ) merge_usgs_ca_ts = NWM_Job_Runner(nwdir, comroot, \ tsjobs.get_ecf( '/ecf/test/jnwm_merge_usgs_ca_timeslices.ecf'), pdy, cyc, None, [usgs_ts, ace_ts] ) rfc_ts = NWM_Job_Runner( nwdir, comroot, \ tsjobs.get_ecf( '/ecf/test/jnwm_rfc_timeseries.ecf'), pdy, cyc ) tsjobs.add_job( usgs_ts ) tsjobs.add_job( ace_ts ) tsjobs.add_job( ca_ts ) tsjobs.add_job( merge_usgs_ca_ts, [usgs_ts, ace_ts] ) tsjobs.add_job( rfc_ts ) return tsjobs # # create CONUS jobs # @staticmethod def createConusJobs(nwdir, comroot, pdy, cyc, \ isfirstcyc=False, machine = "cray" ): jobs = NWM_Jobs(nwdir, comroot, pdy, cyc, machine ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) if cyc == '01' or cyc == '13': forcing_ana = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_analysis_assim.ecf'), thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 45, 0 ) analysis_assim = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_ana ], 45, 0 ) else: forcing_ana = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_analysis_assim.ecf'), thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 30, 0 ) analysis_assim = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_ana ], 30, 0 ) jobs.add_job( analysis_assim, [ forcing_ana ] ) if cyc == '02' or cyc == '14': analysis_assim_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ analysis_assim ], 5, 1 ) # thisjobtime -= timedelta( hours = 1 ) # pre_analysis_assim_no_da = NWM_Job_Runner( nwdir, comroot, \ # jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_no_da.ecf' ), \ # thisjobtime.strftime("%Y%m%d"), \ # thisjobtime.strftime("%H"), \ # None, [ analysis_assim ], 5, 1 ) else: analysis_assim_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ analysis_assim ], 58, 0 ) jobs.add_job( analysis_assim_no_da, [ analysis_assim ] ) # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 1 ) analysis_assim_coastal_pacific = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_coastal_pacific.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ analysis_assim ], 0, 1 ) jobs.add_job( analysis_assim_coastal_pacific, [ analysis_assim_no_da ] ) analysis_assim_coastal_atlgulf = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_coastal_atlgulf.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ analysis_assim ], 0, 1 ) jobs.add_job( analysis_assim_coastal_atlgulf, [ analysis_assim_no_da ] ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 1 ) forcing_short = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_short_range.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 30, 1 ) short_range = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_short_range.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ analysis_assim ], 40, 1 ) jobs.add_job( short_range, [ forcing_short ] ) # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 2 ) short_range_coastal_pacific = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_short_range_coastal_pacific.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ short_range ], 0, 2 ) jobs.add_job( short_range_coastal_pacific, [ short_range, analysis_assim_coastal_pacific ] ) short_range_coastal_atlgulf = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_short_range_coastal_atlgulf.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ short_range ], 0, 2 ) jobs.add_job( short_range_coastal_atlgulf, [ short_range, analysis_assim_coastal_atlgulf ] ) short_range_coastal_atlgulf_psurge = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_short_range_coastal_atlgulf_psurge.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ short_range ], 5, 2 ) jobs.add_job( short_range_coastal_atlgulf_psurge, [ short_range, analysis_assim_coastal_atlgulf ] ) # # AT 02z and 14z, the open loop AnA runs at the next cycle as # the AnA # if isfirstcyc and ( cyc == '02' or cyc == '14' ) : # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 1 ) # # #no da restart # thisjobtime -= timedelta( hours = 3 ) # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_no_da/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') # # #no da forcing files # endtime = datetime.strptime( pdy+cyc, "%Y%m%d%H") - \ # timedelta( hours = 1 ) # thisjobtime += timedelta( hours = 1 ) # while thisjobtime <= endtime: # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_no_da/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) # thisjobtime += timedelta( hours = 1 ) # restarts forcings for short ranges # # short range and coastal AnA runs at the next cycle of model AnA if isfirstcyc : thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 1 ) #inland short range restarts jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') # # # #short range coastal starts 2 cycles later # # # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 2 ) #short coastal restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_pacific_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_atlgulf_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #short range forcing # copy the first from AnA jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) # copy the rest from short jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/short_range/' f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # Add discharge from COM # # #the first sycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/analysis_assim/' f'nwm.t{thisjobtime.strftime("%H")}z.analysis_assim.channel_rt.tm00.conus.nc' ) # for i in range(1, 19): jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/short_range/' f'nwm.t{thisjobtime.strftime("%H")}z.short_range.channel_rt.f{i:03}.conus.nc' ) # #restarts forcings for AnAs if isfirstcyc: thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 3 ) if cyc == '19': # # use AnA extend restart files at 19z # i.e. get 16z restart files jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') #coastal jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_extend_coastal_pacific_ana' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_extend_coastal_atlgulf_ana' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) else: jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') #coastal AnA restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_pacific_ana_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_atlgulf_ana_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #forcing for coastal AnA #needs the first step forcing and discharge from the previous AnA # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) #discharge jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_no_da/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.CHRTOUT_DOMAIN1' ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) if cyc == '00' or cyc == '06' or cyc == '12' or cyc == '18': analysis_assim_long = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_long.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_ana ], 35, 0 ) jobs.add_job( analysis_assim_long, [ forcing_ana ] ) analysis_assim_long_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_long_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ analysis_assim_long ], 58, 0 ) jobs.add_job( analysis_assim_long_no_da, [ analysis_assim_long ] ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 12 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') if isfirstcyc: if int(cyc) > 0 and int(cyc) <= 6: thisjobtime = datetime.strptime( pdy+'06', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 12 ) elif int(cyc) > 6 and int(cyc) <= 12: thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 12 ) elif int(cyc) > 12 and int(cyc) <= 18: thisjobtime = datetime.strptime( pdy+'18', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 12 ) elif int(cyc) > 18 : thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') # restarts for two AnA long cycles are needed thisjobtime += timedelta( hours = 6 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') thisjobtime -= timedelta( hours = 6 ) # # AnA long needs AnA forcing files from 12 cycs ago # endtime = datetime.strptime( pdy+cyc, "%Y%m%d%H") - \ timedelta( hours = 3 ) thisjobtime += timedelta( hours = 1 ) while thisjobtime <= endtime: jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) thisjobtime += timedelta( hours = 1 ) if cyc == '18': thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 6 ) forcing_mpe = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_mpe.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 0, 6 ) thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 2 ) forcing_ana_ext = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_analysis_assim_extend.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_mpe ], 20, 2 ) jobs.add_job( forcing_ana_ext, [ forcing_mpe ] ) ana_ext = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_extend.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ forcing_ana_ext ], 30, 2) jobs.add_job( ana_ext, [ forcing_ana_ext ] ) ana_ext_no_da = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_extend_no_da.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ana_ext ], 20, 2) jobs.add_job( ana_ext_no_da, [ ana_ext ] ) ana_ext_coastal_atlgulf = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_extend_coastal_atlgulf.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ana_ext ], 20, 3) jobs.add_job( ana_ext_coastal_atlgulf, [ ana_ext_no_da ] ) ana_ext_coastal_pacific = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_analysis_assim_extend_coastal_pacific.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ana_ext ], 20, 3) jobs.add_job( ana_ext_coastal_pacific, [ ana_ext_no_da ] ) if isfirstcyc: if int( cyc ) <= 18: thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 24 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') #the hot start files jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_extend_coastal_pacific_ana_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_extend_coastal_atlgulf_ana_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #input for extended AnA forcing jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' ) #first forcing for coastal extended AnA from the previous day extended AnA jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) #first discharge for coastal extended AnA from the previous day extended AnA jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend_no_da/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.CHRTOUT_DOMAIN1' ) #the rest will be generated in this run of extended AnA and forcing else: thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') #the hot start files jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_extend_coastal_pacific_ana_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_extend_coastal_atlgulf_ana_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_extend_no_da/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' ) #first forcing for coastal extended AnA from the previous day extended AnA jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) #first discharge for coastal extended AnA from the previous day extended AnA jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend_no_da/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.CHRTOUT_DOMAIN1' ) #the rest will be generated in this run of extended AnA and forcing # #there is no need to do this if cyc is 18z # if isfirstcyc and ( int( cyc ) == 19 or int( cyc ) < 18 ) : # thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) # thisjobtime -= timedelta( hours = 24 ) # #Coastal restart # #the hot start files # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' # f'hotstart_analysis_assim_extend_coastal_pacific_ana_' # f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' # f'hotstart_analysis_assim_extend_coastal_atlgulf_ana_' # f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) # # # #the forcing for the first cycle only # #the rest of the forcing is avaialble from extend AnA forcing job # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) # #the streamflow # #the first hour # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_no_da/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.CHRTOUT_DOMAIN1' ) # # # #need extended AnA forcing and streamflow here # # # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # #the forcing # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend/' ) # #the discharge # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend_no_da/' ) # # if isfirstcyc and int( cyc ) > 19: # thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) # #Coastal restart # #the hot start files # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' # f'hotstart_analysis_assim_extend_coastal_pacific_ana_' # f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' # f'hotstart_analysis_assim_extend_coastal_atlgulf_ana_' # f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) # # # #the forcing for the first cycle only # #the rest of the forcing is avaialble from extend AnA forcing job # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) # #the streamflow ## #the first hour # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_no_da/' # f'{thisjobtime.strftime("%Y%m%d%H%M")}.CHRTOUT_DOMAIN1' ) # # # #need extended AnA forcing and streamflow here # # # thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) # #the forcing # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend/' ) # #the discharge # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim_extend_no_da/' ) if cyc == '04' or cyc == '10' or cyc == '16' or cyc == '22': thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 4 ) forcing_medium = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_medium_range.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 45, 4 ) for mem in range(1, 7): medium_range = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ f'{mem:d}', [], 45, 5 ) jobs.add_job( medium_range, [ forcing_medium ] ) if mem == 1: medium_range_no_da = NWM_Job_Runner( nwdir, \ comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_no_da.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ], 10, 7 ) jobs.add_job( medium_range_no_da, [ medium_range ] ) medium_range_coastal_pacific = NWM_Job_Runner( nwdir, \ comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_coastal_pacific.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ '1', [ ], 0, 7 ) jobs.add_job( medium_range_coastal_pacific, [ medium_range ] ) medium_range_coastal_atlgulf = NWM_Job_Runner( nwdir, \ comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_coastal_atlgulf.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ '1', [ ], 0, 7 ) jobs.add_job( medium_range_coastal_atlgulf, [ medium_range ] ) medium_range_coastal_atlgulf_psurge = NWM_Job_Runner( nwdir, \ comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_coastal_atlgulf_psurge.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ '1', [ ], 0, 7 ) jobs.add_job( medium_range_coastal_atlgulf_psurge, [ medium_range ] ) forcing_medium_blend = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_medium_range_blend.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), None, None, 45, 4 ) medium_range_blend = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_blend.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ f'{mem:d}', [ ], 15, 5 ) jobs.add_job( medium_range_blend, [ forcing_medium_blend ] ) medium_range_blend_coastal_pacific = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_blend_coastal_pacific.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ], 0, 7 ) jobs.add_job( medium_range_blend_coastal_pacific, [ medium_range_blend ] ) medium_range_blend_coastal_atlgulf = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_blend_coastal_atlgulf.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ], 0, 7 ) jobs.add_job( medium_range_blend_coastal_atlgulf, [ medium_range_blend ] ) medium_range_blend_coastal_atlgulf_psurge = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_medium_range_blend_coastal_atlgulf_psurge.ecf' ), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ None, [ ], 0, 7 ) jobs.add_job( medium_range_blend_coastal_atlgulf_psurge, [ medium_range_blend ] ) if isfirstcyc: if int(cyc) > 0 and int(cyc) <= 4: thisjobtime = datetime.strptime( pdy+'00', "%Y%m%d%H" ) elif int(cyc) > 6 and int(cyc) <= 10: thisjobtime = datetime.strptime( pdy+'06', "%Y%m%d%H" ) elif int(cyc) > 12 and int(cyc) <= 16: thisjobtime = datetime.strptime( pdy+'12', "%Y%m%d%H" ) elif int(cyc) > 18 and int(cyc) <= 22: thisjobtime = datetime.strptime( pdy+'18', "%Y%m%d%H" ) else: thisjobtime = None if thisjobtime: jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') # #blend forcing # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/medium_range_blend/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # Medium range needs forcing files from 30 cycs ago # endtime = thisjobtime - timedelta( hours = 6) thisjobtime -= timedelta( hours = 30 ) while thisjobtime <= endtime: jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/medium_range/' f'{thisjobtime.strftime("%Y%m%d%H")}' ) thisjobtime += timedelta( hours = 6 ) #no da restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_no_da_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') #coastal medium range restart jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_atlgulf_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_coastal/' f'hotstart_analysis_assim_coastal_pacific_' f'{thisjobtime.strftime("%Y%m%d")}_{thisjobtime.strftime("%H")}00.nc' ) #coastal forcing # copy the first from AnA jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/analysis_assim/' f'{thisjobtime.strftime("%Y%m%d%H%M")}.LDASIN_DOMAIN1' ) #the first sycle jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/analysis_assim/' f'nwm.t{thisjobtime.strftime("%H")}z.analysis_assim.channel_rt.tm00.conus.nc' ) # # # #The MR open loop starts on the next cycle after MR. It is # #possible that the MR has already done if the cycle is # #1z, 7z, 13z or 19z. Therefore create a new no jobs and # #copy restart and forcing files. # # # # # # Medium range no da forcing files # # # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/medium_range_no_da/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # # copy the rest from medium range # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/nwges/nwm/medium_range/' # f'{thisjobtime.strftime("%Y%m%d%H")}' ) # # # # get discharge from COM # # # # # # # for i in range(1, 241): # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/medium_range_mem1/' # f'nwm.t{thisjobtime.strftime("%H")}z.medium_range.channel_rt.f{i:03d}.conus.nc' ) # jobs.forcingfiles.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/medium_range_blend/' # f'nwm.t{thisjobtime.strftime("%H")}z.short_range_blend.channel_rt.f{i:03d}.conus.nc' ) # if cyc == "08" or cyc == '14' or cyc=="20" or cyc == '02': thisjobtime = datetime.strptime( pdy+cyc, "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 8 ) for mem in range(1, 5): forcing_long = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_forcing_long_range.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), f'{mem}', None, 10, 8 ) long_range = NWM_Job_Runner( nwdir, comroot, \ jobs.get_ecf( '/ecf/test/jnwm_long_range.ecf'), \ thisjobtime.strftime("%Y%m%d"), \ thisjobtime.strftime("%H"), \ f'{mem}', [ forcing_long ], 45, 8 ) jobs.add_job( long_range, [ forcing_long ] ) # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_long_forecast/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_long_forecast/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') # jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' # f'{thisjobtime.strftime("%Y%m%d")}/restart_long_forecast/' # f'nwm.rst.{thisjobtime.strftime("%H")}/' # f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') if isfirstcyc: if int(cyc) > 2 and int(cyc) <= 8: thisjobtime = datetime.strptime( pdy+'08', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 8 ) elif int(cyc) > 8 and int(cyc) <= 14: thisjobtime = datetime.strptime( pdy+'14', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 8 ) elif int(cyc) > 14 and int(cyc) <= 20: thisjobtime = datetime.strptime( pdy+'20', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 8 ) elif int(cyc) > 20: thisjobtime = datetime.strptime( pdy+'18', "%Y%m%d%H" ) elif int(cyc) <= 2: thisjobtime = datetime.strptime( pdy+'18', "%Y%m%d%H" ) thisjobtime -= timedelta( hours = 24 ) jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'HYDRO_RST.{thisjobtime.strftime("%Y-%m-%d_%H:%M")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'RESTART.{thisjobtime.strftime("%Y%m%d%H")}_DOMAIN1') jobs.restarts.append( comroot + f'/com/nwm/v3.0/nwm.' f'{thisjobtime.strftime("%Y%m%d")}/restart_long_forecast/' f'nwm.rst.{thisjobtime.strftime("%H")}/' f'nudgingLastObs.{thisjobtime.strftime("%Y-%m-%d_%H:%M:%S")}.nc') return jobs class JobQueue: "Current jobs in the queues" def __init__( self ): """ Initialize the list jobs """ self.jobids=None self.update() def update( self ): try: proc = subprocess.Popen('qselect -u $USER', \ shell=True, stdout=subprocess.PIPE) except OSError as e: print( e ) raise RuntimeError( 'RuntimeError: Non-existent file!' ) except ValueError as e: print( e ) raise RuntimeError( \ 'Popen is called with invalid arguments!' ) self.jobids = proc.stdout.read().decode('utf-8').split() def getJobNames( self ): jobnames = [] for j in self.jobids: try: proc = subprocess.Popen(\ f'qstat -x -f -F json {j}', \ shell=True, stdout=subprocess.PIPE) except OSError as e: print( e ) raise RuntimeError( 'RuntimeError: Non-existent file!' ) except ValueError as e: print( e ) raise RuntimeError( \ 'Popen is called with invalid arguments!' ) output = proc.stdout.read().decode('utf-8') jobstat = json.loads( output ) jobnames.append( jobstat[ 'Jobs' ][ j ][ "Job_Name" ] ) return jobnames # #kill jobs except pre-conversion jobs # def killjobs( self ): jobnames = self.getJobNames() for j, n in zip( self.jobids,jobnames ): # don't kill pre-conversion jobs #if not re.match(r"nwm_.*_grib2netcdf_[0-2][0-9]_[0-2][0-9][0-5][0-9]", n ): print( "killjobs: " + n + " " + j ) try: proc = subprocess.Popen( f'qdel -W force {j}', \ shell=True, stdout=subprocess.PIPE) except OSError as e: print( e ) raise RuntimeError( 'RuntimeError: Non-existent file!' ) except ValueError as e: print( e ) raise RuntimeError( \ 'Popen is called with invalid arguments!' ) def print( self ): jobnames = self.getJobNames() for j, n in zip( self.jobids,jobnames ): print(j, n) def list_restart_files( jobslist ): restartfiles = [] for j in jobslist: for restartfile in j.restarts: if restartfile not in restartfiles: restartfiles.append( restartfile ) for f in restartfiles: print( f ) def list_forcing_files( jobslist ): files = [] for j in jobslist: for file in j.forcingfiles: if file not in files: files.append( file ) for f in files: print( f ) def check_required_files( jobslist ): files = [] missingfiles = [] for j in jobslist: for file in j.forcingfiles: if file not in files: files.append( file ) for file in j.restarts: if file not in files: files.append( file ) for f in files: if not os.path.exists( f ): missingfiles.append( f ) if missingfiles: print( "****Missing files: " ) for f in missingfiles: print( " * " + f ) print( "****Please copy missing forcing files and try again!" ) sys.exit( 0 ) # # The main program # if __name__ == "__main__": pgmopt = main(sys.argv[1:]) nwdir = pgmopt[0] comroot = pgmopt[1] pdy = pgmopt[2] cycle = pgmopt[3] queue = pgmopt[4] oneCyc = pgmopt[5] listrestarts = pgmopt[6] useecflow = pgmopt[7] machine = pgmopt[8] version = pgmopt[9] tag = pgmopt[10] print( nwdir, pdy, cycle, queue, oneCyc, listrestarts, version, tag ) onehour = timedelta( hours = 1 ) start_datetime = datetime.strptime( pdy+cycle, "%Y%m%d%H" ) pdy = start_datetime.strftime( "%Y%m%d" ) cyc = start_datetime.strftime("%H" ) conusjobs = NWM_Jobs.createConusJobs(nwdir, comroot, pdy, cyc, True ) tsjobs = NWM_Jobs.createTimeslicesJobs(nwdir, comroot, pdy, cyc ) hijobs = NWM_Jobs.createHawaiiPuertoricoJobs(nwdir, comroot, pdy, cyc, \ 'hawaii', True) prjobs = NWM_Jobs.createHawaiiPuertoricoJobs(nwdir, comroot, pdy, cyc, \ 'puertorico', True) akjobs = NWM_Jobs.createAlaskaJobs(nwdir, comroot, pdy, cyc, \ 'alaska', True) if listrestarts: list_restart_files( [ conusjobs, tsjobs, hijobs, prjobs, akjobs ] ) list_forcing_files( [ conusjobs, tsjobs, hijobs, prjobs, akjobs ] ) sys.exit(0) #check_required_files( [ conusjobs, tsjobs, hijobs, prjobs, akjobs ] ) jobq = JobQueue() cron_marker_start='################# START CRONTAB FOR NWM v'+ version + tag + \ ' #################' cron_marker_end='################# END CRONTAB FOR NWM v' + version + tag + \ ' #################' pre_conversion_cron_marker_start='################# START CRONTAB FOR NWM v' + \ version + tag + ' PRE-CONVERSION #################' pre_conversion_cron_marker_end='################# END CRONTAB FOR NWM v' + \ version + tag + ' PRE-CONVERSION #################' if not oneCyc: if not useecflow: ##remove crontab jobs #print( "crontab -l | cat - | sed '/" + # cron_marker_start + "/,/" + # cron_marker_end + # "/d' - | sed '/^SHELL/,/^PATH/d' - | sed '1i SHELL=\/bin\/bash\ -l\\nPATH=\$PATH:\/opt\/pbs\/bin' - | crontab - ") subprocess.check_call( "crontab -l | cat - | sed '/" + cron_marker_start + "/,/" + cron_marker_end + "/d' - | sed '/^SHELL/,/^PATH/d' - | sed '1i SHELL=\/bin\/bash\ -l\\nPATH=\$PATH:\/opt\/pbs\/bin' - | crontab - ", shell=True) # subprocess.call( "bkill -q " + queue + " 0", shell=True) jobq = JobQueue() jobq.killjobs() currenttime = \ datetime.strptime( datetime.now().strftime("%Y%m%d%H"), "%Y%m%d%H" ) #get the coastal AnAs # pre_coastal_ana_atl = conusjobs.find_job( \ conusjobs.get_ecf( "jnwm_analysis_assim_coastal_atlgulf.ecf" ), pdy, cyc ) pre_coastal_ana_pac = conusjobs.find_job( \ conusjobs.get_ecf( "jnwm_analysis_assim_coastal_pacific.ecf" ), pdy, cyc ) pre_coastal_ana_hi = hijobs.find_job( \ hijobs.get_ecf( "jnwm_analysis_assim_coastal_hawaii.ecf" ), pdy, cyc ) pre_coastal_ana_pr = prjobs.find_job( \ prjobs.get_ecf( "jnwm_analysis_assim_coastal_puertorico.ecf" ), pdy, cyc ) while start_datetime <= currenttime: print( "Working on PDY=", start_datetime.strftime("%Y%m%d"), " CYC=", \ start_datetime.strftime( "%H"), \ " Current time: ", currenttime.strftime("%Y-%m-%d %H:%M") ) #copy time slices from production run nudg_dir = comroot + "/com/nwm/v" + version + "/nwm." + pdy + \ "/merged_usgs_and_ca_timeslices" if not os.path.exists( nudg_dir ): os.makedirs( nudg_dir ) try: copy_tree( "/lfs/h1/ops/prod/com/nwm/v2.2/nwm." + pdy + \ "/merged_usgs_and_ca_timeslices", nudg_dir ) except Exception: pass tsjobs.start_jobs2( queue ) conusjobs.start_jobs2( queue ) hijobs.start_jobs2( queue ) prjobs.start_jobs2( queue ) akjobs.start_jobs2( queue ) if oneCyc: break currenttime = \ datetime.strptime( datetime.now().strftime("%Y%m%d%H"), "%Y%m%d%H" ) if start_datetime < currenttime: ana = conusjobs.find_job( \ conusjobs.get_ecf( "jnwm_analysis_assim.ecf" ), pdy, cyc ) hi_ana = hijobs.find_job( \ hijobs.get_ecf( "jnwm_analysis_assim_hawaii.ecf" ), \ pdy, cyc ) pr_ana = prjobs.find_job( \ prjobs.get_ecf( "jnwm_analysis_assim_puertorico.ecf" ), pdy, cyc ) ak_ana = akjobs.find_job( \ akjobs.get_ecf( "jnwm_analysis_assim_alaska.ecf" ), pdy, cyc ) while not ( conusjobs.hasJobDone( \ "jnwm_analysis_assim.ecf" if machine != 'ibm' \ else "jnwm_analysis_assim_ibm.ecf", pdy, cyc ) ) \ or not ( hijobs.hasJobDone( \ "jnwm_analysis_assim_hawaii.ecf" if machine != 'ibm' \ else "jnwm_analysis_assim_hawaii_ibm.ecf", \ pdy, cyc ) ) \ or not ( prjobs.hasJobDone( \ "jnwm_analysis_assim_puertorico.ecf" if machine != 'ibm' \ else "jnwm_analysis_assim_puertorico_ibm.ecf", pdy, cyc ) ) \ or not ( akjobs.hasJobDone( "jnwm_analysis_assim_alaska.ecf", pdy, cyc ) ): print( "=========================" ) print( "Waiting 60 seconds for pdy = ", pdy, \ ", cyc = ", cyc, " to be done, now time :", \ datetime.now().strftime( "%Y-%m-%d %H:%M" ) ) ana.printjob() hi_ana.printjob() pr_ana.printjob() ak_ana.printjob() print( "-------------------------" ) time.sleep( 60 ) start_datetime += timedelta( hours=1 ) pdy = start_datetime.strftime( "%Y%m%d" ) cyc = start_datetime.strftime("%H" ) conusjobs = NWM_Jobs.createConusJobs(nwdir, comroot, pdy, cyc ) tsjobs = NWM_Jobs.createTimeslicesJobs(nwdir, comroot, pdy, cyc ) hijobs = NWM_Jobs.createHawaiiPuertoricoJobs(nwdir, comroot, pdy, cyc, \ 'hawaii') prjobs = NWM_Jobs.createHawaiiPuertoricoJobs(nwdir, comroot, pdy, cyc, \ 'puertorico' ) akjobs = NWM_Jobs.createAlaskaJobs(nwdir, comroot, pdy, cyc, \ 'alaska' ) #get the coastal AnAs # coastal_ana_atl = conusjobs.find_job( \ conusjobs.get_ecf( "jnwm_analysis_assim_coastal_atlgulf.ecf" ), pdy, cyc ) coastal_ana_pac = conusjobs.find_job( \ conusjobs.get_ecf( "jnwm_analysis_assim_coastal_pacific.ecf" ), pdy, cyc ) coastal_ana_hi = hijobs.find_job( \ hijobs.get_ecf( "jnwm_analysis_assim_coastal_hawaii.ecf" ), pdy, cyc ) coastal_ana_pr = prjobs.find_job( \ prjobs.get_ecf( "jnwm_analysis_assim_coastal_puertorico.ecf" ), pdy, cyc ) # Add new dependencies # Coastal AnA must wait for the AnA in the previous cycle # conusjobs.add_job( coastal_ana_atl, [ pre_coastal_ana_atl ] ) conusjobs.add_job( coastal_ana_pac, [ pre_coastal_ana_pac ] ) hijobs.add_job( coastal_ana_hi, [ pre_coastal_ana_hi ] ) prjobs.add_job( coastal_ana_pr, [ pre_coastal_ana_pr ] ) coastal_ana_hi.addDependent( pre_coastal_ana_hi ) pre_coastal_ana_atl = coastal_ana_atl pre_coastal_ana_pac = coastal_ana_pac pre_coastal_ana_hi = coastal_ana_hi pre_coastal_ana_pr = coastal_ana_pr if not oneCyc: if not useecflow: ##remove pre-coversion crontab jobs # subprocess.check_call( "crontab -l | cat - | sed '/" + # pre_conversion_cron_marker_start + "/,/" + # pre_conversion_cron_marker_end + # "/d' - | crontab - ", shell=True) #install crontab jobs #subprocess.check_call( "crontab -l | cat - " + dir + "/doc/crontab.v1.1 | sed 's/q\\\\ dev\/ \//q\\\\\\ devhigh\/ \//g' - | sed '1,3d' - | crontab -", shell=True ) subprocess.check_call( "( crontab -l | cat - ; " "sed \"s/NWM vX\\\\.X/NWM v" + version + tag + "/\" " + nwdir + \ "/doc/crontab.v3.0 ) | sed " + \ " -e '/" + cron_marker_start + "/,/" + cron_marker_end + "/{s|-e \"s\|\\\\\\\$ECFTESTDIR|-e \"s\|^#PBS -q dev\|#PBS -q " + queue + "\|\" -e \"s\|\\\\\\\$ECFTESTDIR|g}'" + \ " -e '/" + cron_marker_start + "/,/" + cron_marker_end + "/{s|LOGDIR|" + comroot + "/com/logs|g}'" + \ " -e '/" + cron_marker_start + "/,/" + cron_marker_end + "/{s|ECFDIR|" + nwdir + "/ecf/test|g}' " + \ "- | crontab -", shell=True ) # "|\" " + \ # " -e 's|\.ecf|_ibm\.ecf|g' - " if machine == 'ibm' else " - " + \ # " | sed '1,3d' - | crontab -", shell=True ) else: subprocess.check_call("touch $HOME/ecflow/COMPLETEDGRIB2NETCDF", \ shell=True)