a
    äzeû¿  ã                   @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZ d dlmZmZ zd dlZW n eyž   dZY n0 G dd„ dejƒZe ejdkd¡G dd	„ d	ejƒƒZe ejdkd
¡G dd„ dejƒƒZG dd„ dejƒZe ejdkd¡G dd„ dejƒƒZe eedƒd¡G dd„ dejƒƒZe ejdkd¡G dd„ dejƒƒZe ejdkd¡G dd„ dejƒƒZG dd„ dejƒZ G dd„ dejƒZ!G dd„ dejƒZ"G dd „ d ejƒZ#d!d"„ Z$e%d#krþe &¡  dS )$é    N)Úsupport)Úassert_python_okÚspawn_pythonc                   @   s   e Zd Zdd„ Zdd„ ZdS )ÚGenericTestsc                 C   s–   t tƒD ]ˆ}tt|ƒ}|dv r.|  |tj¡ q|dv rF|  |tj¡ q| d¡rj| d¡sj|  |tj¡ q| d¡r|  |tj¡ |  t	j
d¡ qd S )N>   ÚSIG_IGNÚSIG_DFL>   ÚSIG_SETMASKÚ	SIG_BLOCKÚSIG_UNBLOCKZSIGZSIG_ZCTRL_Úwin32)ÚdirÚsignalÚgetattrÚassertIsInstanceÚHandlersÚSigmasksÚ
startswithÚSignalsÚassertEqualÚsysÚplatform)ÚselfÚnameÚsig© r   ú&/usr/lib/python3.9/test/test_signal.pyÚ
test_enums   s    

zGenericTests.test_enumsc                 C   s>   t tƒD ]0}tt|ƒ}t |¡rt |¡s|  |jd¡ qd S )Nr   )r   r   r   ÚinspectZ	isroutineZ	isbuiltinr   Ú
__module__)r   r   Úvaluer   r   r   Útest_functions_module_attr$   s    
z'GenericTests.test_functions_module_attrN)Ú__name__r   Ú__qualname__r   r    r   r   r   r   r      s   r   r   zNot valid on Windowsc                   @   sZ   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	e
 ejd¡dd„ ƒZdS )Ú
PosixTestsc                 G   s   d S ©Nr   ©r   Úargsr   r   r   Útrivial_signal_handler/   s    z!PosixTests.trivial_signal_handlerc                 C   s8   |   ttjd¡ |   ttjd| j¡ |   ttjd¡ d S )Ni’  )ÚassertRaisesÚ
ValueErrorr   Ú	getsignalr'   Ú	strsignal©r   r   r   r   Ú,test_out_of_range_signal_number_raises_error2   s
    ÿz7PosixTests.test_out_of_range_signal_number_raises_errorc                 C   s   |   ttjtjd ¡ d S r$   )r(   Ú	TypeErrorr   ÚSIGUSR1r,   r   r   r   Ú0test_setting_signal_handler_to_none_raises_error:   s    
ÿz;PosixTests.test_setting_signal_handler_to_none_raises_errorc                 C   sZ   t   t j| j¡}|  |t j¡ |  t  t j¡| j¡ t   t j|¡ |  t  t j¡|¡ d S r$   )r   ÚSIGHUPr'   r   r   r   r*   )r   Zhupr   r   r   Útest_getsignal>   s    ÿzPosixTests.test_getsignalc                 C   s@   |   dt tj¡¡ |   dt tj¡¡ |   dt tj¡¡ d S )NZ	InterruptZ
TerminatedZHangup)ÚassertInr   r+   ÚSIGINTÚSIGTERMr1   r,   r   r   r   Útest_strsignalF   s    zPosixTests.test_strsignalc                 C   s&   t j t¡}t j |d¡}t|ƒ d S )Nzsignalinterproctester.py)ÚosÚpathÚdirnameÚ__file__Újoinr   )r   r9   Zscriptr   r   r   Útest_interprocess_signalL   s    z#PosixTests.test_interprocess_signalc                 C   sd   t  ¡ }|  |t¡ |  t jj|¡ |  t jj|¡ |  d|¡ |  t j	|¡ |  
t|ƒt j	¡ d S ©Nr   )r   Úvalid_signalsr   Úsetr3   r   r4   ÚSIGALRMÚassertNotInÚNSIGÚ
assertLessÚlen©r   Úsr   r   r   Útest_valid_signalsQ   s    zPosixTests.test_valid_signalsúsys.executable required.c                 C   s<   t jtjddgt jd}|  d|j¡ |  |jt	j
 ¡ d S )Nú-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)©Ústderró   KeyboardInterrupt)Ú
