B
    u9a۰                 @   s  d Z ddlZddlmZmZmZmZ ddlmZm	Z	 ddl
Z
ddlZddlZddlZddlZddlZddlZddlZddlZddlZddlmZ ddlmZ dZG dd	 d	eZG d
d dejZG dd dejZG dd deZG dd deZG dd deZG dd deZ G dd deZ!G dd dej"Z"G dd dej#Z$e%ej&dkdG dd dej#Z'G dd  d ej(Z(G d!d" d"ej#Z)G d#d$ d$ej*Z*G d%d& d&ej+Z+G d'd( d(ej,Z,G d)d* d*ej-Z-G d+d, d,ejZ.G d-d. d.ejZ/e0d/kre1  dS )0z!
Tests for the threading module.
    N)verboseimport_modulecpython_onlyrequires_type_collecting)assert_python_okassert_python_failure)
lock_tests)support)Znetbsd5zhp-ux11c               @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
Counterc             C   s
   d| _ d S )Nr   )value)self r   $/usr/lib/python3.7/test_threading.py__init__"   s    zCounter.__init__c             C   s   |  j d7  _ d S )N   )r   )r   r   r   r   inc$   s    zCounter.incc             C   s   |  j d8  _ d S )Nr   )r   )r   r   r   r   dec&   s    zCounter.decc             C   s   | j S )N)r   )r   r   r   r   get(   s    zCounter.getN)__name__
__module____qualname__r   r   r   r   r   r   r   r   r
   !   s   r
   c               @   s   e Zd Zdd Zdd ZdS )
TestThreadc             C   s,   t jj| |d || _|| _|| _|| _d S )N)name)	threadingThreadr   testcasesemamutexnrunning)r   r   r   r   r   r   r   r   r   r   ,   s
    zTestThread.__init__c          
   C   s   t   d }tr&td| j|d f  | j | j8 | j  trTt| j d | j	
| j d W d Q R X t| trtd| jd | j@ | j  | j	| j d trtd	| j| j f  W d Q R X W d Q R X d S )
Ng     @ztask %s will run for %.1f usecg    .Aztasks are running   Ztaskdoner   z$%s is finished. %d tasks are running)randomr   printr   r   r   r   r   r   r   ZassertLessEqualtimesleepr   ZassertGreaterEqual)r   Zdelayr   r   r   run3   s&    


zTestThread.runN)r   r   r   r   r%   r   r   r   r   r   +   s   r   c               @   s   e Zd Zdd Zdd ZdS )BaseTestCasec             C   s   t j | _d S )N)testr	   Zthreading_setup_threads)r   r   r   r   setUpM   s    zBaseTestCase.setUpc             C   s   t jj| j  t j  d S )N)r'   r	   Zthreading_cleanupr(   Zreap_children)r   r   r   r   tearDownP   s    zBaseTestCase.tearDownN)r   r   r   r)   r*   r   r   r   r   r&   L   s   r&   c               @   sn  e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zeeedd d!d" Zeeedd#d$d% Zd&d' Zeeedd(eeed)d*d+d, Zeejekd-eeedd(eeed)d*d.d/ Zed0d1 Z d2d3 Z!d4d5 Z"d6d7 Z#d8d9 Z$e%d:d; Z&e%d<d= Z'd>S )?ThreadTestsc             C   s   d}t jdd}t  }t }g }xRt|D ]F}td| | |||}|| | |j | 	t
|d |  q,W trtd xL|D ]D}|  | |  | |jd | |j | 	t
|d qW trtd	 | | d d S )
N
   r   )r   z<thread %d>z^<TestThread\(.*, initial\)>$z!waiting for all tasks to completer   z#^<TestThread\(.*, stopped -?\d+\)>$zall tasks done)r   BoundedSemaphoreRLockr
   ranger   appendassertIsNoneidentassertRegexreprstartr   r"   joinassertFalseis_aliveassertNotEqualassertIsNotNoneassertEqualr   )r   ZNUMTASKSr   r   Z
