a
    zep                     @   s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZ d dl
mZ d dlZd dlZd dlmZ zd dlZW n ey   dZY n0 dZejdkZdd
dZdd Zedd ZG dd dejZedkre  dS )    )contextmanagerN)support)script_helper
is_android)skip_if_sanitizer)dedentg      ?nt   c                 C   sL   |}|d|  7 }|d| 7 }d|k r<d|d |d   | S d| d S d S )Nz#  File "<string>", line %s in func
z&  File "<string>", line %s in <module>r	   ^
$ )Zlineno1Zlineno2header	min_countregexr   r   ,/usr/lib/python3.9/test/test_faulthandler.pyexpected_traceback   s    r   c                 C   s   t td| S )Nz(raising SIGSEGV on Android is unreliable)unittestskipIfr   )testr   r   r   skip_segfault_on_android!   s
    r   c               	   c   s.   t  } z| V  W t|  nt|  0 d S N)tempfilemktempr   unlinkfilenamer   r   r   temporary_filename&   s    r   c                   @   s2  e Zd ZdddZdddddddddZdd	d
Zdd Zee	j
dddd Zedd Zdd Zdd Zee	j
dkddd Zeedu deeeddedd Zeedu deeed d!ed"d# Zd$d% Zd&d' Zee	j
d(d)eeed* d+d,d- Zed.d/ Zeddd0d1ed2d3 Zee	j
dkd4eddd0d1ed5d6 Zed7d8 Z ed9d: Z!d;d< Z"d=d> Z#d?d@ Z$dAdB Z%dddCdDdEZ&dFdG Z'dHdI Z(ee	j
dkd4dJdK Z)dLdM Z*dNdO Z+dPdQ Z,dRdS Z-ddddCdUdVZ.dWdX Z/dYdZ Z0d[d\ Z1d]d^ Z2ee	j
dkd4d_d` Z3dadb Z4eeedc ddddedfZ5dgdh Z6didj Z7dkdl Z8ee	j
dkd4dmdn Z9dodp Z:dqdr Z;e<dsdt Z=dudv Z>ee?dwdxdy Z@ee?dwdzd{ ZAee?dwd|d} ZBee?dwd~d ZCdd ZDdS )FaultHandlerTestsNc           
   	   C   sd  t | }g }|dur"|| t X tjd||d}|$ | \}}| }W d   n1 sj0    Y  W d   n1 s0    Y  |	dd}|r| 
|d t|d}	|	 }W d   n1 s0    Y  |	dd}nj|durX| 
|d t|tjd t|dd	d
}	|	 }W d   n1 sB0    Y  |	dd}| |fS )a{  
        Run the specified code in Python (in a new child process) and read the
        output from the standard error or from a file (if filename is set).
        Return the output lines as a list.

        Strip the reference count from the standard error for Python debug
        build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
        thread XXX".
        N-c)pass_fdsasciibackslashreplace rbr   F)closefd)r   stripappendr   ZSuppressCrashReportr   Zspawn_pythoncommunicatewaitdecodeassertEqualopenreadoslseekSEEK_SET
splitlines)
selfcoder   fdr    processoutputstderrexitcodefpr   r   r   
get_output/   s,    


D&
(zFaultHandlerTests.get_outputTF)r   all_threadsother_regexr4   know_current_threadpy_fatal_errorc                C   s   |r|rd}
qd}
nd}
d}|	r(|d7 }t |j|||
d }|rP|d| 7 }| j|||d\}}d	|}| || | |d
 dS )z
        Check that the fault handler for fatal errors is enabled and check the
        traceback from the child process output.

        Raise an error if the output doesn't match the expected format.
        zCurrent thread 0x[0-9a-f]+zThread 0x[0-9a-f]+ZStackz
            (?m)^{fatal_error}

            {header} \(most recent call first\):
              File "<string>", line {lineno} in <module>
            z"
Python runtime state: initialized)linenofatal_errorr   |r   r4   r   r   N)r   formatr&   r:   joinassertRegexassertNotEqual)r2   r3   line_numberr@   r   r;   r<   r4   r=   r>   r   r   r6   r8   r   r   r   check_errorP   s&    


zFaultHandlerTests.check_errorc                 K   s2   |rd||f }d| }| j |||fi | d S )Nz%s: %szFatal Python error: %srH   )r2   r3   rG   
name_regexfunckwr@   r   r   r   check_fatal_errort   s    z#FaultHandlerTests.check_fatal_errorc                 K   s"   d| }| j |||fi | d S )NzWindows fatal exception: %srI   )r2   r3   rG   rJ   rL   r@   r   r   r   check_windows_exceptionz   s    z)FaultHandlerTests.check_windows_exceptionZaixz5the first page of memory is a mapped read-only on AIXc                 C   s&   t s| ddd n| ddd d S )Nz
                import faulthandler
                faulthandler.enable()
                faulthandler._read_null()
                   z4(?:Segmentation fault|Bus error|Illegal instruction)access violation)
MS_WINDOWSrM   rN   r2   r   r   r   test_read_null~   s    z FaultHandlerTests.test_read_nullc                 C   s   |  ddd d S )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv()
            rO   Segmentation faultrM   rR   r   r   r   test_sigsegv   s    zFaultHandlerTests.test_sigsegvc                 C   s   | j ddddddd d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler._fatal_error_c_thread()
            rO   zin new threadFZfaulthandler_fatal_error_threadT)r=   rK   r>   rU   rR   r   r   r   test_fatal_error_c_thread   s    z+FaultHandlerTests.test_fatal_error_c_threadc                 C   s   |  ddd d S )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigabrt()
            rO   ZAbortedrU   rR   r   r   r   test_sigabrt   s    zFaultHandlerTests.test_sigabrtwin32z"SIGFPE cannot be caught on Windowsc                 C   s   |  ddd d S )Nzr
            import faulthandler
            faulthandler.enable()
            faulthandler._sigfpe()
            rO   zFloating point exceptionrU   rR   r   r   r   test_sigfpe   s    zFaultHandlerTests.test_sigfpezneed _testcapiSIGBUSzneed signal.SIGBUSc                 C   s   |  ddd d S )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGBUS)
               z	Bus errorrU   rR   r   r   r   test_sigbus   s    zFaultHandlerTests.test_sigbusSIGILLzneed signal.SIGILLc                 C   s   |  ddd d S )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGILL)
            r\   zIllegal instructionrU   rR   r   r   r   test_sigill   s    zFaultHandlerTests.test_sigillc                 C   s   | j dddddd d S )Nz[
            import faulthandler
            faulthandler._fatal_error(b'xyz')
               xyzfaulthandler_fatal_error_pyTrK   r>   rU   rR   r   r   r   test_fatal_error   s    z"FaultHandlerTests.test_fatal_errorc                 C   s   | j dddddd d S )Nza
            import faulthandler
            faulthandler._fatal_error(b'xyz', True)
            r`   ra   rb   Trc   rU   rR   r   r   r   test_fatal_error_without_gil   s    z.FaultHandlerTests.test_fatal_error_without_gilZopenbsdzVIssue #12868: sigaltstack() doesn't work on OpenBSD if Python is compiled with pthreadZ_stack_overflowz#need faulthandler._stack_overflow()c                 C   s   | j ddddd d S )Nzz
            import faulthandler
            faulthandler.enable()
            faulthandler._stack_overflow()
            rO   z (?:Segmentation fault|Bus error)z unable to raise a stack overflow)r<   rU   rR   r   r   r   test_stack_overflow   s
    z%FaultHandlerTests.test_stack_overflowc                 C   s   |  ddd d S )Nzw
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv(True)
            rO   rT   rU   rR   r   r   r   test_gil_released   s    z#FaultHandlerTests.test_gil_releasedz0sanitizer builds change crashing process output.)ZmemoryZubreasonc                 C   sH   t  .}| jdjt|ddd|d W d    n1 s:0    Y  d S )Nz
                import faulthandler
                output = open({filename}, 'wb')
                faulthandler.enable(output)
                faulthandler._sigsegv()
                r      rT   )r   rM   rC   reprr2   r   r   r   r   test_enable_file  s    z"FaultHandlerTests.test_enable_filez.subprocess doesn't support pass_fds on Windowsc                 C   sL   t d.}| }| jd| dd|d W d    n1 s>0    Y  d S )Nwb+z
                import faulthandler
                import sys
                faulthandler.enable(%s)
                faulthandler._sigsegv()
                ri   rT   r4   )r   TemporaryFilefilenorM   )r2   r9   r4   r   r   r   test_enable_fd  s    z FaultHandlerTests.test_enable_fdc                 C   s   | j ddddd d S )Nz
            import faulthandler
            faulthandler.enable(all_threads=False)
            faulthandler._sigsegv()
            rO   rT   Fr;   rU   rR   r   r   r   test_enable_single_thread)  s
    z+FaultHandlerTests.test_enable_single_threadc                 C   sH   d}d}|  |\}}d|}| ||vd||f  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            faulthandler._sigsegv()
            zFatal Python errorr   z%r is present in %rr   )r:   rD   
