a
    zep                     @   s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZ d dl
mZ d dlZd dlZd dlmZ zd dlZW n ey   dZY n0 dZejdkZdd
dZdd Zedd ZG dd dejZedkre  dS )    )contextmanagerN)support)script_helper
is_android)skip_if_sanitizer)dedentg      ?nt   c                 C   sL   |}|d|  7 }|d| 7 }d|k r<d|d |d   | S d| d S d S )Nz#  File "<string>", line %s in func
z&  File "<string>", line %s in <module>r	   ^
$ )Zlineno1Zlineno2header	min_countregexr   r   ,/usr/lib/python3.9/test/test_faulthandler.pyexpected_traceback   s    r   c                 C   s   t td| S )Nz(raising SIGSEGV on Android is unreliable)unittestskipIfr   )testr   r   r   skip_segfault_on_android!   s
    r   c               	   c   s.   t  } z| V  W t|  nt|  0 d S N)tempfilemktempr   unlinkfilenamer   r   r   temporary_filename&   s    r   c                   @   s2  e Zd ZdddZdddddddddZdd	d
Zdd Zee	j
dddd Zedd Zdd Zdd Zee	j
dkddd Zeedu deeeddedd Zeedu deeed d!ed"d# Zd$d% Zd&d' Zee	j
d(d)eeed* d+d,d- Zed.d/ Zeddd0d1ed2d3 Zee	j
dkd4eddd0d1ed5d6 Zed7d8 Z ed9d: Z!d;d< Z"d=d> Z#d?d@ Z$dAdB Z%dddCdDdEZ&dFdG Z'dHdI Z(ee	j
dkd4dJdK Z)dLdM Z*dNdO Z+dPdQ Z,dRdS Z-ddddCdUdVZ.dWdX Z/dYdZ Z0d[d\ Z1d]d^ Z2ee	j
dkd4d_d` Z3dadb Z4eeedc ddddedfZ5dgdh Z6didj Z7dkdl Z8ee	j
dkd4dmdn Z9dodp Z:dqdr Z;e<dsdt Z=dudv Z>ee?dwdxdy Z@ee?dwdzd{ ZAee?dwd|d} ZBee?dwd~d ZCdd ZDdS )FaultHandlerTestsNc           
   	   C   sd  t | }g }|d ur"|| t X tjd||d}|$ | \}}| }W d    n1 sj0    Y  W d    n1 s0    Y  |	dd}|r| 
|d t|d}	|	 }W d    n1 s0    Y  |	dd}nj|d urX| 
|d t|tjd t|ddd	}	|	 }W d    n1 sB0    Y  |	dd}| |fS )
N-c)pass_fdsasciibackslashreplace rbr   F)closefd)r   stripappendr   ZSuppressCrashReportr   Zspawn_pythoncommunicatewaitdecodeassertEqualopenreadoslseekSEEK_SET
splitlines)
selfcoder   fdr    processoutputstderrexitcodefpr   r   r   
get_output/   s,    


D&
(zFaultHandlerTests.get_outputTF)r   all_threadsother_regexr4   know_current_threadpy_fatal_errorc                C   s   |r|rd}
qd}
nd}
d}|	r(|d7 }t |j|||
d }|rP|d| 7 }| j|||d\}}d	|}| || | |d
 d S )NzCurrent thread 0x[0-9a-f]+zThread 0x[0-9a-f]+ZStackz
            (?m)^{fatal_error}

            {header} \(most recent call first\):
              File "<string>", line {lineno} in <module>
            z"
Python runtime state: initialized)linenofatal_errorr   |r   r4   r   r   )r   formatr&   r:   joinassertRegexassertNotEqual)r2   r3   line_numberr@   r   r;   r<   r4   r=   r>   r   r   r6   r8   r   r   r   check_errorP   s&    


zFaultHandlerTests.check_errorc                 K   s2   |rd||f }d| }| j |||fi | d S )Nz%s: %szFatal Python error: %srH   )r2   r3   rG   
name_regexfunckwr@   r   r   r   check_fatal_errort   s    z#FaultHandlerTests.check_fatal_errorc                 K   s"   d| }| j |||fi | d S )NzWindows fatal exception: %srI   )r2   r3   rG   rJ   rL   r@   r   r   r   check_windows_exceptionz   s    z)FaultHandlerTests.check_windows_exceptionZaixz5the first page of memory is a mapped read-only on AIXc                 C   s&   t s| ddd n| ddd d S )Nz
                import faulthandler
                faulthandler.enable()
                faulthandler._read_null()
                   z4(?:Segmentation fault|Bus error|Illegal instruction)access violation)
MS_WINDOWSrM   rN   r2   r   r   r   test_read_null~   s    z FaultHandlerTests.test_read_nullc                 C   s   |  ddd d S )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv()
            rO   Segmentation faultrM   rR   r   r   r   test_sigsegv   s    zFaultHandlerTests.test_sigsegvc                 C   s   | j ddddddd d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler._fatal_error_c_thread()
            rO   zin new threadFZfaulthandler_fatal_error_threadT)r=   rK   r>   rU   rR   r   r   r   test_fatal_error_c_thread   s    z+FaultHandlerTests.test_fatal_error_c_threadc                 C   s   |  ddd d S )Nzs
            import faulthandler
            faulthandler.enable()
            faulthandler._sigabrt()
            rO   ZAbortedrU   rR   r   r   r   test_sigabrt   s    zFaultHandlerTests.test_sigabrtwin32z"SIGFPE cannot be caught on Windowsc                 C   s   |  ddd d S )Nzr
            import faulthandler
            faulthandler.enable()
            faulthandler._sigfpe()
            rO   zFloating point exceptionrU   rR   r   r   r   test_sigfpe   s    zFaultHandlerTests.test_sigfpezneed _testcapiSIGBUSzneed signal.SIGBUSc                 C   s   |  ddd d S )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGBUS)
               z	Bus errorrU   rR   r   r   r   test_sigbus   s    zFaultHandlerTests.test_sigbusSIGILLzneed signal.SIGILLc                 C   s   |  ddd d S )Nz
            import faulthandler
            import signal

            faulthandler.enable()
            signal.raise_signal(signal.SIGILL)
            r\   zIllegal instructionrU   rR   r   r   r   test_sigill   s    zFaultHandlerTests.test_sigillc                 C   s   | j dddddd d S )Nz[
            import faulthandler
            faulthandler._fatal_error(b'xyz')
               xyzfaulthandler_fatal_error_pyTrK   r>   rU   rR   r   r   r   test_fatal_error   s    z"FaultHandlerTests.test_fatal_errorc                 C   s   | j dddddd d S )Nza
            import faulthandler
            faulthandler._fatal_error(b'xyz', True)
            r`   ra   rb   Trc   rU   rR   r   r   r   test_fatal_error_without_gil   s    z.FaultHandlerTests.test_fatal_error_without_gilZopenbsdzVIssue #12868: sigaltstack() doesn't work on OpenBSD if Python is compiled with pthreadZ_stack_overflowz#need faulthandler._stack_overflow()c                 C   s   | j ddddd d S )Nzz
            import faulthandler
            faulthandler.enable()
            faulthandler._stack_overflow()
            rO   z (?:Segmentation fault|Bus error)z unable to raise a stack overflow)r<   rU   rR   r   r   r   test_stack_overflow   s
    z%FaultHandlerTests.test_stack_overflowc                 C   s   |  ddd d S )Nzw
            import faulthandler
            faulthandler.enable()
            faulthandler._sigsegv(True)
            rO   rT   rU   rR   r   r   r   test_gil_released   s    z#FaultHandlerTests.test_gil_releasedz0sanitizer builds change crashing process output.)ZmemoryZubreasonc                 C   sH   t  .}| jdjt|ddd|d W d    n1 s:0    Y  d S )Nz
                import faulthandler
                output = open({filename}, 'wb')
                faulthandler.enable(output)
                faulthandler._sigsegv()
                r      rT   )r   rM   rC   reprr2   r   r   r   r   test_enable_file  s    z"FaultHandlerTests.test_enable_filez.subprocess doesn't support pass_fds on Windowsc                 C   sL   t d.}| }| jd| dd|d W d    n1 s>0    Y  d S )Nwb+z
                import faulthandler
                import sys
                faulthandler.enable(%s)
                faulthandler._sigsegv()
                ri   rT   r4   )r   TemporaryFilefilenorM   )r2   r9   r4   r   r   r   test_enable_fd  s    z FaultHandlerTests.test_enable_fdc                 C   s   | j ddddd d S )Nz
            import faulthandler
            faulthandler.enable(all_threads=False)
            faulthandler._sigsegv()
            rO   rT   Fr;   rU   rR   r   r   r   test_enable_single_thread)  s
    z+FaultHandlerTests.test_enable_single_threadc                 C   sH   d}d}|  |\}}d|}| ||vd||f  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            faulthandler._sigsegv()
            zFatal Python errorr   z%r is present in %rr   )r:   rD   