numrunningthreadsitr   r   r   test_various_opsY   s,    

zThreadTests.test_various_opsc          	      sr   |  t j  fdd}t  g t * t|d} 	  | 
d | W d Q R X tjd = d S )Nc                  s    t j    d S )N)r0   r   currentThreadr2   setr   )r    r2   r   r   f{   s    z9ThreadTests.test_ident_of_no_threading_threads.<locals>.fr   r   )r:   r   r@   r2   Eventr	   wait_threads_exit_threadstart_new_threadwaitr;   _active)r   rB   tidr   )r    r2   r   "test_ident_of_no_threading_threadsx   s    
z.ThreadTests.test_ident_of_no_threading_threadsc             C   sR   t rtd ytd W n  tjk
r:   tdY nX |   td d S )Nz!with 256 KiB thread stack size...i   z4platform does not support changing thread stack sizer   )	r   r"   r   
stack_sizerE   errorunittestSkipTestr?   )r   r   r   r   test_various_ops_small_stack   s    z(ThreadTests.test_various_ops_small_stackc             C   sR   t rtd ytd W n  tjk
r:   tdY nX |   td d S )Nzwith 1 MiB thread stack size...i   z4platform does not support changing thread stack sizer   )	r   r"   r   rK   rE   rL   rM   rN   r?   )r   r   r   r   test_various_ops_large_stack   s    z(ThreadTests.test_various_ops_large_stackc          	   C   s   dd }t  }|  t  t||f}|  W d Q R X | |t j | 	t j| t j
 | t j|   | tt j| d t j|= d S )Nc             S   s   t   |   d S )N)r   current_threadrelease)r   r   r   r   rB      s    z*ThreadTests.test_foreign_thread.<locals>.f_DummyThread)r   Lockacquirer	   rD   rE   rF   assertInrH   assertIsInstancerS   
assertTruer8   r3   r4   )r   rB   r   rI   r   r   r   test_foreign_thread   s    
zThreadTests.test_foreign_threadc       	         s  t d}|jj}|j|jf|_G dd dt | }t }| 	|t
 | |d y|||}xqfW W n  k
r   Y nX | d y| |d W n tk
r   Y nX t t G  fdddtj}| }d	|_|  trtd
 tr
td |d|}| |d tr.td  }| | trNtd | |j trhtd ||j|}| |d trtd jdd | |j trtd |jr|  d S )Nctypesc               @   s   e Zd ZdS )z<ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.AsyncExcN)r   r   r   r   r   r   r   AsyncExc   s   r[   r   zAsyncExc not raisedr   c                   s   e Zd Z fddZdS )z:ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Workerc                sT   t  | _d| _yx  td qW W n"  k
rN   d| _  Y nX d S )NFg?T)r   	get_identidfinishedrA   r#   r$   )r   )r[   worker_saw_exceptionworker_startedr   r   r%      s    
z>ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Worker.runN)r   r   r   r%   r   )r[   r_   r`   r   r   Worker   s   ra   Tz    started worker threadz     trying nonsensical thread idz,    waiting for worker thread to get startedz"    verifying worker hasn't exitedz2    attempting to raise asynch exception in workerz5    waiting for worker to say it caught the exceptionr,   )timeoutz    all OK -- joining worker)r   Z	pythonapiZPyThreadState_SetAsyncExcZc_ulongZ	py_objectZargtypes	Exceptionr   r\   rW   intZassertGreaterZfailr;   UnboundLocalErrorrC   r   daemonr5   r   r"   rG   rX   r7   r^   r]   r6   )	r   rZ   Zset_async_excZ	exceptionrI   resultra   r>   retr   )r[   r_   r`   r   test_PyThreadState_SetAsyncExc   sd    




z*ThreadTests.test_PyThreadState_SetAsyncExcc             C   sX   dd }t j}|t _z6t jdd d}| t j|j | |t jkd W d |t _X d S )Nc              W   s   t  d S )N)r   ThreadError)argsr   r   r   fail_new_thread  s    z7ThreadTests.test_limbo_cleanup.<locals>.fail_new_threadc               S   s   d S )Nr   r   r   r   r   <lambda>      z0ThreadTests.test_limbo_cleanup.<locals>.<lambda>)targetz:Failed to cleanup _limbo map on failure of Thread.start().)r   _start_new_threadr   assertRaisesrk   r5   r7   Z_limbo)r   rm   rq   r>   r   r   r   test_limbo_cleanup  s    
zThreadTests.test_limbo_cleanupc             C   s(   t d tdd\}}}| |d d S )NrZ   z-caN  if 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            *   )r   r   r;   )r   rcouterrr   r   r   test_finalize_runnning_thread  s    z)ThreadTests.test_finalize_runnning_threadc             C   s   t dd d S )Nz-caP  if 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print('program blocked; aborting')
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            )r   )r   r   r   r   test_finalize_with_trace>  s    z$ThreadTests.test_finalize_with_tracec             C   s0   t dd\}}}| | d | |d d S )Nz-ca  if 1:
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print("Woke up, sleep function is:", sleep)

                threading.Thread(target=child).start()
                raise SystemExit
            s5   Woke up, sleep function is: <built-in function sleep>ro   )r   r;   strip)r   ru   rv   rw   r   r   r   test_join_nondaemon_on_shutdownW  s
    