subprocessÚrunr   Ú
executableÚPIPEr3   rK   r   Ú
returncoder   r4   )r   Úprocessr   r   r   Ú!test_keyboard_interrupt_exit_codeZ   s    ÿûz,PosixTests.test_keyboard_interrupt_exit_codeN)r!   r   r"   r'   r-   r0   r2   r6   r<   rG   ÚunittestÚ
skipUnlessr   rO   rS   r   r   r   r   r#   -   s   	r#   zWindows specificc                   @   s2   e Zd Zdd„ Zdd„ Ze ejd¡dd„ ƒZ	dS )	ÚWindowsSignalTestsc                 C   sd   t  ¡ }|  |t¡ |  t|ƒd¡ |  t jj|¡ |  	d|¡ |  	t j
|¡ |  t|ƒt j
¡ d S )Né   r   )r   r>   r   r?   ZassertGreaterEqualrD   r3   r   r4   rA   rB   rC   rE   r   r   r   rG   n   s    z%WindowsSignalTests.test_valid_signalsc                 C   sÚ   dd„ }t ƒ }tjtjtjtjtjtjtjfD ]0}t 	|¡d ur.t |t ||¡¡ | 
|¡ q.|  |¡ |  t¡ t d|¡ W d   ƒ n1 s–0    Y  |  t¡ t d|¡ W d   ƒ n1 sÌ0    Y  d S )Nc                 S   s   d S r$   r   )ÚxÚyr   r   r   Ú<lambda>y   ó    z3WindowsSignalTests.test_issue9324.<locals>.<lambda>éÿÿÿÿé   )r?   r   ÚSIGABRTZSIGBREAKÚSIGFPEÚSIGILLr4   ÚSIGSEGVr5   r*   ÚaddÚ
assertTruer(   r)   )r   ÚhandlerÚcheckedr   r   r   r   Útest_issue9324w   s    þ
*z!WindowsSignalTests.test_issue9324rH   c                 C   s<   t jtjddgt jd}|  d|j¡ d}|  |j|¡ d S )NrI   zraise KeyboardInterruptrJ   rL   l   :   )	rM   rN   r   rO   rP   r3   rK   r   rQ   )r   rR   ZSTATUS_CONTROL_C_EXITr   r   r   rS   Œ   s    
þz4WindowsSignalTests.test_keyboard_interrupt_exit_codeN)
r!   r   r"   rG   rf   rT   rU   r   rO   rS   r   r   r   r   rV   k   s   	rV   c                   @   sN   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Ze 	e
jdkd¡dd„ ƒZdS )ÚWakeupFDTestsc                 C   st   |   t¡ tjtjd W d   ƒ n1 s.0    Y  |   t¡ t tjd¡ W d   ƒ n1 sf0    Y  d S )N)ÚsignumF)r(   r.   r   Úset_wakeup_fdr4   r,   r   r   r   Útest_invalid_call   s    ,zWakeupFDTests.test_invalid_callc                 C   s    t  ¡ }|  ttftj|¡ d S r$   )r   Zmake_bad_fdr(   r)   ÚOSErrorr   ri   )r   Úfdr   r   r   Útest_invalid_fd¦   s    
ÿzWakeupFDTests.test_invalid_fdc                 C   s0   t   ¡ }| ¡ }| ¡  |  ttftj|¡ d S r$   )ÚsocketÚfilenoÚcloser(   r)   rk   r   ri   )r   Úsockrl   r   r   r   Útest_invalid_socket«   s    
ÿz!WakeupFDTests.test_invalid_socketc                 C   s¶   t  ¡ \}}|  t j|¡ |  t j|¡ t  ¡ \}}|  t j|¡ |  t j|¡ tt dƒrrt  |d¡ t  |d¡ t |¡ |  t |¡|¡ |  t d¡|¡ |  t d¡d¡ d S )NÚset_blockingFr\   )	r7   ÚpipeÚ
addCleanuprp   Úhasattrrs   r   ri   r   )r   Zr1Zw1Zr2Zw2r   r   r   Útest_set_wakeup_fd_result²   s    

