a
    äze+Ò  ã                   @   sR  d dl Zd dl mZmZmZmZ d dlmZmZ d dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ dZG dd„ deƒZG d	d
„ d
ejƒZG dd„ dejƒZG dd„ deƒZG dd„ deƒZG dd„ deƒZ G dd„ deƒZ!G dd„ dejƒZ"G dd„ deƒZ#G dd„ deƒZ$G dd„ dej%ƒZ%G dd„ dej&ƒZ'e (ej)du d¡G d d!„ d!ej&ƒƒZ*G d"d#„ d#ej+ƒZ+G d$d%„ d%ej&ƒZ,G d&d'„ d'ej-ƒZ-G d(d)„ d)ej.ƒZ.G d*d+„ d+ej/ƒZ/G d,d-„ d-ej0ƒZ0G d.d/„ d/ejƒZ1G d0d1„ d1ejƒZ2G d2d3„ d3ejƒZ3e4d4krNe 5¡  dS )5é    N)ÚverboseÚimport_moduleÚcpython_onlyÚunlink)Úassert_python_okÚassert_python_failure)Ú
lock_tests)Úsupport)Znetbsd5zhp-ux11c                   @   s,   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	S )
ÚCounterc                 C   s
   d| _ d S )Nr   ©Úvalue©Úself© r   ú)/usr/lib/python3.9/test/test_threading.pyÚ__init__#   s    zCounter.__init__c                 C   s   |  j d7  _ d S ©Né   r   r   r   r   r   Úinc%   s    zCounter.incc                 C   s   |  j d8  _ d S r   r   r   r   r   r   Údec'   s    zCounter.decc                 C   s   | j S ©Nr   r   r   r   r   Úget)   s    zCounter.getN)Ú__name__Ú
__module__Ú__qualname__r   r   r   r   r   r   r   r   r
   "   s   r
   c                   @   s   e Zd Zdd„ Zdd„ ZdS )Ú
TestThreadc                 C   s,   t jj| |d || _|| _|| _|| _d S )N©Úname)Ú	threadingÚThreadr   ÚtestcaseÚsemaÚmutexÚnrunning)r   r   r    r!   r"   r#   r   r   r   r   -   s
    zTestThread.__init__c              	   C   s&  t   ¡ d }tr&td| j|d f ƒ | jä | jB | j ¡  trTt| j ¡ dƒ | j	 
| j ¡ d¡ W d   ƒ n1 s|0    Y  t |¡ tr¢td| jdƒ | jJ | j ¡  | j	 | j ¡ d¡ trätd	| j| j ¡ f ƒ W d   ƒ n1 sø0    Y  W d   ƒ n1 s0    Y  d S )
Ng     ˆÃ@ztask %s will run for %.1f usecg    €„.Aztasks are runningé   ZtaskÚdoner   z$%s is finished. %d tasks are running)Úrandomr   Úprintr   r!   r"   r#   r   r   r    ZassertLessEqualÚtimeÚsleepr   ZassertGreaterEqual)r   Údelayr   r   r   Úrun4   s*    ÿ
2

ÿzTestThread.runN)r   r   r   r   r+   r   r   r   r   r   ,   s   r   c                   @   s   e Zd Zdd„ Zdd„ ZdS )ÚBaseTestCasec                 C   s   t j ¡ | _d S r   )Útestr	   Zthreading_setupÚ_threadsr   r   r   r   ÚsetUpN   s    zBaseTestCase.setUpc                 C   s   t jj| jŽ  t j ¡  d S r   )r-   r	   Zthreading_cleanupr.   Úreap_childrenr   r   r   r   ÚtearDownQ   s    zBaseTestCase.tearDownN)r   r   r   r/   r1   r   r   r   r   r,   M   s   r,   c                   @   sœ  e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Ze eedƒd ¡d!d"„ ƒZe eedƒd#¡d$d%„ ƒZe eedƒd ¡d&d'„ ƒZd(d)„ Ze eedƒd*¡e eed+ƒd,¡d-d.„ ƒƒZe ejev d/¡e eedƒd*¡e eed+ƒd,¡d0d1„ ƒƒƒZd2d3„ Z d4d5„ Z!d6d7„ Z"d8d9„ Z#d:d;„ Z$e%d<d=„ ƒZ&e%d>d?„ ƒZ'd@dA„ Z(dBdC„ Z)dDdE„ Z*dFS )GÚThreadTestsc           	      C   s0  d}t jdd}t  ¡ }tƒ }g }t|ƒD ]F}td| | |||ƒ}| |¡ |  |j¡ |  	t
|ƒd¡ | ¡  q*tt dƒr¸tdd„ |D ƒƒt  ¡ hB }|  d |¡ |  t|ƒ|d	 ¡ trÄtd
ƒ |D ]D}| ¡  |  | ¡ ¡ |  |jd¡ |  |j¡ |  	t
|ƒd¡ qÈtrtdƒ |  | ¡ d¡ d S )Né
   r$   r   z<thread %d>z^<TestThread\(.*, initial\)>$Úget_native_idc                 s   s   | ]}|j V  qd S r   )Ú	native_id)Ú.0Útr   r   r   Ú	<genexpr>n   ó    z/ThreadTests.test_various_ops.<locals>.<genexpr>r   z!waiting for all tasks to completer   z#^<TestThread\(.*, stopped -?\d+\)>$zall tasks done)r   ÚBoundedSemaphoreÚRLockr
   Úranger   ÚappendÚassertIsNoneÚidentÚassertRegexÚreprÚstartÚhasattrÚsetr4   ÚassertNotInÚassertEqualÚlenr   r'   ÚjoinÚassertFalseÚis_aliveÚassertNotEqualÚassertIsNotNoner   )	r   ZNUMTASKSr!   r"   Z
numrunningÚthreadsÚir7   Z
native_idsr   r   r   Útest_various_opsZ   s4    


