U gf@sZdZddddddgZGdddeZd d lZd d lZd d lZd d lZd d lZd d l Z d d l Z d d l Z d d l Z d d l Z d d lZd d lZd d lZd d l Z d d l Z Gd d d eZe ZeaedZedZd$d dZd%ddZd&ddZddZddZd'ddZd(ddZddZ e!eZ"d Z#ddZ$ddZ%d)ddZ&d*d d!Z'Gd"d#d#eZ(d S)+a!Internal module that launches and monitors processes. Do not use this module directly: it is part of the internal implementation of the produtil.prog and produtil.run modules. It converts a produtil.prog.Runner object to processes, and monitors the processes until they exit, sending and receiving data as needed. This replaces the built-in "subprocess" module which is not capable of general-purpose pipeline execution.launchmanagePIPEERR2OUTkill_allkill_for_threadc@seZdZdZdS)NoMoreProcesseszz!Raised when the produtil.sigsafety package catches a fatal signal. Indicates to callers that the thread should exit.N)__name__ __module__ __qualname____doc__r r >/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyrsrNc@s*eZdZdZd ddZddZddZdS) Constantz+!A class used to implement named constants.NcCs,||_|dkr"dt|t|f}||_dS)z!Creates a named constant @param s the return value of __str__() = str(self) @param r the return value of __repr__() = repr(self)NzConstant(%s)@0x%x) _Constant__sreprid _Constant__r)selfsrr r r __init__szConstant.__init__cCs|jS)z+!Returns the s argument to the constructor.)rrr r r __str__#szConstant.__str__cCs|jS)z+!Returns the r argument of the constructor.)rrr r r __repr__&szConstant.__repr__)N)rr r r rrrr r r r rs rcCst|tjd|dS)a+!Attempts to modify the given stream to be non-blocking. This only works with streams that have an underlying POSIX fileno, such as those from open. Will re-raise any exception received, other than AttributeError and EnvironmentError. Hence, I/O errors and attempts to make a non-fileno stream non-blocking will produce a False return value, while anything else will raise an exception. @param stream the stream to unblock @param logger a logging.Logger for log messages @returns True on success, False otherwise.rN) call_fcntrlos O_NONBLOCK)streamloggerr r r unblock9s r c Cszt|tr|}n|}WnRttfk rn}z0|dk rX|jdt|t|fddWYdSd}~XYnXz.t|tj }t|tj ||B|@WdStk r}z0|dk r|j dt|t|fddWYdSd}~XYnXdS)z!Internal function that implements unblock() @param stream the stream to modify @param on flags to turn on @param off flags to turn off @param logger a logging.Logger for messages @returns True on success, False otherwise.Nz?%s: stream has no fileno, cannot switch to non-blocking I/O: %sTexc_infoFz)%s: cannot switch to non-blocking I/O: %s) isinstanceintfilenoAttributeErrorEnvironmentErrorwarningrstrfcntlF_GETFLF_SETFLerror)ronoffrfdeeflagsr r r rHs0  rc CsPtBt\}}t|tjd|t|tjd|||fW5QRSQRXdS)z!Creates a pipe that will be closed on exec. Except that it does not seem to be reliably closed on exec, so there are other workarounds in this module. @param logger a logging.Logger for log messagesrN)plockrpiperr* FD_CLOEXEC)rZp1Zp2r r r r4es  r4c Cstt|W5QRXdS)zZ!Adds a file descriptor to the list to close before exec. @param p the file descriptorN)r3pipes_to_closeadd)pr r r paddpsr9c CsXtJzt|Wn"tk r6}zW5d}~XYnX|tkrJt|W5QRXdS)zv!Closes a file descriptor, removing it from the list that must be closed on exec. @param i the file descriptorN)r3rcloser'r6remove)ier r r pclosevsr>c Cs^tPtD]<}||kr ||kr ||kr |dk r<|d|t|q tW5QRXdS)z!Closes all file descriptors sent to padd. @param i my stdin, which should not be closed @param o my stdout, which should not be closed @param e my stderr, which should not be closed @param logger a logging.Logger for debug messagesNzIn child, close old pipe fd %d)r3r6debugrr:clear)r<or=rr8r r r pclose_alls rBFc Cs|dk rt|tstd|dkr*tdd}d}d} d} d} d} t|d} |rh| dt|f|tkrt | \}}|r| d||fn|}|tkrt | \} } |r| d| | fn|} |tkrt | \} } |r| d | | fn|t k r|} t }|dkst |dkr|dk rR|tk rR|rJ| d |t||tkr|dk r|rz| d |t|t||dk r|tk r|r| d |t||tkr| dk r|r| d | t| t| |dk r,|tk r,|t k r,|r$| d|t||tkrd| dk rd|rT| d| t| t| |r| dt||| | f||| | fSt|trt |t|||d|dk r|r| d|t||dk r |r| d|t |dt|| dk r2|r*| d| t| | dk rd|rP| d| t | dt| |t kr|r~| dt dd| dk r|r| d| t| | dk r|r| d| t | dt| |r| dttjtjttjtjttjtjttjtjttjtjttjtj|ds`t z\|r| d|dd|ddf|dkrt |d|nt |d||WnLtk r}z,| jd|dt|fdd t !dW5d}~XYnXdS)!a!Starts the specified command (a list), with the specified environment (or None to copy this process's environment). @param stdin,stdout,stderr Specifies the stdin, stdout and stderr streams. The special value PIPE means "make a pipe," and sending stderr=ERR2OUT requests redirection of stderr to stdout. @param cd The optional "cd" argument specifies a directory to cd into, in the child process, before executing the command. Of course, you shouldn't care about any of this because you should be using the produtil.run package. @param cmd the command to run @param env the subprocess's environment, or None to use mine @param debug if True, send debug messagesNz8In produtil.pipeline.launch, cd must be a string or Nonez=In produtil.pipeline.launch, cd must not be the empty string.rzStart %szPipe for stdin: %d<==%dzPipe for stdout: %d<==%dzPipe for stderr: %d<==%dzClose stdin %d on parent.zClose stdinC %d on parent.zClose stdout %d on parent.zClose stdoutC %d on parent.zClose stderr %d on parent.zClose stderrC %d on parent.zOn parent, returning %s)r<rAr=zClose stdinP %d on child.z5Point stdin to stdinC %d on child and close original.zClose stdoutP %d on child.z7Point stdout to stdoutC %d on child and close original.z#Redirect stderr to stdout on child.zClose stderrP %d on child.z7Point stderr to stderrC %d on child and close original.zReset signal handlers on child.zRun %s %s on child z%s: could not exec: %sTr!)"r#r) TypeError ValueErrorlogging getLoggerr?rrr4rrforkAssertionErrorr>r9chdirrBdup2signalSIGHUPSIG_DFLSIGTERMSIGINTSIGQUITSIGPIPESIGCHLDjoinexecvpexecvpe Exceptionr-sysexit)cmdenvstdinstdoutstderrr?cdstdinPZstdinCstdoutPZstdoutCstderrPZstderrCrpidr=r r r rs                     $ cCs2|tks|tks|dkr|St|tr*|S|S)z!Tries to convert f to a fileno @returns an integer UNIX file descriptor @param f ERR2OUT, PIPE, an integer fileno or a file-like object with a fileno() function.N)rrr#r$r%)fr r r filenoify$s  rhc Cst|}tt|}|D]v}zt|tjWn"tk rR}zW5d}~XYnXzt| |Wqt t t fk r}zW5d}~XYqXqdS)zh!Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for.N) weakrefrefset _manage_setrkillrOrRr'r;rHKeyErrorrG)thZthtZkillmer8r=r r r r5s  cCsd}dS)zF!Sends a TERM signal to all processes that this module is managingTNr ) _kill_allr r r rDsc Cs^tt} t| } |st| || s0td} t} t} d}d}d}t |}t |}t |}|dk r|dkrxd}|dk r| dt ||f| d|gt ||dd}|dk r|dk r| d || d |g|rt}nt}t ||dd}|dk r`|dk r*| d || d |g|rHt}nt}t ||dd}|D],}|dk r| d || d |gqdd}t}d}| rd}d}tdk r|dk r| d| D] \}}|d krt|tq|t | krV| |\}}|dks0|d ks0|d ks0t|dkr|dk r^| dt |||fzBt|trt|||d}nt|t||ddd}WnHtk r}z(|jtjks|jtjkrd}nW5d}~XYnX|r|dk r | d||f||7}|t |krT|dk r>| d||ft|| |q|rJ|dk rr| d|t|| |qn|d krz(|dk r| d|t|| }Wnhtk r&}zH|jtjks|jtjkr|dk r| dt ||fd}nW5d}~XYnX|dkr`|dk rJ| d|t|| |q|dk r|dk r| dt ||f|r||n|t |dd|rJ|dk r| d|t|| |qnd|d krJ|dk r| d|d}t!|tj"}|r2|ddks<|d dkr2|dk rT| d|| |z| #|WnNt$t%t&fk r}z(|dk r|j d|t |fddW5d}~XYnX|dk r$z|#|WnNt$t%t&fk r"}z(|dk r|j d|t |fddW5d}~XYnX|| |<qn|dk rJ| d||d 7}q|rft}n8t}||d kr| r|dk r| d d}q| r|dk r| d!t'| |rt(|n|rt(d"n t(d#q|dk r| d$d}|dk r$|)}|*d}|dk rB|)}|*tdk rTt+d%||| fS)&a!Watches a list of processes, handles their I/O, returns when all processes have exited and all I/O is complete. @warning You should not be calling this function unless you are modifying the implementation of Pipeline. Use the produtil.run module instead of calling launch() and manage(). @param proclist the list of processes to watch @param inf the input file @param outf the output file @param errf the error file @param instr the input string, instead of an input file @param childset the set of child process ids @param sleeptime sleep time between checks of child processes @param logger Logs to the specified object, at level DEBUG, if a logger is specified. @returns a tuple containing the stdout string (or None), the stderr string (or None) and a dict mapping from process id to the return value from os.wait4 called on that process.iNFrCz"Will write instr (%d bytes) to %d.rrTzWill read outstr from %d.rDzWill read errstr from %d.zMonitor process %d.rEzKill all processes.z&Attempt a write of %d characters to %dZUTF8)encodingzWrote %d bytes to %d.z)Done writing all %d characters; close %d.z$Force close of in %d due to timeout.zAttempt a read from %dz!Error %s from %d - assume no datazeof reading output %dz!Read %d characters from output %dz!Force close of %d due to timeout.zCheck process %dzProcess %d exitedz)Cannot remove pid %d from _manage_set: %sr!z&Cannot remove pid %d from childset: %szProcess %d still runningzLNo data two seconds after processes exited. Forcing a close of all streams.zBottom of loop with work=%sg{Gz?g?zDone monitoring pipeline.z8Master thread caught a signal. This thread should exit.),rirj threadingcurrent_threadrlrLupdatelistdictrhr?lenappendr ioBytesIOStringIOtimerprrmrRr#byteswriter'errnoEAGAIN EWOULDBLOCKr>popreadr)wait4WNOHANGr;rHrnrGrsleepgetvaluer:r) ZproclistinfoutferrfZinstrrZchildset sleeptimebinarymemsbufsizeZworkdoneZoutioZerrioZhaveioprocZninZlastprocZ forcecloser<ZdidprocZjobZtgtnr=rrnowZoutstrerrstrr r r rKs                 "                        "                   c Csvt|||||||\} } } } t| g| | | ||d\} }}|| d}t|rXt|St|rnt| SdSdS)N)rrrrrrD)rrr WIFEXITED WEXITSTATUS WIFSIGNALEDWTERMSIG)r]r^r_r`rar?rbrrrfrcrdreZoutstderrrresultr r r simple_run2s&      rc@sjeZdZdZdddZddZddd Zd d Zd d ZddZ dddZ ddZ ddZ e ddZdS)Pipelinez!This class is a wrapper around launch and manage. It converts Runner objects to calls to "launch", and runs "manage" on the resulting processes.FNcCsxt|_t|_||_||_||_d|_d|_d|_ d|_ d|_ d|_ d|_ d|_t|_t||_|j||ddS)a4!Pipeline constructor @param runner the produtil.prog.Runner to convert @param capture if True, capture the stdout of the runner @param logger a logging.Logger for messages @param debug if True, send debug messages @param binary use binary streams for input and outputNrq)rk_Pipeline__childrenrv_Pipeline__quads_Pipeline__capture_Pipeline__logger_Pipeline__debug_Pipeline__instring_Pipeline__out_Pipeline__err_Pipeline__stdin_Pipeline__stdout_Pipeline__stderr_Pipeline__managed_Pipeline__last_pidrsLock_Pipeline__lockbool_Pipeline__binaryZ_gen)rZrunnercapturerr?rr r r rGs   zPipeline.__init__cCs&dt|t|jt|jt|jfS)z7!Return a debug string representation of this Pipeline.z&)rrrrrrr r r r_s zPipeline.__repr__cCs| dkr |n| }|dkr|n|}| dkr,|n| }|dk rBt}||_|dkrZ|jdk rZ|j}|jrr|rr|dkrrt}n|szt}t|| ||||j|\}}}}|j||js||_ ||_||_ |j ||||f||_ t ||_dS)a_!Adds another produtil.prog.Runner's contents to this Pipeline. @param command the command to run @param endpipe is this the end of a pipeline? @param logger a logging.Logger for log messages @param instring the string for stdin @param stdin a file, fileno or PIPE constant for stdin @param stdout a file, fileno or special constant for stdout @param stderr a file, fileno or special constant for stderr @param env subprocess environment variables @param closein,closeout,closeerr Unused. @param cd subprocess working directoryN)rrrrrrrr7rrrryrrr)rZcommandZendpiperZinstringr_r`raZsendoutZsenderrZsendinr^ZcloseinZcloseoutZcloseerrrbrZpinZpoutZperrr8r<rAr=r r r _impl_addds* zPipeline._impl_addc CsD|jD]8}zt||Wqtk r<}zW5d}~XYqXqdS)z>!Sends a signal to all children. @param sig the signalN)rrrmr')rsigr8r=r r r send_signals  zPipeline.send_signalcCs|tjdS)z!Sends SIGTERM to all children.N)rrOrRrr r r terminateszPipeline.terminatecCs|tjdS)z!Sends SIGKILL to all children.N)rrOSIGKILLrr r r rmsz Pipeline.killc Cst|jd|jrW5QRdStdd|jD|j|j|j|j|j|j ||j \}}}||_||_ ||_ W5QRXdS)a#!Writes to input, reads from output, waits for child processes, etc. This is just a wrapper around the manage() function. It will return immediately if self.communicate has already completed earlier. @param sleeptime the sleep time in seconds between checksNcSsg|] }|dqS)rr ).0qr r r sz(Pipeline.communicate..) rrrrrrrrrrrrr)rrrAr=mr r r communicates" zPipeline.communicatecCsN|j}|sdS||jd}t|r0t|St|rFt| SdSdS)z!Returns the exit status of the last element of the pipeline. If the process died due to a signal, returns a negative number.NrDr)rrrrrrr)rrrr r r polls    z Pipeline.pollcCs$||j}t|ts t|}|S)z!Calls self.communicate(), and returns the stdout from the pipeline (self.outbytes). The return value will be Null if the pipeline was redirected to a file or if the constructor's capture option was not True.)routr#r))rrAr r r to_strings  zPipeline.to_stringcCs|jS)z!The stdout from the pipeline. Will be Null if the pipeline was redirected to a file, or if the constructor's capture option was not True.)rrr r r rsz Pipeline.out)FNFF)NNNNNNNNNNNNNF)N)rr r r rrrrrrmrrrpropertyrr r r r rCs8  (  r)N)N)N)NNNN)NNNNFN)NNNNNNNF)NNNNFNNF))r __all__KeyboardInterruptrrrOselectrIr[rzr}rr*rsri collectionsstatobjectrrr3rkr6rrr rr4r9r>rBrrh defaultdictrlrprrrrrr r r r sP `       h