z'WakeupFDTests.test_set_wakeup_fd_resultc                 C   s   t   ¡ }|  |j¡ | d¡ | ¡ }t   ¡ }|  |j¡ | d¡ | ¡ }t |¡ |  t |¡|¡ |  t d¡|¡ |  t d¡d¡ d S )NFr\   )rn   ru   rp   Úsetblockingro   r   ri   r   )r   Zsock1Úfd1Zsock2Úfd2r   r   r   Ú test_set_wakeup_fd_socket_resultÃ   s    


z.WakeupFDTests.test_set_wakeup_fd_socket_resultr   ztests specific to POSIXc                 C   s¢   t  ¡ \}}|  t j|¡ |  t j|¡ t  |d¡ |  t¡}t |¡ W d   ƒ n1 s^0    Y  |  	t
|jƒd| ¡ t  |d¡ t |¡ t d¡ d S )NTz&the fd %s must be in non-blocking modeFr\   )r7   rt   ru   rp   rs   r(   r)   r   ri   r   ÚstrÚ	exception)r   ZrfdZwfdÚcmr   r   r   Útest_set_wakeup_fd_blockingÕ   s    (ÿ
z)WakeupFDTests.test_set_wakeup_fd_blockingN)r!   r   r"   rj   rm   rr   rw   r{   rT   ÚskipIfr   r   r   r   r   r   r   rg   ›   s   	rg   c                   @   st   e Zd Ze edu d¡ddœdd„ƒZe edu d¡dd„ ƒZd	d
„ Zdd„ Z	dd„ Z
e eedƒd¡dd„ ƒZdS )ÚWakeupSignalTestsNúneed _testcapiT©Úorderedc                G   s&   d  ttt|ƒƒ||¡}td|ƒ d S )Na  if 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        rI   )ÚformatÚtupleÚmapÚintr   )r   Z	test_bodyr„   ZsignalsÚcoder   r   r   Úcheck_wakeupê   s     à"zWakeupSignalTests.check_wakeupc              	   C   s|   d}t  ¡ \}}zFzt  |d¡ W n ty4   Y n0 |  d¡ W t  |¡ t  |¡ nt  |¡ t  |¡ 0 td|ƒ d S )Na&  if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
        ó   xz9OS doesn't report write() error on the read end of a piperI   )r7   rt   Úwriterk   ÚskipTestrp   r   )r   r‰   ÚrÚwr   r   r   Útest_wakeup_write_error  s    "