z+ThreadTests.test_join_nondaemon_on_shutdownc          	   C   s   t j}t }zbx\tddD ]N}t|d  t jdd d}|  |  | }| 	||d||f  qW W d t| X d S )Nr   d   g-C6*?c               S   s   d S )Nr   r   r   r   r   rn   s  ro   z7ThreadTests.test_enumerate_after_join.<locals>.<lambda>)rp   z&#1703448 triggered after %d trials: %s)
r   	enumeratesysgetswitchintervalr/   setswitchintervalr   r5   r6   assertNotIn)r   enumold_intervalr=   r>   lr   r   r   test_enumerate_after_joink  s    z%ThreadTests.test_enumerate_after_joinc             C   s   G dd dt }|dd}t|}|j  ~| j| dt|  d |dd}t|}|j  ~| j| dt|  d d S )Nc               @   s   e Zd Zdd Zdd ZdS )zDThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunctionc             S   s.   || _ tj| j| fd| id| _| j  d S )Nyet_another)rp   rl   kwargs)should_raiser   r   _runthreadr5   )r   r   r   r   r   r   ~  s
    zMThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction.__init__c             S   s   | j r
td S )N)r   
SystemExit)r   Z	other_refr   r   r   r   r     s    zIThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction._runN)r   r   r   r   r   r   r   r   r   RunSelfFunction}  s   	r   F)r   z%d references still around)msgT)objectweakrefrefr   r6   r1   r~   getrefcount)r   r   Zcyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectr   r   r   test_no_refcycle_through_target|  s    





z+ThreadTests.test_no_refcycle_through_targetc          	   C   sh   t  }|  |d |  |d | td |  W d Q R X t 	 }|
  t   d S )NTr   zuse is_alive())r   r   ZisDaemonZ	setDaemonZgetNameZsetNameZassertWarnsRegexPendingDeprecationWarningZisAliverC   ZisSetactiveCount)r   r>   er   r   r   test_old_threading_api  s    

z"ThreadTests.test_old_threading_apic             C   s2   t  }| dt| d|_| dt| d S )Nrg   T)r   r   r   r4   rg   rV   )r   r>   r   r   r   test_repr_daemon  s    zThreadTests.test_repr_daemonc             C   sH   t  }| |j t jdd}| |j t jdd}| |j d S )NF)rg   T)r   r   r7   rg   rX   )r   r>   r   r   r   test_daemon_param  s    zThreadTests.test_daemon_paramforkztest needs fork()c             C   s0   d}t d|\}}}| |d | |d d S )Na  if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        z-cro   )r   r;   )r   code_rv   rw   r   r   r   test_dummy_thread_after_fork  s    z(ThreadTests.test_dummy_thread_after_forkzneeds os.fork()c             C   s   t  }| t j| tjd xtdD ]~}tjdd d}|	  t
 }|dkrpt
