B
    v9atm                 @   s  d Z ddlZddlZddlmZ ddlZddlmZ ddlZddl	m
Z
 ddlmZ ddlZddlZddlmZ ddlZddlZddlZe ZdaG d	d
 d
Zdd ZdZdZG dd deZG dd dZdd ZG dd deZ G dd deZ!G dd deZ"G dd deZ#dd Z$dd  Z%d1d!d"Z&d#d$ Z'd%d& Z(d'd( Z)da*da+d)d* Z,d+d, Z-G d-d. d.ej.Z/G d/d0 d0ej0Z1e2e dS )2z"Brian Quinlan (brian@sweetapp.com)    N)_base)Full)wait)Queue)partialFc               @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_ThreadWakeupc             C   s   t jdd\| _| _d S )NF)Zduplex)mpZPipe_reader_writer)self r   /usr/lib/python3.7/process.py__init__R   s    z_ThreadWakeup.__init__c             C   s   | j   | j  d S )N)r
   closer	   )r   r   r   r   r   U   s    
z_ThreadWakeup.closec             C   s   | j d d S )N    )r
   Z
send_bytes)r   r   r   r   wakeupY   s    z_ThreadWakeup.wakeupc             C   s   x| j  r| j   qW d S )N)r	   ZpollZ
recv_bytes)r   r   r   r   clear\   s    z_ThreadWakeup.clearN)__name__
__module____qualname__r   r   r   r   r   r   r   r   r   Q   s   r   c              C   sH   da tt } x| D ]\}}|  qW x| D ]\}}|  q0W d S )NT)_global_shutdownlist_threads_wakeupsitemsr   join)r   _thread_wakeuptr   r   r   _python_exita   s    r      =   c               @   s   e Zd Zdd Zdd ZdS )_RemoteTracebackc             C   s
   || _ d S )N)tb)r   r"   r   r   r   r   z   s    z_RemoteTraceback.__init__c             C   s   | j S )N)r"   )r   r   r   r   __str__|   s    z_RemoteTraceback.__str__N)r   r   r   r   r#   r   r   r   r   r!   y   s   r!   c               @   s   e Zd Zdd Zdd ZdS )_ExceptionWithTracebackc             C   s0   t t|||}d|}|| _d| | _d S )N z

"""
%s""")	tracebackformat_exceptiontyper   excr"   )r   r)   r"   r   r   r   r      s    
z _ExceptionWithTraceback.__init__c             C   s   t | j| jffS )N)_rebuild_excr)   r"   )r   r   r   r   
__reduce__   s    z"_ExceptionWithTraceback.__reduce__N)r   r   r   r   r+   r   r   r   r   r$      s   r$   c             C   s   t || _| S )N)r!   	__cause__)r)   r"   r   r   r   r*      s    
r*   c               @   s   e Zd Zdd ZdS )	_WorkItemc             C   s   || _ || _|| _|| _d S )N)futurefnargskwargs)r   r.   r/   r0   r1   r   r   r   r      s    z_WorkItem.__init__N)r   r   r   r   r   r   r   r   r-      s   r-   c               @   s   e Zd ZdddZdS )_ResultItemNc             C   s   || _ || _|| _d S )N)work_id	exceptionresult)r   r3   r4   r5   r   r   r   r      s    z_ResultItem.__init__)NN)r   r   r   r   r   r   r   r   r2      s   r2   c               @   s   e Zd Zdd ZdS )	_CallItemc             C   s   || _ || _|| _|| _d S )N)r3   r/   r0   r1   )r   r3   r/   r0   r1   r   r   r   r      s    z_CallItem.__init__N)r   r   r   r   r   r   r   r   r6      s   r6   c                   s*   e Zd Zd fdd	Z fddZ  ZS )
_SafeQueuer   c               s   || _ t j||d d S )N)ctx)pending_work_itemssuperr   )r   max_sizer8   r9   )	__class__r   r   r      s    z_SafeQueue.__init__c                sl   t |trZtt|||j}tdd||_	| j
|jd }|d k	rh|j| nt || d S )Nz

"""
{}"""r%   )
isinstancer6   r&   r'   r(   __traceback__r!   formatr   r,   r9   popr3   r.   set_exceptionr:   _on_queue_feeder_error)r   eobjr"   	work_item)r<   r   r   rB      s    
z!_SafeQueue._on_queue_feeder_error)r   )r   r   r   r   rB   __classcell__r   r   )r<   r   r7      s   r7   c             g   s0   t | }x"tt|| }|s"d S |V  q
W d S )N)ziptuple	itertoolsislice)	chunksize	iterablesitchunkr   r   r   _get_chunks   s    rO   c                s    fdd|D S )Nc                s   g | ]} | qS r   r   ).0r0   )r/   r   r   
<listcomp>   s    z"_process_chunk.<locals>.<listcomp>r   )r/   rN   r   )r/   r   _process_chunk   s    	rR   c          
   C   s^   y|  t|||d W n@ tk
