a ze`@sddlZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl m Z ddl mZmZz ddlZWneydZYn0Gddde jZe ejdkdGdd d e jZe ejdkd Gd d d e jZGd dde jZe ejdkdGddde jZe eeddGddde jZe ejdkdGddde jZe ejdkdGddde jZGddde jZGddde jZ Gddde jZ!Gdd d e jZ"d!d"Z#e$d#kre %dS)$N)support)assert_python_ok spawn_pythonc@seZdZddZdS) GenericTestscCsttD]}tt|}|dvr.||tjq|dvrF||tjq|drj|dsj||tjq|dr||tj|t j dqdS)N>SIG_DFLSIG_IGN> SIG_BLOCK SIG_UNBLOCK SIG_SETMASKZSIGZSIG_ZCTRL_win32) dirsignalgetattrassertIsInstanceHandlersSigmasks startswithSignals assertEqualsysplatform)selfnamesigrD/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_signal.py test_enumss   zGenericTests.test_enumsN)__name__ __module__ __qualname__rrrrrrsrr zNot valid on Windowsc@sZeZdZddZddZddZddZd d Zd d Zd dZ e e j dddZdS) PosixTestscGsdSNrrargsrrrtrivial_signal_handler&sz!PosixTests.trivial_signal_handlercCs8|ttjd|ttjd|j|ttjddS)Ni) assertRaises ValueErrorr getsignalr$ strsignalrrrr,test_out_of_range_signal_number_raises_error)s  z7PosixTests.test_out_of_range_signal_number_raises_errorcCs|ttjtjddSr!)r% TypeErrorr SIGUSR1r)rrr0test_setting_signal_handler_to_none_raises_error1s z;PosixTests.test_setting_signal_handler_to_none_raises_errorcCsZttj|j}||tj|ttj|jttj||ttj|dSr!)r SIGHUPr$rrrr')rZhuprrrtest_getsignal5szPosixTests.test_getsignalcCs@|dttj|dttj|dttjdS)NZ InterruptZ TerminatedZHangup)assertInr r(SIGINTSIGTERMr.r)rrrtest_strsignal=szPosixTests.test_strsignalcCs&tjt}tj|d}t|dS)Nzsignalinterproctester.py)ospathdirname__file__joinr)rr6Zscriptrrrtest_interprocess_signalCs z#PosixTests.test_interprocess_signalcCsdt}||t|tjj||tjj||d||tj || t |tj dSNr) r valid_signalsrsetr0rr1SIGALRM assertNotInNSIG assertLesslenrsrrrtest_valid_signalsHs  zPosixTests.test_valid_signalssys.executable required.cCs<tjtjddgtjd}|d|j||jt j dS)z+KeyboardInterrupt triggers exit via SIGINT.-czaimport os, signal, time os.kill(os.getpid(), signal.SIGINT) for _ in range(999): time.sleep(0.01)stderrKeyboardInterruptN) subprocessrunr executablePIPEr0rHr returncoder r1)rprocessrrr!test_keyboard_interrupt_exit_codeQsz,PosixTests.test_keyboard_interrupt_exit_codeN)rrrr$r*r-r/r3r9rDunittest skipUnlessrrLrPrrrrr $s r zWindows specificc@s2eZdZddZddZeejdddZ dS) WindowsSignalTestscCsdt}||t|t|d|tjj|| d|| tj || t|tj dS)Nr) r r;rr<ZassertGreaterEqualrAr0rr1r>r?r@rBrrrrDes  z%WindowsSignalTests.test_valid_signalscCsdd}t}tjtjtjtjtjtjtjfD]0}t |dur.t|t||| |q.| || t td|Wdn1s0Y| t td|Wdn1s0YdS)NcSsdSr!r)xyrrrpz3WindowsSignalTests.test_issue9324..)r<r SIGABRTZSIGBREAKSIGFPESIGILLr1SIGSEGVr2r'add assertTruer%r&)rhandlercheckedrrrrtest_issue9324ns     * z!WindowsSignalTests.test_issue9324rEcCs<tjtjddgtjd}|d|jd}||j|dS)z?KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.rFzraise KeyboardInterruptrGrIl:N) rJrKrrLrMr0rHrrN)rrOZSTATUS_CONTROL_C_EXITrrrrPs z4WindowsSignalTests.test_keyboard_interrupt_exit_codeN) rrrrDrcrQrRrrLrPrrrrrSbs  rSc@sNeZdZddZddZddZddZd d Ze e j d kd d dZ dS) WakeupFDTestscCst|ttjtjdWdn1s.0Y|tttjdWdn1sf0YdS)N)signumF)r%r+r set_wakeup_fdr1r)rrrtest_invalid_calls , zWakeupFDTests.test_invalid_callcCs t}|ttftj|dSr!)rZ make_bad_fdr%r&OSErrorr rf)rfdrrrtest_invalid_fds zWakeupFDTests.test_invalid_fdcCs0t}|}||ttftj|dSr!)socketfilenocloser%r&rhr rf)rsockrirrrtest_invalid_sockets  z!WakeupFDTests.test_invalid_socketcCst\}}|tj||tj|t\}}|tj||tj|ttdrrt|dt|dt||t|||td||tdddS)N set_blockingFrY) r4pipe addCleanuprmhasattrrpr rfr)rZr1Zw1Zr2Zw2rrrtest_set_wakeup_fd_results      z'WakeupFDTests.test_set_wakeup_fd_resultcCst}||j|d|}t}||j|d|}t||t|||td||tdddS)NFrY)rkrrrm setblockingrlr rfr)rZsock1fd1Zsock2fd2rrr test_set_wakeup_fd_socket_results     z.WakeupFDTests.test_set_wakeup_fd_socket_resultr ztests specific to POSIXcCst\}}|tj||tj|t|d|t}t|Wdn1s^0Y| t |j d|t|dt|tddS)NTz&the fd %s must be in non-blocking modeFrY) r4rqrrrmrpr%r&r rfrstr exception)rZrfdZwfdcmrrrtest_set_wakeup_fd_blockings   (   z)WakeupFDTests.test_set_wakeup_fd_blockingN) rrrrgrjrortrxrQskipIfrrr|rrrrrds rdc@steZdZeedudddddZeedudddZd d Zd d Z d dZ e e e ddddZdS)WakeupSignalTestsNneed _testcapiTorderedcGs&dttt|||}td|dS)Naif 1: import _testcapi import os import signal import struct signals = {!r} def handler(signum, frame): pass def check_signum(signals): data = os.read(read, len(signals)+1) raised = struct.unpack('%uB' % len(data), data) if not {!r}: raised = set(raised) signals = set(signals) if raised != signals: raise Exception("%r != %r" % (raised, signals)) {} signal.signal(signal.SIGALRM, handler) read, write = os.pipe() os.set_blocking(write, False) signal.set_wakeup_fd(write) test() check_signum(signals) os.close(read) os.close(write) rF)formattuplemapintr)rZ test_bodyrZsignalscoderrr check_wakeups "zWakeupSignalTests.check_wakeupc Cs|d}t\}}zFzt|dWnty4Yn 0|dWt|t|nt|t|0td|dS)Na&if 1: import _testcapi import errno import os import signal import sys from test.support import captured_stderr def handler(signum, frame): 1/0 signal.signal(signal.SIGALRM, handler) r, w = os.pipe() os.set_blocking(r, False) # Set wakeup_fd a read-only file descriptor to trigger the error signal.set_wakeup_fd(r) try: with captured_stderr() as err: signal.raise_signal(signal.SIGALRM) except ZeroDivisionError: # An ignored exception should have been printed out on stderr err = err.getvalue() if ('Exception ignored when trying to write to the signal wakeup fd' not in err): raise AssertionError(err) if ('OSError: [Errno %d]' % errno.EBADF) not in err: raise AssertionError(err) else: raise AssertionError("ZeroDivisionError not raised") os.close(r) os.close(w) xz9OS doesn't report write() error on the read end of a piperF)r4rqwriterhskipTestrmr)rrrwrrrtest_wakeup_write_errors"      z)WakeupSignalTests.test_wakeup_write_errorcCs|dtjdS)Nadef test(): import select import time TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 class InterruptSelect(Exception): pass def handler(signum, frame): raise InterruptSelect signal.signal(signal.SIGALRM, handler) signal.alarm(1) # We attempt to get a signal during the sleep, # before select is called try: select.select([], [], [], TIMEOUT_FULL) except InterruptSelect: pass else: raise Exception("select() was not interrupted") before_time = time.monotonic() select.select([read], [], [], TIMEOUT_FULL) after_time = time.monotonic() dt = after_time - before_time if dt >= TIMEOUT_HALF: raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) rr r=r)rrrtest_wakeup_fd_early<sz&WakeupSignalTests.test_wakeup_fd_earlycCs|dtjdS)Na`def test(): import select import time TIMEOUT_FULL = 10 TIMEOUT_HALF = 5 class InterruptSelect(Exception): pass def handler(signum, frame): raise InterruptSelect signal.signal(signal.SIGALRM, handler) signal.alarm(1) before_time = time.monotonic() # We attempt to get a signal during the select call try: select.select([read], [], [], TIMEOUT_FULL) except InterruptSelect: pass else: raise Exception("select() was not interrupted") after_time = time.monotonic() dt = after_time - before_time if dt >= TIMEOUT_HALF: raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) rr)rrrtest_wakeup_fd_during^sz'WakeupSignalTests.test_wakeup_fd_duringcCs|dtjtjdS)Nzdef test(): signal.signal(signal.SIGUSR1, handler) signal.raise_signal(signal.SIGUSR1) signal.raise_signal(signal.SIGALRM) )rr r,r=r)rrr test_signum|szWakeupSignalTests.test_signumpthread_sigmaskneed signal.pthread_sigmask()cCs|jdtjtjdddS)Nadef test(): signum1 = signal.SIGUSR1 signum2 = signal.SIGUSR2 signal.signal(signum1, handler) signal.signal(signum2, handler) signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) signal.raise_signal(signum1) signal.raise_signal(signum2) # Unblocking the 2 signals calls the C signal handler twice signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) Fr)rr r,SIGUSR2r)rrr test_pendings zWakeupSignalTests.test_pending)rrrrQr} _testcapirrrrrrRrsr rrrrrr~s& 3" r~ socketpairzneed socket.socketpairc@sTeZdZeedudddZeedudddZeedudddZdS) WakeupSocketSignalTestsNrcCsd}td|dS)Naif 1: import signal import socket import struct import _testcapi signum = signal.SIGINT signals = (signum,) def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() write.setblocking(False) signal.set_wakeup_fd(write.fileno()) signal.raise_signal(signum) data = read.recv(1) if not data: raise Exception("no signum written") raised = struct.unpack('B', data) if raised != signals: raise Exception("%r != %r" % (raised, signals)) read.close() write.close() rFrrrrrr test_socketsz#WakeupSocketSignalTests.test_socketcCs.tjdkrd}nd}dj|d}td|dS)Nntsendra+if 1: import errno import signal import socket import sys import time import _testcapi from test.support import captured_stderr signum = signal.SIGINT def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() read.setblocking(False) write.setblocking(False) signal.set_wakeup_fd(write.fileno()) # Close sockets: send() will fail read.close() write.close() with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if ('Exception ignored when trying to {action} to the signal wakeup fd' not in err): raise AssertionError(err) actionrFr4rrrrrrrrrtest_send_errors !"z'WakeupSocketSignalTests.test_send_errorcCs.tjdkrd}nd}dj|d}td|dS)Nrrra if 1: import errno import signal import socket import sys import time import _testcapi from test.support import captured_stderr signum = signal.SIGINT # This handler will be called, but we intentionally won't read from # the wakeup fd. def handler(signum, frame): pass signal.signal(signum, handler) read, write = socket.socketpair() # Fill the socketpair buffer if sys.platform == 'win32': # bpo-34130: On Windows, sometimes non-blocking send fails to fill # the full socketpair buffer, so use a timeout of 50 ms instead. write.settimeout(0.050) else: write.setblocking(False) # Start with large chunk size to reduce the # number of send needed to fill the buffer. written = 0 for chunk_size in (2 ** 16, 2 ** 8, 1): chunk = b"x" * chunk_size try: while True: write.send(chunk) written += chunk_size except (BlockingIOError, socket.timeout): pass print(f"%s bytes written into the socketpair" % written, flush=True) write.setblocking(False) try: write.send(b"x") except BlockingIOError: # The socketpair buffer seems full pass else: raise AssertionError("%s bytes failed to fill the socketpair " "buffer" % written) # By default, we get a warning when a signal arrives msg = ('Exception ignored when trying to {action} ' 'to the signal wakeup fd') signal.set_wakeup_fd(write.fileno()) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("first set_wakeup_fd() test failed, " "stderr: %r" % err) # And also if warn_on_full_buffer=True signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " "test failed, stderr: %r" % err) # But not if warn_on_full_buffer=False signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if err != "": raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " "test failed, stderr: %r" % err) # And then check the default again, to make sure warn_on_full_buffer # settings don't leak across calls. signal.set_wakeup_fd(write.fileno()) with captured_stderr() as err: signal.raise_signal(signum) err = err.getvalue() if msg not in err: raise AssertionError("second set_wakeup_fd() test failed, " "stderr: %r" % err) rrFrrrrrtest_warn_on_full_buffers cdz0WakeupSocketSignalTests.test_warn_on_full_buffer) rrrrQr}rrrrrrrrrs  # *rc@s,eZdZddZddZddZddZd S) SiginterruptTestc Csd|f}td|}z |j}|jtjd\}}Wn*tjy`|YWddS0||}| }|dvrt d||f|dkWdSWdn1s0YdS) zPerform a read during which a signal will arrive. Return True if the read is interrupted by the signal and raises an exception. Return False if it returns normally. aif 1: import errno import os import signal import sys interrupt = %r r, w = os.pipe() def handler(signum, frame): 1 / 0 signal.signal(signal.SIGALRM, handler) if interrupt is not None: signal.siginterrupt(signal.SIGALRM, interrupt) print("ready") sys.stdout.flush() # run the test twice try: for loop in range(2): # send a SIGALRM in a second (during the read) signal.alarm(1) try: # blocking call: read from a pipe without data os.read(r, 1) except ZeroDivisionError: pass else: sys.exit(2) sys.exit(3) finally: os.close(r) os.close(w) rF)timeoutNF)zChild error (exit code %s): %rr) rstdoutreadline communicater SHORT_TIMEOUTrJTimeoutExpiredkillwait Exception)rZ interruptrrOZ first_linerrHexitcoderrrreadpipe_interruptedXs"#$  z%SiginterruptTest.readpipe_interruptedcCs|d}||dSr!rr`rZ interruptedrrrtest_without_siginterrupts z*SiginterruptTest.test_without_siginterruptcCs|d}||dSNTrrrrrtest_siginterrupt_ons z%SiginterruptTest.test_siginterrupt_oncCs|d}||dS)NF)rZ assertFalserrrrtest_siginterrupt_offs z&SiginterruptTest.test_siginterrupt_offN)rrrrrrrrrrrrUs<rc@sneZdZddZddZddZddZd d Zd d Zd dZ e e j dvdddZddZddZdS) ItimerTestcCs(d|_d|_d|_ttj|j|_dS)NFr) hndl_called hndl_countitimerr r=sig_alrm old_alarmr)rrrsetUpszItimerTest.setUpcCs,ttj|j|jdur(t|jddSr:)r r=rr setitimerr)rrrtearDowns zItimerTest.tearDowncGs d|_dSr)rr"rrrrszItimerTest.sig_alrmcGsFd|_|jdkrtdn|jdkr4ttjd|jd7_dS)NTrz.setitimer didn't disable ITIMER_VIRTUAL timer.r)rrr ItimerErrorrITIMER_VIRTUALr"rrr sig_vtalrms    zItimerTest.sig_vtalrmcGsd|_ttjddS)NTr)rr r ITIMER_PROFr"rrrsig_profszItimerTest.sig_profcCs|tjtjdddS)NrYr)r%r rrr)rrrtest_itimer_excszItimerTest.test_itimer_exccCs0tj|_t|jdt||jddS)Ng?T)r ITIMER_REALrrpauserrr)rrrtest_itimer_realszItimerTest.test_itimer_real)Znetbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.cCstj|_ttj|jt|jddt}t|dkr`tddd}t |jdkr0qjq0| d| t |jd| |j d dS) Ng333333?皙?N@902 铖r8timeout: likely cause: machine too slow or load too highT) r rr SIGVTALRMrrtime monotonicpow getitimerrrrrZ start_time_rrrtest_itimer_virtuals  zItimerTest.test_itimer_virtualcCstj|_ttj|jt|jddt}t|dkr`tddd}t |jdkr0qjq0| d| t |jd| |j ddS) NrrrrrrrT) r rrSIGPROFrrrrrrrrrrrrrtest_itimer_profs  zItimerTest.test_itimer_profcCs2tj|_t|jdtd||jddS)Nư>rT)r rrrrsleeprrr)rrrtest_setitimer_tinys zItimerTest.test_setitimer_tinyN)rrrrrrrrrrrQr}rrrrrrrrrrs    rc@seZdZdZeeeddddZeeeddeeedddd Z eeed d d d Z eeeddddZ eeeddddZ eeeddddZ eeeddddZeeeddddZeeeddddZeeeddd d!Zeeeddeeeddd"d#Zeeeddd$d%Zeeeddd&d'Zeeeddd(d)Zeeed d d*d+Zd,S)-PendingSignalsTestsz[ Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() functions. sigpendingzneed signal.sigpending()cCs|ttdSr!)rr rr<r)rrrtest_sigpending_emptysz)PendingSignalsTests.test_sigpending_emptyrrcCsd}td|dS)Na if 1: import os import signal def handler(signum, frame): 1/0 signum = signal.SIGUSR1 signal.signal(signum, handler) signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) os.kill(os.getpid(), signum) pending = signal.sigpending() for sig in pending: assert isinstance(sig, signal.Signals), repr(pending) if pending != {signum}: raise Exception('%s != {%s}' % (pending, signum)) try: signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") rFrrrrrtest_sigpendingsz#PendingSignalsTests.test_sigpending pthread_killzneed signal.pthread_kill()cCsd}td|dS)Naif 1: import signal import threading import sys signum = signal.SIGUSR1 def handler(signum, frame): 1/0 signal.signal(signum, handler) tid = threading.get_ident() try: signal.pthread_kill(tid, signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") rFrrrrrtest_pthread_kill9sz%PendingSignalsTests.test_pthread_killcCsd||f}td|dS)zo test: body of the "def test(signum):" function. blocked: number of the blocked signal awif 1: import signal import sys from signal import Signals def handler(signum, frame): 1/0 %s blocked = %s signum = signal.SIGALRM # child: block and wait the signal try: signal.signal(signum, handler) signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) # Do the tests test(signum) # The handler must not be called on unblock try: signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) except ZeroDivisionError: print("the signal handler has been called", file=sys.stderr) sys.exit(1) except BaseException as err: print("error: {}".format(err), file=sys.stderr) sys.stderr.flush() sys.exit(1) rFN)stripr)rZblockedtestrrrr wait_helperRs %zPendingSignalsTests.wait_helpersigwaitzneed signal.sigwait()cCs|tjddS)Na  def test(signum): signal.alarm(1) received = signal.sigwait([signum]) assert isinstance(received, signal.Signals), received if received != signum: raise Exception('received %s, not %s' % (received, signum)) rr r=r)rrr test_sigwaitsz PendingSignalsTests.test_sigwait sigwaitinfozneed signal.sigwaitinfo()cCs|tjddS)Nz def test(signum): signal.alarm(1) info = signal.sigwaitinfo([signum]) if info.si_signo != signum: raise Exception("info.si_signo != %s" % signum) rr)rrrtest_sigwaitinfosz$PendingSignalsTests.test_sigwaitinfo sigtimedwaitzneed signal.sigtimedwait()cCs|tjddS)Nz def test(signum): signal.alarm(1) info = signal.sigtimedwait([signum], 10.1000) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) rr)rrrtest_sigtimedwaitsz%PendingSignalsTests.test_sigtimedwaitcCs|tjddS)Nz def test(signum): import os os.kill(os.getpid(), signum) info = signal.sigtimedwait([signum], 0) if info.si_signo != signum: raise Exception('info.si_signo != %s' % signum) rr)rrrtest_sigtimedwait_pollsz*PendingSignalsTests.test_sigtimedwait_pollcCs|tjddS)Nz def test(signum): received = signal.sigtimedwait([signum], 1.0) if received is not None: raise Exception("received=%r" % (received,)) rr)rrrtest_sigtimedwait_timeoutsz-PendingSignalsTests.test_sigtimedwait_timeoutcCstj}|ttj|gddS)Ng)r r=r%r&r)rrerrr"test_sigtimedwait_negative_timeoutsz6PendingSignalsTests.test_sigtimedwait_negative_timeoutcCstdddS)NrFaif True: import os, threading, sys, time, signal # the default handler terminates the process signum = signal.SIGUSR1 def kill_later(): # wait until the main thread is waiting in sigwait() time.sleep(1) os.kill(os.getpid(), signum) # the signal must be blocked by all the threads signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) killer = threading.Thread(target=kill_later) killer.start() received = signal.sigwait([signum]) if received != signum: print("sigwait() received %s, not %s" % (received, signum), file=sys.stderr) sys.exit(1) killer.join() # unblock the signal, which should have been cleared by sigwait() signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) rr)rrrtest_sigwait_threads z'PendingSignalsTests.test_sigwait_threadcCs|ttj|ttjd|ttjddd|ttjdg|t"ttjtjgWdn1sv0Y|t ttjdgWdn1s0Y|t$ttjdd>gWdn1s0YdS)Nrrriri)r%r+r rrhr&rr?r)rrrtest_pthread_sigmask_argumentss 0 . z2PendingSignalsTests.test_pthread_sigmask_argumentscCsJttjt}|tjtj|ttjt}||tdSr!)r rrr;rrr r ZassertLessEqualrBrrr"test_pthread_sigmask_valid_signalssz6PendingSignalsTests.test_pthread_sigmask_valid_signalscCsd}td|dS)Na- if 1: import signal import os; import threading def handler(signum, frame): 1/0 def kill(signum): os.kill(os.getpid(), signum) def check_mask(mask): for sig in mask: assert isinstance(sig, signal.Signals), repr(sig) def read_sigmask(): sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) check_mask(sigmask) return sigmask signum = signal.SIGUSR1 # Install our signal handler old_handler = signal.signal(signum, handler) # Unblock SIGUSR1 (and copy the old mask) to test our signal handler old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) check_mask(old_mask) try: kill(signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") # Block and then raise SIGUSR1. The signal is blocked: the signal # handler is not called, and the signal is now pending mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) check_mask(mask) kill(signum) # Check the new mask blocked = read_sigmask() check_mask(blocked) if signum not in blocked: raise Exception("%s not in %s" % (signum, blocked)) if old_mask ^ blocked != {signum}: raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) # Unblock SIGUSR1 try: # unblock the pending signal calls immediately the signal handler signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") try: kill(signum) except ZeroDivisionError: pass else: raise Exception("ZeroDivisionError not raised") # Check the new mask unblocked = read_sigmask() if signum in unblocked: raise Exception("%s in %s" % (signum, unblocked)) if blocked ^ unblocked != {signum}: raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) if old_mask != unblocked: raise Exception("%s != %s" % (old_mask, unblocked)) rFrrrrrtest_pthread_sigmasksHz(PendingSignalsTests.test_pthread_sigmaskcCs^d}td|<}|\}}|}|dkrfddt tdD}t |}t j rtd |f|S) Ncs,tkr(tttjddS)Nr)rAappendr perf_counterr rrreframeNtimesrrrans z5StressTest.measure_itimer_resolution..handlerrgMbP?cs g|]}|d|qS)rr).0i)rrr |rXz8StressTest.measure_itimer_resolution..rz,detected median itimer() resolution: %.6f s.)NN)rrr rrrr=rArrrange statisticsZmedianrverboseprint)rraZ durationsZmedrrrmeasure_itimer_resolutionjs   z$StressTest.measure_itimer_resolutioncCs4|}|dkrdS|dkr dS|d|fdS)Ng-C6?i'g{Gz?dz^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r r)rZresorrrdecide_itimer_countszStressTest.decide_itimer_countrztest needs setitimer()cs|}gdd}d fdd }|tj||tj||tj|d}ttj }||krt t tj|d7}t |krt|krtdqt t tj|d7}t |kr^t|kr^tdqq^|t |d dS) z; This test uses dependent signal handlers. cSsttjdtddS)Nrh㈵>)r rrrandomrrrr first_handlersz@StressTest.test_stress_delivery_dependent..first_handlerNcs|dSr!rrZsigsrrsecond_handlerszAStressTest.test_stress_delivery_dependent..second_handlerrrrSome signals were lost)NN)r rr rr,r=rrrrr4rgetpidrArr)rrrr expected_sigsdeadlinerrrtest_stress_delivery_dependents& z)StressTest.test_stress_delivery_dependentcs|}gfdd}|tj||tj|d}ttj}||krt tj dt dt t tj|d7}t|krFt|krFtdqqF|t|ddS) z> This test uses simultaneous signal handlers. cs|dSr!rrrrrrasz=StressTest.test_stress_delivery_simultaneous..handlerrrrrrN)r rr r,r=rrrrrrrr4rrrArr)rrrarrrrr!test_stress_delivery_simultaneouss z,StressTest.test_stress_delivery_simultaneousr,ztest needs SIGUSR1cs&tjdddfddfdd}fdd}t}|tj|tj|d }zd}td}||d ||j dur| |j j t | d d t|j j d }Wdn1s0Y|s|d|Wd |nd |0dS) NrFcs d7dSNrrr)num_received_signalsrrcustom_handlerszAStressTest.test_stress_modifying_handlers..custom_handlercsstd7qdSr)r raise_signalr)do_stopnum_sent_signalsrerrset_interruptss zAStressTest.test_stress_modifying_handlers..set_interruptscs8dkr4tdD] }tjfD]}t|qqqdS)Nr i N)rr r)rra)rrrerrcycle_handlerss zAStressTest.test_stress_modifying_handlers..cycle_handlers)targetTzSignal z ignored due to race condition)r r,rr threadingThreadrZcatch_unraisable_exceptionstartr8Z unraisabler exc_valuerhr0ryZ assertGreaterr@)rr r!rtZignoredr{r)rrrrrertest_stress_modifying_handlerss>       "  z)StressTest.test_stress_modifying_handlersN)rrrrrr r rQrRrsr rrr(rrrrr_s   ,   rc@s6eZdZddZeejdkdddZddZ d S) RaiseSignalTestcCs:|tttjWdn1s,0YdSr!)r%KeyboardInterruptr rr1r)rrr test_sigints zRaiseSignalTest.test_sigintr zWindows specific testc CsVzd}t||dWn4tyP}z|jtjkr:nWYd}~n d}~00dS)Nrz#OSError (Invalid argument) expected)r rZfailrherrnoEINVAL)rr.errrtest_invalid_argument s  z%RaiseSignalTest.test_invalid_argumentcsJdfdd}ttj|}|tjtj|ttj|dS)NFcsddSrr)abZis_okrrra.sz-RaiseSignalTest.test_handler..handler)r r1rrrr`)rraZ old_signalrr2r test_handler,s   zRaiseSignalTest.test_handlerN) rrrr+rQr}rrr/r3rrrrr)s r)c@s&eZdZeeeddddZdS)PidfdSignalTestpidfd_send_signalzpidfd support not built incCs |t}tdtjWdn1s.0Y|jjtjkrR|dn|jjtj krj|d| |jjtj t dt t j}|t j||td$t|tjtdWdn1s0Y|tt|tjWdn1s0YdS)Nrzkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)r%rhr r5r1rzr,ZENOSYSrEPERMrEBADFr4openr O_DIRECTORYrrrmZassertRaisesRegexr+objectr*)rr{Zmy_pidfdrrrtest_pidfd_send_signal:s ,  2 z&PidfdSignalTest.test_pidfd_send_signalN)rrrrQrRrsr r;rrrrr48s r4cCs tdSr!)r reap_childrenrrrrtearDownModuleMsr=__main__)&r,r4rr rkrrJrr#rrQrrZtest.support.script_helperrrr ImportErrorZTestCaserr}rr rRrSrdr~rsrrrrrr)r4r=rmainrrrrsT    =/M6@TeQ<