| rhdnd q,|  t
|d\}}| t
| | dt
| q,W d S )	Ngư>   c               S   s   d S )Nr   r   r   r   r   rn     ro   z6ThreadTests.test_is_alive_after_fork.<locals>.<lambda>)rp   r      r,   )r~   r   
addCleanupr   r'   r	   r/   r   r   r5   osr   _exitr8   r6   waitpidrX   	WIFEXITEDr;   WEXITSTATUS)r   r   r=   r>   pidstatusr   r   r   test_is_alive_after_fork  s    z$ThreadTests.test_is_alive_after_forkc                sh   t  } |jd  |jt  j  |jt    fdd}t j|d}|  |	  d S )NZ
MainThreadc                  s     t jt j d S )N)r9   r   main_threadr2   rQ   r   )r   r   r   rB     s    z'ThreadTests.test_main_thread.<locals>.f)rp   )
r   r   r;   r   r2   rQ   r\   r   r5   r6   )r   mainrB   thr   )r   r   test_main_thread  s    zThreadTests.test_main_threadztest needs os.fork()r   ztest needs os.waitpid()c             C   s@   d}t d|\}}}| dd}| |d | |d d S )Nak  if 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        z-c ro   zMainThread
True
True
)r   decodereplacer;   )r   r   r   rv   rw   datar   r   r   test_main_thread_after_fork  s
    z'ThreadTests.test_main_thread_after_forkzdue to known OS bugc             C   s@   d}t d|\}}}| dd}| |d | |d d S )Na  if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        z-cr   r   ro   zThread-1
True
True
)r   r   r   r;   )r   r   r   rv   rw   r   r   r   r   /test_main_thread_after_fork_from_nonmain_thread
  s
    z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadc             C   sB   d}t d|\}}}| }| |d | | dgd  d S )Na  if 1:
            import gc, threading

            main_thread = threading.current_thread()
            assert main_thread is threading.main_thread()  # sanity check

            class RefCycle:
                def __init__(self):
                    self.cycle = self

                def __del__(self):
                    print("GC:",
                          threading.current_thread() is main_thread,
                          threading.main_thread() is main_thread,
                          threading.enumerate() == [main_thread])

            RefCycle()
            gc.collect()  # sanity check
            x = RefCycle()
        z-cro   zGC: True True True   )r   r   r;   
splitlines)r   r   r   rv   rw   r   r   r   r    test_main_thread_during_shutdown'  s    
z,ThreadTests.test_main_thread_during_shutdownc             C   s$   d}t d|\}}}| |d d S )Na  if 1:
            import os
            import threading
            import time
            import random

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_Finalize() is called.
                random_sleep()
                tls.x = Sleeper()
                random_sleep()

            threading.Thread(target=f).start()
            random_sleep()
        z-cro   )r   r;   )r   r   ru   rv   rw   r   r   r   test_finalization_shutdownE  s    z&ThreadTests.test_finalization_shutdownc                s   t  t         fdd}tj|d}| |jd  |    | |	  |j}| 