zThreadTests.test_various_opsc                    s†   |   t ¡ j¡ ‡ ‡fdd„}t ¡ ‰ g ‰t ¡ 4 t |d¡}ˆ  	¡  |  
ˆd |¡ W d   ƒ n1 sl0    Y  tjˆd = d S )Nc                      s   ˆ  t ¡ j¡ ˆ  ¡  d S r   )r=   r   ÚcurrentThreadr?   rD   r   ©r%   r?   r   r   Úf   s    z9ThreadTests.test_ident_of_no_threading_threads.<locals>.fr   r   )rL   r   rP   r?   ÚEventr	   Úwait_threads_exitÚ_threadÚstart_new_threadÚwaitrF   Ú_active)r   rR   Útidr   rQ   r   Ú"test_ident_of_no_threading_threads~   s    
.z.ThreadTests.test_ident_of_no_threading_threadsc                 C   sP   t rtdƒ zt d¡ W n tjy8   t d¡‚Y n0 |  ¡  t d¡ d S )Nz!with 256 KiB thread stack size...i   ú4platform does not support changing thread stack sizer   ©	r   r'   r   Ú
stack_sizerU   ÚerrorÚunittestZSkipTestrO   r   r   r   r   Útest_various_ops_small_stackŽ   s    ÿ
z(ThreadTests.test_various_ops_small_stackc                 C   sP   t rtdƒ zt d¡ W n tjy8   t d¡‚Y n0 |  ¡  t d¡ d S )Nzwith 1 MiB thread stack size...i   r[   r   r\   r   r   r   r   Útest_various_ops_large_stackš   s    ÿ
z(ThreadTests.test_various_ops_large_stackc                 C   s®   dd„ }t  ¡ }| ¡  t ¡ & t ||f¡}| ¡  W d   ƒ n1 sL0    Y  |  |t j¡ |  	t j| t j
¡ |  t j|  ¡ ¡ |  tt j| ƒd¡ t j|= d S )Nc                 S   s   t  ¡  |  ¡  d S r   )r   Úcurrent_threadÚrelease)r"   r   r   r   rR   §   s    z*ThreadTests.test_foreign_thread.<locals>.fÚ_DummyThread)r   ÚLockÚacquirer	   rT   rU   rV   ÚassertInrX   ÚassertIsInstancerd   Ú
assertTruerJ   r@   rA   )r   rR   r"   rY   r   r   r   Útest_foreign_thread¥   s    
&zThreadTests.test_foreign_threadc           	         sÂ  t dƒ}|jj}|j|jf|_G dd„ dtƒ‰ | ˆ ¡}t ¡ }|  	|t
¡ |  |d¡ z|||ƒ}qdW n ˆ yz   Y n0 |  d¡ z|  |d¡ W n ty¨   Y n0 t ¡ ‰t ¡ ‰G ‡ ‡‡fdd„dtjƒ}|ƒ }d	|_| ¡  trôtd
ƒ trtdƒ |d|ƒ}|  |d¡ tr&tdƒ ˆ ¡ }|  |¡ trFtdƒ |  |j¡ tr`tdƒ ||j|ƒ}|  |d¡ tr†tdƒ ˆjtjd |  |j¡ tr®tdƒ |jr¾| ¡  d S )NÚctypesc                   @   s   e Zd ZdS )z<ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.AsyncExcN)r   r   r   r   r   r   r   ÚAsyncExcÂ   s   rl   r   zAsyncExc not raisedr   c                       s   e Zd Z‡ ‡‡fdd„ZdS )z:ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Workerc                    sN   t  ¡ | _d| _zˆ ¡  t d¡ qW n  ˆ yH   d| _ˆ ¡  Y n0 d S )NFgš™™™™™¹?T)r   Ú	get_identÚidÚfinishedrD   r(   r)   r   ©rl   Zworker_saw_exceptionZworker_startedr   r   r+   æ   s    
z>ThreadTests.test_PyThreadState_SetAsyncExc.<locals>.Worker.runN©r   r   r   r+   r   rp   r   r   ÚWorkerå   s   rr   Tz    started worker threadz     trying nonsensical thread idéÿÿÿÿz,    waiting for worker thread to get startedz"    verifying worker hasn't exitedz2    attempting to raise asynch exception in workerz5    waiting for worker to say it caught the exception©Útimeoutz    all OK -- joining worker)r   Z	pythonapiZPyThreadState_SetAsyncExcZc_ulongZ	py_objectÚargtypesÚ	Exceptionr   rm   rh   ÚintZassertGreaterZfailrF   ÚUnboundLocalErrorrS   r   ÚdaemonrB   r   r'   rW   ri   rI   ro   rn   r	   ÚSHORT_TIMEOUTrH   )	r   rk   Zset_async_excÚ	exceptionrY   Úresultrr   r7   Úretr   rp   r   Útest_PyThreadState_SetAsyncExc¼   sb    




z*ThreadTests.test_PyThreadState_SetAsyncExcc                 C   s^   dd„ }t j}|t _z<t jdd„ d}|  t j|j¡ |  |t jv d¡ W |t _n|t _0 d S )Nc                  W   s   t  ¡ ‚d S r   )r   ÚThreadError©Úargsr   r   r   Úfail_new_thread  s    z7ThreadTests.test_limbo_cleanup.<locals>.fail_new_threadc                   S   s   d S r   r   r   r   r   r   Ú<lambda>  r9   z0ThreadTests.test_limbo_cleanup.<locals>.<lambda>©Útargetz:Failed to cleanup _limbo map on failure of Thread.start().)r   Ú_start_new_threadr   ÚassertRaisesr€   rB   rI   Ú_limbo)r   rƒ   r‡   r7   r   r   r   Útest_limbo_cleanup  s    þzThreadTests.test_limbo_cleanupc                 C   s(   t dƒ tddƒ\}}}|  |d¡ d S )Nrk   ú-caN  if 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            é*   )r   r   rF   ©r   ÚrcÚoutÚerrr   r   r   Útest_finalize_running_thread#  s    z(ThreadTests.test_finalize_running_threadc                 C   s   t ddƒ d S )Nr‹   aP  if 1:
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print('program blocked; aborting')
                os._exit(2)
            t = threading.Thread(target=killer)
            t.daemon = True
            t.start()

            # This is the trace function
            def func(frame, event, arg):
                threading.current_thread()
                return func

            sys.settrace(func)
            )r   r   r   r   r   Útest_finalize_with_traceD  s    z$ThreadTests.test_finalize_with_tracec                 C   s0   t ddƒ\}}}|  | ¡ d¡ |  |d¡ d S )Nr‹   a§  if 1:
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print("Woke up, sleep function is:", sleep)

                threading.Thread(target=child).start()
                raise SystemExit
            s5   Woke up, sleep function is: <built-in function sleep>r9   )r   rF   Ústripr   r   r   r   Útest_join_nondaemon_on_shutdown]  s
    
ÿz+ThreadTests.test_join_nondaemon_on_shutdownc              	   C   sˆ   t j}t ¡ }zhtddƒD ]N}t |d ¡ t jdd„ d}| ¡  | ¡  |ƒ }|  	||d||f ¡ qW t |¡ nt |¡ 0 d S )Nr   éd   g-Cëâ6*?c                   S   s   d S r   r   r   r   r   r   r„   y  r9   z7ThreadTests.test_enumerate_after_join.<locals>.<lambda>r…   z&#1703448 triggered after %d trials: %s)
r   Ú	enumerateÚsysÚgetswitchintervalr<   Úsetswitchintervalr   rB   rH   rE   )r   ÚenumÚold_intervalrN   r7   Úlr   r   r   Útest_enumerate_after_joinq  s    
ÿz%ThreadTests.test_enumerate_after_joinc                 C   sŒ   G dd„ dt ƒ}|dd}t |¡}|j ¡  ~| j|ƒ dt |ƒ ¡ d |dd}t |¡}|j ¡  ~| j|ƒ dt |ƒ ¡ d d S )Nc                   @   s   e Zd Zdd„ Zdd„ ZdS )zDThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunctionc                 S   s.   || _ tj| j| fd| id| _| j ¡  d S )NÚyet_another)r†   r‚   Úkwargs)Úshould_raiser   r   Ú_runÚthreadrB   )r   r    r   r   r   r   „  s    þzMThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction.__init__c                 S   s   | j r
t‚d S r   )r    Ú
SystemExit)r   Z	other_refrž   r   r   r   r¡     s    zIThreadTests.test_no_refcycle_through_target.<locals>.RunSelfFunction._runN)r   r   r   r   r¡   r   r   r   r   ÚRunSelfFunctionƒ  s   	r¤   F)r    z%d references still around)ÚmsgT)ÚobjectÚweakrefÚrefr¢   rH   r>   r—   Úgetrefcount)r   r¤   Zcyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectr   r   r   Útest_no_refcycle_through_target‚  s&    



ÿÿ



ÿÿz+ThreadTests.test_no_refcycle_through_targetc                 C   sH   t  ¡ }| ¡  | d¡ | ¡  | d¡ t  ¡ }| ¡  t  ¡  d S )NTr   )	r   r   ÚisDaemonÚ	setDaemonÚgetNameÚsetNamerS   ÚisSetÚactiveCount)r   r7   Úer   r   r   Útest_old_threading_api¡  s    

z"ThreadTests.test_old_threading_apic                 C   s2   t  ¡ }|  dt|ƒ¡ d|_|  dt|ƒ¡ d S ©Nrz   T)r   r   rE   rA   rz   rg   ©r   r7   r   r   r   Útest_repr_daemon­  s    zThreadTests.test_repr_daemonc                 C   sH   t  ¡ }|  |j¡ t jdd}|  |j¡ t jdd}|  |j¡ d S )NF©rz   T)r   r   rI   rz   ri   r´   r   r   r   Útest_daemon_param³  s    zThreadTests.test_daemon_paramÚforkúneeds os.fork()c                 C   s:   t  d¡}td|ƒ\}}}|  |d¡ |  | ¡ d¡ d S )Na“  
            import atexit
            import os
            import sys
            from test.support import wait_process

            # Import the threading module to register its "at fork" callback
            import threading

            def exit_handler():
                pid = os.fork()
                if not pid:
                    print("child process ok", file=sys.stderr, flush=True)
                    # child process
                    sys.exit()
                else:
                    wait_process(pid, exitcode=0)

            # exit_handler() will be called after threading._shutdown()
            atexit.register(exit_handler)
        r‹   r9   s   child process ok)ÚtextwrapÚdedentr   rF   Úrstrip©r   ÚcodeÚ_r   r   r   r   r   Útest_fork_at_exit»  s    
zThreadTests.test_fork_at_exitztest needs fork()c                 C   s0   d}t d|ƒ\}}}|  |d¡ |  |d¡ d S )NaŠ  if 1:
            import _thread, threading, os, time

            def background_thread(evt):
                # Creates and registers the _DummyThread instance
                threading.current_thread()
                evt.set()
                time.sleep(10)

            evt = threading.Event()
            _thread.start_new_thread(background_thread, (evt,))
            evt.wait()
            assert threading.active_count() == 2, threading.active_count()
            if os.fork() == 0:
                assert threading.active_count() == 1, threading.active_count()
                os._exit(0)
            else:
                os.wait()
        r‹   r9   ©r   rF   r½   r   r   r   Útest_dummy_thread_after_forkØ  s    z(ThreadTests.test_dummy_thread_after_forkc                 C   sŠ   t  ¡ }|  t j|¡ tj d¡ tdƒD ]Z}tjdd„ d}| 	¡  t
 ¡ }|dkrnt
 | ¡ rfdnd¡ q*| ¡  tj|dd	 q*d S )
Ngíµ ÷Æ°>é   c                   S   s   d S r   r   r   r   r   r   r„   þ  r9   z6ThreadTests.test_is_alive_after_fork.<locals>.<lambda>r…   r   é   r3   ©Úexitcode)r—   r˜   Ú
addCleanupr™   r-   r	   r<   r   r   rB   Úosr¸   Ú_exitrJ   rH   Úwait_process)r   r›   rN   r7   Úpidr   r   r   Útest_is_alive_after_forkó  s    z$ThreadTests.test_is_alive_after_forkc                    sh   t  ¡ }ˆ  |jd¡ ˆ  |jt  ¡ j¡ ˆ  |jt  ¡ ¡ ‡ fdd„}t j|d}| ¡  | 	¡  d S )NÚ
MainThreadc                      s   ˆ   t ¡ jt ¡ j¡ d S r   )rK   r   Úmain_threadr?   rb   r   r   r   r   rR     s    ÿz'ThreadTests.test_main_thread.<locals>.fr…   )
r   rÎ   rF   r   r?   rb   rm   r   rB   rH   )r   ÚmainrR   Úthr   r   r   Útest_main_thread  s    zThreadTests.test_main_threadztest needs os.fork()Úwaitpidztest needs os.waitpid()c                 C   s@   d}t d|ƒ\}}}| ¡  dd¡}|  |d¡ |  |d¡ d S )Na£  if 1:
            import os, threading
            from test import support

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                support.wait_process(pid, exitcode=0)
        r‹   úÚ r9   zMainThread
True
True
©r   ÚdecodeÚreplacerF   ©r   r¾   r¿   r   r   Údatar   r   r   Útest_main_thread_after_fork  s
    z'ThreadTests.test_main_thread_after_forkúdue to known OS bugc                 C   s@   d}t d|ƒ\}}}| ¡  dd¡}|  |d¡ |  |d¡ d S )NaÔ  if 1:
            import os, threading, sys
            from test import support

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    support.wait_process(pid, exitcode=0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        r‹   rÓ   rÔ   r9   zThread-1
True
True
rÕ   rØ   r   r   r   Ú/test_main_thread_after_fork_from_nonmain_thread*  s
    z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadc                 C   sB   d}t d|ƒ\}}}| ¡ }|  |d¡ |  | ¡ dgd ¡ d S )Na€  if 1:
            import gc, threading

            main_thread = threading.current_thread()
            assert main_thread is threading.main_thread()  # sanity check

            class RefCycle:
                def __init__(self):
                    self.cycle = self

                def __del__(self):
                    print("GC:",
                          threading.current_thread() is main_thread,
                          threading.main_thread() is main_thread,
                          threading.enumerate() == [main_thread])

            RefCycle()
            gc.collect()  # sanity check
            x = RefCycle()
        r‹   r9   zGC: True True Trueé   )r   rÖ   rF   Ú
splitlinesrØ   r   r   r   Ú test_main_thread_during_shutdownH  s    
ÿz,ThreadTests.test_main_thread_during_shutdownc                 C   s$   d}t d|ƒ\}}}|  |d¡ d S )Na¤  if 1:
            import os
            import threading
            import time
            import random

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_Finalize() is called.
                random_sleep()
                tls.x = Sleeper()
                random_sleep()

            threading.Thread(target=f).start()
            random_sleep()
        r‹   r9   rÁ   ©r   r¾   rŽ   r   r   r   r   r   Útest_finalization_shutdowne  s    z&ThreadTests.test_finalization_shutdownc                    sÚ   t  ¡ ‰t  ¡ ‰ ˆ ¡  ˆ  ¡  ‡ ‡fdd„}tj|d}|  |jd ¡ | ¡  ˆ ¡  |  | 	¡ ¡ |j}|  
|jddd¡ ˆ  ¡  |  |jtjdd¡ |  | 	¡ ¡ | ¡  |  
| 	¡ ¡ |  |j¡ | ¡  d S )Nc                      s   ˆ  ¡  ˆ  ¡  t d¡ d S )Nç{®Gáz„?)rc   rf   r(   r)   r   ©ZfinishÚstartedr   r   rR   Ž  s    z'ThreadTests.test_tstate_lock.<locals>.fr…   r   rt   F)rU   Úallocate_lockrf   r   r   ÚassertIsÚ_tstate_lockrB   ri   rJ   rI   rc   r	   r{   r>   rH   )r   rR   r7   Útstate_lockr   rã   r   Útest_tstate_lockˆ  s&    zThreadTests.test_tstate_lockc                    sª   t  ¡ ‰t  ¡ ‰ ˆ ¡  ˆ  ¡  ‡ ‡fdd„}tj|d}| ¡  ˆ ¡  |  dt|ƒ¡ ˆ  ¡  d}t	dƒD ]}|t|ƒv r‚ qŽt
 d¡ qn|  |t|ƒ¡ | ¡  d S )Nc                      s   ˆ  ¡  ˆ  ¡  d S r   )rc   rf   r   rã   r   r   rR   ±  s    z(ThreadTests.test_repr_stopped.<locals>.fr…   rä   Ústoppediô  râ   )rU   rå   rf   r   r   rB   rg   rA   rc   r<   r(   r)   rH   )r   rR   r7   ZLOOKING_FORrN   r   rã   r   Útest_repr_stopped«  s"    zThreadTests.test_repr_stoppedc                    s    t ddƒD ]}t |¡‰ ‡ fdd„t |ƒD ƒ}|D ]}| ¡  q2|D ]}| ¡  qD‡ fdd„t |ƒD ƒ}|D ]}| ¡  ql|D ]}| ¡  q~|  tˆ j¡ q
d S )Nr   r3   c                    s   g | ]}t jˆ jd ‘qS ©r…   )r   r   rf   ©r6   r¿   ©Úbsr   r   Ú
<listcomp>Ê  s   ÿz;ThreadTests.test_BoundedSemaphore_limit.<locals>.<listcomp>c                    s   g | ]}t jˆ jd ‘qS rì   )r   r   rc   rí   rî   r   r   rð   Ð  s   ÿ)r<   r   r:   rB   rH   rˆ   Ú
ValueErrorrc   )r   ÚlimitrM   r7   r   rî   r   Útest_BoundedSemaphore_limitÆ  s"    

ÿ


ÿ

z'ThreadTests.test_BoundedSemaphore_limitc              	      sŠ   ‡fdd„‰dd„ ‰‡ ‡fdd„‰ d ˆ _ t ¡ }t ˆ¡ z>t ˆ¡ dd l}| ˆ ¡ tdƒD ]
}ˆ ƒ  q`W t |¡ nt |¡ 0 d S )	Nc                    s   ˆ S r   r   )ÚframeÚeventÚarg)Ú
noop_tracer   r   r÷   à  s    z9ThreadTests.test_frame_tstate_tracing.<locals>.noop_tracec                   s   s   dV  q d S )NÚ	generatorr   r   r   r   r   rø   ä  s    z8ThreadTests.test_frame_tstate_tracing.<locals>.generatorc                      s   ˆ j d u rˆƒ ˆ _ tˆ j ƒS r   )ÚgenÚnextr   )Úcallbackrø   r   r   rû   è  s    
z7ThreadTests.test_frame_tstate_tracing.<locals>.callbackr   r$   )rù   r—   ÚgettraceÚsettracer   Ú	_testcapiZcall_in_temporary_c_threadr<   )r   Z	old_tracerþ   r-   r   )rû   rø   r÷   r   Útest_frame_tstate_tracingØ  s    



