U  g@sdZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddlZddl Z ddlZ ddlZ ddlZ ddddddd d d d d dddddgZGdddeZGdddeZGdddeZGdddeZGdddeZGdddeZedZedZdZdZdZdZdZ d Z!Gd!d d e"Z#Gd"dde"Z$Gd#dde"Z%Gd$ddeZ&Gd%dde%Z'Gd&d'd'e'Z(Gd(d)d)e'Z)d.d+d,Z*Gd-dde%Z+dS)/aQ!Stores products and tasks in an sqlite3 database file. This module maintains an sqlite3 database file that stores information about Products and Tasks. A Product is a file or group of files created by some Task. Both Product and Task classes derive from Datum, which is the base class of anything that can be stored in the Datastore.NDatumException DatumLockHeld InvalidIDInvalidOperationUnknownLocationFAILED UNSTARTEDRUNNINGPARTIAL COMPLETED Datastore TransactionDatumCallbackExceptionsProductTaskc@seZdZdZdS) FakeExceptionzf!This is a fake exception used to get a stack trace. It will never be raised outside this module.N__name__ __module__ __qualname____doc__rr?/lfs/h1/ops/prod/packages/hmon.v3.2.7/ush/produtil/datastore.pyrsrc@seZdZdZdS)rz:!Superclass of all exceptions local to produtil.datastore.Nrrrrrrsc@s(eZdZdZddZddZddZdS) rz3!Raised when a LockDatum is held by another Worker.cCs@t||_t||_t||_t||_t||_t||_dS)a!DatumLockHeld constructor. @param did the database ID of the datum whose lock is held @param owner the owner of the lock @param owninfo implementation-defined information about the owner @param ownseen last time the owner checked in @param ownlegacy length of time the lock is valid @param checktime suggestion of how often to check the lockN)strdidintownerowninfoownseen ownlegacy checktime)selfrrrrr r!rrr__init__s     zDatumLockHeld.__init__c CsPd|j|jtj|j|j|j|jtj|j|j|j|j|jf S)z1!Human-readable representation of this exception.zuCannot lock Datum %s at %d (%s) Was locked by worker %d (info %s) Last seen %d = %s Lock legacy time %d Time left: %d) rr!datetimeutcfromtimestampctimerrrr r"rrr__str__>szDatumLockHeld.__str__c Cs@dt|jt|jt|jt|jt|jt|jt|jfS)z&!String representation of this object.z%s(%s,%s,%s,%s,%s,%s)) typerreprrrrrr r!r'rrr__repr__Is zDatumLockHeld.__repr__N)rrrrr#r(r+rrrrrs  c@seZdZdZdS)rzV!Raised when a Datum or subclass receives a prodname or category name that is invalid.NrrrrrrOsc@seZdZdZdS)rz\!Raised when an invalid Datum operation is requested, such as delivering an UpstreamProduct.NrrrrrrQsc@seZdZdZdS)rz:!Raised when delivering data, but no location is provided.NrrrrrrSsz \A.*::.*\Zz \A.*\*\*.*\Zz**task**i c@sXeZdZdZdddZddZejdd Zd d Z d d Z ddZ ddZ ddZ dS)r ae!Stores information about Datum objects in a database. Stores and retrieves Datum objects from an sqlite3 database. Uses file locking workarounds for bugs in RedHat Enterprise Linux's sqlite3 library, which is compiled without proper locking. Has support for caching, and multithreading. Each object in this database has a type, a string location, an integer "available" parameter, and an arbitrary list of (key,value) metadata pairs. This object can safely be accessed by multiple threads in the local process, and handles concurrency between processes via file locking.NTc Cs||_||_d|_||_t|_t|_t|_ |d}|dk rV| d||ft j j ||dddd|_tt|_|}||W5QRXdS)a!Datastore constructor Creates a Datastore for the specified sqlite3 file. Uses the given logging.Logger object for logging messages. Set locking=False to disable all file locking. That is generally unwise, and should only be used when reading the database. That functionality is supplied, and critical, for monitoring another user's jobs. One cannot lock another user's file, so the "no locking" option is the only way to analyze the other user's simulation. @param filename the filename passed to sqlite3.connect @param logger a logging.Logger to use for logging messages @param locking should file locking be used? It is unwise to turn off file locking. @warning Setting locking=False will disable file locking at both the Datastore level, and within sqlite3 itself. This can lead to database corruption if two processes try to write at the same time. This functionality is provided for the rare situation where you are unable to write to a database, such as when reading other users' sqlite3 database files.Nz.lockzLockfile is %s for database %si,g?2)logger max_tries sleep_time first_warn)_loggerfilenamedb_lockingdict _connections threadingLock _map_lock_db_lockdebugprodutillockingLockFile _file_lock collections defaultdictlist _transtack transaction _createdb _connection)r"r5r0r@ZlockfileZtxrrrr#s(    zDatastore.__init__c Csftj}|jL||jkr2|j|W5QRSt|j}||j|<|W5QRSW5QRXdS)z\!Gets the current thread's database connection. Each thread has its own connection.N)r:current_threadidentr<r9sqlite3connectr5)r"tidcrrrrIs    zDatastore._connectionc cs,tj}|j|j|VW5QRXdS)z3!Gets the transaction stack for the current thread.N)r:rJrKr<rF)r"rNrrr_mystacks zDatastore._mystackcCs@|js dS|jz|jWn|jYnXdS)z3!Acquires the database lock for the current thread.N)r7r=acquirerBreleaser'rrr_locks  zDatastore._lockcCs"|js dS|j|jdS)z!Releases the database lock from the current thread. If the current thread does not have the lock, the results are undefined.N)r7rBrRr=r'rrr_unlocks zDatastore._unlockcCst|S)zq|sd|rT|j|ddd|d<d|d<|d |jfD]\}}} | ||<qt||_d S) a!Replace Datum metadata with database values, add new metadata to database. Given a Datum d, discards and replaces d._meta with the current metadata, location and availability. Will raise an exception if the product does not exist in the database. @param d The Datum. @param or_add If True, then any metadata that does not exist in the database is created from values in d.Fz9SELECT id, available, location FROM products WHERE id = ?Trqrr)r[rrsz0SELECT id, key, value FROM metadata WHERE id = ?N)r8rVrr}ru) r"rxrtfoundr[rrzr_r{r|rrrrwqs   zTransaction.refresh_metacCsT|dkr|d||jfn4|dkr<|dt||jfn|d|j||fdS)aL!Sets metadata key k to value v for the given Datum. Modifies the database entries for key k and datum d to have the value v. If k is location or available, then the product table will be updated instead. @param d The Datum @param k The metadata key. @param v The value, a string.rrz7UPDATE OR IGNORE products SET location = ? WHERE id = ?rqz8UPDATE OR IGNORE products SET available = ? WHERE id = ?z.INSERT OR REPLACE INTO metadata VALUES (?,?,?)N)rprr)r"rxr{r|rrrset_metas zTransaction.set_metacCs*|dkr|dkst|d|j|fdS)a!Delete metadata from the database. Deletes the specified metadata key k, which must not be "location" or "available". @param d The Datum whose metadata is being deleted. @param k The metadata key, which cannot be "available" or "location".rqrrz)DELETE FROM metadata WHERE id=? AND key=?N)rjrpr)r"rxr{rrrdel_metaszTransaction.del_metaN)r)r)T)T)rrrrr#rhrnrVrpr}r~rwrrrrrrr s     c@s<eZdZdZd?ddZddZdd Zd d Zd d ZddZ ddZ ddZ ddZ ddZ ddZddZee dddZee dddZee dddZeedddZee ddd Zeeedd!Zd"d#Zd$d%Zd&d'Zd(d)Zd*d+Zd@d-d.Zd/d0Zd1d2ZdAd3d4ZdBd5d6Z d7d8Z!d9d:Z"d;d<Z#d=d>Z$dS)Cra!Superclass of anything that can be stored in a Datastore. This is the superclass of anything that can be placed in a datastore. It has a category, a product name (prodname for short), a location, availability (an int) and arbitrary metadata (key,value) pairs. It caches database metadata in self._meta, which is directly accessed by the Datastore class. Cache data will be discarded once its age is older than self._cacheage.Nr.c Ks|t|t||_|_|_|jd|j|_t|_d|_|sJd|_||dkrdt |_ n t ||_ |dk rt||j d<d|j krt |j d|j d<t |_|j}||W5QRXdS)a!Datum constructor. Creates a Datum in the given Datastore dstore, under the specified category and product name prodname. The datastore id used is "category::prodname". The value for "cache" is the number of seconds to retain cached metadata before going back to disk to reread it. That only applies to data "get" operations: setting a data or metadata value will cause an immediate write to the database. Also, __contains__ ("var" in self) will force a fetch from the database if the requested metadata variable is not in the cached copy of the database. Values for location and meta are only the DEFAULT values, and will be ignored if other values are already set in the database. The location is only used if the product is not already in the database: its location will be set to the provided values upon entry. Similarly, the metadata is only set in this call if there isn't already metadata for the product with the given metadata keys. @param dstore The Datastore for this Datum. @param prodname The product name portion of the Datum ID @param category The category part of the Datum ID @param meta A dict of metadata values. @param cache Metadata cache lifetime in seconds. @param location The initial value for location, if it is not set already in the database. @param kwargs Ignored.z::r.Nrrrq)r_dstore _prodname _category_idtime _cachetime _cacheagevalidater8rurr:RLockrSrGr}) r"dstoreprodnamecategoryr[cacherrkwargsrYrrrr#s"      zDatum.__init__cCs|j|S)zI!Acquires this object's thread lock. This is used to manage cached data.)rSrQr'rrrrhs zDatum.__enter__cCs|jdS)z!Releases this object's thread lock. This is used to manage cached data. @param etype,evalue,traceback Exception information.N)rSrR)r"rkrlrmrrrrnszDatum.__exit__cCs8t|jrtd|jt|jr4td|jdS)z:!Validates this object's Datastore, prodname and category.z3%s: the prodname cannot contain a double colon (::)z3%s: the category cannot contain a double colon (::)N) _has_dcolonmatchrrrrr'rrrrs  zDatum.validatecCs|jS)z'!Returns the database ID of this datum.)rr'rrrgetidsz Datum.getidcCs|jS)z%!Returns the datastore of this datum.)rr'rrr getdatastoreszDatum.getdatastorecCs |jS)zF!Creates, but does not lock, a Transaction for this datum's datastore.)rrGr'rrrrGszDatum.transactioncCs t|jS)z!Returns the product type of this Datum. Returns the product type of this Datum. This is generally the name of the Python class that created the entry in the database.)r)rr'rrr getprodtypeszDatum.getprodtypecCs|jS)z2!Returns the product name part of the database ID.rr'rrr getprodnameszDatum.getprodnamecCs|jS)z6!Returns the product category part of the database ID.)rr'rrr getcategory szDatum.getcategorycCs|dS)z=!Returns the "location" field of this Datum's database entry.rrrr'rrr getlocation szDatum.getlocationcCs ||d<dS)z\!Sets the "location" field of this Datum's database entry. @param v the new locationrrNr)r"r|rrr setlocationszDatum.setlocationzthe product name (read-only)zthe category (read-only)z?Returns the prodtype for this Datum: its class name (read-only)z2Returns the database id for this Datum (read-only)z>Gets the Datastore object that contains this Datum (read-only)z)The location of this product (read/write)cCst|jt|jAS)z!Integer hash function.)hashrrr'rrr__hash__7szDatum.__hash__cCsd|j|jfS)z*!Human-readable description of this Datum.z %s with id %s)ryrr'rrrr(:sz Datum.__str__cCs$d|jt|jt|jt|jfS)z,!Python code-like description of this Datum.z %s(%s,%s,%s))ryr*rrrr'rrrr+=szDatum.__repr__cCs:t|tstSt|j|j}|dkr2t|j|jn|}|S)zg!Compares two Datums' prodnames and categories. @param other the other datum to compare againstr) isinstancerNotImplementedcmprr)r"otherrOrrr__cmp__As  z Datum.__cmp__c CsN|@t||jd<t||jd<|}||W5QRXW5QRXdS)z!Sets the location and availability of this Datum in a single transaction. @param loc the new location, a string @param avail the new availability, an intrrrqN)rrurrGr~)r"r_r^rYrrr set_loc_availHs  zDatum.set_loc_availFc Csp|jj}|j}|sBt|j}||jkrB|dks<||jkrB|jS|}||t|_W5QRX|jS)a!Requests or forces a cache update. This is the implementation of metadata/location/available caching. It returns self._meta if the cache has not aged out (and k, if provided, is in self._meta) or goes to the Datastore to update the cache, and then returns the resulting self._meta. This MUST be called from within a "with self". @param k The key of interest. @param force If True, forces a cache update even if the cache is not expired.N) rr4rrrrrurGrw)r"r{forcer0rZagerYrrr _getcacheRs    zDatum._getcachec Cs ||jddW5QRXdS)zJ!Discards all cached metadata and refreshes it from the Datastore.T)rNrr'rrrupdategsz Datum.updatec Cs.| ||}||W5QRSQRXdS)z}!Returns the value of the specified metadata key or raises KeyError. Forces a cache update if k is not in the cache.Nr)r"r{r[rrr __getitem__ls zDatum.__getitem__c Cs,||||W5QRSQRXdS)a!Return the value of a metadata key Returns the value of the specified metadata key or returns default if it is unset. Does NOT force a cache update if k is missing from the cache. To force a cache update, use __getitem__ instead. @param k The key of interest. @param default The value to return if no value is seen. @returns The metadata value or the default.Nrgetr"r{defaultrrrr[rs z Datum.metac Cs,||||W5QRSQRXdS)a!Alias for self.meta() Returns the value of the specified metadata key or returns default if it is unset. Does NOT force a cache update if k is missing from the cache. To force a cache update, use __getitem__ instead. @param k The key of interest. @param default The value to return if no value is seen. @returns The metadata value or the default.Nrrrrrr~s z Datum.getc Cs@|2|}||||W5QRX||j|<W5QRXdS)zb!Sets the value of the specified metadata key. @param k the key @param v the valueN)rGrru)r"r{r|rYrrr __setitem__s zDatum.__setitem__c CsZ|dkr|dkst|8|$}|||||jkrB|j|=W5QRXW5QRXdS)z|!Deletes the specified metadata key, which must not be "available" or "location". @param k the key to deleterqrrN)rjrGrru)r"r{rYrrr __delitem__s    zDatum.__delitem__c Cs<|dks|dkrdS|||kW5QRSQRXdS)a !Determines if a metadata key is set. @returns True if the specified metadata key is set, and False otherwise. Immediately returns True for 'available' and 'location' without checking the metadata cache. @param k the key of interestrqrrTNr)r"r{rrr __contains__szDatum.__contains__c cs|t|}W5QRXd|ks(td|ks4td|dfVd|dfV|D]"\}}|dkrX|dkrX||fVqXdS)zk!Iterates over all metadata (key,value) pairs for this Datum, including "available" and "location".rqrrN)r8rrjrv)r"r[r{r|rrr iteritemss  zDatum.iteritems)Nr.N)NF)N)N)%rrrrr#rhrnrrrrGrrrrrpropertyrrryrrrrrr(r+rrrrrr[rrrrrrrrrrsR 1    c@seZdZdZddZdS)rz!Exception raised when a Product class encounters exceptions while calling its callback functions in Product.call_callbacks.cCs:||_||_|D]}|dt|f7}qt|||dS)z!CallbackExceptions constructor. @param message The beginning of the exception message. Each exception's message will be appended to this. @param exlist The list of exceptions.z %sN)Z messagebaseexlistr Exceptionr#)r"messagerexrrrr#s zCallbackExceptions.__init__N)rrrrr#rrrrrscsreZdZdZdddZddZdddZd d Zd d Zd dZ ddZ ddZ e e e ddZ fddZZS)ra!A piece of data produced by a Task. A Product is a piece of data that can be produced by a Task. Once the product is available, self.available or self.is_available() will be True, and the self.location will be valid. The meaning of self.location is up to the implementer to decide, but it should be a full path to a location on disk for file products. As with all Datum objects, a Product also has arbitrary metadata.NcCsH|dkrt}nt|}||g}d|jkr8t|dt|j|dS)a!Adds a delivery callback function. Adds a delivery callback that is called when the product is delivered. This is intended to do such tasks as running an NCO dbn_alert, or copying to a website, or emailing someone. This function is only added in this local Python instance, not in the database file. Also, it is the responsibility of the subclasses to call self.call_callbacks() from self.deliver() to ensure the callbacks are run. Example: @code{.py} def callback(name,*args,**kwargs): print "My fancy product %s was delivered!"%(name,) product.add_callback(callback,[product.prodname]) @endcode @param callback The callback function, which must be able to take any keyword or indexed arguments. @param args The indexed arguments to send. @param states Presently unused.N _callbacks)rE__dict__setattrrre)r"callbackargsstatesZlargsZcalldatarrr add_callbacks zProduct.add_callbackcCsd|jkrdSt|jdkS)zT!Returns True if this Product has any callback functions and False otherwiserFr)rlenrr'rrr has_callbackss zProduct.has_callbacksc Csd|jkrdS|jsdS|dkr8t|jdkr8td}d}|}|jD]l\}}z|||WqJtk r}z6|dkrt}|||dk r|j t |ddW5d}~XYqJXqJ|dk rt d|j f|dS)a!Calls all delivery callback functions. Calls all data delivery callbacks for this Product. Collects any raised Exception subclasses until after all callbacks are called. Will raise CallbackExceptions if any exceptions are caught. Subclasses should call this from either check, or deliver, as appropriate for the product type. @param logger Optional: the logging.Logger for logging messages.rNrzprodutil.datastoreT)exc_infoz2%s: exceptions caught when delivering this product) rrrlogging getLoggerrrrErewarningrrr)r"r0rr[rrerrrcall_callbackss&   &zProduct.call_callbackscKs||jS)a!Asks the product to check its own availability and update the database. Checks to see if this product is available. This is generally not a cheap operation, as it can take seconds or minutes and may fail. One should call "available" instead if cached information is sufficient. @param kwargs Additional keyword arguments are unused. This is for use by subclasses.)rrqr"rrrrchecks z Product.checkcKs tddS)a+!Asks the Product to deliver itself. Delivers a product to its destination. This is not implemented by the base class. Note that this is generally an expensive operation which may take seconds or minutes, and may fail. It may involve copying many files, network access, or even pulling tapes from a silo. In the end, the location and availability are expected to be updated in the database. @param kwargs Unused, to be used by subclasses. @post available=True and location is non-empty.z1The Product base class does not implement deliverNrrrrrdelivers zProduct.delivercKs d|_dS)a!"Undelivers" a product. The meaning of this function is implementation-dependent: it could mean deleting an output file, or any number of other actions. Regardless, it should result in self.available=False or an exception being thrown. Note that this is generally an expensive operation that could take seconds or minutes, and may fail. The default implementation simply sets available to False. @post available=FalseFNrqrrrr undeliver+s zProduct.undelivercCstt||d<dS)z!Sets the availability to the specified value. Sets the "available" attribute of this Product in the database after converting the given value to a bool and then int (int(bool(val))). @param val the new availabilityrqN)rrXr"valrrr setavailable8szProduct.setavailablecCstt|dS)z!Is the product available? Returns the "available" attribute of this Product in the database, converted to a boolean value via bool()rq)rXrr'rrr is_available@szProduct.is_availablez7The availability of this product as a bool (read/write)cs,t|jrtd|jtt|dS)a!Validates this object's Datastore, prodname and category. Validates the Datastore, prodname and category of this Product. In addition to the requirements made by Datum, this function requires that the category not contain any double stars ("**").z7%s: Product categories cannot contain double stars (**)N) _has_dstarrrrrsuperrrr' __class__rrrLs zProduct.validate)NN)N)rrrrrrrrrrrrrrqr __classcell__rrrrrs      c@s$eZdZdZdddZd ddZdS) FileProductaF!A subclass of Product that represents file delivery. This subclass of Product represents a file that is delivered by this workflow. The deliver() subroutine actually copies the file, and undeliver() deletes it. The produtil.fileop.remove_file() and produtil.fileop.deliver_file() are used for this purpose.TNcCs*|j}|r |r tjj||ddd|_dS)a!Undoes the effect of deliver() Sets this Product's available attribute to False. If delete=True, will also delete the specified file. @param delete if True, the file is deleted @param logger a logging.Logger for log messagesT)r5r0infoFN)rrr?fileop remove_filerq)r"deleter0r_rrrr`szFileProduct.undelivercCst|tstdt|j|dk r@t|ts@tdt|j|}d}|dkrZd}|j}|dkrptd|jtj j |||||d|r| |dnd|_ |j |ddS) a/!Delivers the file to a destination. Delivers the file to a destination location specified. The origin is in the "frominfo" argument. Delivery is done by produtil.fileop.deliver_file. The keep, copier and logger arguments are passed on unmodified. @param location The new location. @param frominfo Where to get the file from. @param keep If True, the original file is always kept. If False, the original file may be moved to the destination instead of copied. @param logger a logging.Logger for log messages @param copier Passed to the copier argument of produtil.fileop.deliver_file() @post The file is at the location specified, and the database location and availability are updated accordingly.zlFileProduct.deliver requires a string filename as its frominfo argument. You provided an object of type %s.NzwFileProduct.deliver requires a location argument that is None or a string filename. You provided an object of type %s.TFzP%s: no location known when delivering product. Specify a location to deliver().)keepr0copierr0)rr TypeErrorr)rrrrrr?r deliver_filerrqr)r"rrfrominforr0rr_setlocrrrrks, zFileProduct.deliver)TN)NNTNN)rrrrrrrrrrrYs  rc@s,eZdZdZd ddZddZd ddZdS) UpstreamFilea!Represents a Product created by an external workflow. This subclass of Product represents a file that is created by an external workflow. It implements a check() call that determines if the file is larger than a minimum size and older than a minimum age. Once the file is large enough and old enough, it sets the location and availability. Any calls to undeliver() or deliver() raise InvalidOperation.NcCs|}d}|dkrd}|j}nt|ts8tdt|j|dk sDt|dkr\t|dd}|dkrtt|dd}t j j |||d s|j rd|_ dS|r| |dnd|_ |j|d dS) ak!Checks the specified file to see if it is available. Looks for the file on disk. Updates the "available" and "location" attributes of this Product. Uses two metadata values to decide whether the file is "available" if it exists: self["minsize"] - minimum size in bytes. Default: 0 self["minage"] - minimum age in seconds. Default: 20 Both can be overridden by corresponding arguments. Note that one must be careful with the minimum age on poorly-maintained clusters due to clock skews. @param frominfo Optional: where to look for the file instead of looking at self.location @param minsize Optional: the minimum file size in bytes @param minage Optional: the minimum file modification time in seconds. @param logger Optional: a logging.Logger for log messages.TNFz}UpstreamFile.check requires a frominfo argument that is either None or a string filename. You provided an object of type %s.minsizerminager-)min_size min_mtime_ager)rrrrrr)rrjrrr?r check_filerqrr)r"rrrr0r_rrrrrs.    zUpstreamFile.checkcCs d|_dS)zw!Undelivering an UpstreamFile merely sets the internal "available" flag to False. It does not remove the data.FNrr'rrrrszUpstreamFile.undelivercCs tddS)z!Raises InvalidOperation. You cannot deliver an upstream file. The upstream workflow must do that for you. Call check() instead to see if the file has been delivered.zEInternal error: the scripts attempted to deliver an upstream product.Nr)r"rrrrrrrszUpstreamFile.deliver)NNNN)NN)rrrrrrrrrrrrs )rcCs(|dkrdd}t|tr |g}t|tsFt|tsFtdt|jtt}|} t } |D]} t| ts`tdq`|dkrt}|dkrt}| dtt |ft | t |kr|| |krtt}|D]} | | krq| j s| | j rV| dt| jt| jf| | |dk r||| |f|} || | |f|q| dt| jt| j t| jfqtt}|| |krt | t |krtd t|| ||d } | d kr|j n|j}|d t| ft| |d q| dtt | tt |ft | S)a!Waits for products to be available and performs an action on them. Waits for a specified list of products to be available, and performs some action on each product when it becomes available. Sleeps sleeptime seconds between checks. Returns the number of products that were found before the maxtime was reached. @param plist A Product or a list of Product objects. @param logger A logging.Logger object in which to log messages. @param renamer Optional: a function or callable object that provides a new name for each product. This is passed the product, the logger and the contents of *renamer_args. Default: os.path.basename(p.location) @param action Optional: an action to perform on each product. This is passed the product, the output of renamer, the logger and the contents of *action_args. Default: perform no action. @param renamer_args Optional: arguments to renamer. @param action_args Optional: arguments to action. @param sleeptime - after checking availability of all products, if at least one is unavailable, the code will sleep for this much time before rechecking. Will be overridden by 0.01 if it is set to something lower than that. Default: 20 @param maxtime - maximum amount of time to spend in this routine before giving up. @returns the number of products that became available before the maximum wait time was hit. NcSstj|jS)N)ospathbasenamerr)plrrrz#wait_for_products..z;In wait_for_products, plist must be a list or tuple, not a z>In wait_for_products, plist must only contain Product objects.zWaiting for %d products.z&Product %s is available at location %sz4Product %s not available (available=%s location=%s).g{Gz?zSleeping %g seconds...zDone sleeping.z3Done waiting for products: found %d of %d products.)rrtuplerErr)rrrsetrrrqrr*rrraddmaxminr>floatsleep)Zplistr0renameractionZ renamer_argsZ action_args sleeptimeZmaxtimenowstartseenrnamesleepnowlogfunrrrwait_for_productssd           rc@seZdZdZd#ddZeddZddZd d Zd d Z ee edd Z eddZ ddZ ee dddZ ddZddZddZddZddZddZedd Zd!d"ZdS)$raj!Represents a process or actor that makes a Product. A Task represents some work that needs to be done to produce Products. A task has a state (stored in the "available" metadata attribute), a location, whose meaning is up to the implementer to decide, and a logger.Logger. As with all Datum subclasses, a Task also has arbitrary metadata.NcKs6|dkrt|}||_tj|f||td|dS)a!Task constructor Creates a new Task from the given dataset and with the given task name. @param dstore the Datastore where this Task will live @param taskname the task name, passed to the Datum as prodname @param logger a logging.Logger for this task to use for log messages @param kwargs other keyword arguments are passed to Datum.__init__N)rrr)rrr4rr# TASK_CATEGORY)r"rtasknamer0rrrrr#%s  z Task.__init__cCstjjS)a!returns the jlogfile logger. Returns a logging.Logger for the jlogfile. The jlogfile is intended to receive only major errors, and per-job start and completion information. This is equivalent to simply accessing produtil.log.jlogger.)r?logjloggerr'rrrjlogfile3sz Task.jlogfilecOstjjj|f||dS)a!same as produtil.log.jlogger.info() Sends a message to the multi-job shared log file at level INFO. @param message the message @param args positional arguments for string replacement @param kwargs keyword arguments for string replacement.N)r?rrr)r"rrrrrrpostmsg=sz Task.postmsgcCst||d<dS)aF!Sets the state of this job. Sets the job stat to the specified value. This works by setting the "available" attribute to the specified integer. For compatibility with other scripts, this should be FAILED, UNSTARTED, RUNNING, PARTIAL or COMPLETED. @param val the new job state, an intrqNrrrrrsetstateGsz Task.setstatecCs t|dS)z!Returns the job state. Returns the "available" attribute as an integer. This is used as the state of the Task. Typically, the return value should be one of: FAILED, UNSTARTED, RUNNING, PARTIAL, or COMPLETED.rqrr'rrrgetstatePsz Task.getstatez-The state of this Task as an int (read/write)cCsTt|d}|tkrdS|tkr$dS|tkr0dS|tkrsb` 3      ;> K