|jddd    | |jddd | |	  |  | 
|	  | |j |  d S )Nc                  s         td d S )Ng{Gz?)rR   rU   r#   r$   r   )finishstartedr   r   rB   n  s    z'ThreadTests.test_tstate_lock.<locals>.f)rp   r   )rc   F   )rE   allocate_lockrU   r   r   ZassertIs_tstate_lockr5   rX   r8   r7   rR   r1   r6   )r   rB   r>   tstate_lockr   )r   r   r   test_tstate_lockh  s&    zThreadTests.test_tstate_lockc                s   t  t         fdd}tj|d}|    | dt|    d}x(t	dD ]}|t|krP t
d qpW | |t| |  d S )Nc                  s         d S )N)rR   rU   r   )r   r   r   r   rB     s    z(ThreadTests.test_repr_stopped.<locals>.f)rp   r   Zstoppedi  g{Gz?)rE   r   rU   r   r   r5   rV   r4   rR   r/   r#   r$   r6   )r   rB   r>   ZLOOKING_FORr=   r   )r   r   r   test_repr_stopped  s"    zThreadTests.test_repr_stoppedc                s   xt ddD ]}t|  fddt |D }x|D ]}|  q6W x|D ]}|  qLW  fddt |D }x|D ]}|  qxW x|D ]}|  qW | t j qW d S )Nr   r,   c                s   g | ]}t j jd qS ))rp   )r   r   rU   ).0r   )bsr   r   
<listcomp>  s   z;ThreadTests.test_BoundedSemaphore_limit.<locals>.<listcomp>c                s   g | ]}t j jd qS ))rp   )r   r   rR   )r   r   )r   r   r   r     s   )r/   r   r-   r5   r6   rr   
ValueErrorrR   )r   limitr<   r>   r   )r   r   test_BoundedSemaphore_limit  s    






z'ThreadTests.test_BoundedSemaphore_limitc          	      s   fdddd  fdd d  _ t }t z8t dd l}|  xtdD ]
}   qbW W d t| X d S )	Nc                s    S )Nr   )frameeventarg)
noop_tracer   r   r     s    z9ThreadTests.test_frame_tstate_tracing.<locals>.noop_tracec               s   s   x
dV  qW d S )N	generatorr   r   r   r   r   r     s    z8ThreadTests.test_frame_tstate_tracing.<locals>.generatorc                  s    j d kr  _ t j S )N)gennextr   )callbackr   r   r   r     s    
z7ThreadTests.test_frame_tstate_tracing.<locals>.callbackr   r   )r   r~   gettracesettracer   	_testcapiZcall_in_temporary_c_threadr/   )r   Z	old_tracer   r'   r   )r   r   r   r   test_frame_tstate_tracing  s    


z%ThreadTests.test_frame_tstate_tracingc          
   C   s   xdD ]}| j |dl t }tj|j|d}|  |j}|sR| |tj n| 	|tj |
  |  | 	|tj W d Q R X qW d S )N)FT)rg   )rp   rg   )ZsubTestr   rC   r   rG   r5   r   rV   Z_shutdown_locksr   rA   r6   )r   rg   r   r   r   r   r   r   test_shutdown_locks  s    
zThreadTests.test_shutdown_locksN)(r   r   r   r?   rJ   rO   rP   rY   rj   rs   rx   ry   r{   r   r   r   r   r   rM   
skipUnlesshasattrr   r   r   r   r   skipIfr~   platformplatforms_to_skipr   r   r   r   r   r   r   r   r   r   r   r   r   r   r+   U   s<   X!##(r+   c               @   s   e Zd Zdd Zdd Zeeedde	e
jekddd	 Zeeedde	e
jekdd
d Ze	e
jekddd Zeeedde	e
jekddd Zeeedddd ZdS )ThreadJoinOnShutdownc             C   s8   d| }t d|\}}}| dd}| |d d S )Na  if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print('end of thread')
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
        
