U  gX@sdZdgZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl m Z mZmZmZmZmZddlmZmZmZmZmZmZddl mZdgZGdddeZdS)a?!Contains the WRFCopyTask, which delivers WRF output and input. This module contains the implementation of WRFCopyTask, which delivers WRF input and output files. It maintains three sets of deliveries: * initial - files created before the wrf execution begins * wrfprod - files created by wrf during its execution * final - files created at the end of the wrf execution, or just after wrf ends The WRFCopyTask.run() and WRFCopyTask.runpart() monitor the WRF execution and copy files as needed. Examples: @code wrf=WRFSimulation(...) ... add domains to the wrf simulation ... sim=ExternalWRFTask(dstore,conf,section,wrf,...more stuff...) wrfcopier=WRFCopyTask(dstore,conf,"wrf_copier",sim, "{vit[stormname]}{vit[stormid3]}.{YMDH}.") wrfcopier.d_initial('namelist.input') wrfcopier.d_initial('wrfinput_d01') wrfcopier.d_initial('wrfbdy_d01') wrfcopier.d_initial('fort.65') fhr=0 while fhr-0.001<126: for product in runwrf.products(time=3600*fhr,stream="history"): wrfcopier.d_wrfprod(product,check=False) fhr+=3 wrfcopier.d_final('track_d03.patcf') wrfcopier.run() @endcode This example will monitor an external WRF simulation. It will deliver four files at the beginning of the simulation: namelist.input, wrfinput_d01, wrfbdy_d01 and fort.65. One file will be delivered at the end: track_d03.patcf. All three-hourly history stream files (wrfout_d...) will be delivered as they become available. The files will be delivered to com, prepending the storm name, id and cycle. For example, this file wrfout_d01_2016-08-14_09:00:00 will go here: /path/to/com/invest97l.2016081400.wrfout_d01_2016-08-14_09:00:00 due to the wrfcopier.d_wrfprod() call. All of the delivery happens in the last line of the code example when WRFCopier.run() is called. That call will not return until all files are delivered or the WRF model fails. WRFCopyTaskN)checkrunrunexebigexempimpirun) COMPLETEDRUNNING UNSTARTEDProduct FileProduct UpstreamFile)HWRFTaskcseZdZdZfddZeddZd ddZd d Zd!d d Z d"d dZ d#ddZ d$ddZ d%ddZ d&ddZd'ddZddZd(ddZd)ddZZS)*rz!@brief wrf file delivery task @details This is a Task that copies WRF input and output files from the WRF run directory to the COM directory.c sbd|kr|d|d<tt|j|||f|||_||_t|_t|_t|_ d|_ d|_ dS)aG!@brief WRFCopyTask constructor @details Constructor for the WRFCopyTask @param dstore the produtil.datastore.Datastore @param conf the hwrf.config.HWRFConfig @param section the section to use in conf @param wrftask the task that runs WRF. This should be an hwrf.wrf.ExternalWRFTask, or a subclass of hwrf.fcsttask.WRFTaskBase @param out_prefix output file prefix, a string suitable for passing into hwrf.config.HWRFConfig.strinterp() @param kwargs passed to the superclass constructoroutdircomFN) getdirsuperr__init___wrftask out_prefixlist_initial_wrfprod_final _ncks_path_could_not_find_ncks)selfdstoreconfsectionwrftaskrkwargs __class__9/lfs/h1/ops/prod/packages/hmon.v3.2.7/ush/hwrf/copywrf.pyrHs zWRFCopyTask.__init__cCsJ|jdkrD|jsD|dd}|js2tjjddd}|r>||_nd|_|jS)a`!@brief Returns the path to ncks. @details Returns the path to the ncks program, used to convert between NetCDF 3 and compressed NetCDF 4 file formats. Returns None if ncks cannot be found. This function will only search for ncks once, and will cache the result. Set self._ncks_path=False to force a recheck.Fncks) raise_missingT)rrgetexeprodutilfileopfind_exe)rr'r%r%r& ncks_pathvs zWRFCopyTask.ncks_pathNcs<|jsdStj|dkr"dS|fdd}|S)a !@brief creates and returns a compression_copier for deliver_file @brief Returns the object that should be sent as the "copier" argument to produtil.fileop.deliver_file() to copy the given source file. This is either None, or a function that calls ncks to compress NetCDF files. If a vsubset argument is present, the file is subsetted, retaining only the variables vsubset (a comma separated list). @param src the source file @param vsubset unused, but may one day be used to subset the file @returns None if the source file is not NetCDF. If it is NetCDF, then a copy(s,t,x) function is returned, suitable for passing to the copy argument of produtil.fileop.deliver_file()Ncs8tjj|dttdddd||fdkddS)Nloggerz-4z-L6-O /dev/nullr+r, remove_filerrstxr0r'r%r&copysz,WRFCopyTask.compression_copier..copyr.r+r, netcdfverlog)rsrcZvsubsetr;r%r:r&compression_copierszWRFCopyTask.compression_copiercs<|jsdStj|dkr"dS|fdd}|S)a3!@brief returns a decompression copier for deliver_file @details Returns an object that has the reverse effect of self.compression_copier. This will uncompress files that compression_copier copier would compress. NetCDF files will all be converted to 64-bit indexing NetCDF 3 files. @param src the source file @returns None if the source file is not NetCDF. If it is NetCDF, then a copy(s,t,x) function is returned, suitable for passing to the copy argument of produtil.fileop.deliver_file()Ncs4tjj|dttdd||fdkddS)Nr/z-6r2r3r4r6r:r%r&r;sz.WRFCopyTask.decompression_copier..copyr<)rr?r;r%r:r&decompression_copiers z WRFCopyTask.decompression_copiercCst|trtjt|j}ntjt|}tdd|}tdd|}|dkrbd|j |f}n|j ||||d}tj |j ||fS)aE!@brief get the full path to a file @details Generates a full path to the delivery location of the specified source file. Returns the full path and the basename in a tuple. @returns a tuple (path,basename) where the path is the full path to the file, and the basename is os.path.basename(path) @param orig the original filename @param destname optional: the desired destination name format which will be sent through hwrf.hwrftask.HWRFTask.confstrinterp() to generate the final destination filenamezq([0-9][0-9][0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[_.-]([0-9][0-9])[^/]*$z\1-\2-\3_\4:\5:\6z\1-\2-\3_\4:\5:00Nz%s.%s)ZinnameZ inname_colonZinname_colon_s00) isinstancer ospathbasenamestrlocationresubr confstrinterpjoinr)rorigdestnamebnZbn_colonZ bn_colon_s00Zfullbnr%r%r&comfiles( zWRFCopyTask.comfilecCs||j|||S)a'!@brief deliver a file generated before WRF starts @details Requests delivery of a file that is created before the wrf.exe invocation. The "inprod" may be a Product or a filename relative to the WRF run directory. The optional "check" argument enables calling the potentially expensive "check" subroutine on the upstream product every time it is considered for delivery. If the input is a Product, and check=False, then only the "available" subroutine is called, which will not be updated unless another Task marks the product as available. The default is check=False for Products and check=True for filenames. @param inprod the input produtil.datastore.Product @param check do we need to call the product's check() function? This is needed if it is a produtil.datastore.UpstreamFile @param destname optional: the destname argument to comfile() which will be used to generate the delivered filename in the COM directory.)_deliver_to_grouprrinprodcheckrMr%r%r& d_initialszWRFCopyTask.d_initialcCs||j|||S)a!@brief deliver a file generated at the end of the WRF run @details Requests delivery of a file created by WRF that is not complete until the WRF exits. Examples of this are the wrfdiag, hifreq and patcf files. These files will be delivered when the underlying WRF Task has a state of COMPLETED. The optional "check" argument enables calling the potentially expensive "check" subroutine on the upstream product every time it is considered for delivery. If the input is a Product, and check=False, then only the "available" subroutine is called, which will not be updated unless another Task marks the product as available. The default is check=False for Products and check=True for filenames. @param inprod the input produtil.datastore.Product @param check do we need to call the product's check() function? This is needed if it is a produtil.datastore.UpstreamFile @param destname optional: the destname argument to comfile() which will be used to generate the delivered filename in the COM directory.)rPrrQr%r%r&d_finalszWRFCopyTask.d_finalc Cs|j||d\}}t|tr.|}|dkrdd}n6tj|jj|}t|j |d|j f|d}||_d}t |j ||j |d} | || t |f|S)a!@brief internal function that arranges for future file delivery @details Do not call this function directly. It is the internal implementation of d_initial and d_final. Call those functions instead. @param group which group does this belong to? Should be self._initial, self._wrfprod or self._final @param inprod the input produtil.datastore.Product @param check do we need to call the product's check() function? This is needed if it is a produtil.datastore.UpstreamFile @param destname optional: the destname argument to comfile() which will be used to generate the delivered filename in the COM directory.rMNF %s-upstreamrprodnamecategoryrGT)rOrBr rCrDrKrrGrrtasknamer appendbool) rgrouprRrSrMrOcombnupstreamwrffileproductr%r%r&rP s(   zWRFCopyTask._deliver_to_groupFc Cst|ts$tdt|jt|f|j||d\}}t|tsttj |j j |}t |j|d|jf|d}||_ n|}t|j||j|d}||_ |j||t|f|S)a!@brief deliver a file generated during the WRF simulation such as a history or restart file @details Requests delivery of a WRF I/O subsystem output file. The "product" argument must be a Product object. The optional argument "check" enables calling the potentially expensive "check" subroutine on the product every time it is considered for delivery. If check=False, then only the "available" subroutine is called, which will not be updated unless another Task marks the product as available. @param product the produtil.datastore.Product @param check do we need to call the product's check() function? This is needed if it is a produtil.datastore.UpstreamFile @param destname optional: the destname argument to comfile() which will be used to generate the delivered filename in the COM directory.z/In d_wrfprod, check must be a bool, not a %s %srVrWrX)rBr] TypeErrortype__name__reprrOr rCrDrKrrGrrr[r rr\) rrbrSrMrOr_rar`Z outproductr%r%r& d_wrfprod+s*   zWRFCopyTask.d_wrfprodcCsLt|_|}|jtkrH|jd||dr|dtd|dqdS)a!@brief watch for files to show up, delivering them when they do @details Keeps watching for WRF files to show up, copying them when they do. This is just a simple wrapper around self.runpart, and does not return until runpart sets the state to something other than RUNNING. @param check_all if True, all non-delivered products have product.check() called on themF raise_allz Sleep 5...z ...done sleeping.N)r stater> run_helperinfotimesleep)r check_allrir0r%r%r&rOs   zWRFCopyTask.runc Csd}d}|}tj|d|j}|D]\}} } |dt|t| t| t|fd} | sl|sld} z| jsf|j} | s| s|r| |j} | rJ| dt | j ftj|tj |j } tjj| dd}z6|(|j }||}| j||d d }W5QRXWn6tjjk rF}z| d d }W5d }~XYnXn| d t |j | fd }Wq*tjjk r}z| dt| j fd }W5d }~XYq*tk r}z0d }|jdt| j t |fd d|rW5d }~XYq*Xq*||fS)a!deliver files to COM @protected This is an internal implementation function. Do not call it directly. Takes a list of tuples containing an upstream product, a downstream product to deliver, and a boolean telling whether to check() the upstream product. Delivers available products. Returns a tuple containing two booleans: the first is True iff something was delivered, and the second is true iff something is left in the group that has not been delivered yet. The check_all argument can be used to force a check on all products by setting check_all=True. @param group either self._initial, self._wrfprod or self._final @param check_all if True, run product.check() on all productsFlockdirz8COPYWRF ITEM: inprod=%s outprod=%s check=%s check_all=%sr(z or post has not posted it yetz%s: delivering.)filename max_tries)frominfocopierTzF Nope. Another process is delivering this file right now. Moving on.Nz%s: not yet available%s.z%s: lock held, move on.z%s: trouble delivering: %s )exc_info)r>rCrDrKrr[debugrf availablerSrmrFrGrEr+lockingLockFiler@deliverLockHeld Exceptionwarning)rr^rpri did_somethingZ more_to_dor0rqrRoutprodrSZ messagemorerylockfilelockerZifromrvlher%r%r& deliver_group_s    zWRFCopyTask.deliver_groupcCs8|jD]\}}}|q|jD]\}}}|q dS)z!@brief delete delivered files @details Calls the undeliver function on all products, deleting them from the destination. Product objects' undeliver() functions are called to achieve this. N)r undeliverr)rrRrrSr%r%r&unruns zWRFCopyTask.unruncCs|d|dS)aK!@brief deliver one output file and return. @details Delivers one output file and returns. Sets the state to COMPLETED if all files are delivered. @param check_all Optional. If True, forces a call to check() on all undelivered products, even if those products are not checked by default.TN)rl)rrpr%r%r&runpartszWRFCopyTask.runpartcCs|j|jj}|tkp |tk}|tk}d}d}|} | d|jjt|t|fd} d} d} |r|j|j ||d\} } | r|rdS||j |\} } | r|rdS|r||j |\} } | r|rdS| s| s| st|_| ddSdSdS)a!@brief internal implementation of run() and runpart() @details This is the internal implementation of run and runpart. It delivers files, and returns False if all files are delivered. @param runpart If runpart=True, run_helper() will return immediately after delivering one file. @param check_all Optional. If True, forces a call to check() on all undelivered products, even if those products are not checked by default.Fz0wrf task state=%d so started=%s and completed=%srhTznothing left to deliverN) r update_staterkr r r>rmrfrrrr)rrrprirkstarted completedZinitial_completeZparallel_completer0Z more_initZ more_paraZ more_finalrr%r%r&rlsH    zWRFCopyTask.run_helper)N)N)NN)NN)NN)FN)FF)FF)F)FF)re __module__ __qualname____doc__rpropertyr.r@rArOrTrUrPrgrrrrrl __classcell__r%r%r#r&rDs  .   #    $  A )r__all__rCrHrnprodutil.datastorer+produtil.lockingprodutil.fileop hwrf.hwrftaskhwrfhwrf.wrf produtil.runrrrrrrr r r r r rrrr%r%r%r&s3