ÿ
z)WakeupSignalTests.test_wakeup_write_errorc                 C   s   |   dtj¡ d S )Na·  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        ©rŠ   r   r@   r,   r   r   r   Útest_wakeup_fd_earlyE  s    áz&WakeupSignalTests.test_wakeup_fd_earlyc                 C   s   |   dtj¡ d S )Na`  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r‘   r,   r   r   r   Útest_wakeup_fd_duringg  s    åz'WakeupSignalTests.test_wakeup_fd_duringc                 C   s   |   dtjtj¡ d S )Nz§def test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )rŠ   r   r/   r@   r,   r   r   r   Útest_signum…  s    üzWakeupSignalTests.test_signumÚpthread_sigmaskúneed signal.pthread_sigmask()c                 C   s   | j dtjtjdd d S )Naæ  def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Frƒ   )rŠ   r   r/   ÚSIGUSR2r,   r   r   r   Útest_pendingŒ  s    
ôzWakeupSignalTests.test_pending)r!   r   r"   rT   r€   Ú	_testcapirŠ   r   r’   r“   r”   rU   rv   r   r˜   r   r   r   r   r   è   s   &
3"ÿr   Ú
socketpairzneed socket.socketpairc                   @   sT   e Zd Ze edu d¡dd„ ƒZe edu d¡dd„ ƒZe edu d¡dd„ ƒZdS )	ÚWakeupSocketSignalTestsNr‚   c                 C   s   d}t d|ƒ d S )Na´  if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        rI   ©r   ©r   r‰   r   r   r   Útest_socket¡  s    z#WakeupSocketSignalTests.test_socketc                 C   s.   t jdkrd}nd}dj|d}td|ƒ d S )NÚntÚsendrŒ   a+  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        ©ÚactionrI   ©r7   r   r…   r   ©r   r¢   r‰   r   r   r   Útest_send_errorÅ  s    
!ß"z'WakeupSocketSignalTests.test_send_errorc                 C   s.   t jdkrd}nd}dj|d}td|ƒ d S )NrŸ   r    rŒ   aË  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        # Start with large chunk size to reduce the
        # number of send needed to fill the buffer.
        written = 0
        for chunk_size in (2 ** 16, 2 ** 8, 1):
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, socket.timeout):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        r¡   rI   r£   r¤   r   r   r   Útest_warn_on_full_bufferð  s    
cdz0WakeupSocketSignalTests.test_warn_on_full_buffer)	r!   r   r"   rT   r€   r™   rž   r¥   r¦   r   r   r   r   r›   ž  s   
#
*r›   c                   @   s,   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	S )
ÚSiginterruptTestc              	   C   sÂ   d|f }t d|ƒš}z |j ¡ }|jtjd\}}W n* tjy`   | ¡  Y W d   ƒ dS 0 || }| 	¡ }|dvrŠt
d||f ƒ‚|dkW  d   ƒ S W d   ƒ n1 s´0    Y  d S )Naò  if 1:
            import errno
            import os
            import signal
            import sys

            interrupt = %r
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        rI   )ÚtimeoutF)é   é   zChild error (exit code %s): %rrª   )r   ÚstdoutÚreadlineÚcommunicater   ÚSHORT_TIMEOUTrM   ÚTimeoutExpiredÚkillÚwaitÚ	Exception)r   Z	interruptr‰   rR   Z
first_liner«   rK   Úexitcoder   r   r   Úreadpipe_interrupteda  s"    #Ý$
ÿz%SiginterruptTest.readpipe_interruptedc                 C   s   |   d ¡}|  |¡ d S r$   ©r´   rc   ©r   Zinterruptedr   r   r   Útest_without_siginterrupt  s    
z*SiginterruptTest.test_without_siginterruptc                 C   s   |   d¡}|  |¡ d S ©NTrµ   r¶   r   r   r   Útest_siginterrupt_on¤  s    
z%SiginterruptTest.test_siginterrupt_onc                 C   s   |   d¡}|  |¡ d S )NF)r´   ZassertFalser¶   r   r   r   Útest_siginterrupt_off«  s    
z&SiginterruptTest.test_siginterrupt_offN)r!   r   r"   r´   r·   r¹   rº   r   r   r   r   r§   ^  s   <r§   c                   @   sn   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	e
 ejdv d¡dd„ ƒZdd„ Zdd„ ZdS )Ú
ItimerTestc                 C   s(   d| _ d| _d | _t tj| j¡| _d S )NFr   )Úhndl_calledÚ
hndl_countÚitimerr   r@   Úsig_alrmÚ	old_alarmr,   r   r   r   ÚsetUpµ  s    zItimerTest.setUpc                 C   s,   t   t j| j¡ | jd ur(t  | jd¡ d S r=   )r   r@   rÀ   r¾   Ú	setitimerr,   r   r   r   ÚtearDown»  s    
zItimerTest.tearDownc                 G   s
   d| _ d S r¸   )r¼   r%   r   r   r   r¿   Á  s    zItimerTest.sig_alrmc                 G   sF   d| _ | jdkrt d¡‚n| jdkr4t tjd¡ |  jd7  _d S )NTrª   z.setitimer didn't disable ITIMER_VIRTUAL timer.r   é   )r¼   r½   r   ÚItimerErrorrÂ   ÚITIMER_VIRTUALr%   r   r   r   Ú
sig_vtalrmÄ  s    