z-cr   r   zend of main
end of thread
)r   r   r   r;   )r   scriptru   rv   rw   r   r   r   r   _run_and_join  s    z"ThreadJoinOnShutdown._run_and_joinc             C   s   d}|  | d S )Nzif 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print('end of main')
            )r   )r   r   r   r   r   test_1_join_on_shutdown  s    	z,ThreadJoinOnShutdown.test_1_join_on_shutdownr   zneeds os.fork()zdue to known OS bugc             C   s   d}|  | d S )NaG  if 1:
            childpid = os.fork()
            if childpid != 0:
                os.waitpid(childpid, 0)
                sys.exit(0)

            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print('end of main')
            )r   )r   r   r   r   r   test_2_join_in_forked_process  s    z2ThreadJoinOnShutdown.test_2_join_in_forked_processc             C   s   d}|  | d S )Na:  if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            )r   )r   r   r   r   r   !test_3_join_in_forked_from_thread)  s    z6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadc             C   s"   d}t d|\}}}| | d S )Na  if True:
            import os
            import random
            import sys
            import time
            import threading

            thread_has_run = set()

            def random_io():
                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
                while True:
                    in_f = open(os.__file__, 'rb')
                    stuff = in_f.read(200)
                    null_f = open(os.devnull, 'wb')
                    null_f.write(stuff)
                    time.sleep(random.random() / 1995)
                    null_f.close()
                    in_f.close()
                    thread_has_run.add(threading.current_thread())

            def main():
                count = 0
                for _ in range(40):
                    new_thread = threading.Thread(target=random_io)
                    new_thread.daemon = True
                    new_thread.start()
                    count += 1
                while len(thread_has_run) < count:
                    time.sleep(0.001)
                # Trigger process shutdown
                sys.exit(0)

            main()
            z-c)r   r7   )r   r   ru   rv   rw   r   r   r   test_4_daemon_threadsB  s    'z*ThreadJoinOnShutdown.test_4_daemon_threadsc             C   sV   dd }g }x.t dD ]"}tj|d}|| |  qW x|D ]}|  qBW d S )Nc              S   s,   t  } | dkrt | d n
t d d S )Nr   )r   r   r   r   )r   r   r   r   do_fork_and_waits  s    zIThreadJoinOnShutdown.test_reinit_tls_after_fork.<locals>.do_fork_and_wait   )rp   )r/   r   r   r0   r5   r6   )r   r   r<   r=   r>   r   r   r   test_reinit_tls_after_forkm  s    	

z/ThreadJoinOnShutdown.test_reinit_tls_after_forkc             C   s   g }x2t dD ]&}tjdd d}|| |  qW t }|dkrptt	 dkrdt
d qt
d nt|d\}}| d| x|D ]}|  qW d S )Nr   c               S   s
   t dS )Ng333333?)r#   r$   r   r   r   r   rn     ro   zKThreadJoinOnShutdown.test_clear_threads_states_after_fork.<locals>.<lambda>)rp   r   r   )r/   r   r   r0   r5   r   r   lenr~   _current_framesr   r   r;   r6   )r   r<   r=   r>   r   r   r   r   r   r   $test_clear_threads_states_after_fork  s    

z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)r   r   r   r   r   rM   r   r   r   r   r~   r   r   r   r   r   r   r   r   r   r   r   r     s   +r   c               @   s(   e Zd Zdd Zdd Zedd ZdS )SubinterpThreadingTestsc             C   sb   t  \}}| t j| | t j| d|f }tj|}| |d | t |dd d S )Na  if 1:
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
            r   r      x)	r   piper   closer'   r	   run_in_subinterpr;   read)r   rwr   ri   r   r   r   test_threads_join  s    
z)SubinterpThreadingTests.test_threads_joinc             C   sb   t  \}}| t j| | t j| d|f }tj|}| |d | t |dd d S )Na  if 1:
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                tls.x = Sleeper()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
            r   r   r   )	r   r   r   r   r'   r	   r   r;   r   )r   r   r   r   ri   r   r   r   test_threads_join_2  s    
z+SubinterpThreadingTests.test_threads_join_2c          	   C   sH   d}d|f }t j  td|\}}}W d Q R X | d|  d S )NaA  if 1:
            import os
            import threading
            import time

            def f():
                # Make sure the daemon thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(10)
            threading.Thread(target=f, daemon=True).start()
            z[if 1:
            import _testcapi

            _testcapi.run_in_subinterp(%r)
            z-cz:Fatal Python error: Py_EndInterpreter: not the last thread)r'   r	   ZSuppressCrashReportr   rV   r   )r   Zsubinterp_coder   ru   rv   rw   r   r   r   test_daemon_threads_fatal_error  s    
z7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)r   r   r   r   r   r   r   r   r   r   r   r     s   'r   c               @   s|   e Zd Zdd Zdd Zdd Zdd Zd	d
 Ze	e