assertTruerF   )r2   r3   Znot_expectedr7   r8   r   r   r   test_disable4  s    


zFaultHandlerTests.test_disablec                 C   s   t j}zzt jt _t }zFt  | t  t  | t  W |rVt  qxt  n|rnt  nt  0 W |t _n|t _0 d S r   )	sysr7   
__stderr__faulthandler
is_enabledenablert   disableZassertFalse)r2   Zorig_stderrZwas_enabledr   r   r   test_is_enabledC  s     


z!FaultHandlerTests.test_is_enabledc                 C   s0   d}t jdd|f}t|}| | d d S )N5import faulthandler; print(faulthandler.is_enabled())-Er      False)rv   
executable
subprocesscheck_outputr+   rstrip)r2   r3   argsr6   r   r   r   test_disabled_by_defaultY  s    
z*FaultHandlerTests.test_disabled_by_defaultc                 C   s`   d}t d tjtjjrdndddd|f}tj }|dd  t	j
||d}| | d	 d S )
Nr}   r~   r#   z-Xrx   r   PYTHONFAULTHANDLERenv   True)filterrv   r   flagsignore_environmentr.   environcopypopr   r   r+   r   r2   r3   r   r   r6   r   r   r   test_sys_xoptionsa  s    
z#FaultHandlerTests.test_sys_xoptionsc                 C   s   d}t jd|f}ttj}d|d< d|d< tj||d}| | d ttj}d|d< d|d< tj||d}| | d	 d S )
Nr}   r   r#   r   ZPYTHONDEVMODEr   r   1r   )	rv   r   dictr.   r   r   r   r+   r   r   r   r   r   test_env_varm  s    

zFaultHandlerTests.test_env_varrB   c                C   sl   d}|j ||d}|rd}n|dur*d}nd}dd| d	d
g}| |||\}}| || | |d dS )z
        Explicitly call dump_traceback() function and check its output.
        Raise an error if the output doesn't match the expected format.
        a[  
            import faulthandler

            filename = {filename!r}
            fd = {fd}

            def funcB():
                if filename:
                    with open(filename, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=False)
                elif fd is not None:
                    faulthandler.dump_traceback(fd,
                                                all_threads=False)
                else:
                    faulthandler.dump_traceback(all_threads=False)

            def funcA():
                funcB()

            funcA()
            rB   	   N      Stack (most recent call first):z#  File "<string>", line %s in funcBz#  File "<string>", line 17 in funcAz&  File "<string>", line 19 in <module>r   rC   r:   r+   )r2   r   r4   r3   r?   expectedtracer8   r   r   r   check_dump_traceback  s$    z&FaultHandlerTests.check_dump_tracebackc                 C   s   |    d S r   )r   rR   r   r   r   test_dump_traceback  s    z%FaultHandlerTests.test_dump_tracebackc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S Nr   )r   r   rk   r   r   r   test_dump_traceback_file  s    z*FaultHandlerTests.test_dump_traceback_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S Nrm   rn   )r   ro   r   rp   r2   r9   r   r   r   test_dump_traceback_fd  s    z(FaultHandlerTests.test_dump_traceback_fdc                 C   sd   d}d|d  }d| d }d}|j |d}dd| d	g}| |\}}| || | |d
 d S )Ni  x2   z...z
            import faulthandler

            def {func_name}():
                faulthandler.dump_traceback(all_threads=False)

            {func_name}()
            )	func_namer   z  File "<string>", line 4 in %sz%  File "<string>", line 6 in <module>r   r   )r2   maxlenr   Z	truncatedr3   r   r   r8   r   r   r   test_truncate  s    zFaultHandlerTests.test_truncatec                 C   sp   d}|j t|d}| ||\}}d|}|r8d}nd}d}t|j |d }| || | |d d	S )
z
        Call explicitly dump_traceback(all_threads=True) and check the output.
        Raise an error if the output doesn't match the expected format.
        a  
            import faulthandler
            from threading import Thread, Event
            import time

            def dump():
                if {filename}:
                    with open({filename}, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=True)
                else:
                    faulthandler.dump_traceback(all_threads=True)

            class Waiter(Thread):
                # avoid blocking if the main thread raises an exception.
                daemon = True

                def __init__(self):
                    Thread.__init__(self)
                    self.running = Event()
                    self.stop = Event()

                def run(self):
                    self.running.set()
                    self.stop.wait()

            waiter = Waiter()
            waiter.start()
            waiter.running.wait()
            dump()
            waiter.stop.set()
            waiter.join()
            r   r      
   a  
            ^Thread 0x[0-9a-f]+ \(most recent call first\):
            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
            ){{1,3}}  File "<string>", line 23 in run
              File ".*threading.py", line [0-9]+ in _bootstrap_inner
              File ".*threading.py", line [0-9]+ in _bootstrap

            Current thread 0x[0-9a-f]+ \(most recent call first\):
              File "<string>", line {lineno} in dump
              File "<string>", line 28 in <module>$
            )r?   r   N)rC   rj   r:   rD   r   r&   rE   r+   )r2   r   r3   r6   r8   r?   r   r   r   r   check_dump_traceback_threads  s     
z.FaultHandlerTests.check_dump_traceback_threadsc                 C   s   |  d  d S r   )r   rR   r   r   r   test_dump_traceback_threads  s    z-FaultHandlerTests.test_dump_traceback_threadsc                 C   s4   t  }| | W d    n1 s&0    Y  d S r   )r   r   rk   r   r   r    test_dump_traceback_threads_file  s    z2FaultHandlerTests.test_dump_traceback_threads_filer	   c                C   s   t tjtd}d}|jt|||||d}| ||\}}	d|}|s~|}
|rX|
d9 }
d| }tdd||
d	}| || n| 	|d
 | 	|	d dS )a  
        Check how many times the traceback is written in timeout x 2.5 seconds,
        or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
        on repeat and cancel options.

        Raise an error if the output doesn't match the expect format.
        )Zsecondsa  
            import faulthandler
            import time
            import sys

            timeout = {timeout}
            repeat = {repeat}
            cancel = {cancel}
            loops = {loops}
            filename = {filename!r}
            fd = {fd}

            def func(timeout, repeat, cancel, file, loops):
                for loop in range(loops):
                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
                    if cancel:
                        faulthandler.cancel_dump_traceback_later()
                    time.sleep(timeout * 5)
                    faulthandler.cancel_dump_traceback_later()

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            func(timeout, repeat, cancel, file, loops)
            if filename:
                file.close()
            )timeoutrepeatcancelloopsr   r4   r   r`   zATimeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n      )r   r#   r   N)
