a zer@s0ddlmZddlZddlZddlZddlZddlZddlZddlZddl m Z ddl m Z m Z ddlZddlZddlmZz ddlZWneydZYn0dZejdkZedpd Zed pd Zd evpd evZd evpdevZdddZddZeddZGdddejZ e!dkr,e"dS))contextmanagerN)support) script_helper is_android)dedentg?ntZCFLAGSZ CONFIG_ARGSz-fsanitize=undefinedz#--with-undefined-behavior-sanitizerz-fsanitize=memoryz--with-memory-sanitizercCsL|}|d|7}|d|7}d|kr", line %s in func z& File "", line %s in r ^ $)Zlineno1Zlineno2header min_countregexr r J/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_faulthandler.pyexpected_traceback"s   rcCsttd|S)Nz(raising SIGSEGV on Android is unreliable)unittestskipIfr)testr r rskip_segfault_on_android+s rc cs.t}z|VWt|n t|0dSN)tempfilemktemprunlinkfilenamer r rtemporary_filename0src@s:eZdZdddZdddddddddZdd d Zd d Zee j d dddZ e ddZddZddZee j dkdddZeedudeeedde ddZeedudeeed d!e d"d#Zd$d%Zd&d'Zee j d(d)eeed* d+d,d-Ze d.d/ZeepBed0e d1d2Zee j dkd3eepped0e d4d5Z e d6d7Z!e d8d9Z"d:d;Z#dd?Z%d@dAZ&dddBdCdDZ'dEdFZ(dGdHZ)ee j dkd3dIdJZ*dKdLZ+dMdNZ,dOdPZ-dQdRZ.ddddBdTdUZ/dVdWZ0dXdYZ1dZd[Z2d\d]Z3ee j dkd3d^d_Z4d`daZ5eeedb dcddddeZ6dfdgZ7dhdiZ8djdkZ9ee j dkd3dldmZ:dndoZ;dpdqZdtduZ?ee@dvdwdxZAee@dvdydzZBee@dvd{d|ZCee@dvd}d~ZDddZEdS)FaultHandlerTestsNc Csdt|}g}|dur"||tXtjd||d}|$|\}}|}Wdn1sj0YWdn1s0Y| dd}|r| |dt |d} | }Wdn1s0Y| dd}nj|durX| |dt |t jdt |dd d } | }Wdn1sB0Y| dd}||fS) a{ Run the specified code in Python (in a new child process) and read the output from the standard error or from a file (if filename is set). Return the output lines as a list. Strip the reference count from the standard error for Python debug build, and replace "Current thread 0x00007f8d8fbd9700" by "Current thread XXX". N-c)pass_fdsasciibackslashreplacerrbrF)closefd)rstripappendrZSuppressCrashReportrZ spawn_python communicatewaitdecode assertEqualopenreadoslseekSEEK_SET splitlines) selfcoderfdr processoutputstderrexitcodefpr r r get_output9s,    D   &  ( zFaultHandlerTests.get_outputTF)r all_threads other_regexr3know_current_threadpy_fatal_errorcCs|r|rd} qd} nd} d} | r(|d7}t| j||| d} |rP| d|7} |j|||d\} } d | } || | || d d S) z Check that the fault handler for fatal errors is enabled and check the traceback from the child process output. Raise an error if the output doesn't match the expected format. zCurrent thread 0x[0-9a-f]+zThread 0x[0-9a-f]+ZStackz (?m)^{fatal_error} {header} \(most recent call first\): File "", line {lineno} in z" Python runtime state: initialized)lineno fatal_errorr|rr3r rN)rformatr%r9join assertRegexassertNotEqual)r1r2 line_numberr?rr:r;r3r<r=rrr5r7r r r check_errorZs&     zFaultHandlerTests.check_errorcKs2|rd||f}d|}|j|||fi|dS)Nz%s: %szFatal Python error: %srG)r1r2rF name_regexfunckwr?r r rcheck_fatal_error~s z#FaultHandlerTests.check_fatal_errorcKs"d|}|j|||fi|dS)NzWindows fatal exception: %srH)r1r2rFrIrKr?r r rcheck_windows_exceptionsz)FaultHandlerTests.check_windows_exceptionZaixz5the first page of memory is a mapped read-only on AIXcCs&ts|dddn|ddddS)Nz import faulthandler faulthandler.enable() faulthandler._read_null() z4(?:Segmentation fault|Bus error|Illegal instruction)access violation) MS_WINDOWSrLrMr1r r rtest_read_nulls z FaultHandlerTests.test_read_nullcCs|ddddS)Nzs import faulthandler faulthandler.enable() faulthandler._sigsegv() rNSegmentation faultrLrQr r r test_sigsegvszFaultHandlerTests.test_sigsegvcCs|jddddddddS)Nz import faulthandler faulthandler.enable() faulthandler._fatal_error_c_thread() rNz in new threadFZfaulthandler_fatal_error_threadT)r<rJr=rTrQr r rtest_fatal_error_c_threadsz+FaultHandlerTests.test_fatal_error_c_threadcCs|ddddS)Nzs import faulthandler faulthandler.enable() faulthandler._sigabrt() rNZAbortedrTrQr r r test_sigabrtszFaultHandlerTests.test_sigabrtwin32z"SIGFPE cannot be caught on WindowscCs|ddddS)Nzr import faulthandler faulthandler.enable() faulthandler._sigfpe() rNzFloating point exceptionrTrQr r r test_sigfpeszFaultHandlerTests.test_sigfpezneed _testcapiSIGBUSzneed signal.SIGBUScCs|ddddS)Nz import faulthandler import signal faulthandler.enable() signal.raise_signal(signal.SIGBUS) z Bus errorrTrQr r r test_sigbusszFaultHandlerTests.test_sigbusSIGILLzneed signal.SIGILLcCs|ddddS)Nz import faulthandler import signal faulthandler.enable() signal.raise_signal(signal.SIGILL) r[zIllegal instructionrTrQr r r test_sigillszFaultHandlerTests.test_sigillcCs|jdddddddS)Nz[ import faulthandler faulthandler._fatal_error(b'xyz') xyzfaulthandler_fatal_error_pyTrJr=rTrQr r rtest_fatal_errors z"FaultHandlerTests.test_fatal_errorcCs|jdddddddS)Nza import faulthandler faulthandler._fatal_error(b'xyz', True) r_r`raTrbrTrQr r rtest_fatal_error_without_gils z.FaultHandlerTests.test_fatal_error_without_gilZopenbsdzVIssue #12868: sigaltstack() doesn't work on OpenBSD if Python is compiled with pthreadZ_stack_overflowz#need faulthandler._stack_overflow()cCs|jddddddS)Nzz import faulthandler faulthandler.enable() faulthandler._stack_overflow() rNz (?:Segmentation fault|Bus error)z unable to raise a stack overflow)r;rTrQr r rtest_stack_overflows z%FaultHandlerTests.test_stack_overflowcCs|ddddS)Nzw import faulthandler faulthandler.enable() faulthandler._sigsegv(True) rNrSrTrQr r rtest_gil_releasedsz#FaultHandlerTests.test_gil_releasedz0sanitizer builds change crashing process output.cCsHt.}|jdjt|ddd|dWdn1s:0YdS)Nz import faulthandler output = open({filename}, 'wb') faulthandler.enable(output) faulthandler._sigsegv() rrS)rrLrBreprr1rr r rtest_enable_filesz"FaultHandlerTests.test_enable_filez.subprocess doesn't support pass_fds on WindowscCsLtd.}|}|jd|dd|dWdn1s>0YdS)Nwb+z import faulthandler import sys faulthandler.enable(%s) faulthandler._sigsegv() rgrSr3)r TemporaryFilefilenorL)r1r8r3r r rtest_enable_fd!s z FaultHandlerTests.test_enable_fdcCs|jddddddS)Nz import faulthandler faulthandler.enable(all_threads=False) faulthandler._sigsegv() rNrSFr:rTrQr r rtest_enable_single_thread3s z+FaultHandlerTests.test_enable_single_threadcCsHd}d}||\}}d|}|||vd||f||ddS)Nz import faulthandler faulthandler.enable() faulthandler.disable() faulthandler._sigsegv() zFatal Python errorr z%r is present in %rr)r9rC assertTruerE)r1r2Z not_expectedr6r7r r r test_disable>s   zFaultHandlerTests.test_disablecCstj}zztjt_t}zFt|tt|tW|rVtqxtn|rntnt0W|t_n|t_0dSr) sysr6 __stderr__ faulthandler is_enabledenablerrdisableZ assertFalse)r1Z orig_stderrZ was_enabledr r rtest_is_enabledMs     z!FaultHandlerTests.test_is_enabledcCs0d}tjdd|f}t|}||ddS)N5import faulthandler; print(faulthandler.is_enabled())-ErFalse)rt executable subprocess check_outputr*rstrip)r1r2argsr5r r rtest_disabled_by_defaultcs z*FaultHandlerTests.test_disabled_by_defaultcCs`d}tdtjtjjrdndddd|f}tj}|ddt j ||d}| | d dS) Nr{r|rz-XrvrPYTHONFAULTHANDLERenvTrue) filterrtr~flagsignore_environmentr-environcopypoprrr*rr1r2rrr5r r rtest_sys_xoptionsks  z#FaultHandlerTests.test_sys_xoptionscCsd}tjd|f}ttj}d|d<d|d<tj||d}||dttj}d|d<d|d<tj||d}||d dS) Nr{rrrZ PYTHONDEVMODErr}1r) rtr~dictr-rrrr*rrr r r test_env_varws   zFaultHandlerTests.test_env_varrAcCsld}|j||d}|rd}n|dur*d}nd}dd|d d g}||||\}}|||||d dS) z Explicitly call dump_traceback() function and check its output. Raise an error if the output doesn't match the expected format. a[ import faulthandler filename = {filename!r} fd = {fd} def funcB(): if filename: with open(filename, "wb") as fp: faulthandler.dump_traceback(fp, all_threads=False) elif fd is not None: faulthandler.dump_traceback(fd, all_threads=False) else: faulthandler.dump_traceback(all_threads=False) def funcA(): funcB() funcA() rA N Stack (most recent call first):z# File "", line %s in funcBz# File "", line 17 in funcAz& File "", line 19 in rrBr9r*)r1rr3r2r>expectedtracer7r r rcheck_dump_tracebacks$ z&FaultHandlerTests.check_dump_tracebackcCs |dSr)rrQr r rtest_dump_tracebacksz%FaultHandlerTests.test_dump_tracebackcCs6t}|j|dWdn1s(0YdSNr)rrrir r rtest_dump_traceback_filesz*FaultHandlerTests.test_dump_traceback_filecCs>td }|j|dWdn1s00YdSNrkrl)rrmrrnr1r8r r rtest_dump_traceback_fds z(FaultHandlerTests.test_dump_traceback_fdcCsdd}d|d}d|d}d}|j|d}dd|d g}||\}}|||||d dS) Nix2z...z import faulthandler def {func_name}(): faulthandler.dump_traceback(all_threads=False) {func_name}() ) func_namerz File "", line 4 in %sz% File "", line 6 in rr)r1maxlenrZ truncatedr2rrr7r r r test_truncates   zFaultHandlerTests.test_truncatecCspd}|jt|d}|||\}}d|}|r8d}nd}d}t|j|d}|||||dd S) z Call explicitly dump_traceback(all_threads=True) and check the output. Raise an error if the output doesn't match the expected format. a import faulthandler from threading import Thread, Event import time def dump(): if {filename}: with open({filename}, "wb") as fp: faulthandler.dump_traceback(fp, all_threads=True) else: faulthandler.dump_traceback(all_threads=True) class Waiter(Thread): # avoid blocking if the main thread raises an exception. daemon = True def __init__(self): Thread.__init__(self) self.running = Event() self.stop = Event() def run(self): self.running.set() self.stop.wait() waiter = Waiter() waiter.start() waiter.running.wait() dump() waiter.stop.set() waiter.join() rr  a ^Thread 0x[0-9a-f]+ \(most recent call first\): (?: File ".*threading.py", line [0-9]+ in [_a-z]+ ){{1,3}} File "", line 23 in run File ".*threading.py", line [0-9]+ in _bootstrap_inner File ".*threading.py", line [0-9]+ in _bootstrap Current thread 0x[0-9a-f]+ \(most recent call first\): File "", line {lineno} in dump File "", line 28 in $ )r>rN)rBrhr9rCrr%rDr*)r1rr2r5r7r>rr r rcheck_dump_traceback_threadss    z.FaultHandlerTests.check_dump_traceback_threadscCs|ddSr)rrQr r rtest_dump_traceback_threadssz-FaultHandlerTests.test_dump_traceback_threadscCs4t}||Wdn1s&0YdSr)rrrir r r test_dump_traceback_threads_filesz2FaultHandlerTests.test_dump_traceback_threads_filer c Csttjtd}d}|jt|||||d}|||\}} d|}|s~|} |rX| d9} d|} tdd| | d } ||| n | |d | | d d S) a Check how many times the traceback is written in timeout x 2.5 seconds, or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending on repeat and cancel options. Raise an error if the output doesn't match the expect format. )Zsecondsa import faulthandler import time import sys timeout = {timeout} repeat = {repeat} cancel = {cancel} loops = {loops} filename = {filename!r} fd = {fd} def func(timeout, repeat, cancel, file, loops): for loop in range(loops): faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) if cancel: faulthandler.cancel_dump_traceback_later() time.sleep(timeout * 5) faulthandler.cancel_dump_traceback_later() if filename: file = open(filename, "wb") elif fd is not None: file = sys.stderr.fileno() else: file = None func(timeout, repeat, cancel, file, loops) if filename: file.close() )timeoutrepeatcancelloopsrr3r r_zATimeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n)rrrN) strdatetimeZ timedeltaTIMEOUTrBr9rCrrDr*) r1rrrrr3Z timeout_strr2rr7countrrr r rcheck_dump_traceback_laters*   z,FaultHandlerTests.check_dump_traceback_latercCs |dSrrrQr r rtest_dump_traceback_later\sz+FaultHandlerTests.test_dump_traceback_latercCs|jdddS)NT)rrrQr r r test_dump_traceback_later_repeat_sz2FaultHandlerTests.test_dump_traceback_later_repeatcCs|jdddS)NT)rrrQr r r test_dump_traceback_later_cancelbsz2FaultHandlerTests.test_dump_traceback_later_cancelcCs6t}|j|dWdn1s(0YdSr)rrrir r rtest_dump_traceback_later_fileesz0FaultHandlerTests.test_dump_traceback_later_filecCs>td }|j|dWdn1s00YdSr)rrmrrnrr r rtest_dump_traceback_later_fdis z.FaultHandlerTests.test_dump_traceback_later_fdcCs|jdddS)Nr_)rrrQr r rtest_dump_traceback_later_twiceosz1FaultHandlerTests.test_dump_traceback_later_twiceregisterzneed faulthandler.registerc Cstj}d}|j||||||d}|||\}} d|}|sf|rHd} nd} tdd| } ||| n ||d|r|| d n || d d S) a Register a handler displaying the traceback on a user signal. Raise the signal and check the written traceback. If chain is True, check that the previous signal handler is called. Raise an error if the output doesn't match the expected format. ax import faulthandler import os import signal import sys all_threads = {all_threads} signum = {signum} unregister = {unregister} chain = {chain} filename = {filename!r} fd = {fd} def func(signum): os.kill(os.getpid(), signum) def handler(signum, frame): handler.called = True handler.called = False if filename: file = open(filename, "wb") elif fd is not None: file = sys.stderr.fileno() else: file = None if chain: signal.signal(signum, handler) faulthandler.register(signum, file=file, all_threads=all_threads, chain={chain}) if unregister: faulthandler.unregister(signum) func(signum) if chain and not handler.called: if file is not None: output = file else: output = sys.stderr print("Error: signal handler not called!", file=output) exitcode = 1 else: exitcode = 0 if filename: file.close() sys.exit(exitcode) )r:signum unregisterchainrr3r z8Current thread 0x[0-9a-f]+ \(most recent call first\):\nz#Stack \(most recent call first\):\nr rrN) signalSIGUSR1rBr9rCrrDr*rE) r1rr:rrr3rr2rr7rr r rcheck_registerrs, .   z FaultHandlerTests.check_registercCs |dSrrrQr r r test_registerszFaultHandlerTests.test_registercCs|jdddS)NT)rrrQr r rtest_unregistersz!FaultHandlerTests.test_unregistercCs6t}|j|dWdn1s(0YdSr)rrrir r rtest_register_filesz$FaultHandlerTests.test_register_filecCs>td }|j|dWdn1s00YdSr)rrmrrnrr r rtest_register_fds z"FaultHandlerTests.test_register_fdcCs|jdddS)NTrprrQr r rtest_register_threadssz'FaultHandlerTests.test_register_threadscCs|jdddS)NT)rrrQr r rtest_register_chainsz%FaultHandlerTests.test_register_chainccsftj}zRdt_|t}dVWdn1s40Y|t|jdW|t_n|t_0dS)Nzsys.stderr is None)rtr6Z assertRaises RuntimeErrorr*r exception)r1r6cmr r rcheck_stderr_nones $z#FaultHandlerTests.check_stderr_nonecCs|tWdn1s&0Y|tWdn1sV0Y|tdWdn1s0Yttdr|ttjWdn1s0YdS)NgMbP?r) rrvrxZdump_tracebackZdump_traceback_laterhasattrrrrrQr r rtest_stderr_Nones & & (  z"FaultHandlerTests.test_stderr_Nonezspecific to WindowscCs(dD]\}}|d|dd|qdS)N))ZEXCEPTION_ACCESS_VIOLATIONrO)ZEXCEPTION_INT_DIVIDE_BY_ZEROzint divide by zero)ZEXCEPTION_STACK_OVERFLOWzstack overflowz import faulthandler faulthandler.enable() faulthandler._raise_exception(faulthandler._) rN)rM)r1excnamer r rtest_raise_exceptions z&FaultHandlerTests.test_raise_exceptioncCsHdD]>}d|d}t|}||\}}||g|||qdS)N)lcs@lRC@z import faulthandler faulthandler.enable() faulthandler._raise_exception(z) rr9r*)r1Zexc_coder2r5r7r r rtest_ignore_exceptions z'FaultHandlerTests.test_ignore_exceptioncCsFdD]<}|d|dd\}}||g||||d@fqdS)N)rixV4i@i@ipiz{ import faulthandler faulthandler.enable() faulthandler._raise_exception(0xrri)r9r*ZassertIn)r1rr5r7r r rtest_raise_nonfatal_exceptions  z/FaultHandlerTests.test_raise_nonfatal_exceptioncCs2td}||\}}||g||ddS)Nz import faulthandler faulthandler.enable() faulthandler.disable() code = faulthandler._EXCEPTION_ACCESS_VIOLATION faulthandler._raise_exception(code) lrr1r2r5r7r r r test_disable_windows_exc_handler.s z2FaultHandlerTests.test_disable_windows_exc_handlercCs2td}||\}}||g||ddS)Nz` import faulthandler faulthandler.cancel_dump_traceback_later() rrrr r r.test_cancel_later_without_dump_traceback_later;s z@FaultHandlerTests.test_cancel_later_without_dump_traceback_later)NN)N)FFr )FFFFN)F__name__ __module__ __qualname__r9rGrLrMrrrtplatform startswithrRrrUrVrWrY _testcapiZ skipUnlessrrr\r^rcrdrvrerf UB_SANITIZERMEMORY_SANITIZERrjrorqrsrzrrrrrrrrrrrrrrrrrrrrrrrrrrrrrPrrrrrr r r rr8s " $                  .  ; >   Q           r__main__)r )# contextlibrrrvr-rrrtZ sysconfigrrZ test.supportrrrrtextwraprr ImportErrorrrrPZget_config_varZ_cflagsZ _config_argsrrrrrZTestCaserrmainr r r rsN