jdkoFej ddd Zdd Zedd Zdd Zdd ZdS )ThreadingExceptionTestsc             C   s*   t  }|  | t|j |  d S )N)r   r   r5   rr   RuntimeErrorr6   )r   r   r   r   r   test_start_thread_again  s    z/ThreadingExceptionTests.test_start_thread_againc             C   s   t  }| t|j d S )N)r   rQ   rr   r   r6   )r   rQ   r   r   r   test_joining_current_thread  s    z3ThreadingExceptionTests.test_joining_current_threadc             C   s   t  }| t|j d S )N)r   r   rr   r   r6   )r   r   r   r   r   test_joining_inactive_thread  s    z4ThreadingExceptionTests.test_joining_inactive_threadc             C   s.   t  }|  | tt|dd |  d S )Nrg   T)r   r   r5   rr   r   setattrr6   )r   r   r   r   r   test_daemonize_active_thread  s    z4ThreadingExceptionTests.test_daemonize_active_threadc             C   s   t  }| t|j d S )N)r   rT   rr   r   rR   )r   lockr   r   r   test_releasing_unacquired_lock  s    z6ThreadingExceptionTests.test_releasing_unacquired_lockdarwinztest macosx problemc             C   sh   d}d}t jtjd|gt jt jd}| \}}| dd}| |j	dd|   | || d S )	Na  if True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RecursionError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            zend of main thread
z-c)stdoutstderrr   r   r   zUnexpected error: )

subprocessPopenr~   
executablePIPEZcommunicater   r   r;   
returncode)r   r   Zexpected_outputpr   r   r   r   r   r   test_recursion_limit  s    z,ThreadingExceptionTests.test_recursion_limitc             C   s\   d}t d|\}}}| |d | }| d| | d| | d| | d| d S )Na  if True:
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            z-cro   zException in threadz"Traceback (most recent call last):ZeroDivisionErrorzUnhandled exception)r   r;   r   rV   r   )r   r   ru   rv   rw   r   r   r   test_print_exception:  s    z,ThreadingExceptionTests.test_print_exceptionc             C   s\   d}t d|\}}}| |d | }| d| | d| | d| | d| d S )Na  if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            sys.stderr = None
            running = False
            t.join()
            z-cro   zException in threadz"Traceback (most recent call last):r  zUnhandled exception)r   r;   r   rV   r   )r   r   ru   rv   rw   r   r   r   %test_print_exception_stderr_is_none_1U  s    z=ThreadingExceptionTests.test_print_exception_stderr_is_none_1c             C   s4   d}t d|\}}}| |d | d|  d S )Na  if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            sys.stderr = None
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            z-cro   zUnhandled exception)r   r;   r   r   )r   r   ru   rv   rw   r   r   r   %test_print_exception_stderr_is_none_2s  s    z=ThreadingExceptionTests.test_print_exception_stderr_is_none_2c                sX   dd  G  fdddt j}| }|  |  | |j | |jt d |_d S )Nc                S   s    d S )Nr   r   r   r   r   
bare_raise  s    zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.bare_raisec                   s   e Zd ZdZ fddZdS )zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558Nc          
      s8   y
   W n( t k
r2 } z
|| _W d d }~X Y nX d S )N)rd   exc)r   r  )r  r   r   r%     s    
zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558.run)r   r   r   r  r%   r   )r  r   r   
Issue27558  s   r  )r   r   r5   r6   r:   r  rW   r   )r   r  r   r   )r  r   #test_bare_raise_in_brand_new_thread  s    	z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadN)r   r   r   r   r   r   r   r   rM   r   r~   r   r'   r	   Zpython_is_optimizedr  r  r   r  r  r	  r   r   r   r   r     s    r   c               @   s$   e Zd Zdd Zdd Zdd ZdS )
TimerTestsc             C   s   t |  g | _t | _d S )N)r&   r)   callback_argsr   rC   callback_event)r   r   r   r   r)     s    
zTimerTests.setUpc             C   s   t d| j}|  | j  |jd d|jd< | j	  t d| j}|  | j  | 
t| jd | 
| jdi fdi fg |  |  d S )Ng{Gz?ZblahZbarZfoor   r   )r   ZTimer_callback_spyr5   r  rG   rl   r0   r   clearr;   r   r  r6   )r   Ztimer1Ztimer2r   r   r    test_init_immutable_default_args  s    