rX } z"t||j}|  t||d W d d }~X Y nX d S )N)r5   r4   )r4   )putr2   BaseExceptionr$   r>   )result_queuer3   r5   r4   rC   r)   r   r   r   _sendback_result   s    
rV   c          
   C   s   |d k	r:y||  W n$ t k
r8   tjjddd d S X x| jdd}|d krb|t  d S y|j|j	|j
}W n> t k
r } z t||j}t||j|d W d d }~X Y nX t||j|d ~q<W d S )NzException in initializer:T)exc_info)block)r4   )r5   )rT   r   ZLOGGERZcriticalgetrS   osgetpidr/   r0   r1   r$   r>   rV   r3   )
call_queuerU   initializerinitargsZ	call_itemrrC   r)   r   r   r   _process_worker   s$    "r`   c             C   sx   xr|  rd S y|jdd}W n tjk
r4   d S X | | }|j rh|jt||j|j	|j
dd q| |= qqW d S )NF)rX   T)ZfullrY   queueZEmptyr.   Zset_running_or_notify_cancelrS   r6   r/   r0   r1   )r9   Zwork_idsr\   r3   rE   r   r   r   _add_call_item_to_queue   s     

rb   c          
      sF  d fdd} fdd}|j }	|j }
|	|
g}xt||  dd  D }t|| }d }d}|	|kry|	 }d}W q tk
r } ztt|||j	}W d d }~X Y qX n|
|krd}d }|
  |rt|  d k	rd	_d_d td
}|d k	r tdd| d|_x$| D ]\}}|j| ~q*W |
  x D ]}|  qXW |  d S t|tr|}|  s|  d S nL|d k	r||jd }|d k	r|jr|j|j n|j|j ~~|  | r:y$d k	rd_|s |  d S W n tk
r8   Y nX d q6W d S )Nc                  s   t p d kp jS )N)r   _shutdown_threadr   )executorr   r   shutting_down?  s    z/_queue_management_worker.<locals>.shutting_downc           	      s   t dd  D } | }d}xn||k r| dkrxBt|| D ]2}y d  |d7 }W q> tk
rn   P Y q>X q>W t dd  D } q W    x D ]}|  qW d S )Nc             s   s   | ]}|  V  qd S )N)is_alive)rP   pr   r   r   	<genexpr>E  s    zD_queue_management_worker.<locals>.shutdown_worker.<locals>.<genexpr>r   r   c             s   s   | ]}|  V  qd S )N)rf   )rP   rg   r   r   r   rh   Q  s    )sumvaluesrangeZ
put_nowaitr   r   r   )Zn_children_aliveZn_children_to_stopZn_sentinels_sentirg   )r\   	processesr   r   shutdown_workerC  s    
z1_queue_management_worker.<locals>.shutdown_workerc             S   s   g | ]
}|j qS r   )sentinel)rP   rg   r   r   r   rQ   h  s    z,_queue_management_worker.<locals>.<listcomp>TFzKA child process terminated abruptly, the process pool is not usable anymorez^A process in the process pool was terminated abruptly while the future was running or pending.z
'''
r%   z''')r	   rb   rj   r   ZrecvrT   r&   r'   r(   r>   r   _brokenrc   BrokenProcessPoolr!   r   r,   r   r.   rA   Z	terminater=   intr@   r3   r4   Z
set_resultr5   r   )Zexecutor_referencerm   r9   Zwork_ids_queuer\   rU   r   re   rn   Zresult_readerZwakeup_readerZreadersZworker_sentinelsZreadycauseZ	is_brokenZresult_itemrC   Zbper3   rE   rg   r   )r\   rd   rm   r   _queue_management_worker!  s    (




rt   c           	   C   sh   t rtrttda ytd} W n ttfk
r:   d S X | dkrHd S | dkrTd S d|  attd S )NTSC_SEM_NSEMS_MAX   z@system provides too few semaphores (%d available, 256 necessary))_system_limits_checked_system_limitedNotImplementedErrorrZ   sysconfAttributeError
ValueError)Z	nsems_maxr   r   r   _check_system_limits  s    r~   c             c   s.   x(| D ] }|   x|r$| V  qW qW d S )N)reverser@   )iterableZelementr   r   r   _chain_from_iterable_of_lists  s    
r   c               @   s   e Zd ZdS )rq   N)r   r   r   r   r   r   r   rq     s   rq   c                   sh   e Zd ZdddZdd Zdd Zd	d
 Zejjj	e_	ddd fdd
