U  g v@sPdZdgZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl Z ddlZddlZddlZddlZddlZddlZddlZddlmZddlmZmZmZmZddlmZmZmZddlmZddl m Z m!Z!dd l m"Z"m#Z#m$Z$dd l%m&Z&m'Z'd d Z(d dZ)Gddde*Z+e*Z,Gdddej-j.Z/dS)z!Declares GRIBTask, which automates regribbing operations. This module contains the GRIBTask, which is an HWRFTask that runs the regribbing described in an hwrf.regrib.RegribMany object.GRIBTaskN)set_ecflow_meter)GRIBBaseGRIBOp RegribManyGRIB2Op) to_datetimeto_datetime_rel to_fraction) GribberError)TempDirNamedDir)Product COMPLETED UNSTARTED)runexecCsdS)z!Returns None.Nrrr:/lfs/h1/ops/prod/packages/hmon.v3.2.7/ush/hwrf/gribtask.py nonemakersrcCs ttS)zf!Creates a collections.defaultdict that has a default value of None for any key that has no value.) collections defaultdictrrrrrnonedictsrc@s eZdZdZddZddZdS) LockDummyz3!A Python "with" block construct that does nothing.cCsdS)z'!Do nothing at entry to a "with" block.Nrselfrrr __enter__%szLockDummy.__enter__cCsdS)zo!Do nothing upon exiting a "with" block. @param etype,evalue,traceback exception information to ignore.Nr)retypeevalue tracebackrrr__exit__'szLockDummy.__exit__N)__name__ __module__ __qualname____doc__rr rrrrr#srcseZdZdZd!fdd ZddZddZd"d d Zd#d d ZddZ d$ddZ d%ddZ ddZ ddZ ddZd&ddZddZdd ZZS)'ra!An hwrf.hwrftask.HWRFTask that performs regribbing operations. An HWRFTask that runs regribbing for a list of input/output times. This class keeps track of many different grids and grib files, each of which is given a name. The regribbing operations are specified in an hwrf.regrib.RegribMany object. It is possible for multiple jobs to run the same GRIBTask at the same time: the task uses lock files and a produtil.datastore.Datastore to communicate between them.Nc  s0|s ttt|j|||f| | dkr4tjj} | |_|} tj |||t |_ |j j |_ |j j|_|j j|_t|j } t|j|_t|_t|_tj ||||_|jD]B} tj | |j \}}d|j||f}tjj|j|| d|j| <q|dk r|j |_ ||_!|"||_#d|_$dS)aJ!Creates a new GRIBTask: @param dstore the produtil.datastore.Datastore to use @param conf the HWRFConfig to use for configuration options. This conf is passed down to the RegribMany during regribbing operations. @param section the config section to use. @param regribmany the hwrf.regrib.RegribMany that describes the regribbing operations @param start,end,step the start and end times and timestep for the list of times to regrib. @param atime the analysis time of the simulation. This is needed by some of the regribbing operations. Note that this analysis time may differ from the conf.cycle, especially for data assimilation first guess jobs. @param kwargs passed to the HWRFTask constructorNz%s.f%02dh%02dm)tasknamelogger)%has_deliveriesAssertionErrorsuperr__init__produtillog masterloggerhwrfnumerics TimeArrayr _regribstorestartendtimesteplenrrmake_time_array_list _deliveriesset _all_productslist _regribnames _subtaskstimes fcst_hr_minr% datastoreTaskdstoreatime_rm make_productsr&Z workerdesc)rrAconfsection regribmanyr2r3steprBr-kwargsr&ntimeZihriminr% __class__rrr*:s8        zGRIBTask.__init__cCstj|j|j|jS)z!Create a time-indexed array of None. Creates a new hwrf.numerics.TimeArray that maps from list of times to be processed to None.)r.r/r0r2r3r4rrrrmake_time_arrayyszGRIBTask.make_time_arraycCstj|j|j|jtS)a !Create an array of times to be processed. Returns a new hwrf.numerics.TimeArray for the list of times to be processed. Unlike make_time_array, this TimeArray will assume any times that don't have data assigned have an empty list().)r.r/r0r2r3r4r:rrrrr6szGRIBTask.make_time_array_listFcKs|dk s t|dkrdS||j|kr*dS|j||}|tkrDdSt|trr|j}|sj|rj||j}|srdS|d}| d||f|S)a!Returns intermediate results. Used by RegribMany to retrieve intermediate results. The extra update flag is not used by RegribMany, but rather used by GRIBTask.run() to force a recheck of the database to see if another GRIBTask has completed a regribbing operation by sending update=True. @param name the name of the result type @param time the time at which the result is requested @param update if True, call update() on the result @param kwargs unusedN %Y%m%d.%H%M%Szrecalling result - %s %s) r(r1SHOULD_BE_MADE isinstancer availableupdatestrftimer,info)rnamerKrTrIgotavstrtimerrrget_datas$   zGRIBTask.get_datac Ks*|dk s t|dkr|}|d}|dkr2dS|d||f||j|ksVt|j||}t|tjjr||j k}|j d||t |t |j t |f|j ||d||j kr&|j ||} | D]0} | |kr|j d||| j f| j |dqn |d||f||j||<dS) a!Stores intermediate results. Used by RegribMany to store intermediate results. Will also deliver any output products. @param name the name of the result being set @param data the data, any implementation-defined object. If this is a product that is delivered, it is sent to the frominfo= argument of Product.deliver() @param time the time at which the result is set @param logger a logging.Logger to use for logging messages @param kwargs ignoredNrPzresult obtained - %s %szIresult obtained - %s %s: deliver (keep=%s) destination = %s frominfo = %s)frominfokeepz%s %s: also deliver to %s)r\z,result obtained - %s %s: non-product result.)r(r,rUrVr1rRr+r?rr7r-reprlocationdeliver) rrWdatarKr&rIrZZmydatar]productsproductrrrset_datas:     zGRIBTask.set_datac Cs|j}tt}t}t}t}t}|}|jD]2\}} ||t | t rb||q:||q:|j D]\}} ||qxt} |j D]F\} }} } }}|| | | | ||g|| kr| ||j |q|j}|j}|jD]}|d}|D]:}|jj|||dr|d||ft|j||<q|D]B}|jj|||dslqN|d||f||krtjj}ntjj}d}||kr||}|dkst|dkrL||||||d}|j|||j||<|dk r|D]F\} } } }}|||||| | | |}|j||j|| |qnD|d\} } } }}|||||| | | |d }|j|||j||<qN|j|D]}|d ||fqqdS) z!Creates Product objects. Called automatically from run(), this creates Product objects for all output products. Note that this is potentially an expensive function if the gribtask has a large number of products.rPrKtaskz!%s: make non-file %s at this timez%s: make file %s at this timeN)catr)wherelocrhprodz%s: final list contains: %s)r%rrr:r8r,rCGRIBOpsaddrRr nonGRIBOps deliveriesappendr; _make_productr1r=rU input_validdebugrQr.regrib GRIB2Product GRIB1Productr5r9r7keys)rcategoryroZgrib2sZgrib1sZallsZnongribr&rWopZ regribsetrirjrhrkr]r%Z make_productrKfcststrZptypeZdlistrcrrrrDs                   zGRIBTask.make_productsccsZ|dk r|s|dk r |j|}|dk rl||j|kr:dS|j||}|dkrTdS|rd||fVq|Vn<|j|D],\}}t|tjjrz|r||fVqz|Vqzn|jD]}|dk r||j|krdS|j||d}t|tjjrT|r ||fVn|Vq|j|D]2\}}t|tjjr |rJ||fVn|Vq qdS)a!Iterates over products: Loops over all products that meet the specifications, yielding each one in turn. @param name the name of the products (same name as in the RegribMany). @param time the forecast time of the product @param yieldtime if True, instead of iterating over products, this will iterate over tuples containing the time and product.N) r1neartimeitemsrRr+r?r datatimesget)rrWrKZ yieldtimer{foundrcrtimerrrrbs>         zGRIBTask.productsc Cs|dkr d}|dkr|j}nd|kr0|||}|dkrV|dksDtd|j||f}nd|krj|||}|dkrd||f}nd|kr|||}tj|j||}|dk r|dkst|dk r|dkst|dk r|dkst||j|||dS)a]!implementation of make_products The underlying implementation of make_products, this makes one product. @param cls the class to create. This is a constructor which will be called like so: @code cls(self.dstore,category=cat,prodname=prod,location=loc) @endcode @param name the name of the product from the RegribMany @param fcststr a string representation of the forecast time, used for logging and product location generation @param time the forecast time @param where "com" or "intercom". Default: "intercom" @param loc the product location. A reasonable default is chosen if this is unspecified @param cat the product category. Default: self.taskname @param prod the product's prodname. Default: name.Nintercom{z%s/%s.%sz%s/%s)rxprodnamer_) r%conftimestrinterpr(ospathjoinrEgetdirrA) rclsrWrzrKrirjrhrkrrrrqHs(    zGRIBTask._make_productc Cs|}|j|jD]>}t|tjjs@|dt |ft|tjjsRt t |_ q|j D]}|js~|d|jfq`|j}|dkrtj|s|d|j|f|q|d|j|fq`|d|jfd|_q`t |_ W5QRXdS) aH!Marks all tasks as unstarted and products as unavailable. Marks this task and all subtasks as incomplete, ensuring that the next call to run or runpart will attempt to produce any products that are not delivered. All products that are "available" but are not on disk are marked as unavailable.z:Somehow ended up with a non-Task in the array of tasks: %sz%s: not availablerz%s: no file at %sz%s: already at %sz%%s: no location, but set to availableFN)r,rA transactionr<rRr+r?r@criticalr^r(rstater9rSrVdidr_fileop isnonemptywarning undeliver)rr&rfrcrjrrr uncompletess6     zGRIBTask.uncompletecCs|}|jdd}d}d}|jD]r}|d7}|jr&|d7}|jr|rn|d|j|jf|q|d|j|jfq&|d|jfq&|jd|jd|||fd S) z[!Calls the Product.call_callback function for all completed and delivered products.zDCalling all callbacks for products that have already been completed.rrgzC%s: Calling all callbacks for this product. Product location is %szD%s: Already completed, but has no callbacks. Product location is %sz<%s: Product has no location. I will not call its callbacks.z6Done calling callbacks for already completed products.zTCalled %d callbacks out of %d completed products, of %d total products for this job.N) r,r-rVr9rSr_ has_callbacksrcall_callbacks)rr&Z ncompleteZntotalZncalledrcrrrcall_completed_callbackss2      z!GRIBTask.call_completed_callbacksc Csd|jP|jD](}|jdks$t||jdkstq|jD] }|qBt|_ W5QRXdS)zY!Deletes all output products, and marks this task and all subtasks as incomplete.rN) rArr9r_r(rr<unrunrr)rrcrfrrrrs    zGRIBTask.unrunTc' Cs|r dS|}|r"|d|dd}|dd}tjtj | d|j tj | d|j } tjj| |d tjj|j |d t d d } d } |jD]"} | d } |d|j | fqd}d}d}d}d}t}d}d}| dkrtjd|sR|r|n|}|r*|jjn|j}|d|ft||dd}|}t}|j`|jD]N} |j| }|s||s||| ft||krvqƐqvW5QRX||krl|jdd dd|D|rl|rl|d \}}||j j!}t"t#$t%|d}|dkrH|d8}n|d8}|d krdt&||~~~~t|d kr|jdq|D]\}} | d } |d| f|r|d| fq|j'(}||_)d}d}|j'*D]L}|| f}|| krH| |dkr0|+d | |fqn|d!| || |f|j| } |d"| |f|| kr|d#| |fqz|,|| dk r|d$|| fWq|j-|| |d%sd&| |f}!||!d}d}|r|rt.|!Wq|d'|| ftj | d(| |f}"tj/j0|"dddd)}#d}|#|j,|| dd*dk rz|d$|| fW5QRWqt1d+|j | |f|j dd|d,|2|r|d-| |fn|jd.| |f|j-|| |d%r|j3|| |d%|,|| dkr,d/|| f}!|4|!d}|r,t.|!W5QRXW5QRXWntjj5k r~}$z|d0|| fW5d}$~$XYntj/j6k r}%z|d1|| fd}W5d}%~%XYnt7k r<}&zd|j4d2|| t8|&fdd3|r| d7} | |d| |<| |dkr(|9d4| |fd}W5d}&~&XYnXq|rl|d5| ft:|_;|r|dSn|d6| f|+d7| fqqt:|_;dS)8a!Internal function that underlies run() and runpart() Called from run() and runpart() to perform the actual work. @param one True for runpart(), False for run(). If True, exits after one product is processed. @param worker obsolete and ignored @param raiseall If true, nearly all exceptions are raised. @param now If true, the function will not sleep or wait.Nz$Will re-raise any caught exceptions. waitsleep readysleeprlockdir)r&cSsdS)Nrrrrrrz%GRIBTask.run_helper..rrPz%s: will process time %sTFdz4Gave up: more than 100 regribbing operations failed.z+Sleeping %d s while waiting for work to do.zDone sleeping.zConsidering these times: %sz, cSsg|]\}}|dqS)z %Y%m%d-%H%M%S)rU).0strrr sz'GRIBTask.run_helper..irgzMNo subtasks incomplete. I think I am done running. Will exit regribber now.z%s: examine this time...z%s: already done.z#%s %s: skip: failed too many times.z%s %s: fail count is %dz%s: consider product %sz%s: no %s product at this time.z%s %s: already donerez%s: %s: not ready yet.z%s %s: will need lockz%s.%s)filenamer& max_tries giveup_quiet)rTz %s/%s.%s.)prefixdirr] keep_on_errorr&z%s: %s: calculate this grid.z%%s: %s: process this grib1/2 product.z6%s %s: somehow regribber.make did not deliver anythingz;%s %s: internal error: product does not exist at this time.z%s %s: lock held, move on.z%s %s: failed with exception %s)exc_infoz%s %s: failed too many timesz %s: done.z %s: not done.z%s: on to next time....)< is_completedr,rVconfintr+rmakedirsrrrrr%workdirrrr1r=rUrr. exceptions PostFailedr-rKsleepr:rArr<rTrpr5rEcycleintmathceilr rrCcopy_datanamesrsr[is_readyr lockingLockFiler is_gridmakerNoProductErrorLockHeld Exceptionstrerrorrr)'roneZworkerraiseallnowZ ecflow_meterr&rrrZ fail_countsZ all_failsrrZZfirst_nZfirst_time_throughZnotreadyZlast_nincompleteZ nincompletedummyZattempted_something sleeptimeZloggifysubtaskrrZmeterdtZmeterfhr regribberZthisdonerWZfailkeyZrstmsglockfilelockerZnpelherrr run_helpers                                       zGRIBTask.run_helperc Ks&t|j|jd|W5QRXdS)ay!Performs all regribbing, returning when complete. Runs all regribbing. Does not return until all regribbing is complete, or a fatal error happens. It is safe to run this in multiple threads at the same time. Through file locking and database usage, the jobs will work together. @param kwargs keyword arguments passed to run_helper()FN)Fr rrrrIrrrrs z GRIBTask.runc Ks&t|j|jd|W5QRXdS)zl!Performs a small amount of work and returns. @param kwargs keyword arguments passed to run_helper()TN)Trrrrrrunparts zGRIBTask.runpart)NN)NF)NN)NNF)NNNN)FNTFF)r!r"r#r$r*rOr6r[rdrDrbrqrrrrrr __classcell__rrrMrr0s. ?  (N . +"!  C )0r$__all__ros.pathrKsocketrerrprodutil.datastorer+ produtil.cdprodutil.fileop produtil.log hwrf.postr.hwrf.exceptions hwrf.regrib hwrf.hwrftask hwrf.confighwrf.storminfo hwrf.numericsprodutil.ecflowrrrrrrr r r r r rrr produtil.runrrrrobjectrrQhwrftaskHWRFTaskrrrrrs$8