U `gj7@sxdZddgZddlZddlZddlZddlZddlZGdddeZ Gddde Z dd Z e e Z Gd dde ZdS) zX!Contains the WorkPool class, which maintains pools of threads that perform small tasks.WorkPool WrongThreadNc@seZdZdZdS)rz!Raised when a thread unrelated to a WorkPool attempts to interact with the WorkPool. Only the thread that called the constructor, and the threads created by the WorkPool can interact with it.N)__name__ __module__ __qualname____doc__rr>/lfs/h1/ops/para/packages/hafs.v2.0.7/ush/produtil/workpool.pyr sc@sreZdZdZdddZeddZddZd d Zd d Z ee eed Z ddZ ddZ ddZ ee e e dZdS)WorkTaskz!Stores a piece of work. This is an internal implementation class. Do not use it directly. It stores one piece of work to be done by a worker thread in a WorkPool.NcCs.||_d|_d|_|dkr tnt||_dS)z!Create a WorkTask whose job is to call work() @param work the function to call @param args the arguments to workFN)work_WorkTask__done_WorkTask__exceptionlist_WorkTask__args)selfr argsrrr __init__szWorkTask.__init__cCs|jS)z#!The arguments to the work function)rrrrr r#sz WorkTask.argscCsd|_||_|jS)z!Sets the exception that was raised by the work function. Sets the done status to False. @returns the exception @param e the exception.Fr r )rerrr _set_exception(szWorkTask._set_exceptioncCs d|_dS)z|q|||n4t||}|j|| dt |f|j dS)a!Adds a piece of work to be done. It must be a callable object. If there are no worker threads, the work() is called immediately. The args are passed, if present. @param work a callable object @param args a list of arguments to the work functionRIn WorkPool.add_work, thread %s is not the master thread and is not a work thread.Nz Added work %s) r&r.r/rstrr9r r%appendrRreprr(release)rr rrOZworktaskrrr add_works     zWorkPool.add_workcCstj|j S)zV!Return True if worker threads should keep running, False if they should exit.)r>r?checksigdierrrr _worker_exit_checks zWorkPool._worker_exit_checkcCs4t}||jkrdS|jD]}||krdSqdS)zw!Returns True if this is the thread that called the constructor or any worker thread. Returns False otherwise.TF)r&r.r/r-)rrOtrrr _valid_threads  zWorkPool._valid_threadc Cst}|s"tdt|f|r|j}|j}t|t j sFt d}zf| d| |}|tkr~| dWdS| d|jr|j}|j|n|d|_Wq"tk r}z$|dk r||_|jdddnW5d}~XYq"Xq"dS) z9!Main function for worker threads. Do not call directly.zVIn WorkPool._worker_main, thread %s is not the master thread and is not a work thread.NzReady for work. terminatez ... working ... Tz ...failed.rS)r&r.rerr\rcr(r%r=r#r$AssertionErrorrRacquirepopleft TERMINATErr r!r6r rW)rrOwswqr rrrrr _worker_mains<     zWorkPool._worker_mainc Cs|dks tt|tst|dkr&dSt}||jkrJtdt|fd|_t |D]}|j |j d}d}z6dd}tj ||gd}|j |||_ |WnTttfk r}z2||j kr|jd t|d d |j |W5d}~XYnXW5QRXqXdS) z!Starts n new threads. Can only be called from the thread that made this object. @param n number of threads to start, an integer greater than 0rN=In WorkPool.kill_threads, thread %s is not the master thread.Fr[cSs |dS)N)rm)arrr doit(sz$WorkPool.start_threads..doit)targetrzERROR: TrS)rgr=intr&r.r/rr\rbranger1r2Threadr-addstartr6r7rWremove)rnrOitidrHrprrrr r5s6        zWorkPool.start_threadsc Cst}||jkr$tdt|fd|_|j}|j}|jt |j }|D]<}t |t j svtdt|jt|f|t|qL|D],}|dt|ftj||qW5QRX|ddS)z]!Kills all worker threads. Can only be called from the thread that made this object.rnFz4self._work_queue should be a deque but it is a %s %szKill worker thread %szDone killing worker threads.N)r&r.r/rr\rbr%r(r1r)r-r=r#r$ TypeErrortyperr^ appendleftrjr_rRr>pipelinekill_for_threadjoin)rrOrlrkkillmerHrrr r88s4      zWorkPool.kill_threadsc Cs4|jdkrdS|s(tdt|ft}||jkr|d|j|dt t |j D]}| |j qd|dt |jt |j krtdq|j|j|jW5QRXW5QRXn^|d|j D]:}||kr|j|j||jW5QRXdSqtd t|fdS) aJ!Waits for all threads to reach the barrier function. This can only be called by the master thread. Upon calling, the master thread adds a WorkTask for each thread, telling the thread to call self.barrier(). Once all threads have reached that point, the barrier returns in all threads.rNrZzBARRIER (master)zRequest barrier on all threads.z&Wait for all workers to reach barrier.g{Gz?zBARRIER (worker)zSIn WorkPool.barrier, thread %s is not the master thread and is not a worker thread.)r9rerr\r&r.r/rRr1rsrYr-r`r;r*timesleepr, notify_allclearruwait)rrOryrHrrr r;UsB           zWorkPool.barrier)NF)F)F)N)rrrrrr:rGr<rPrRrWrArr9r`rcrermr5r8r;rrrr rVs$      %")r__all__r&r#rprodutil.pipeliner>produtil.sigsafetyr6robjectr r"rjrrrrr s<