z+TimerTests.test_init_immutable_default_argsc             O   s*   | j |d d  | f | j  d S )N)r  r0   copyr  rA   )r   rl   r   r   r   r   r    s    zTimerTests._callback_spyN)r   r   r   r)   r  r  r   r   r   r   r
    s   r
  c               @   s   e Zd ZeejZdS )	LockTestsN)r   r   r   staticmethodr   rT   locktyper   r   r   r   r    s   r  c               @   s   e Zd ZeejZdS )PyRLockTestsN)r   r   r   r  r   Z_PyRLockr  r   r   r   r   r    s   r  zRLock not implemented in Cc               @   s   e Zd ZeejZdS )CRLockTestsN)r   r   r   r  r   _CRLockr  r   r   r   r   r    s   r  c               @   s   e Zd ZeejZdS )
EventTestsN)r   r   r   r  r   rC   Z	eventtyper   r   r   r   r    s   r  c               @   s   e Zd ZeejZdS )ConditionAsRLockTestsN)r   r   r   r  r   	Conditionr  r   r   r   r   r    s   r  c               @   s   e Zd ZeejZdS )ConditionTestsN)r   r   r   r  r   r  Zcondtyper   r   r   r   r    s   r  c               @   s   e Zd ZeejZdS )SemaphoreTestsN)r   r   r   r  r   Z	Semaphoresemtyper   r   r   r   r    s   r  c               @   s   e Zd ZeejZdS )BoundedSemaphoreTestsN)r   r   r   r  r   r-   r  r   r   r   r   r    s   r  c               @   s   e Zd ZeejZdS )BarrierTestsN)r   r   r   r  r   ZBarrierZbarriertyper   r   r   r   r    s   r  c               @   s   e Zd Zdd ZdS )MiscTestCasec             C   s&   dh}ddh}t j| td||d d S )Nrk   r@   r   )r   rE   )extra	blacklist)r	   Zcheck__all__r   )r   r   r!  r   r   r   test__all__  s    
zMiscTestCase.test__all__N)r   r   r   r"  r   r   r   r   r    s   r  c               @   s$   e Zd Zdd Zdd Zdd ZdS )InterruptMainTestsc          	   C   sF   dd }t j|d}| t |  |  W d Q R X |  d S )Nc               S   s   t   d S )N)rE   interrupt_mainr   r   r   r   call_interrupt  s    zHInterruptMainTests.test_interrupt_main_subthread.<locals>.call_interrupt)rp   )r   r   rr   KeyboardInterruptr5   r6   )r   r%  r>   r   r   r   test_interrupt_main_subthread  s    z0InterruptMainTests.test_interrupt_main_subthreadc          	   C   s"   |  t t  W d Q R X d S )N)rr   r&  rE   r$  )r   r   r   r   test_interrupt_main_mainthread  s    z1InterruptMainTests.test_interrupt_main_mainthreadc          
   C   sV   t t j}z4t  t jt j t  t  t jt j t  W d t  t j| X d S )N)signal	getsignalSIGINTSIG_IGNrE   r$  SIG_DFL)r   Zhandlerr   r   r   test_interrupt_main_noerror  s    z.InterruptMainTests.test_interrupt_main_noerrorN)r   r   r   r'  r(  r.  r   r   r   r   r#    s   r#  __main__)2__doc__Ztest.supportr'   r   r   r   r   Ztest.support.script_helperr   r   r!   r~   rE   r   r#   rM   r   r   r   r)  r   r	   r   r   r
   r   r   ZTestCaser&   r+   r   r   r   r
  r  Z
RLockTestsr  r   r  r  r  r  r  r  r  r  r  r#  r   r   r   r   r   r   <module>   sZ   
!	     ) '_ $
 