zItimerTest.sig_vtalrmc                 G   s   d| _ t tjd¡ d S )NTr   )r¼   r   rÂ   ÚITIMER_PROFr%   r   r   r   Úsig_profÑ  s    zItimerTest.sig_profc                 C   s   |   tjtjdd¡ d S )Nr\   r   )r(   r   rÅ   rÂ   r,   r   r   r   Útest_itimer_excÕ  s    zItimerTest.test_itimer_excc                 C   s0   t j| _t  | jd¡ t  ¡  |  | jd¡ d S )Ng      ð?T)r   ÚITIMER_REALr¾   rÂ   Úpauser   r¼   r,   r   r   r   Útest_itimer_realÞ  s    zItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.c                 C   s   t j| _t   t j| j¡ t  | jdd¡ t ¡ }t ¡ | dk r`tdddƒ}t  	| j¡dkr0qjq0|  
d¡ |  t  	| j¡d¡ |  | jd	¡ d S )
Ng333333Ó?çš™™™™™É?ç      N@é90  é2	 é“–˜ ©ç        rÔ   ú8timeout: likely cause: machine too slow or load too highT)r   rÆ   r¾   Ú	SIGVTALRMrÇ   rÂ   ÚtimeÚ	monotonicÚpowÚ	getitimerr   r   r¼   ©r   Z
start_timeÚ_r   r   r   Útest_itimer_virtualå  s    
zItimerTest.test_itimer_virtualc                 C   s   t j| _t   t j| j¡ t  | jdd¡ t ¡ }t ¡ | dk r`tdddƒ}t  	| j¡dkr0qjq0|  
d¡ |  t  	| j¡d¡ |  | jd¡ d S )	NrÎ   rÏ   rÐ   rÑ   rÒ   rÓ   rÕ   T)r   rÈ   r¾   ÚSIGPROFrÉ   rÂ   r×   rØ   rÙ   rÚ   r   r   r¼   rÛ   r   r   r   Útest_itimer_profû  s    
zItimerTest.test_itimer_profc                 C   s2   t j| _t  | jd¡ t d¡ |  | jd¡ d S )Nçíµ ÷Æ°>rÄ   T)r   rË   r¾   rÂ   r×   Úsleepr   r¼   r,   r   r   r   Útest_setitimer_tiny  s    
zItimerTest.test_setitimer_tinyN)r!   r   r"   rÁ   rÃ   r¿   rÇ   rÉ   rÊ   rÍ   rT   r€   r   r   rÝ   rß   râ   r   r   r   r   r»   ³  s   	ÿ
r»   c                   @   s¶  e Zd Ze eedƒd¡dd„ ƒZe eedƒd¡e eedƒd¡dd„ ƒƒZe eed	ƒd
¡dd„ ƒZ	e eedƒd¡dd„ ƒZ
e eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd„ ƒZe eedƒd¡dd „ ƒZe eedƒd¡e eedƒd¡d!d"„ ƒƒZe eedƒd¡d#d$„ ƒZe eedƒd¡d%d&„ ƒZe eedƒd¡d'd(„ ƒZe eed	ƒd
¡d)d*„ ƒZd+S ),ÚPendingSignalsTestsÚ
sigpendingzneed signal.sigpending()c                 C   s   |   t ¡ tƒ ¡ d S r$   )r   r   rä   r?   r,   r   r   r   Útest_sigpending_empty  s    z)PendingSignalsTests.test_sigpending_emptyr•   r–   c                 C   s   d}t d|ƒ d S )Na  if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rI   rœ   r   r   r   r   Útest_sigpending#  s    z#PendingSignalsTests.test_sigpendingÚpthread_killzneed signal.pthread_kill()c                 C   s   d}t d|ƒ d S )Naâ  if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        rI   rœ   r   r   r   r   Útest_pthread_killB  s    z%PendingSignalsTests.test_pthread_killc                 C   s   d|  ¡ |f }td|ƒ d S )Naw  if 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        %s

        blocked = %s
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        rI   )Ústripr   )r   ZblockedÚtestr‰   r   r   r   Úwait_helper[  s     
à%zPendingSignalsTests.wait_helperÚsigwaitzneed signal.sigwait()c                 C   s   |   tjd¡ d S )Na   
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        ©rë   r   r@   r,   r   r   r   Útest_sigwait‰  s    z PendingSignalsTests.test_sigwaitÚsigwaitinfozneed signal.sigwaitinfo()c                 C   s   |   tjd¡ d S )Nz×
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        rí   r,   r   r   r   Útest_sigwaitinfo•  s    z$PendingSignalsTests.test_sigwaitinfoÚsigtimedwaitzneed signal.sigtimedwait()c                 C   s   |   tjd¡ d S )Nzá
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        rí   r,   r   r   r   Útest_sigtimedwait   s    z%PendingSignalsTests.test_sigtimedwaitc                 C   s   |   tjd¡ d S )Nzþ
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        rí   r,   r   r   r   Útest_sigtimedwait_poll«  s    z*PendingSignalsTests.test_sigtimedwait_pollc                 C   s   |   tjd¡ d S )Nz¿
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        rí   r,   r   r   r   Útest_sigtimedwait_timeout¸  s    z-PendingSignalsTests.test_sigtimedwait_timeoutc                 C   s   t j}|  tt j|gd¡ d S )Ng      ð¿)r   r@   r(   r)   rñ   )r   rh   r   r   r   Ú"test_sigtimedwait_negative_timeoutÂ  s    z6PendingSignalsTests.test_sigtimedwait_negative_timeoutc                 C   s   t ddƒ d S )NrI   a­  if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        rœ   r,   r   r   r   Útest_sigwait_threadÈ  s    	z'PendingSignalsTests.test_sigwait_threadc                 C   sü   |   ttj¡ |   ttjd¡ |   ttjddd¡ |   ttjdg ¡ |   t¡" t tjtjg¡ W d   ƒ n1 sv0    Y  |   t¡  t tjdg¡ W d   ƒ n1 s°0    Y  |   t¡$ t tjdd> g¡ W d   ƒ n1 sî0    Y  d S )NrÄ   r©   rª   i¤  r   iè  )r(   r.   r   r•   rk   r)   r	   rB   r,   r   r   r   Útest_pthread_sigmask_argumentsê  s    0.z2PendingSignalsTests.test_pthread_sigmask_argumentsc                 C   sJ   t  t jt  ¡ ¡}|  t jt j|¡ t  t jt  ¡ ¡}|  |t  ¡ ¡ d S r$   )r   r•   r	   r>   ru   r   r
   ZassertLessEqualrE   r   r   r   Ú"test_pthread_sigmask_valid_signalsø  s    z6PendingSignalsTests.test_pthread_sigmask_valid_signalsc                 C   s   d}t d|ƒ d S )Na-	  if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        rI   rœ   r   r   r   r   Útest_pthread_sigmask  s    Hz(PendingSignalsTests.test_pthread_sigmaskc                 C   s^   d}t d|ƒ<}| ¡ \}}| ¡ }|dkr<td||f ƒ‚W d   ƒ n1 sP0    Y  d S )Na7  if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        rI   rª   zChild error (exit code %s): %s)r   r­   r±   r²   )r   r‰   rR   r«   rK   r³   r   r   r   Útest_pthread_kill_main_threadN  s    ÿz1PendingSignalsTests.test_pthread_kill_main_threadN)r!   r   r"   rT   rU   rv   r   rå   ræ   rè   rë   rî   rð   rò   ró   rô   rõ   rö   r÷   rø   rù   rú   r   r   r   r   rã     s„   ÿ