Z
dddZejjj	e_	  ZS )ProcessPoolExecutorNr   c             C   s  t   |d kr6t pd| _tjdkrntt| j| _n8|dkrHtdn tjdkrh|tkrhtdt || _|d kr~t	
 }|| _|d k	rt|std|| _|| _d | _i | _d| _t | _d| _d| _i | _| jt }t|| j| jd| _d	| j_| | _t  | _!t" | _#d S )
Nr   win32r   z"max_workers must be greater than 0zmax_workers must be <= zinitializer must be a callableF)r;   r8   r9   T)$r~   rZ   	cpu_count_max_workerssysplatformmin_MAX_WINDOWS_WORKERSr}   r   Zget_context_mp_contextcallable	TypeError_initializer	_initargs_queue_management_thread
_processesrc   	threadingZLock_shutdown_lockrp   _queue_count_pending_work_itemsEXTRA_QUEUED_CALLSr7   _call_queueZ_ignore_epipeZSimpleQueue_result_queuera   r   	_work_idsr   _queue_management_thread_wakeup)r   max_workersZ
mp_contextr]   r^   Z
queue_sizer   r   r   r     sF    






zProcessPoolExecutor.__init__c          	   C   sv   | j d krr| jfdd}|   tjtt| || j| j	| j
| j| j| jfdd| _ d| j _| j   | jt| j < d S )Nc             S   s   t jd |  d S )Nz?Executor collected: triggering callback for QueueManager wakeup)r   utildebugr   )r   r   r   r   r   
weakref_cbA  s    zFProcessPoolExecutor._start_queue_management_thread.<locals>.weakref_cbZQueueManagerThread)targetr0   nameT)r   r   _adjust_process_countr   ZThreadrt   weakrefrefr   r   r   r   r   Zdaemonstartr   )r   r   r   r   r   _start_queue_management_thread<  s     



z2ProcessPoolExecutor._start_queue_management_threadc             C   sT   xNt t| j| jD ]8}| jjt| j| j| j	| j
fd}|  || j|j< qW d S )N)r   r0   )rk   lenr   r   r   ZProcessr`   r   r   r   r   r   pid)r   r   rg   r   r   r   r   W  s    z)ProcessPoolExecutor._adjust_process_countc           	   O   s   t | dkr| ^}}} n>| s&tdn0d|krB|d}| ^}} ntdt | d  |j |jrnt|j|jr|tdtrtdt	
 }t||| |}||j|j< |j|j | jd7  _|j  |  |S Q R X d S )N   zEdescriptor 'submit' of 'ProcessPoolExecutor' object needs an argumentr/   z6submit expected at least 1 positional argument, got %dr   z*cannot schedule new futures after shutdownz6cannot schedule new futures after interpreter shutdown)r   r   r@   r   rp   rq   rc   RuntimeErrorr   r   ZFuturer-   r   r   r   rS   r   r   r   )r0   r1   r   r/   fwr   r   r   submitb  s0    




zProcessPoolExecutor.submitr   )timeoutrK   c               s:   |dk rt dt jtt|t|d|i|d}t|S )Nr   zchunksize must be >= 1.rK   )r   )r}   r:   mapr   rR   rO   r   )r   r/   r   rK   rL   results)r<   r   r   r     s    zProcessPoolExecutor.mapTc          	   C   s   | j  d| _W d Q R X | jr6| j  |r6| j  d | _| jd k	rd| j  |r^| j  d | _d | _	d | _
| jr| j  d | _d S )NT)r   rc   r   r   r   r   r   r   Zjoin_threadr   r   )r   r   r   r   r   shutdown  s"    





zProcessPoolExecutor.shutdown)NNNr   )T)r   r   r   r   r   r   r   r   Executor__doc__r   r   rF   r   r   )r<   r   r     s    
J!
r   )NN)3
__author__atexitrZ   concurrent.futuresr   ra   r   Zmultiprocessingr   Zmultiprocessing.connectionr   Zmultiprocessing.queuesr   r   r   	functoolsr   rI   r   r&   WeakKeyDictionaryr   r   r   r   r   r   	Exceptionr!   r$   r*   objectr-   r2   r6   r7   rO   rR   rV   r`   rb   rt   rx   ry   r~   r   ZBrokenExecutorrq   r   r   registerr   r   r   r   <module>.   sT   		

(& ! L