z%ThreadTests.test_frame_tstate_tracingc              	   C   s    dD ]–}| j |dv t ¡ }tj|j|d}| ¡  |j}|sP|  |tj¡ n|  	|tj¡ | 
¡  | ¡  |  	|tj¡ W d   ƒ q1 s0    Y  qd S )N)FTr¶   )r†   rz   )ZsubTestr   rS   r   rW   rB   rç   rg   Ú_shutdown_locksrE   rD   rH   )r   rz   rõ   r¢   rè   r   r   r   Útest_shutdown_locks   s    zThreadTests.test_shutdown_locksc                 C   s$   t ddƒ\}}}|  | ¡ d¡ d S )Nr‹   a(  if 1:
            import threading

            class Atexit:
                def __del__(self):
                    print("thread_dict.atexit = %r" % thread_dict.atexit)

            thread_dict = threading.local()
            thread_dict.atexit = "value"

            atexit = Atexit()
        s   thread_dict.atexit = 'value')r   rF   r¼   r   r   r   r   Útest_locals_at_exit  s    zThreadTests.test_locals_at_exitc                 C   sD   dd„ }t  ¡   tj|d ¡  W d   ƒ n1 s60    Y  d S )Nc                   S   s   d S r   r   r   r   r   r   Únoop,  r9   z0ThreadTests.test_leak_without_join.<locals>.noopr…   )r	   rT   r   r   rB   )r   r  r   r   r   Útest_leak_without_join)  s    