ÿÿÿ
ÿ
,ÿ

ÿ
	ÿ
	ÿ
ÿ
ÿ
ÿÿÿ
ÿ
ÿ
Kÿrã   c                   @   sr   e Zd Zdd„ Zdd„ Zdd„ Ze ee	dƒd¡d	d
„ ƒZ
e ee	dƒd¡dd„ ƒZe ee	dƒd¡dd„ ƒZdS )Ú
StressTestc                 C   s    t   ||¡}|  t j ||¡ d S r$   )r   ru   )r   rh   rd   Úold_handlerr   r   r   Úsetsigo  s    zStressTest.setsigc                    s–   d‰ g ‰d
‡ ‡fdd„	}|   tjtjd¡ |  tj|¡ |ƒ  tˆƒˆ k rVt d¡ q>‡fdd„t	tˆƒd ƒD ƒ}t
 |¡}tjr’td	|f ƒ |S )Né   c                    s,   t ˆƒˆ k r(ˆ t ¡ ¡ t tjd¡ d S )Nrà   )rD   Úappendr×   Úperf_counterr   rÂ   rË   ©rh   Úframe©ÚNÚtimesr   r   rd   w  s    z5StressTest.measure_itimer_resolution.<locals>.handlerr   gü©ñÒMbP?c                    s    g | ]}ˆ |d   ˆ |  ‘qS )rÄ   r   )Ú.0Úi)r  r   r   Ú
