gc @sdZddddddgZdefdYZd d lZd d lZd d lZd d lZd d lZd d l Z d d l Z d d l Z d d l Z d d l Z d d lZd d lZd d lZd d l Z d d l Z d efd YZe jZeaedZedZd d Zd dZd dZdZdZd d d d dZd d d d e d dZ!dZ"ej#eZ$d Z%dZ&dZ'd d d d d d d e dZ(d d d d e d d e dZ)defdYZ*d S(s!Internal module that launches and monitors processes. Do not use this module directly: it is part of the internal implementation of the produtil.prog and produtil.run modules. It converts a produtil.prog.Runner object to processes, and monitors the processes until they exit, sending and receiving data as needed. This replaces the built-in "subprocess" module which is not capable of general-purpose pipeline execution.tlaunchtmanagetPIPEtERR2OUTtkill_alltkill_for_threadtNoMoreProcessescBseZdZRS(sz!Raised when the produtil.sigsafety package catches a fatal signal. Indicates to callers that the thread should exit.(t__name__t __module__t__doc__(((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRsiNtConstantcBs,eZdZddZdZdZRS(s+!A class used to implement named constants.cCsA||_|dkr4dt|t|f}n||_dS(s!Creates a named constant @param s the return value of __str__() = str(self) @param r the return value of __repr__() = repr(self)sConstant(%s)@0x%xN(t _Constant__stNonetreprtidt _Constant__r(tselftstr((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt__init__s  cCs|jS(s+!Returns the s argument to the constructor.(R (R((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt__str__#scCs|jS(s+!Returns the r argument of the constructor.(R(R((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt__repr__&sN(RRR R RRR(((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyR s  cCst|tjd|dS(s+!Attempts to modify the given stream to be non-blocking. This only works with streams that have an underlying POSIX fileno, such as those from open. Will re-raise any exception received, other than AttributeError and EnvironmentError. Hence, I/O errors and attempts to make a non-fileno stream non-blocking will produce a False return value, while anything else will raise an exception. @param stream the stream to unblock @param logger a logging.Logger for log messages @returns True on success, False otherwise.iN(t call_fcntrltost O_NONBLOCK(tstreamtlogger((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pytunblock9s cCs y(t|tr|}n |j}WnRttfk r|}|dk rx|jdt|t|fdt nt SXy<t j |t j }t j |t j ||B|@t SWnLtk r}|dk r|jdt|t|fdt nt SXdS(s!Internal function that implements unblock() @param stream the stream to modify @param on flags to turn on @param off flags to turn off @param logger a logging.Logger for messages @returns True on success, False otherwise.s?%s: stream has no fileno, cannot switch to non-blocking I/O: %stexc_infos)%s: cannot switch to non-blocking I/O: %sN(t isinstancetinttfilenotAttributeErrortEnvironmentErrorR twarningR tstrtTruetFalsetfcntltF_GETFLtF_SETFLterror(RtontoffRtfdteetflags((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRHs$   #  #c CsYtMtj\}}t|tjd|t|tjd|||fSWdQXdS(s!Creates a pipe that will be closed on exec. Except that it does not seem to be reliably closed on exec, so there are other workarounds in this module. @param logger a logging.Logger for log messagesiN(tplockRtpipeRR&t FD_CLOEXEC(Rtp1tp2((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyR0es cCsttj|WdQXdS(sZ!Adds a file descriptor to the list to close before exec. @param p the file descriptorN(R/tpipes_to_closetadd(tp((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pytpaddpsc CsTtHytj|Wntk r-}nX|tkrJtj|nWdQXdS(sv!Closes a file descriptor, removing it from the list that must be closed on exec. @param i the file descriptorN(R/RtcloseR!R4tremove(tite((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pytpclosevs cCsttxbtD]Z}||kr||kr||kr|dk rX|jd|ntj|qqWtjWdQXdS(s!Closes all file descriptors sent to padd. @param i my stdin, which should not be closed @param o my stdout, which should not be closed @param e my stderr, which should not be closed @param logger a logging.Logger for debug messagessIn child, close old pipe fd %dN(R/R4R tdebugRR8tclear(R:toR;RR6((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt pclose_alls $ cCs|d!k r+t|t r+tdn|dkrFtdnd!}d!}d!} d!} d!} d!} tj|d} |r| jdt|fn|t krt | \}}|r| jd||fqn|}|t kr(t | \} } |r.| jd| | fq.n|} |t krot | \} } |r| jd| | fqn|t k r|} nt j }|dkst|dkr|d!k r|t k r|r| jd |nt|n|t kr6|d!k r6|r| jd |nt|t|n|d!k ru|t k ru|rh| jd |nt|n|t kr| d!k r|r| jd | nt| t| n|d!k r |t k r |t k r |r| jd |nt|n|t krR| d!k rR|r;| jd| nt| t| n|r~| jdt||| | fn||| | fSt|trt j|ntd|d|d||d!k r|r| jd|nt|n|d!k r<|r| jd|nt j|dt|n| d!k ro|rb| jd| nt| n| d!k r|r| jd| nt j| dt| n|t kr|r| jdnt jddn| d!k r|r | jd| nt| n| d!k r]|r@| jd| nt j| dt| n|rs| jdntjtjtjtjtjtjtjtjtjtjtjtjtjtjtjtjtjtj|dstyo|r;| jd|ddj|dfn|d!kr^t j|d|nt j|d||WnGtk r}| j d|dt|fd t!t"j#dnXd!S("s!Starts the specified command (a list), with the specified environment (or None to copy this process's environment). @param stdin,stdout,stderr Specifies the stdin, stdout and stderr streams. The special value PIPE means "make a pipe," and sending stderr=ERR2OUT requests redirection of stderr to stdout. @param cd The optional "cd" argument specifies a directory to cd into, in the child process, before executing the command. Of course, you shouldn't care about any of this because you should be using the produtil.run package. @param cmd the command to run @param env the subprocess's environment, or None to use mine @param debug if True, send debug messagess8In produtil.pipeline.launch, cd must be a string or Nonets=In produtil.pipeline.launch, cd must not be the empty string.isStart %ssPipe for stdin: %d<==%dsPipe for stdout: %d<==%dsPipe for stderr: %d<==%dsClose stdin %d on parent.sClose stdinC %d on parent.sClose stdout %d on parent.sClose stdoutC %d on parent.sClose stderr %d on parent.sClose stderrC %d on parent.sOn parent, returning %sR:R?R;sClose stdinP %d on child.s5Point stdin to stdinC %d on child and close original.sClose stdoutP %d on child.s7Point stdout to stdoutC %d on child and close original.is#Redirect stderr to stdout on child.isClose stderrP %d on child.s7Point stderr to stderrC %d on child and close original.sReset signal handlers on child.sRun %s %s on childt s%s: could not exec: %sRN($R RR#t TypeErrort ValueErrortloggingt getLoggerR=R RR0RRtforktAssertionErrorR<R7tchdirR@tdup2tsignaltSIGHUPtSIG_DFLtSIGTERMtSIGINTtSIGQUITtSIGPIPEtSIGCHLDtjointexecvptexecvpet ExceptionR)R$tsystexit(tcmdtenvtstdintstdouttstderrR=tcdtstdinPtstdinCtstdoutPtstdoutCtstderrPtstderrCRtpidR;((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRs                $                 + 'cCsE|tks$|tks$|dkr(|St|tr;|S|jS(s!Tries to convert f to a fileno @returns an integer UNIX file descriptor @param f ERR2OUT, PIPE, an integer fileno or a file-like object with a fileno() function.N(RRR RRR(tf((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt filenoify$s $cCstj|}tt|}xo|D]g}ytj|tjWntk rX}nXyt|j |Wq&t t t fk r}q&Xq&WdS(sh!Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for.N( tweakreftreftsett _manage_setRtkillRKRNR!R9RDtKeyErrorRC(tthtthttkillmeR6R;((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyR5s cCs t}dS(sF!Sends a TERM signal to all processes that this module is managingN(R$(t _kill_all((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRDsc Cs tjtj} t| } |s+t| j|| sDtd} t} t} d#}d#}t }t |}t |}t |}|d#k r |d#krd}n|d#k r|j dt ||fn| jd|gt|d|t}n|d#k r|d#k r7|j d|n| jd|g|r_tj}n tj}t|d|t}n|d#k r|d#k r|j d|n| jd|g|rtj}n tj}t|d|t}nxA|D]9}|d#k r*|j d |n| jd |gqWd}tj}t }x| rd}t }td#k r|d#k r|j d nx6| D]+\}}|d krtj|tqqWnxK|t | kr| |\}}|dks|dks|d kst|dkr|d#k rZ|j d t |||fnyNt|trtj|||}n"tj|t||d d}WnCtk r}|jtjks|jtjkrd#}qnX|r'|d#k r|j d||fn||7}n|t |kr||d#k r_|j d||fnt || j!|qn|r|d#k r|j d|nt || j!|qqnM|dkroy6|d#k r|j d|ntj"|| }Wnotk ru}|jtjks:|jtjkro|d#k rf|j dt#||fnd#}qvnX|dkr|d#k r|j d|nt || j!|qn|d#k r)|d#k r|j dt ||fn|r |j|q)|jt#|d dn|r|d#k rO|j d|nt || j!|qqn|d kr|d#k r|j d|nt}tj$|tj%}|r|ddks|ddkr|d#k r|j d|n| j!|y| j&|WnNt't(t)fk rj}|d#k rk|j d|t#|fdtqknX|d#k ry|j&|Wqt't(t)fk r}|d#k r|j d|t#|fdtqqXn|| |/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRKsl                              *   &$               $                  &   !  $               c  Cst|||||||\} } } } t| gd| d| d| d|d|\} }}|| d}tj|rtj|Stj|rtj| SdSdS(NRRRRRii(RRRt WIFEXITEDt WEXITSTATUSt WIFSIGNALEDtWTERMSIG(RYRZR[R\R]R=R^RRReR_RaRctoutstderRRtresult((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt simple_run2s'$ tPipelinecBseZdZed eedZdZd d d d d d d d d d d d d edZdZdZ dZ d dZ dZ d Z ed ZRS( s!This class is a wrapper around launch and manage. It converts Runner objects to calls to "launch", and runs "manage" on the resulting processes.cCst|_t|_||_||_||_d|_d|_ d|_ d|_ d|_ d|_ d|_d|_tj|_t||_|j|d|dS(s4!Pipeline constructor @param runner the produtil.prog.Runner to convert @param capture if True, capture the stdout of the runner @param logger a logging.Logger for messages @param debug if True, send debug messages @param binary use binary streams for input and outputRN(Rjt_Pipeline__childrenRwt_Pipeline__quadst_Pipeline__capturet_Pipeline__loggert_Pipeline__debugR t_Pipeline__instringt_Pipeline__outt_Pipeline__errt_Pipeline__stdint_Pipeline__stdoutt_Pipeline__stderrt_Pipeline__managedt_Pipeline__last_pidRttLockt_Pipeline__locktboolt_Pipeline__binaryt_gen(RtrunnertcaptureRR=R((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRGs              cCs5dt|t|jt|jt|jfS(s7!Return a debug string representation of this Pipeline.s&(RR RRR(R((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyR_s cCs\| dkr|n| }|dkr*|n|}| dkrB|n| }|dk rft}||_n|dkr|jdk r|j}n|jr|r|dkrt}n|st}nt|| ||||j|\}}}}|jj||j s||_ n||_||_ |j j ||||f||_ t||_dS(s_!Adds another produtil.prog.Runner's contents to this Pipeline. @param command the command to run @param endpipe is this the end of a pipeline? @param logger a logging.Logger for log messages @param instring the string for stdin @param stdin a file, fileno or PIPE constant for stdin @param stdout a file, fileno or special constant for stdout @param stderr a file, fileno or special constant for stderr @param env subprocess environment variables @param closein,closeout,closeerr Unused. @param cd subprocess working directoryN(R RRRRRRRR5RRRRzRRR(RtcommandtendpipeRtinstringR[R\R]tsendouttsenderrtsendinRZtcloseintcloseouttcloseerrR^RtpintpouttperrR6R:R?R;((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt _impl_addds*     -    cCsBx;|jD]0}ytj||Wq tk r9}q Xq WdS(s>!Sends a signal to all children. @param sig the signalN(RRRlR!(RtsigR6R;((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt send_signals cCs|jtjdS(s!Sends SIGTERM to all children.N(RRKRN(R((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt terminatescCs|jtjdS(s!Sends SIGKILL to all children.N(RRKtSIGKILL(R((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRlscCs|j|jrdStg|jD]}|d^q$|j|j|j|j|j|j ||j \}}}||_||_ ||_ WdQXdS(s#!Writes to input, reads from output, waits for child processes, etc. This is just a wrapper around the manage() function. It will return immediately if self.communicate has already completed earlier. @param sleeptime the sleep time in seconds between checksNi( RRRRRRRRRRRRR(RRtqR?R;tm((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt communicates    cCse|j}|sdS||jd}tj|r@tj|Stj|r]tj| SdSdS(s!Returns the exit status of the last element of the pipeline. If the process died due to a signal, returns a negative number.iiN(RR RRRRRR(RRR((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pytpolls  cCs5|j|j}t|ts1t|}n|S(s!Calls self.communicate(), and returns the stdout from the pipeline (self.outbytes). The return value will be Null if the pipeline was redirected to a file or if the constructor's capture option was not True.(RtoutRR#(RR?((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt to_strings   cCs|jS(s!The stdout from the pipeline. Will be Null if the pipeline was redirected to a file, or if the constructor's capture option was not True.(R(R((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRsN(RRR R%R RRRRRRlRRRtpropertyR(((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyRCs       $     (+R t__all__tKeyboardInterruptRRRKtselectRERWR{R~RR&RtRht collectionststattobjectR RR/RjR4RRR RRR0R7R<R@R%RRgt defaultdictRkRqRRRRR(((s>/lfs/h1/ops/prod/packages/hafs.v2.0.7/ush/produtil/pipeline.pyt s8 $