z"ThreadTests.test_leak_without_joinc                 C   s6   t  d¡}td|ƒ\}}}|  |d¡ |  |d¡ d S )Na¨  
            import _thread
            import sys

            event = _thread.allocate_lock()
            event.acquire()

            def import_threading():
                import threading
                event.release()

            if 'threading' in sys.modules:
                raise Exception('threading is already imported')

            _thread.start_new_thread(import_threading, ())

            # wait until the threading module is imported
            event.acquire()
            event.release()

            if 'threading' not in sys.modules:
                raise Exception('threading is not imported')

            # don't wait until the thread completes
        r‹   r9   )rº   r»   r   rF   rà   r   r   r   Útest_import_from_another_thread1  s    
z+ThreadTests.test_import_from_another_threadN)+r   r   r   rO   rZ   r`   ra   rj   r   rŠ   r‘   r’   r”   r   rª   r²   rµ   r·   r_   Ú
skipUnlessrC   rÈ   rÀ   rÂ   rÌ   rÑ   rÚ   ÚskipIfr—   ÚplatformÚplatforms_to_skiprÜ   rß   rá   ré   rë   ró   r   rÿ   r  r  r  r  r   r   r   r   r2   V   sR   $X!


##
'
r2   c                   @   sÔ   e Zd Zdd„ Zdd„ Ze eedƒd¡e 	e
jev d¡dd	„ ƒƒZe eedƒd¡e 	e
jev d¡d
d„ ƒƒZe 	e
jev d¡dd„ ƒZe eedƒd¡e 	e
jev d¡dd„ ƒƒZe eedƒd¡dd„ ƒZdS )ÚThreadJoinOnShutdownc                 C   s8   d| }t d|ƒ\}}}| ¡  dd¡}|  |d¡ d S )Na…  if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print('end of thread')
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
        
r‹   rÓ   rÔ   zend of main
end of thread
rÕ   )r   ÚscriptrŽ   r   r   rÙ   r   r   r   Ú_run_and_joinU  s    
öz"ThreadJoinOnShutdown._run_and_joinc                 C   s   d}|   |¡ d S )Nzõif 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print('end of main')
            ©r  ©r   r  r   r   r   Útest_1_join_on_shutdownf  s    z,ThreadJoinOnShutdown.test_1_join_on_shutdownr¸   r¹   rÛ   c                 C   s   d}|   |¡ d S )Na½  if 1:
            from test import support

            childpid = os.fork()
            if childpid != 0:
                # parent process
                support.wait_process(childpid, exitcode=0)
                sys.exit(0)

            # child process
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print('end of main')
            r  r  r   r   r   Útest_2_join_in_forked_processr  s    z2ThreadJoinOnShutdown.test_2_join_in_forked_processc                 C   s   d}|   |¡ d S )Na¸  if 1:
            from test import support

            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    # parent process
                    support.wait_process(childpid, exitcode=0)
                    sys.exit(0)

                # child process
                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            r  r  r   r   r   Ú!test_3_join_in_forked_from_thread‡  s    z6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadc                 C   s"   d}t d|ƒ\}}}|  |¡ d S )Nac  if True:
            import os
            import random
            import sys
            import time
            import threading

            thread_has_run = set()

            def random_io():
                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
                while True:
                    with open(os.__file__, 'rb') as in_f:
                        stuff = in_f.read(200)
                        with open(os.devnull, 'wb') as null_f:
                            null_f.write(stuff)
                            time.sleep(random.random() / 1995)
                    thread_has_run.add(threading.current_thread())

            def main():
                count = 0
                for _ in range(40):
                    new_thread = threading.Thread(target=random_io)
                    new_thread.daemon = True
                    new_thread.start()
                    count += 1
                while len(thread_has_run) < count:
                    time.sleep(0.001)
                # Trigger process shutdown
                sys.exit(0)

            main()
            r‹   ©r   rI   ©r   r  rŽ   r   r   r   r   r   Útest_4_daemon_threads¤  s    !z*ThreadJoinOnShutdown.test_4_daemon_threadsc                 C   sN   dd„ }g }t dƒD ]"}tj|d}| |¡ | ¡  q|D ]}| ¡  q<d S )Nc                  S   s.   t  ¡ } | dkr tj| dd n
t  d¡ d S )Nr   é2   rÅ   )rÈ   r¸   r	   rÊ   rÉ   )rË   r   r   r   Údo_fork_and_waitÓ  s    zIThreadJoinOnShutdown.test_reinit_tls_after_fork.<locals>.do_fork_and_waité   r…   )r<   r   r   r=   rB   rH   )r   r  rM   rN   r7   r   r   r   Útest_reinit_tls_after_forkÍ  s    	

z/ThreadJoinOnShutdown.test_reinit_tls_after_forkc                 C   s   g }t dƒD ]&}tjdd„ d}| |¡ | ¡  qt ¡ }|dkrltt 	¡ ƒdkr`t 
d¡ qzt 
d¡ ntj|dd	 |D ]}| ¡  q~d S )
Nr  c                   S   s
   t  d¡S )Ng333333Ó?)r(   r)   r   r   r   r   r„   ì  r9   zKThreadJoinOnShutdown.test_clear_threads_states_after_fork.<locals>.<lambda>r…   r   r   é3   é4   rÅ   )r<   r   r   r=   rB   rÈ   r¸   rG   r—   Ú_current_framesrÉ   r	   rÊ   rH   )r   rM   rN   r7   rË   r   r   r   Ú$test_clear_threads_states_after_forkå  s    