<listcomp>…  r[   z8StressTest.measure_itimer_resolution.<locals>.<listcomp>rÄ   z,detected median itimer() resolution: %.6f s.)NN)ru   r   rÂ   rË   rý   r@   rD   r×   rá   ÚrangeÚ
statisticsZmedianr   ÚverboseÚprint)r   rd   Z	durationsZmedr   r  r   Úmeasure_itimer_resolutions  s    
z$StressTest.measure_itimer_resolutionc                 C   s4   |   ¡ }|dkrdS |dkr dS |  d|f ¡ d S )Ng-Cëâ6?i'  g{®Gáz„?éd   z^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r  r   )r   Zresor   r   r   Údecide_itimer_count‹  s    þzStressTest.decide_itimer_countrÂ   ztest needs setitimer()c                    sú   |   ¡ }g ‰ dd„ }d	‡ fdd„	}|  tj|¡ |  tj|¡ |  tj|¡ d}t ¡ tj	 }||k rät
 t
 ¡ tj¡ |d7 }tˆ ƒ|k r¤t ¡ |k r¤t d¡ q€t
 t
 ¡ tj¡ |d7 }tˆ ƒ|k r^t ¡ |k r^t d¡ q¾q^|  tˆ ƒ|d¡ d S )
Nc                 S   s   t  t jdt ¡ d  ¡ d S )Nrà   çñhãˆµøä>)r   rÂ   rË   Úrandomr  r   r   r   Úfirst_handler¢  s    z@StressTest.test_stress_delivery_dependent.<locals>.first_handlerc                    s   ˆ   | ¡ d S r$   ©rÿ   r  ©Zsigsr   r   Úsecond_handlerª  s    zAStressTest.test_stress_delivery_dependent.<locals>.second_handlerr   rÄ   r  úSome signals were lost)NN)r  rý   r   rÞ   r/   r@   r×   rØ   r   r®   r7   r°   ÚgetpidrD   rá   r   )r   r  r  r  Úexpected_sigsÚdeadliner   r  r   Útest_stress_delivery_dependent™  s&    z)StressTest.test_stress_delivery_dependentc                    s¾   |   ¡ }g ‰ ‡ fdd„}|  tj|¡ |  tj|¡ d}t ¡ tj }||k r¨t 	tj
dt ¡ d  ¡ t t ¡ tj¡ |d7 }tˆ ƒ|k rFt ¡ |k rFt d¡ q‚qF|  tˆ ƒ|d¡ d S )Nc                    s   ˆ   | ¡ d S r$   r  r  r  r   r   rd   Ð  s    z=StressTest.test_stress_delivery_simultaneous.<locals>.handlerr   rà   r  r©   r  )r  rý   r   r/   r@   r×   rØ   r   r®   rÂ   rË   r  r7   r°   r  rD   rá   r   )r   r  rd   r  r  r   r  r   Ú!test_stress_delivery_simultaneousÇ  s    z,StressTest.test_stress_delivery_simultaneousr/   ztest needs SIGUSR1c                    s&  t j‰d‰d‰d‰‡fdd„‰ ‡‡‡fdd„}‡ ‡‡fdd„}t   ˆˆ ¡}|  t j ˆ|¡ tj|d	}z¬d}t ¡ d}| ¡  |ƒ  d
‰| ¡  |j	d urÊ|  
|j	jt¡ |  dˆ› dt|j	jƒ¡ d
}W d   ƒ n1 sÞ0    Y  |sø|  ˆd¡ |  ˆˆ¡ W d
‰| ¡  nd
‰| ¡  0 d S )Nr   Fc                    s   ˆ d7 ‰ d S ©NrÄ   r   r  )Únum_received_signalsr   r   Úcustom_handlerñ  s    zAStressTest.test_stress_modifying_handlers.<locals>.custom_handlerc                      s   ˆ st  ˆ¡ ˆd7 ‰q d S r  )r   Úraise_signalr   )Údo_stopÚnum_sent_signalsrh   r   r   Úset_interruptsõ  s    
zAStressTest.test_stress_modifying_handlers.<locals>.set_interruptsc                     s8   ˆdk r4t dƒD ] } ˆ tjfD ]}t ˆ|¡ qqq d S )Nr  i N  )r	  r   r   )r  rd   )r  r!  rh   r   r   Úcycle_handlersû  s    zAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlers)ÚtargetTzSignal z ignored due to race condition)r   r/   ru   Ú	threadingÚThreadr   Zcatch_unraisable_exceptionÚstartr;   Z
unraisabler   Ú	exc_valuerk   r3   r|   ZassertGreaterrC   )r   r"  r#  rü   ÚtZignoredr~   r   )r  r   r  r!  rh   r   Útest_stress_modifying_handlersè  s>    