assertTruerF   )r2   r3   Znot_expectedr7   r8   r   r   r   test_disable4  s    


zFaultHandlerTests.test_disablec                 C   s   t j}zzt jt _t }zFt  | t  t  | t  W |rVt  qxt  n|rnt  nt  0 W |t _n|t _0 d S r   )	sysr7   
__stderr__faulthandler
is_enabledenablert   disableZassertFalse)r2   Zorig_stderrZwas_enabledr   r   r   test_is_enabledC  s     


z!FaultHandlerTests.test_is_enabledc                 C   s0   d}t jdd|f}t|}| | d d S )N5import faulthandler; print(faulthandler.is_enabled())-Er      False)rv   
executable
subprocesscheck_outputr+   rstrip)r2   r3   argsr6   r   r   r   test_disabled_by_defaultY  s    
z*FaultHandlerTests.test_disabled_by_defaultc                 C   s`   d}t d tjtjjrdndddd|f}tj }|dd  t	j
||d}| | d	 d S )
Nr}   r~   r#   z-Xrx   r   PYTHONFAULTHANDLERenv   True)filterrv   r   flagsignore_environmentr.   environcopypopr   r   r+   r   r2   r3   r   r   r6   r   r   r   test_sys_xoptionsa  s    
z#FaultHandlerTests.test_sys_xoptionsc                 C   s   d}t jd|f}ttj}d|d< d|d< tj||d}| | d ttj}d|d< d|d< tj||d}| | d	 d S )
Nr}   r   r#   r   ZPYTHONDEVMODEr   r   1r   )	rv   r   dictr.   r   r   r   r+   r   r   r   r   r   test_env_varm  s    

zFaultHandlerTests.test_env_varrB   c                C   sl   d}|j ||d}|rd}n|d ur*d}nd}dd| dd	g}| |||\}}| || | |d
 d S )Na[  
            import faulthandler

            filename = {filename!r}
            fd = {fd}

            def funcB():
                if filename:
                    with open(filename, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=False)
                elif fd is not None:
                    faulthandler.dump_traceback(fd,
                                                all_threads=False)
                else:
                    faulthandler.dump_traceback(all_threads=False)

            def funcA():
                funcB()

            funcA()
            rB   	         Stack (most recent call first):z#  File "<string>", line %s in funcBz#  File "<string>", line 17 in funcAz&  File "<string>", line 19 in <module>r   rC   r:   r+   )r2   r   r4   r3   r?   expectedtracer8   r   r   r   check_dump_traceback  s$    z&FaultHandlerTests.check_dump_tracebackc                 C   s   |    d S r   )r   rR   r   r   r   test_dump_traceback  s    z%FaultHandlerTests.test_dump_tracebackc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S Nr   )r   r   rk   r   r   r   test_dump_traceback_file  s    z*FaultHandlerTests.test_dump_traceback_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S Nrm   rn   )r   ro   r   rp   r2   r9   r   r   r   test_dump_traceback_fd  s    z(FaultHandlerTests.test_dump_traceback_fdc                 C   sd   d}d|d  }d| d }d}|j |d}dd| d	g}| |\}}| || | |d
 d S )Ni  x2   z...z
            import faulthandler

            def {func_name}():
                faulthandler.dump_traceback(all_threads=False)

            {func_name}()
            )	func_namer   z  File "<string>", line 4 in %sz%  File "<string>", line 6 in <module>r   r   )r2   maxlenr   Z	truncatedr3   r   r   r8   r   r   r   test_truncate  s    zFaultHandlerTests.test_truncatec                 C   sp   d}|j t|d}| ||\}}d|}|r8d}nd}d}t|j |d }| || | |d d S )	Na  
            import faulthandler
            from threading import Thread, Event
            import time

            def dump():
                if {filename}:
                    with open({filename}, "wb") as fp:
                        faulthandler.dump_traceback(fp, all_threads=True)
                else:
                    faulthandler.dump_traceback(all_threads=True)

            class Waiter(Thread):
                # avoid blocking if the main thread raises an exception.
                daemon = True

                def __init__(self):
                    Thread.__init__(self)
                    self.running = Event()
                    self.stop = Event()

                def run(self):
                    self.running.set()
                    self.stop.wait()

            waiter = Waiter()
            waiter.start()
            waiter.running.wait()
            dump()
            waiter.stop.set()
            waiter.join()
            r   r      
   a  
            ^Thread 0x[0-9a-f]+ \(most recent call first\):
            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
            ){{1,3}}  File "<string>", line 23 in run
              File ".*threading.py", line [0-9]+ in _bootstrap_inner
              File ".*threading.py", line [0-9]+ in _bootstrap

            Current thread 0x[0-9a-f]+ \(most recent call first\):
              File "<string>", line {lineno} in dump
              File "<string>", line 28 in <module>$
            )r?   r   )rC   rj   r:   rD   r   r&   rE   r+   )r2   r   r3   r6   r8   r?   r   r   r   r   check_dump_traceback_threads  s     
z.FaultHandlerTests.check_dump_traceback_threadsc                 C   s   |  d  d S r   )r   rR   r   r   r   test_dump_traceback_threads  s    z-FaultHandlerTests.test_dump_traceback_threadsc                 C   s4   t  }| | W d    n1 s&0    Y  d S r   )r   r   rk   r   r   r    test_dump_traceback_threads_file  s    z2FaultHandlerTests.test_dump_traceback_threads_filer	   c                C   s   t tjtd}d}|jt|||||d}| ||\}}	d|}|s~|}
|rX|
d9 }
d| }tdd||
d	}| || n| 	|d
 | 	|	d d S )N)Zsecondsa  
            import faulthandler
            import time
            import sys

            timeout = {timeout}
            repeat = {repeat}
            cancel = {cancel}
            loops = {loops}
            filename = {filename!r}
            fd = {fd}

            def func(timeout, repeat, cancel, file, loops):
                for loop in range(loops):
                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
                    if cancel:
                        faulthandler.cancel_dump_traceback_later()
                    time.sleep(timeout * 5)
                    faulthandler.cancel_dump_traceback_later()

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            func(timeout, repeat, cancel, file, loops)
            if filename:
                file.close()
            )timeoutrepeatcancelloopsr   r4   r   r`   zATimeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n      )r   r#   r   )