z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)r   r   r   r  r  r_   r  rC   rÈ   r  r—   r  r	  r  r  r  r  r  r   r   r   r   r
  S  s   
(r
  c                   @   s0   e Zd Zdd„ Zdd„ Zdd„ Zedd„ ƒZd	S )
ÚSubinterpThreadingTestsc                 C   sF   t  ¡ \}}|  t j|¡ |  t j|¡ tt dƒr>t  |d¡ ||fS )NÚset_blockingF)rÈ   ÚpiperÇ   ÚcloserC   r  )r   ÚrÚwr   r   r   r  ÿ  s    
zSubinterpThreadingTests.pipec                 C   sL   |   ¡ \}}t d|f ¡}tj |¡}|  |d¡ |  t |d¡d¡ d S )Naþ  
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
        r   r   ó   x©	r  rº   r»   r-   r	   Zrun_in_subinterprF   rÈ   Úread©r   r!  r"  r¾   r~   r   r   r   Útest_threads_join  s    îz)SubinterpThreadingTests.test_threads_joinc                 C   sL   |   ¡ \}}t d|f ¡}tj |¡}|  |d¡ |  t |d¡d¡ d S )Na§  
            import os
            import random
            import threading
            import time

            def random_sleep():
                seconds = random.random() * 0.010
                time.sleep(seconds)

            class Sleeper:
                def __del__(self):
                    random_sleep()

            tls = threading.local()

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                random_sleep()
                tls.x = Sleeper()
                os.write(%d, b"x")

            threading.Thread(target=f).start()
            random_sleep()
        r   r   r#  r$  r&  r   r   r   Útest_threads_join_2#  s    çz+SubinterpThreadingTests.test_threads_join_2c                 C   sh   dt jj› d}d|f }t j ¡   td|ƒ\}}}W d   ƒ n1 sJ0    Y  |  d| ¡ ¡ d S )Nzõif 1:
            import os
            import threading
            import time

            def f():
                # Make sure the daemon thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(zJ)
            threading.Thread(target=f, daemon=True).start()
            z[if 1:
            import _testcapi

            _testcapi.run_in_subinterp(%r)
            r‹   z:Fatal Python error: Py_EndInterpreter: not the last thread)r-   r	   r{   ZSuppressCrashReportr   rg   rÖ   )r   Zsubinterp_coder  rŽ   r   r   r   r   r   Útest_daemon_threads_fatal_errorH  s    øü.ÿz7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)r   r   r   r  r'  r(  r   r)  r   r   r   r   r  þ  s
   %r  c                   @   sd   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ ZdS )ÚThreadingExceptionTestsc                 C   s*   t  ¡ }| ¡  |  t|j¡ | ¡  d S r   )r   r   rB   rˆ   ÚRuntimeErrorrH   ©r   r¢   r   r   r   Útest_start_thread_againc  s    z/ThreadingExceptionTests.test_start_thread_againc                 C   s   t  ¡ }|  t|j¡ d S r   )r   rb   rˆ   r+  rH   )r   rb   r   r   r   Útest_joining_current_threadi  s    z3ThreadingExceptionTests.test_joining_current_threadc                 C   s   t  ¡ }|  t|j¡ d S r   )r   r   rˆ   r+  rH   r,  r   r   r   Útest_joining_inactive_threadm  s    z4ThreadingExceptionTests.test_joining_inactive_threadc                 C   s.   t  ¡ }| ¡  |  tt|dd¡ | ¡  d S r³   )r   r   rB   rˆ   r+  ÚsetattrrH   r,  r   r   r   Útest_daemonize_active_threadq  s    z4ThreadingExceptionTests.test_daemonize_active_threadc                 C   s   t  ¡ }|  t|j¡ d S r   )r   re   rˆ   r+  rc   )r   Úlockr   r   r   Útest_releasing_unacquired_lockw  s    z6ThreadingExceptionTests.test_releasing_unacquired_lockc                 C   sh   d}d}t jtjd|gt jt jd}| ¡ \}}| ¡  dd¡}|  |j	dd| ¡  ¡ |  ||¡ d S )	Na  if True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RecursionError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            zend of main thread