þ"
ÿz)StressTest.test_stress_modifying_handlersN)r!   r   r"   rý   r  r  rT   rU   rv   r   r  r  r*  r   r   r   r   rû   h  s   ÿ
,ÿ
ÿrû   c                   @   s6   e Zd Zdd„ Ze ejdkd¡dd„ ƒZdd„ Z	d	S )
ÚRaiseSignalTestc                 C   s:   |   t¡ t tj¡ W d   ƒ n1 s,0    Y  d S r$   )r(   ÚKeyboardInterruptr   r  r4   r,   r   r   r   Útest_sigint%  s    zRaiseSignalTest.test_sigintr   zWindows specific testc              
   C   sV   zd}t  |¡ |  d¡ W n4 tyP } z|jtjkr:n‚ W Y d }~n
d }~0 0 d S )NrÄ   z#OSError (Invalid argument) expected)r   r  Zfailrk   ÚerrnoÚEINVAL)r   r1   Úer   r   r   Útest_invalid_argument)  s    
z%RaiseSignalTest.test_invalid_argumentc                    sJ   d‰ ‡ fdd„}t   t j|¡}|  t j t j|¡ t  t j¡ |  ˆ ¡ d S )NFc                    s   d‰ d S r¸   r   )ÚaÚb©Zis_okr   r   rd   7  s    z-RaiseSignalTest.test_handler.<locals>.handler)r   r4   ru   r  rc   )r   rd   Z
old_signalr   r4  r   Útest_handler5  s    zRaiseSignalTest.test_handlerN)
r!   r   r"   r-  rT   r€   r   r   r1  r5  r   r   r   r   r+  #  s   
r+  c                   @   s&   e Zd Ze eedƒd¡dd„ ƒZdS )ÚPidfdSignalTestÚpidfd_send_signalzpidfd support not built inc                 C   s   |   t¡}t dtj¡ W d   ƒ n1 s.0    Y  |jjtjkrR|  d¡ n|jjtj	krj|  d¡ |  
|jjtj¡ t dt ¡ › tj¡}|  tj|¡ |  td¡$ t |tjtƒ d¡ W d   ƒ n1 sØ0    Y  |   t¡ t |tj¡ W d   ƒ n1 s0    Y  d S )Nr   zkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)r(   rk   r   r7  r4   r}   r.  ZENOSYSr   ÚEPERMr   ÚEBADFr7   Úopenr  ÚO_DIRECTORYru   rp   ZassertRaisesRegexr.   Úobjectr,  )r   r~   Zmy_pidfdr   r   r   Útest_pidfd_send_signalC  s    ,
2z&PidfdSignalTest.test_pidfd_send_signalN)r!   r   r"   rT   rU   rv   r   r=  r   r   r   r   r6  A  s
   þr6  c                   C   s   t  ¡  d S r$   )r   Úreap_childrenr   r   r   r   ÚtearDownModuleV  s    r?  Ú__main__)'r.  r   r7   r  r   rn   r
  rM   r   r%  r×   rT   rê   r   Ztest.support.script_helperr   r   r™   ÚImportErrorZTestCaser   r€   r   r#   rU   rV   rg   r   rv   r›   r§   r»   rã   rû   r+  r6  r?  r!   Úmainr   r   r   r   Ú<module>   sV   
=/M 6 @Te  Q <