strdatetimeZ	timedeltaTIMEOUTrC   r:   rD   r   rE   r+   )r2   r   r   r   r   r4   Ztimeout_strr3   r   r8   countr   r   r   r   r   check_dump_traceback_later  s*    	
z,FaultHandlerTests.check_dump_traceback_laterc                 C   s   |    d S r   r   rR   r   r   r   test_dump_traceback_laterR  s    z+FaultHandlerTests.test_dump_traceback_laterc                 C   s   | j dd d S )NT)r   r   rR   r   r   r    test_dump_traceback_later_repeatU  s    z2FaultHandlerTests.test_dump_traceback_later_repeatc                 C   s   | j dd d S )NT)r   r   rR   r   r   r    test_dump_traceback_later_cancelX  s    z2FaultHandlerTests.test_dump_traceback_later_cancelc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S r   )r   r   rk   r   r   r   test_dump_traceback_later_file[  s    z0FaultHandlerTests.test_dump_traceback_later_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S r   )r   ro   r   rp   r   r   r   r   test_dump_traceback_later_fd_  s    z.FaultHandlerTests.test_dump_traceback_later_fdc                 C   s   | j dd d S )Nr`   )r   r   rR   r   r   r   test_dump_traceback_later_twicee  s    z1FaultHandlerTests.test_dump_traceback_later_twiceregisterzneed faulthandler.registerc                 C   s   t j}d}|j||||||d}| ||\}}	d|}|sf|rHd}
nd}
tdd|
}
| ||
 n| |d |r| |	d	 n| |	d	 d S )
Nax  
            import faulthandler
            import os
            import signal
            import sys

            all_threads = {all_threads}
            signum = {signum}
            unregister = {unregister}
            chain = {chain}
            filename = {filename!r}
            fd = {fd}

            def func(signum):
                os.kill(os.getpid(), signum)

            def handler(signum, frame):
                handler.called = True
            handler.called = False

            if filename:
                file = open(filename, "wb")
            elif fd is not None:
                file = sys.stderr.fileno()
            else:
                file = None
            if chain:
                signal.signal(signum, handler)
            faulthandler.register(signum, file=file,
                                  all_threads=all_threads, chain={chain})
            if unregister:
                faulthandler.unregister(signum)
            func(signum)
            if chain and not handler.called:
                if file is not None:
                    output = file
                else:
                    output = sys.stderr
                print("Error: signal handler not called!", file=output)
                exitcode = 1
            else:
                exitcode = 0
            if filename:
                file.close()
            sys.exit(exitcode)
            )r;   signum
unregisterchainr   r4   r   z8Current thread 0x[0-9a-f]+ \(most recent call first\):\nz#Stack \(most recent call first\):\nr       r#   r   )	signalSIGUSR1rC   r:   rD   r   rE   r+   rF   )r2   r   r;   r   r   r4   r   r3   r   r8   r   r   r   r   check_registerh  s,    .
z FaultHandlerTests.check_registerc                 C   s   |    d S r   r   rR   r   r   r   test_register  s    zFaultHandlerTests.test_registerc                 C   s   | j dd d S )NT)r   r   rR   r   r   r   test_unregister  s    z!FaultHandlerTests.test_unregisterc                 C   s6   t  }| j|d W d    n1 s(0    Y  d S r   )r   r   rk   r   r   r   test_register_file  s    z$FaultHandlerTests.test_register_filec                 C   s>   t d }| j| d W d    n1 s00    Y  d S r   )r   ro   r   rp   r   r   r   r   test_register_fd  s    z"FaultHandlerTests.test_register_fdc                 C   s   | j dd d S )NTrr   r   rR   r   r   r   test_register_threads  s    z'FaultHandlerTests.test_register_threadsc                 C   s   | j dd d S )NT)r   r   rR   r   r   r   test_register_chain  s    z%FaultHandlerTests.test_register_chainc                 c   sf   t j}zRd t _| t}d V  W d    n1 s40    Y  | t|jd W |t _n|t _0 d S )Nzsys.stderr is None)rv   r7   ZassertRaisesRuntimeErrorr+   r   	exception)r2   r7   cmr   r   r   check_stderr_none  s    $z#FaultHandlerTests.check_stderr_nonec                 C   s   |    t  W d    n1 s&0    Y  |    t  W d    n1 sV0    Y  |    td W d    n1 s0    Y  ttdr|    ttj W d    n1 s0    Y  d S )NgMbP?r   )	r   rx   rz   Zdump_tracebackZdump_traceback_laterhasattrr   r   r   rR   r   r   r   test_stderr_None  s    
&
&
(

z"FaultHandlerTests.test_stderr_Nonezspecific to Windowsc                 C   s(   dD ]\}}|  d| dd| qd S )N))ZEXCEPTION_ACCESS_VIOLATIONrP   )ZEXCEPTION_INT_DIVIDE_BY_ZEROzint divide by zero)ZEXCEPTION_STACK_OVERFLOWzstack overflowz
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(faulthandler._)
                rO   )rN   )r2   excnamer   r   r   test_raise_exception  s    z&FaultHandlerTests.test_raise_exceptionc                 C   sH   dD ]>}d| d}t |}| |\}}| |g  | || qd S )N)l   cs@ l   RC@ z
                    import faulthandler
                    faulthandler.enable()
                    faulthandler._raise_exception(z)
                    r   r:   r+   )r2   Zexc_coder3   r6   r8   r   r   r   test_ignore_exception  s    z'FaultHandlerTests.test_ignore_exceptionc                 C   sF   dD ]<}|  d|dd\}}| |g  | |||d@ f qd S )N)r   ixV4i   @i  @i   piz{
                import faulthandler
                faulthandler.enable()
                faulthandler._raise_exception(0xr   r   i)r:   r+   ZassertIn)r2   r   r6   r8   r   r   r   test_raise_nonfatal_exception  s    
z/FaultHandlerTests.test_raise_nonfatal_exceptionc                 C   s2   t d}| |\}}| |g  | |d d S )Nz
            import faulthandler
            faulthandler.enable()
            faulthandler.disable()
            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
            faulthandler._raise_exception(code)
        l       r   r2   r3   r6   r8   r   r   r    test_disable_windows_exc_handler$  s    z2FaultHandlerTests.test_disable_windows_exc_handlerc                 C   s2   t d}| |\}}| |g  | |d d S )Nz`
            import faulthandler
            faulthandler.cancel_dump_traceback_later()
        r   r   r   r   r   r   .test_cancel_later_without_dump_traceback_later1  s    z@FaultHandlerTests.test_cancel_later_without_dump_traceback_later)NN)N)FFr	   )FFFFN)E__name__
__module____qualname__r:   rH   rM   rN   r   r   rv   platform
startswithrS   r   rV   rW   rX   rZ   	_testcapiZ
skipUnlessr   r   r]   r_   rd   re   rx   rf   rg   r   rl   rq   rs   ru   r|   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rQ   r   r   r   r   r   r   r   r   r   r   .   s   
"$


		
	



	


.
;>
  Q










r   __main__)r	   )
contextlibr   r   rx   r.   r   r   rv   r   r   Ztest.supportr   r   r   r   r   textwrapr   r   ImportErrorr   r   rQ   r   r   r   ZTestCaser   r   mainr   r   r   r   <module>   s>   


	
      