r‹   )ÚstdoutÚstderrrÓ   rÔ   r   zUnexpected error: )
Ú
subprocessÚPopenr—   Ú
executableÚPIPEÚcommunicaterÖ   r×   rF   Ú
returncode)r   r  Zexpected_outputÚpr4  r5  rÙ   r   r   r   Útest_recursion_limit{  s    ÿz,ThreadingExceptionTests.test_recursion_limitc                 C   s\   d}t d|ƒ\}}}|  |d¡ | ¡ }|  d|¡ |  d|¡ |  d|¡ |  d|¡ d S )NaÈ  if True:
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            r‹   r9   úException in threadú"Traceback (most recent call last):ÚZeroDivisionErrorúUnhandled exception©r   rF   rÖ   rg   rE   r  r   r   r   Útest_print_exceptionš  s    z,ThreadingExceptionTests.test_print_exceptionc                 C   s\   d}t d|ƒ\}}}|  |d¡ | ¡ }|  d|¡ |  d|¡ |  d|¡ |  d|¡ d S )Naý  if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            sys.stderr = None
            running = False
            t.join()
            r‹   r9   r>  r?  r@  rA  rB  r  r   r   r   Ú%test_print_exception_stderr_is_none_1µ  s    z=ThreadingExceptionTests.test_print_exception_stderr_is_none_1c                 C   s4   d}t d|ƒ\}}}|  |d¡ |  d| ¡ ¡ d S )Naý  if True:
            import sys
            import threading
            import time

            running = False
            def run():
                global running
                running = True
                while running:
                    time.sleep(0.01)
                1/0
            sys.stderr = None
            t = threading.Thread(target=run)
            t.start()
            while not running:
                time.sleep(0.01)
            running = False
            t.join()
            r‹   r9   rA  )r   rF   rE   rÖ   r  r   r   r   Ú%test_print_exception_stderr_is_none_2Ò  s    z=ThreadingExceptionTests.test_print_exception_stderr_is_none_2c                    sX   dd„ ‰ G ‡ fdd„dt jƒ}|ƒ }| ¡  | ¡  |  |j¡ |  |jt¡ d |_d S )Nc                    S   s   ‚ d S r   r   r   r   r   r   Ú
bare_raiseì  s    zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.bare_raisec                       s   e Zd ZdZ‡ fdd„ZdS )zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558Nc              
      s:   z
