U ����� gf����������������������@���sZ��d�Z�ddddddgZG�dd��de�Zd d lZd d lZd d lZd d lZd d lZd d l Z d d l Z d d lZd d lZd d l Z d d lZd d lZd d lZd d lZd d lZG�dd��de�Ze ���Ze��aed�Zed�Zd$d d�Zd%dd�Zd&dd�Zdd��Zdd��Zd'dd�Zd(dd�Zdd��Z e�!e�Z"d Z#dd��Z$dd��Z%d)dd�Z&d*d d!�Z'G�d"d#��d#e�Z(d S�)+a���!Internal module that launches and monitors processes. Do not use this module directly: it is part of the internal implementation of the produtil.prog and produtil.run modules. It converts a produtil.prog.Runner object to processes, and monitors the processes until they exit, sending and receiving data as needed. This replaces the built-in "subprocess" module which is not capable of general-purpose pipeline execution.�launch�manage�PIPE�ERR2OUT�kill_all�kill_for_threadc�������������������@���s���e�Zd�ZdZdS�)�NoMoreProcesseszz!Raised when the produtil.sigsafety package catches a fatal signal. Indicates to callers that the thread should exit.N)�__name__� __module__�__qualname__�__doc__��r���r����>/lfs/h1/ops/prod/packages/hmon.v3.2.7/ush/produtil/pipeline.pyr������s���r��������Nc�������������������@���s*���e�Zd�ZdZd dd�Zdd��Zdd��ZdS�) �Constantz+!A class used to implement named constants.Nc�����������������C���s,���||�_�|dkr"dt|�t|��f�}||�_dS�)z�!Creates a named constant @param s the return value of __str__() = str(self) @param r the return value of __repr__() = repr(self)NzConstant(%s)@0x%x)�_Constant__s�repr�id�_Constant__r)�self�s�rr���r���r ����__init__���s����zConstant.__init__c�����������������C���s���|�j�S�)z+!Returns the s argument to the constructor.)r����r���r���r���r ����__str__"���s����zConstant.__str__c�����������������C���s���|�j�S�)z+!Returns the r argument of the constructor.)r���r���r���r���r ����__repr__%���s����zConstant.__repr__)N)r���r ���r ���r���r���r���r���r���r���r���r ���r������s��� r���c�����������������C���s���t�|�tjd|��dS�)a+��!Attempts to modify the given stream to be non-blocking. This only works with streams that have an underlying POSIX fileno, such as those from open. Will re-raise any exception received, other than AttributeError and EnvironmentError. Hence, I/O errors and attempts to make a non-fileno stream non-blocking will produce a False return value, while anything else will raise an exception. @param stream the stream to unblock @param logger a logging.Logger for log messages @returns True on success, False otherwise.r���N)�call_fcntrl�os� O_NONBLOCK)�stream�loggerr���r���r ����unblock8���s���� r ���c�������������� ���C���s����zt�|�t�r|�}n|����}W�nR�ttfk rn�}�z0|dk rX|jdt|��t|�f�dd��W�Y��dS�d}~X�Y�nX�z.t�|tj �}t�|tj ||B�|�@���W�dS��tk r��}�z0|dk r�|jdt|��t|�f�dd��W�Y��dS�d}~X�Y�nX�dS�)z�!Internal function that implements unblock() @param stream the stream to modify @param on flags to turn on @param off flags to turn off @param logger a logging.Logger for messages @returns True on success, False otherwise.Nz?%s: stream has no fileno, cannot switch to non-blocking I/O: %sT��exc_infoFz)%s: cannot switch to non-blocking I/O: %s)� isinstance�int�fileno�AttributeError�EnvironmentError�warningr����str�fcntl�F_GETFL�F_SETFL�error)r����on�offr����fd�ee�flagsr���r���r ���r���G���s0���� ����r���c�������������� ���C���sP���t��B�t���\}}t|tjd|���t|tjd|���||fW��5�Q�R���S�Q�R�X�dS�)z�!Creates a pipe that will be closed on exec. Except that it does not seem to be reliably closed on exec, so there are other workarounds in this module. @param logger a logging.Logger for log messagesr���N)�plockr����piper���r*���� FD_CLOEXEC)r����p1Zp2r���r���r ���r4���d���s ����r4���c�������������� ���C���s���t���t�|���W�5�Q�R�X�dS�)zZ!Adds a file descriptor to the list to close before exec. @param p the file descriptorN)r3����pipes_to_close�add)�pr���r���r ����paddo���s����r:���c�����������������C���sX���t��J�zt�|���W�n"�tk r6�}�zW�5�d}~X�Y�nX�|�tkrJt�|���W�5�Q�R�X�dS�)zv!Closes a file descriptor, removing it from the list that must be closed on exec. @param i the file descriptorN)r3���r����closer'���r7����remove)�i�er���r���r ����pcloseu���s�����r?���c�������������� ���C���s^���t��P�tD�]<}||�kr ||kr ||kr |dk r<|�d|���t�|��q t����W�5�Q�R�X�dS�)z�!Closes all file descriptors sent to padd. @param i my stdin, which should not be closed @param o my stdout, which should not be closed @param e my stderr, which should not be closed @param logger a logging.Logger for debug messagesNzIn child, close old pipe fd %d)r3���r7����debugr���r;����clear)r=����or>���r���r9���r���r���r ���� pclose_all����s����rC���Fc�������������� ���C���s��|dk rt�|t�std��|dkr*td��d}d}d} d} d}d}t�|�d��} |rh| �dt|��f���|tkr�t | �\}}|r�| �d||f���n|}|tkr�t | �\} } |r�| �d| | f���n|} |tkr�t | �\}}|r�| �d ||f���n|t k �r|}t���}|dk�st �|dk�r�|dk �rR|tk �rR|�rJ| �d |���t|��|tk�r�|dk �r�|�rz| �d|���t|��t|��|dk �r�|tk �r�|�r�| �d|���t|��|tk�r�| dk �r�|�r�| �d | ���t| ��t| ��|dk �r,|tk �r,|t k �r,|�r$| �d|���t|��|tk�rd|dk �rd|�rT| �d|���t|��t|��|�r�| �dt||| |f����||| |fS�t�|t��r�t�|��t|||d��|dk �r�|�r�| �d|���t|��|dk �r|�r�| �d|���t�|d��t|��| dk �r2|�r*| �d| ���t| ��| dk �rd|�rP| �d| ���t�| d��t| ��|t k�r�|�r~| �d��t�dd��|dk �r�|�r�| �d|���t|��|dk �r�|�r�| �d|���t�|d��t|��|�r�| �d��t�tjtj��t�tjtj��t�tjtj��t�tjtj��t�tjtj��t�tjtj��|�d��s`t �z\|�r�| �d|�d�d�|�dd���f���|dk�r�t�|�d�|���nt�|�d�|�|��W�nL�tk �r�}�z,| jd|�d�t|�f�dd ��t �!d��W�5�d}~X�Y�nX�dS�)!a���!Starts the specified command (a list), with the specified environment (or None to copy this process's environment). @param stdin,stdout,stderr Specifies the stdin, stdout and stderr streams. The special value PIPE means "make a pipe," and sending stderr=ERR2OUT requests redirection of stderr to stdout. @param cd The optional "cd" argument specifies a directory to cd into, in the child process, before executing the command. Of course, you shouldn't care about any of this because you should be using the produtil.run package. @param cmd the command to run @param env the subprocess's environment, or None to use mine @param debug if True, send debug messagesNz8In produtil.pipeline.launch, cd must be a string or None��z=In produtil.pipeline.launch, cd must not be the empty string.r���zStart %szPipe for stdin: %d<==%dzPipe for stdout: %d<==%dzPipe for stderr: %d<==%dzClose stdin %d on parent.zClose stdinC %d on parent.zClose stdout %d on parent.zClose stdoutC %d on parent.zClose stderr %d on parent.zClose stderrC %d on parent.zOn parent, returning %s)r=���rB���r>���zClose stdinP %d on child.z5Point stdin to stdinC %d on child and close original.zClose stdoutP %d on child.z7Point stdout to stdoutC %d on child and close original.����z#Redirect stderr to stdout on child.����zClose stderrP %d on child.z7Point stderr to stderrC %d on child and close original.zReset signal handlers on child.zRun %s %s on child� z%s: could not exec: %sTr!���)"r#���r)���� TypeError� ValueError�logging� getLoggerr@���r���r���r4���r���r����fork�AssertionErrorr?���r:����chdirrC����dup2�signal�SIGHUP�SIG_DFL�SIGTERM�SIGINT�SIGQUIT�SIGPIPE�SIGCHLD�join�execvp�execvpe� Exceptionr-����sys�exit)�cmd�env�stdin�stdout�stderrr@����cd�stdinPZstdinC�stdoutPZstdoutC�stderrPZstderrCr����pidr>���r���r���r ���r�������s���������� � $ c�����������������C���s2���|�t�ks|�tks|�dkr|�S�t|�t�r*|�S�|����S�)z�!Tries to convert f to a fileno @returns an integer UNIX file descriptor @param f ERR2OUT, PIPE, an integer fileno or a file-like object with a fileno() function.N)r���r���r#���r$���r%���)�fr���r���r ���� filenoify#��s ����� �ri���c�����������������C���s����t��|��}tt|��}|D�]v}zt�|tj��W�n"�tk rR�}�zW�5�d}~X�Y�nX�zt|��� |��W�q�t ttfk r��}�zW�5�d}~X�Y�qX�qdS�)zh!Sends a TERM signal to all processes that the specified thread (a threading.Thread) is waiting for.N) �weakref�ref�set�_manage_setr����killrP���rS���r'���r<���rI����KeyErrorrH���)ZthZthtZkillmer9���r>���r���r���r ���r���4��s���� c������������������C���s���d}�dS�)zF!Sends a TERM signal to all processes that this module is managingTNr���)� _kill_allr���r���r ���r���C��s����c ����������� ��� ���C���s^��t��t����} t| �} |�st�| �|���| s0t�d}t��}t��} d}d}d}t |�}t |�}t |�}|dk r�|dkrxd}|dk r�|� dt|�|f���|�d|g��t ||d��d}|dk �r|dk r�|� d |���|�d |g��|r�t���}nt���}t ||d��d}|dk �r`|dk �r*|� d|���|�d |g��|�rHt���}nt���}t ||d��d}|�D�],}|dk �r�|� d|���|�d |g���qdd}t���}d}|�r�d}d}tdk �r�|dk �r�|� d��|D�] \}}|d k�r�t�|t���q�|t|�k��rV||�\}}|dk�s0|d k�s0|d k�s0t�|dk�r�|dk �r^|� dt|�|�|f���zBt|t��r�t�|||d���}nt�|t||d��dd��}W�nH�tk �r��}�z(|jtjk�s�|jtjk�r�d}n��W�5�d}~X�Y�nX�|�r|dk �r|� d||f���||7�}|t|�k�rT|dk �r>|� d||f���t|��|�|���q�|�rJ|dk �rr|� d|���t|��|�|���q��n�|d k�r�z(|dk �r�|� d|���t�||�}W�nh�tk �r&�}�zH|jtjk�s�|jtjk�r|dk �r|� dt |�|f���d}n��W�5�d}~X�Y�nX�|dk�r`|dk �rJ|� d|���t|��|�|���q�|dk �r�|dk �r�|� dt|�|f���|�r�|�|��n|�t |dd���|�rJ|dk �r�|� d|���t|��|�|���q��nd|d k�rJ|dk �r|� d|���d}t�!|tj"�}|�r2|d�dk�s<|d �dk�r2|dk �rT|� d|���|�|��z| �#|��W�nN�t$t%t&fk �r��}�z(|dk �r�|j d|t |�f�dd��W�5�d}~X�Y�nX�|dk �r$z|�#|��W�nN�t$t%t&fk �r"�}�z(|dk �r|j d|t |�f�dd��W�5�d}~X�Y�nX�|| |<��q�n|dk �rJ|� d|���|d 7�}�q�|�rft���}n8t���}||�d k�r�|�r�|dk �r�|� d ��d}�q�|�r�|dk �r�|� d!t'|����|�r�t�(|��n|�r�t�(d"��n t�(d#���q�|dk �r|� d$��d}|dk �r$|�)��}|�*���d}|dk �rB|�)��}|�*���tdk �rTt+d%��||| fS�)&a���!Watches a list of processes, handles their I/O, returns when all processes have exited and all I/O is complete. @warning You should not be calling this function unless you are modifying the implementation of Pipeline. Use the produtil.run module instead of calling launch() and manage(). @param proclist the list of processes to watch @param inf the input file @param outf the output file @param errf the error file @param instr the input string, instead of an input file @param childset the set of child process ids @param sleeptime sleep time between checks of child processes @param logger Logs to the specified object, at level DEBUG, if a logger is specified. @returns a tuple containing the stdout string (or None), the stderr string (or None) and a dict mapping from process id to the return value from os.wait4 called on that process.i���NFrD���z"Will write instr (%d bytes) to %d.r����r���TzWill read outstr from %d.rE���zWill read errstr from %d.zMonitor process %d.rF���zKill all processes.z&Attempt a write of %d characters to %dZUTF8)�encodingzWrote %d bytes to %d.z)Done writing all %d characters; close %d.z$Force close of in %d due to timeout.zAttempt a read from %dz!Error %s from %d - assume no datazeof reading output %dz!Read %d characters from output %dz!Force close of %d due to timeout.zCheck process %dzProcess %d exitedz)Cannot remove pid %d from _manage_set: %sr!���z&Cannot remove pid %d from childset: %szProcess %d still runningzLNo data two seconds after processes exited. Forcing a close of all streams.zBottom of loop with work=%sg{�G�z�?g�������?zDone monitoring pipeline.z8Master thread caught a signal. This thread should exit.),rj���rk���� threading�current_threadrm���rM����update�list�dictri���r@����len�appendr ����io�BytesIO�StringIO�timerp���r���rn���rS���r#����bytes�writer'����errno�EAGAIN�EWOULDBLOCKr?����pop�readr)����wait4�WNOHANGr<���rI���ro���rH���r����sleep�getvaluer;���r���) Zproclist�inf�outf�errf�instrr���Zchildset� sleeptime�binary�meZms�bufsizeZwork�doneZoutioZerrioZhaveio�procZninZlastprocZ forcecloser=���Zdidproc�jobZtgt�nr>���r���r����nowZoutstr�errstrr���r���r ���r���J��s���� � " � � � � � � " �� �� � �c �����������������C���sv���t�|�||||||�\} } }}t| g| ||||d�\} }}|| �d�}t�|�rXt�|�S�t�|�rnt�|��S�dS�d�S�)N)r����r����r����r���r����rE�������)r���r���r���� WIFEXITED�WEXITSTATUS�WIFSIGNALED�WTERMSIG)r^���r_���r`���ra���rb���r@���rc���r���r����rg���rd���re���rf���Zoutstderr����r�����resultr���r���r ���� simple_run1��s&������������� r����c�������������������@���sj���e�Zd�ZdZddd�Zdd��Zddd �Zd d��Zdd ��Zdd��Z ddd�Z dd��Zdd��Ze dd���ZdS�)�Pipelinez�!This class is a wrapper around launch and manage. It converts Runner objects to calls to "launch", and runs "manage" on the resulting processes.FNc�����������������C���sx���t���|�_t��|�_||�_||�_||�_d|�_d|�_d|�_ d|�_ d|�_d|�_d|�_ d|�_t���|�_t|�|�_|j|�|d��dS�)a4��!Pipeline constructor @param runner the produtil.prog.Runner to convert @param capture if True, capture the stdout of the runner @param logger a logging.Logger for messages @param debug if True, send debug messages @param binary use binary streams for input and outputNrq���)rl����_Pipeline__childrenrv����_Pipeline__quads�_Pipeline__capture�_Pipeline__logger�_Pipeline__debug�_Pipeline__instring�_Pipeline__out�_Pipeline__err�_Pipeline__stdin�_Pipeline__stdout�_Pipeline__stderr�_Pipeline__managed�_Pipeline__last_pidrs����Lock�_Pipeline__lock�bool�_Pipeline__binaryZ_gen)r���Zrunner�capturer���r@���r����r���r���r ���r���F��s ���� zPipeline.__init__c�����������������C���s&���dt�|��t|�j�t|�j�t|�j�f�S�)z7!Return a debug string representation of this Pipeline.z&<Pipeline id=0x%x in=%s out=%s err=%s>)r���r���r����r����r����r���r���r���r ���r���^��s�������zPipeline.__repr__c�����������������C���s����| dkr|n| }|dkr|n|}| dkr,|n| }|dk rBt�}||�_|dkrZ|�jdk rZ|�j}|�jrr|rr|dkrrt�}n|szt�}t||||||�j|�\}}}}|�j�|��|�js�||�_ ||�_||�_ |�j�||||f��||�_t |�|�_dS�)a_��!Adds another produtil.prog.Runner's contents to this Pipeline. @param command the command to run @param endpipe is this the end of a pipeline? @param logger a logging.Logger for log messages @param instring the string for stdin @param stdin a file, fileno or PIPE constant for stdin @param stdout a file, fileno or special constant for stdout @param stderr a file, fileno or special constant for stderr @param env subprocess environment variables @param closein,closeout,closeerr Unused. @param cd subprocess working directoryN)r���r����r����r����r���r����r����r8���r����r����r����ry���r����r����r����)r����commandZendpiper���Zinstringr`���ra���rb���ZsendoutZsenderrZsendinr_���ZcloseinZcloseoutZcloseerrrc���r����ZpinZpoutZperrr9���r=���rB���r>���r���r���r ���� _impl_addc��s*�����zPipeline._impl_addc�����������������C���sD���|�j�D�]8}zt�||��W�q�tk r<�}�zW�5�d}~X�Y�qX�qdS�)z>!Sends a signal to all children. @param sig the signalN)r����r���rn���r'���)r����sigr9���r>���r���r���r ����send_signal���s ���� �zPipeline.send_signalc�����������������C���s���|���tj��dS�)z!Sends SIGTERM to all children.N)r����rP���rS���r���r���r���r ���� terminate���s����zPipeline.terminatec�����������������C���s���|���tj��dS�)z!Sends SIGKILL to all children.N)r����rP����SIGKILLr���r���r���r ���rn������s����z Pipeline.killc�����������������C���st���|�j��d�|�jrW�5�Q�R���dS�tdd��|�jD��|�j|�j|�j|�j|�j|�j ||�j � \}}}||�_||�_||�_W�5�Q�R�X�dS�)a#��!Writes to input, reads from output, waits for child processes, etc. This is just a wrapper around the manage() function. It will return immediately if self.communicate has already completed earlier. @param sleeptime the sleep time in seconds between checksNc�����������������S���s���g�|�]}|d���qS�)r���r���)�.0�qr���r���r ���� <listcomp>���s�����z(Pipeline.communicate.<locals>.<listcomp>) r����r����r���r����r����r����r����r����r����r����r����r����r����)r���r����rB���r>����mr���r���r ����communicate���s"����������� zPipeline.communicatec�����������������C���sN���|�j�}|sdS�||�j�d�}t�|�r0t�|�S�t�|�rFt�|��S�dS�dS�)z�!Returns the exit status of the last element of the pipeline. If the process died due to a signal, returns a negative number.NrE���r����)r����r����r���r����r����r����r����)r���r����r����r���r���r ����poll���s����� z Pipeline.pollc�����������������C���s$���|������|�j}t|t�s t|�}|S�)z�!Calls self.communicate(), and returns the stdout from the pipeline (self.outbytes). The return value will be Null if the pipeline was redirected to a file or if the constructor's capture option was not True.)r�����outr#���r)���)r���rB���r���r���r ���� to_string���s ���� zPipeline.to_stringc�����������������C���s���|�j�S�)z�!The stdout from the pipeline. Will be Null if the pipeline was redirected to a file, or if the constructor's capture option was not True.)r����r���r���r���r ���r�������s����zPipeline.out)FNFF)NNNNNNNNNNNNNF)N)r���r ���r ���r���r���r���r����r����r����rn���r����r����r�����propertyr����r���r���r���r ���r����B��s8������ ������������������������ ( r����)N)N)N)NNNN)NNNNFN)NNNNNNNF)NNNNFNNF))r����__all__�KeyboardInterruptr���r���rP����selectrJ���r\���rz���r}���r����r*���rs���rj����collections�stat�objectr���r����r3���rl���r7���r���r���r ���r���r4���r:���r?���rC���r���ri����defaultdictrm���rp���r���r���r���r����r����r���r���r���r ����<module>���sP��� �` ����� � ������� �h���������