strdatetimeZ	timedeltaTIMEOUTrC   r:   rD   r   rE   r+   )r2   r   r   r   r   r4   Ztimeout_strr3   r   r8   countr   r   r   r   r   check_dump_traceback_later  s*    	
z,FaultHandlerTests.check_dump_traceback_laterc                 C   s   |    d S r   r   rR   r   r   r   test_dump_traceback_laterR  s    z+FaultHandlerTests.test_dump_traceback_laterc                 C   s   | j dd d S )NT)r   r   rR   r   r   r    test_dump_traceback_later_repeatU  s    z2FaultHandlerTests.test_dump_traceback_later_repeatc                 C   s   | j dd d S )NT)r   r   rR   r   r   r    test_dump_traceback_later_cancelX  s    z2FaultHandlerTests.test_dump_traceback_later_cancelc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S r   )r   r   rk   r   r   r   test_dump_traceback_later_file[  s    z0FaultHandlerTests.test_dump_traceback_later_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S r   )r   ro   r   rp   r   r   r   r   test_dump_traceback_later_fd_  s    z.FaultHandlerTests.test_dump_traceback_later_fdc                 C   s   | j dd d S )Nr`   )r   r   rR   r   r   r   test_dump_traceback_later_twicee  s    z1FaultHandlerTests.test_dump_traceback_later_twiceregisterzneed faulthandler.registerc                 C   s   t j}d}|j||||||d}| ||\}}	d|}|sf|rHd}
nd}
tdd|
}
| ||
 n| |d |r| |	d	 n| |	d	 d
S )a  
        Register a handler displaying the traceback on a user signal. Raise the
        signal and check the written traceback.

        If chain is True, check that the previous signal handler is called.

        Raise an error if the output doesn't match the expected format.
        ax  
            import faulthandler
            import os
            import signal
            import sys

            all_threads = {all_threads}
            signum = {signum}
            unregister = {unregister}
            chain = {chain}
            filename = {filename!r}
            fd = {fd}

            def func(signum):
                os.kill(os.getpid(), signum)

            def handler(signum, frame):
                handler.called = True
            handler.called = False

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            if chain:
                signal.signal(signum, handler)
            faulthandler.register(signum, file=file,
                                  all_threads=all_threads, chain={chain})
            if unregister:
                faulthandler.unregister(signum)
            func(signum)
            if chain and not handler.called:
                if file is not None:
                    output = file
                else:
                    output = sys.stderr
                print("Error: signal handler not called!", file=output)
                exitcode = 1
            else:
                exitcode = 0
            if filename:
                file.close()
            sys.exit(exitcode)
            )r;   signum
unregisterchainr   r4   r   z8Current thread 0x[0-9a-f]+ \(most recent call first\):\nz#Stack \(most recent call first\):\nr       r#   r   N)	signalSIGUSR1rC   r:   rD   r   rE   r+   rF   )r2   r   r;   r   r   r4   r   r3   r   r8   r   r   r   r   check_registerh  s,    .
z FaultHandlerTests.check_registerc                 C   s   |    d S r   r   rR   r   r   r   test_register  s    zFaultHandlerTests.test_registerc                 C   s   | j dd d S )NT)r   r   rR   r   r   r   test_unregister  s    z!FaultHandlerTests.test_unregisterc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S r   )r   r   rk   r   r   r   test_register_file  s    z$FaultHandlerTests.test_register_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S r   )r   ro   r   rp   r   r   r   r   test_register_fd  s    z"FaultHandlerTests.test_register_fdc                 C   s   | j dd d S )NTrr   r   rR   r   r   r   test_register_threads  s    z'FaultHandlerTests.test_register_threadsc                 C   s   | j dd d S )NT)r   r   rR   r   r   r   test_register_chain  s    z%FaultHandlerTests.test_register_chainc                 c   sf   t j}zRd t _| t}d V  W d    n1 s40    Y  | t|jd W |t _n|t _0 d S )Nzsys.stderr is None)rv   r7   ZassertRaisesRuntimeErrorr+   r   	exception)r2   r7   cmr   r   r   check_stderr_none  s    $z#FaultHandlerTests.check_stderr_nonec                 C   s   |    t  W d    n1 s&0    Y  |    t  W d    n1 sV0    Y  |    td W d    n1 s0    Y  ttdr|    ttj W d    n1 s0    Y  d S )NgMbP?r   )	r   rx   rz   Zdump_tracebackZdump_traceback_laterhasattrr   r   r   rR   r   r   r   test_stderr_None  s    
&
&
(

z"FaultHandlerTests.test_stderr_Nonezspecific to Windowsc                 C   s(   dD ]\}}|  d| dd| qd S )N))ZEXCEPTION_ACCESS_VIOLATIONrP   )ZEXCEPTION_INT_DIVIDE_BY_ZEROzint divide by zero)ZEXCEPTION_STACK_OVERFLOWzstack overflowz
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(faulthandler._)
                rO   )rN   )r2   excnamer   r   r   test_raise_exception  s    z&FaultHandlerTests.test_raise_exceptionc                 C   sH   dD ]>}d| d}t |}| |\}}| |g  | || qd S )N)l   cs@ l   RC@ z
                    import faulthandler
                    faulthandler.enable()
                    faulthandler._raise_exception(z)
                    r   r:   r+   )r2   Zexc_coder3   r6   r8   r   r   r   test_ignore_exception  s    z'FaultHandlerTests.test_ignore_exceptionc                 C   sF   dD ]<}|  d|dd\}}| |g  | |||d@ f qd S )N)r   ixV4i   @i  @i   piz{
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(0xr   r   i)r:   r+   ZassertIn)r2   r   r6   r8   r   r   r   test_raise_nonfatal_exception  s    
z/FaultHandlerTests.test_raise_nonfatal_exceptionc                 C   s2   t d}| |\}}| |g  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
            faulthandler._raise_exception(code)
        l       r   r2   r3   r6   r8   r   r   r    test_disable_windows_exc_handler$  s    z2FaultHandlerTests.test_disable_windows_exc_handlerc                 C   s2   t d}| |\}}| |g  | |d d S )Nz`
            import faulthandler
            faulthandler.cancel_dump_traceback_later()
        r   r   r   r   r   r   .test_cancel_later_without_dump_traceback_later1  s    z@FaultHandlerTests.test_cancel_later_without_dump_traceback_later)NN)N)FFr	   )FFFFN)E__name__
__module____qualname__r:   rH   rM   rN   r   r   rv   platform
startswithrS   r   rV   rW   rX   rZ   	_testcapiZ
skipUnlessr   r   r]   r_   rd   re   rx   rf   rg   r   rl   rq   rs   ru   r|   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rQ   r   r   r   r   r   r   r   r   r   r   .   s   
"$


		
	



	


.
;>
  Q










r   __main__)r	   )
contextlibr   r   rx   r.   r   r   rv   r   r   Ztest.supportr   r   r   r   r   textwrapr   r   ImportErrorr   r   rQ   r   r   r   ZTestCaser   r   mainr   r   r   r   <module>   s>   


	
      