ˆ ƒ  W n* t y4 } z|| _W Y d }~n
d }~0 0 d S r   )rw   Úexc)r   rG  ©rF  r   r   r+   ò  s    
zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread.<locals>.Issue27558.run)r   r   r   rG  r+   r   rH  r   r   Ú
Issue27558ï  s   rI  )r   r   rB   rH   rL   rG  rh   r+  )r   rI  r¢   r   rH  r   Ú#test_bare_raise_in_brand_new_threadë  s    	z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadc                    sL   dd„ ‰ |   ttjj¡ ‡ fdd„tdƒD ƒ}|D ]}| ¡  | ¡  q2d S )Nc                  S   sH   t tjjddd"} |  d¡ t ¡  W d   ƒ n1 s:0    Y  d S )Nr"  zutf-8)Úencodingú )Úopenr-   r	   ÚTESTFNÚwriteÚ	tracebackÚformat_stack)Úfpr   r   r   Úmodify_file  s    
zQThreadingExceptionTests.test_multithread_modify_file_noerror.<locals>.modify_filec                    s   g | ]}t jˆ d ‘qS rì   )r   r   )r6   rN   ©rS  r   r   rð     s   ÿzPThreadingExceptionTests.test_multithread_modify_file_noerror.<locals>.<listcomp>r•   )rÇ   r   r-   r	   rN  r<   rB   rH   )r   rM   r7   r   rT  r   Ú$test_multithread_modify_file_noerror   s    
þz<ThreadingExceptionTests.test_multithread_modify_file_noerrorN)r   r   r   r-  r.  r/  r1  r3  r=  rC  rD  rE  rJ  rU  r   r   r   r   r*  `  s   r*  c                   @   s   e Zd Zdd„ ZdS )ÚThreadRunFailc                 C   s   t dƒ‚d S )Nú
run failed©rñ   r   r   r   r   r+     s    zThreadRunFail.runNrq   r   r   r   r   rV    s   rV  c                   @   s:   e Zd Zdd„ Zejdd„ ƒZdd„ Zdd„ Zd	d
„ Z	dS )ÚExceptHookTestsc                 C   sŽ   t  d¡*}tdd}| ¡  | ¡  W d   ƒ n1 s:0    Y  | ¡  ¡ }|  d|j› d|¡ |  d|¡ |  d|¡ |  d|¡ d S )	Nr5  zexcepthook threadr   úException in thread ú:
ú#Traceback (most recent call last):
z   raise ValueError("run failed")zValueError: run failed)	r	   Úcaptured_outputrV  rB   rH   Úgetvaluer“   rg   r   )r   r5  r¢   r   r   r   Útest_excepthook  s    
&zExceptHookTests.test_excepthookc                 C   sØ   t  d¡r}ztdƒ‚W nT tyl } z<t g t ¡ ¢d ‘¡}zt |¡ W d }nd }0 W Y d }~n
d }~0 0 W d   ƒ n1 s‚0    Y  | 	¡  
¡ }|  dt ¡ › d|¡ |  d|¡ |  d|¡ |  d|¡ d S )Nr5  ZbugrZ  r[  r\  z  raise ValueError("bug")zValueError: bug)r	   r]  rñ   rw   r   ÚExceptHookArgsr—   Úexc_infoÚ
excepthookr^  r“   rg   rm   )r   r5  rG  r‚   r   r   r   Útest_excepthook_thread_None#  s    @z+ExceptHookTests.test_excepthook_thread_Nonec                 C   sf   G dd„ dt jƒ}t d¡&}|ƒ }| ¡  | ¡  W d   ƒ n1 sH0    Y  |  | ¡ d¡ d S )Nc                   @   s   e Zd Zdd„ ZdS )z4ExceptHookTests.test_system_exit.<locals>.ThreadExitc                 S   s   t  d¡ d S r   )r—   Úexitr   r   r   r   r+   :  s    z8ExceptHookTests.test_system_exit.<locals>.ThreadExit.runNrq   r   r   r   r   Ú
ThreadExit9  s   re  r5  rÔ   )r   r   r	   r]  rB   rH   rF   r^  )r   re  r5  r¢   r   r   r   Útest_system_exit8  s    &z ExceptHookTests.test_system_exitc                    s¨   d ‰ ‡ fdd„}zŒt  td|¡& tƒ }| ¡  | ¡  W d   ƒ n1 sL0    Y  |  ˆ jt¡ |  t	ˆ j
ƒd¡ |  ˆ jˆ j
j¡ |  ˆ j|¡ W d ‰ nd ‰ 0 d S )Nc                    s   | ‰ d S r   r   )Z	hook_argsr   r   r   ÚhookH  s    z4ExceptHookTests.test_custom_excepthook.<locals>.hookrb  rW  )r	   Ú	swap_attrr   rV  rB   rH   rF   Úexc_typerñ   ÚstrÚ	exc_valueÚexc_tracebackÚ__traceback__ræ   r¢   )r   rg  r¢   r   r   r   Útest_custom_excepthookE  s    &z&ExceptHookTests.test_custom_excepthookc              
      sÔ   dd„ }d ‰ ‡ fdd„}t  td|¡~ t  td|¡P t  d¡&}tƒ }| ¡  | ¡  W d   ƒ n1 sn0    Y  W d   ƒ n1 sŒ0    Y  W d   ƒ n1 sª0    Y  |  | 	¡ d¡ |  ˆ d¡ d S )	Nc                 S   s   t dƒ‚d S )Núthreading_hook failedrX  r   r   r   r   Úthreading_hook[  s    zCExceptHookTests.test_custom_excepthook_fail.<locals>.threading_hookc                    s   t |ƒ‰ d S r   )rj  )ri  rk  rl  ©Zerr_strr   r   Úsys_hook`  s    z=ExceptHookTests.test_custom_excepthook_fail.<locals>.sys_hookrb  r5  z#Exception in threading.excepthook:
ro  )
r	   rh  r   r—   r]  rV  rB   rH   rF   r^  )r   rp  rr  r5  r¢   r   rq  r   Útest_custom_excepthook_failZ  s     ÿþb
ÿz+ExceptHookTests.test_custom_excepthook_failN)
r   r   r   r_  r	   r   rc  rf  rn  rs  r   r   r   r   rY    s   
rY  c                   @   s$   e Zd Zdd„ Zdd„ Zdd„ ZdS )Ú
TimerTestsc                 C   s   t  | ¡ g | _t ¡ | _d S r   )r,   r/   Úcallback_argsr   rS   Úcallback_eventr   r   r   r   r/   r  s    
zTimerTests.setUpc                 C   s    t  d| j¡}| ¡  | j ¡  |j d¡ d|jd< | j 	¡  t  d| j¡}| ¡  | j ¡  |  
t| jƒd¡ |  
| jdi fdi fg¡ | ¡  | ¡  d S )Nrâ   ZblahZbarZfoorÝ   r   )r   ÚTimerÚ_callback_spyrB   rv  rW   r‚   r=   rŸ   ÚclearrF   rG   ru  rH   )r   Ztimer1Ztimer2r   r   r   Ú test_init_immutable_default_argsw  s    



z+TimerTests.test_init_immutable_default_argsc                 O   s*   | j  |d d … | ¡ f¡ | j ¡  d S r   )ru  r=   Úcopyrv  rD   )r   r‚   rŸ   r   r   r   rx  ˆ  s    zTimerTests._callback_spyN)r   r   r   r/   rz  rx  r   r   r   r   rt  p  s   rt  c                   @   s   e Zd ZeejƒZdS )Ú	LockTestsN)r   r   r   Ústaticmethodr   re   Úlocktyper   r   r   r   r|  Œ  s   r|  c                   @   s   e Zd ZeejƒZdS )ÚPyRLockTestsN)r   r   r   r}  r   Ú_PyRLockr~  r   r   r   r   r    s   r  zRLock not implemented in Cc                   @   s   e Zd ZeejƒZdS )ÚCRLockTestsN)r   r   r   r}  r   Ú_CRLockr~  r   r   r   r   r  ’  s   r  c                   @   s   e Zd ZeejƒZdS )Ú
EventTestsN)r   r   r   r}  r   rS   Z	eventtyper   r   r   r   rƒ  –  s   rƒ  c                   @   s   e Zd ZeejƒZdS )ÚConditionAsRLockTestsN)r   r   r   r}  r   Ú	Conditionr~  r   r   r   r   r„  ™  s   r„  c                   @   s   e Zd ZeejƒZdS )ÚConditionTestsN)r   r   r   r}  r   r…  Zcondtyper   r   r   r   r†    s   r†  c                   @   s   e Zd ZeejƒZdS )ÚSemaphoreTestsN)r   r   r   r}  r   Ú	SemaphoreÚsemtyper   r   r   r   r‡     s   r‡  c                   @   s   e Zd ZeejƒZdS )ÚBoundedSemaphoreTestsN)r   r   r   r}  r   r:   r‰  r   r   r   r   rŠ  £  s   rŠ  c                   @   s   e Zd ZeejƒZdS )ÚBarrierTestsN)r   r   r   r}  r   ÚBarrierZbarriertyper   r   r   r   r‹  ¦  s   r‹  c                   @   s   e Zd Zdd„ ZdS )ÚMiscTestCasec                 C   s&   dh}ddh}t j| td||d d S )Nr€   rP   r°   )r   rU   )ÚextraÚ	blacklist)r	   Zcheck__all__r   )r   rŽ  r  r   r   r   Útest__all__«  s
    
ÿzMiscTestCase.test__all__N)r   r   r   r  r   r   r   r   r  ª  s   r  c                   @   s$   e Zd Zdd„ Zdd„ Zdd„ ZdS )ÚInterruptMainTestsc                 C   sZ   dd„ }t j|d}|  t¡  | ¡  | ¡  W d   ƒ n1 sD0    Y  | ¡  d S )Nc                   S   s   t  ¡  d S r   )rU   Úinterrupt_mainr   r   r   r   Úcall_interrupt¶  s    zHInterruptMainTests.test_interrupt_main_subthread.<locals>.call_interruptr…   )r   r   rˆ   ÚKeyboardInterruptrB   rH   )r   r“  r7   r   r   r   Útest_interrupt_main_subthread³  s    &z0InterruptMainTests.test_interrupt_main_subthreadc                 C   s6   |   t¡ t ¡  W d   ƒ n1 s(0    Y  d S r   )rˆ   r”  rU   r’  r   r   r   r   Útest_interrupt_main_mainthread¾  s    z1InterruptMainTests.test_interrupt_main_mainthreadc              
   C   sd   t  t j¡}zBt   t jt j¡ t ¡  t   t jt j¡ t ¡  W t   t j|¡ nt   t j|¡ 0 d S r   )ÚsignalÚ	getsignalÚSIGINTÚSIG_IGNrU   r’  ÚSIG_DFL)r   Úhandlerr   r   r   Útest_interrupt_main_noerrorÄ  s    
z.InterruptMainTests.test_interrupt_main_noerrorN)r   r   r   r•  r–  r  r   r   r   r   r‘  ²  s   r‘  c                   @   s$   e Zd Zdd„ Zdd„ Zdd„ ZdS )ÚAtexitTestsc                 C   s.   t ddƒ\}}}|  |¡ |  | ¡ d¡ d S )Nr‹   zif True:
            import threading

            def run_last():
                print('parrot')

            threading._register_atexit(run_last)
        s   parrot)r   rI   rF   r“   r   r   r   r   Útest_atexit_outputÔ  s    	
zAtexitTests.test_atexit_outputc                 C   s   t ddƒ\}}}|  |¡ d S )Nr‹   aN  if True:
            import threading
            from unittest.mock import Mock

            mock = Mock()
            threading._register_atexit(mock)
            mock.assert_not_called()
            # force early shutdown to ensure it was called once
            threading._shutdown()
            mock.assert_called_once()
        r  r   r   r   r   Útest_atexit_called_onceá  s    z#AtexitTests.test_atexit_called_oncec                 C   s.   t ddƒ\}}}|  |¡ |  d| ¡ ¡ d S )Nr‹   zÜif True:
            import threading

            def func():
                pass

            def run_last():
                threading._register_atexit(func)

            threading._register_atexit(run_last)
        z2RuntimeError: can't register atexit after shutdown)r   ri   rg   rÖ   r   r   r   r   Útest_atexit_after_shutdownð  s
    
ÿz&AtexitTests.test_atexit_after_shutdownN)r   r   r   rŸ  r   r¡  r   r   r   r   rž  Ò  s   rž  Ú__main__)6Ztest.supportr-   r   r   r   r   Ztest.support.script_helperr   r   r&   r—   rU   r   r(   r_   r§   rÈ   r6  r—  rº   rP  r   r	   r	  r¦   r
   r   r   ZTestCaser,   r2   r
  r  r*  rV  rY  rt  r|  Z
RLockTestsr  r  r‚  r  rƒ  r„  r†  r‡  rŠ  r‹  r  r‘  rž  r   rÏ   r   r   r   r   Ú<module>   sd   
!	       ,b 2Z 2
