a ze+@sVdZddlZddlmZmZmZmZddlmZm Z ddl Z ddl Z ddl Z ddl Z ddlZddlZddlZddlZddlZddlZddlZddlZddlmZddlmZdZGdd d eZGd d d e jZGd d d ejZGdddeZGdddeZ GdddeZ!GdddeZ"Gddde jZ#GdddeZ$GdddeZ%Gdddej&Z&Gdddej'Z(e)e j*dud Gd!d"d"ej'Z+Gd#d$d$ej,Z,Gd%d&d&ej'Z-Gd'd(d(ej.Z.Gd)d*d*ej/Z/Gd+d,d,ej0Z0Gd-d.d.ej1Z1Gd/d0d0ejZ2Gd1d2d2ejZ3Gd3d4d4ejZ4e5d5krRe6dS)6z! Tests for the threading module. N)verbose import_module cpython_onlyunlink)assert_python_okassert_python_failure) lock_tests)support)Znetbsd5zhp-ux11c@s,eZdZddZddZddZddZd S) CountercCs d|_dS)NrvalueselfrG/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_threading.py__init__#szCounter.__init__cCs|jd7_dSNr r rrrinc%sz Counter.inccCs|jd8_dSrr r rrrdec'sz Counter.deccCs|jSNr r rrrget)sz Counter.getN)__name__ __module__ __qualname__rrrrrrrrr "sr c@seZdZddZddZdS) TestThreadcCs,tjj||d||_||_||_||_dS)Nname) threadingThreadrtestcasesemamutexnrunning)rrr r!r"r#rrrr-s zTestThread.__init__c Cs&td}tr&td|j|df|j|jB|jtrTt|jd|j |jdWdn1s|0Yt |trtd|jd|jJ|j |j |jdtrtd |j|jfWdn1s0YWdn1s0YdS) Ng@ztask %s will run for %.1f usecg.Aztasks are runningZtaskdonerz$%s is finished. %d tasks are running)randomrprintrr!r"r#rrr ZassertLessEqualtimesleeprZassertGreaterEqual)rdelayrrrrun4s*   2  zTestThread.runN)rrrrr+rrrrr,src@seZdZddZddZdS) BaseTestCasecCstj|_dSr)testr Zthreading_setup_threadsr rrrsetUpNszBaseTestCase.setUpcCstjj|jtjdSr)r-r Zthreading_cleanupr. reap_childrenr rrrtearDownQszBaseTestCase.tearDownN)rrrr/r1rrrrr,Msr,c@seZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZddZeeedd d!d"Zeeedd#d$d%Zeeedd d&d'Zd(d)Zeeedd*eeed+d,d-d.Zeejevd/eeedd*eeed+d,d0d1Zd2d3Z d4d5Z!d6d7Z"d8d9Z#d:d;Z$e%dd?Z'd@dAZ(dBdCZ)dDdEZ*dFS)G ThreadTestsc Cs0d}tjdd}t}t}g}t|D]F}td|||||}||||j| t |d| q*t tdrt dd|DthB}|d||t||d trtd |D]D}|||||jd ||j| t |d qtrtd ||d dS)N r$r z z^$ get_native_idcss|] }|jVqdSr) native_id).0trrr nz/ThreadTests.test_various_ops..rz!waiting for all tasks to completerz#^$zall tasks done)rBoundedSemaphoreRLockr rangerappend assertIsNoneident assertRegexreprstarthasattrsetr4 assertNotIn assertEquallenrr'join assertFalseis_aliveassertNotEqualassertIsNotNoner) rZNUMTASKSr!r"Z numrunningthreadsir7Z native_idsrrrtest_various_opsZs4        zThreadTests.test_various_opscs|tjfdd}tgt4t|d} | d|Wdn1sl0Ytj d=dS)NcstjdSr)r=r currentThreadr?rDrr%r?rrfsz9ThreadTests.test_ident_of_no_threading_threads..frr) rLrrPr?Eventr wait_threads_exit_threadstart_new_threadwaitrF_active)rrRtidrrQr"test_ident_of_no_threading_threads~s  .z.ThreadTests.test_ident_of_no_threading_threadscCsPtr tdztdWntjy8tdYn0|tddS)Nz!with 256 KiB thread stack size...i4platform does not support changing thread stack sizer rr'r stack_sizerUerrorunittestZSkipTestrOr rrrtest_various_ops_small_stacks z(ThreadTests.test_various_ops_small_stackcCsPtr tdztdWntjy8tdYn0|tddS)Nzwith 1 MiB thread stack size...ir[rr\r rrrtest_various_ops_large_stacks z(ThreadTests.test_various_ops_large_stackcCsdd}t}|t&t||f}|Wdn1sL0Y||tj| tj|tj | tj| | ttj|dtj|=dS)NcSst|dSr)rcurrent_threadrelease)r"rrrrRsz*ThreadTests.test_foreign_thread..f _DummyThread)rLockacquirer rTrUrVassertInrXassertIsInstancerd assertTruerJr@rA)rrRr"rYrrrtest_foreign_threads &zThreadTests.test_foreign_threadc std}|jj}|j|jf|_Gdddt|}t}| |t | |dz|||}qdWnyzYn 0| dz| |dWntyYn0ttGfdddtj}|}d |_|trtd trtd |d |}| |dtr&td }||trFtd||jtr`td||j|}| |dtrtdjtjd||jtrtd|jr|dS)Nctypesc@s eZdZdS)z.AsyncExcN)rrrrrrrAsyncExcsrlrzAsyncExc not raisedrcseZdZfddZdS)z:ThreadTests.test_PyThreadState_SetAsyncExc..WorkercsNt|_d|_ztdqWn yHd|_Yn0dS)NFg?T)r get_identidfinishedrDr(r)r rlZworker_saw_exceptionZworker_startedrrr+s  z>ThreadTests.test_PyThreadState_SetAsyncExc..Worker.runNrrrr+rrprrWorkersrrTz started worker threadz trying nonsensical thread idz, waiting for worker thread to get startedz" verifying worker hasn't exitedz2 attempting to raise asynch exception in workerz5 waiting for worker to say it caught the exceptiontimeoutz all OK -- joining worker)rZ pythonapiZPyThreadState_SetAsyncExcZc_ulongZ py_objectargtypes ExceptionrrmrhintZ assertGreaterZfailrFUnboundLocalErrorrSrdaemonrBrr'rWrirIrornr SHORT_TIMEOUTrH) rrkZ set_async_exc exceptionrYresultrrr7retrrprtest_PyThreadState_SetAsyncExcsb               z*ThreadTests.test_PyThreadState_SetAsyncExccCs^dd}tj}|t_z.fail_new_threadcSsdSrrrrrrr9z0ThreadTests.test_limbo_cleanup..targetz:Failed to cleanup _limbo map on failure of Thread.start().)r_start_new_threadr assertRaisesrrBrI_limbo)rrrr7rrrtest_limbo_cleanupszThreadTests.test_limbo_cleanupcCs(tdtdd\}}}||ddS)Nrk-caNif 1: import ctypes, sys, time, _thread # This lock is used as a simple event variable. ready = _thread.allocate_lock() ready.acquire() # Module globals are cleared before __del__ is run # So we save the functions in class dict class C: ensure = ctypes.pythonapi.PyGILState_Ensure release = ctypes.pythonapi.PyGILState_Release def __del__(self): state = self.ensure() self.release(state) def waitingThread(): x = C() ready.release() time.sleep(100) _thread.start_new_thread(waitingThread, ()) ready.acquire() # Be sure the other thread is waiting. sys.exit(42) *)rrrFrrcouterrrrrtest_finalize_running_thread#sz(ThreadTests.test_finalize_running_threadcCstdddS)NraPif 1: import sys, threading # A deadlock-killer, to prevent the # testsuite to hang forever def killer(): import os, time time.sleep(2) print('program blocked; aborting') os._exit(2) t = threading.Thread(target=killer) t.daemon = True t.start() # This is the trace function def func(frame, event, arg): threading.current_thread() return func sys.settrace(func) )rr rrrtest_finalize_with_traceDsz$ThreadTests.test_finalize_with_tracecCs0tdd\}}}||d||ddS)Nraif 1: import threading from time import sleep def child(): sleep(1) # As a non-daemon thread we SHOULD wake up and nothing # should be torn down yet print("Woke up, sleep function is:", sleep) threading.Thread(target=child).start() raise SystemExit s5Woke up, sleep function is: r9)rrFstriprrrrtest_join_nondaemon_on_shutdown]s  z+ThreadTests.test_join_nondaemon_on_shutdownc Cstj}t}zhtddD]N}t|dtjddd}|||}| ||d||fqWt|n t|0dS)Nrdg-C6*?cSsdSrrrrrrryr9z7ThreadTests.test_enumerate_after_join..rz� triggered after %d trials: %s) r enumeratesysgetswitchintervalr<setswitchintervalrrBrHrE)renum old_intervalrNr7lrrrtest_enumerate_after_joinqs z%ThreadTests.test_enumerate_after_joincCsGdddt}|dd}t|}|j~|j|dt|d|dd}t|}|j~|j|dt|ddS)Nc@seZdZddZddZdS)zDThreadTests.test_no_refcycle_through_target..RunSelfFunctioncSs.||_tj|j|fd|id|_|jdS)N yet_another)rrkwargs) should_raiserr_runthreadrB)rrrrrrs zMThreadTests.test_no_refcycle_through_target..RunSelfFunction.__init__cSs|jr tdSr)r SystemExit)rZ other_refrrrrrszIThreadTests.test_no_refcycle_through_target..RunSelfFunction._runN)rrrrrrrrrRunSelfFunctions rF)rz%d references still around)msgT)objectweakrefrefrrHr>r getrefcount)rrZ cyclic_objectZweak_cyclic_objectZraising_cyclic_objectZweak_raising_cyclic_objectrrrtest_no_refcycle_through_targets&        z+ThreadTests.test_no_refcycle_through_targetcCsHt}||d||dt}|tdS)NTr) rrisDaemon setDaemongetNamesetNamerSisSet activeCount)rr7errrtest_old_threading_apis  z"ThreadTests.test_old_threading_apicCs2t}|dt|d|_|dt|dSNrzT)rrrErArzrgrr7rrrtest_repr_daemonszThreadTests.test_repr_daemoncCsHt}||jtjdd}||jtjdd}||jdS)NFrzT)rrrIrzrirrrrtest_daemon_params     zThreadTests.test_daemon_paramforkneeds os.fork()cCs:td}td|\}}}||d||ddS)Na import atexit import os import sys from test.support import wait_process # Import the threading module to register its "at fork" callback import threading def exit_handler(): pid = os.fork() if not pid: print("child process ok", file=sys.stderr, flush=True) # child process sys.exit() else: wait_process(pid, exitcode=0) # exit_handler() will be called after threading._shutdown() atexit.register(exit_handler) rr9schild process ok)textwrapdedentrrFrstriprcode_rrrrrtest_fork_at_exits  zThreadTests.test_fork_at_exitztest needs fork()cCs0d}td|\}}}||d||ddS)Naif 1: import _thread, threading, os, time def background_thread(evt): # Creates and registers the _DummyThread instance threading.current_thread() evt.set() time.sleep(10) evt = threading.Event() _thread.start_new_thread(background_thread, (evt,)) evt.wait() assert threading.active_count() == 2, threading.active_count() if os.fork() == 0: assert threading.active_count() == 1, threading.active_count() os._exit(0) else: os.wait() rr9rrFrrrrtest_dummy_thread_after_forks z(ThreadTests.test_dummy_thread_after_forkcCst}|tj|tjdtdD]Z}tjddd}| t }|dkrnt | rfdndq*|tj|dd q*dS) Ngư>cSsdSrrrrrrrr9z6ThreadTests.test_is_alive_after_fork..rr r3exitcode)rr addCleanuprr-r r<rrrBosr_exitrJrH wait_process)rrrNr7pidrrrtest_is_alive_after_forks  z$ThreadTests.test_is_alive_after_forkcsht}|jd|jtj|jtfdd}tj|d}|| dS)N MainThreadcstjtjdSr)rKr main_threadr?rbrr rrrRs z'ThreadTests.test_main_thread..fr) rrrFrr?rbrmrrBrH)rmainrRthrr rtest_main_threads  zThreadTests.test_main_threadztest needs os.fork()waitpidztest needs os.waitpid()cCs@d}td|\}}}|dd}||d||ddS)Naif 1: import os, threading from test import support pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) else: support.wait_process(pid, exitcode=0) r r9zMainThread True True rdecodereplacerFrrrrrdatarrrtest_main_thread_after_forks   z'ThreadTests.test_main_thread_after_forkdue to known OS bugcCs@d}td|\}}}|dd}||d||ddS)Naif 1: import os, threading, sys from test import support def f(): pid = os.fork() if pid == 0: main = threading.main_thread() print(main.name) print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) # stdout is fully buffered because not a tty, # we have to flush before exit. sys.stdout.flush() else: support.wait_process(pid, exitcode=0) th = threading.Thread(target=f) th.start() th.join() rrrr9zThread-1 True True rrrrr/test_main_thread_after_fork_from_nonmain_thread*s  z;ThreadTests.test_main_thread_after_fork_from_nonmain_threadcCsBd}td|\}}}|}||d||dgddS)Naif 1: import gc, threading main_thread = threading.current_thread() assert main_thread is threading.main_thread() # sanity check class RefCycle: def __init__(self): self.cycle = self def __del__(self): print("GC:", threading.current_thread() is main_thread, threading.main_thread() is main_thread, threading.enumerate() == [main_thread]) RefCycle() gc.collect() # sanity check x = RefCycle() rr9zGC: True True True)rrrF splitlinesrrrr test_main_thread_during_shutdownHs  z,ThreadTests.test_main_thread_during_shutdowncCs$d}td|\}}}||ddS)Naif 1: import os import threading import time import random def random_sleep(): seconds = random.random() * 0.010 time.sleep(seconds) class Sleeper: def __del__(self): random_sleep() tls = threading.local() def f(): # Sleep a bit so that the thread is still running when # Py_Finalize() is called. random_sleep() tls.x = Sleeper() random_sleep() threading.Thread(target=f).start() random_sleep() rr9rrrrrrrrrtest_finalization_shutdownesz&ThreadTests.test_finalization_shutdowncsttfdd}tj|d}||jd||| |j}| |jddd ||jt j dd|| | | | ||j|dS)NcstddS)N{Gz?)rcrfr(r)rZfinishstartedrrrRsz'ThreadTests.test_tstate_lock..frrrtF)rU allocate_lockrfrrassertIs _tstate_lockrBrirJrIrcr r{r>rH)rrRr7 tstate_lockrrrtest_tstate_locks&  zThreadTests.test_tstate_lockcsttfdd}tj|d}||dt|d}t dD]}|t|vrqt dqn||t|| dS)NcsdSr)rcrfrrrrrRsz(ThreadTests.test_repr_stopped..frrstoppedir) rUrrfrrrBrgrArcr<r(r)rH)rrRr7Z LOOKING_FORrNrrrtest_repr_stoppeds"    zThreadTests.test_repr_stoppedcstddD]}t|fddt|D}|D] }|q2|D] }|qDfddt|D}|D] }|ql|D] }|q~|tjq dS)Nrr3csg|]}tjjdqSr)rrrfr6rbsrr sz;ThreadTests.test_BoundedSemaphore_limit..csg|]}tjjdqSr)rrrcrrrrrs)r<rr:rBrHr ValueErrorrc)rlimitrMr7rrrtest_BoundedSemaphore_limits"       z'ThreadTests.test_BoundedSemaphore_limitc sfddddfddd_t}tz>tddl}|tdD] }q`Wt|n t|0dS) NcsSrr)frameeventarg) noop_tracerrrsz9ThreadTests.test_frame_tstate_tracing..noop_tracecss dVqdS)N generatorrrrrrrsz8ThreadTests.test_frame_tstate_tracing..generatorcsjdur_tjSr)gennextr)callbackrrrrs z7ThreadTests.test_frame_tstate_tracing..callbackrr$)rrgettracesettracer _testcapiZcall_in_temporary_c_threadr<)rZ old_tracerr-r)rrrrtest_frame_tstate_tracings      z%ThreadTests.test_frame_tstate_tracingc CsdD]}|j|dvt}tj|j|d}||j}|sP||tjn| |tj| | | |tjWdq1s0YqdS)N)FTr)rrz) ZsubTestrrSrrWrBrrg_shutdown_locksrErDrH)rrzrrrrrrtest_shutdown_locksszThreadTests.test_shutdown_lockscCs$tdd\}}}||ddS)Nra(if 1: import threading class Atexit: def __del__(self): print("thread_dict.atexit = %r" % thread_dict.atexit) thread_dict = threading.local() thread_dict.atexit = "value" atexit = Atexit() sthread_dict.atexit = 'value')rrFrrrrrtest_locals_at_exits zThreadTests.test_locals_at_exitcCsDdd}t tj|dWdn1s60YdS)NcSsdSrrrrrrnoop,r9z0ThreadTests.test_leak_without_join..noopr)r rTrrrB)rrrrrtest_leak_without_join)s z"ThreadTests.test_leak_without_joincCs6td}td|\}}}||d||ddS)Na import _thread import sys event = _thread.allocate_lock() event.acquire() def import_threading(): import threading event.release() if 'threading' in sys.modules: raise Exception('threading is already imported') _thread.start_new_thread(import_threading, ()) # wait until the threading module is imported event.acquire() event.release() if 'threading' not in sys.modules: raise Exception('threading is not imported') # don't wait until the thread completes rr9)rrrrFrrrrtest_import_from_another_thread1s  z+ThreadTests.test_import_from_another_threadN)+rrrrOrZr`rarjrrrrrrrrrrr_ skipUnlessrCrrrrrrskipIfrplatformplatforms_to_skiprrrrrrrrrrrrrrrrr2VsR$  X!      ## ' r2c@seZdZddZddZeeedde e j e vddd Z eeedde e j e vdd d Ze e j e vdd d Zeeedde e j e vdddZeeeddddZdS)ThreadJoinOnShutdowncCs8d|}td|\}}}|dd}||ddS)Naif 1: import sys, os, time, threading # a thread, which waits for the main program to terminate def joiningfunc(mainthread): mainthread.join() print('end of thread') # stdout is fully buffered because not a tty, we have to flush # before exit. sys.stdout.flush() rrrzend of main end of thread r)rscriptrrrrrrr _run_and_joinUs   z"ThreadJoinOnShutdown._run_and_joincCsd}||dS)Nzif 1: import os t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() time.sleep(0.1) print('end of main') r rr rrrtest_1_join_on_shutdownfsz,ThreadJoinOnShutdown.test_1_join_on_shutdownrrrcCsd}||dS)Naif 1: from test import support childpid = os.fork() if childpid != 0: # parent process support.wait_process(childpid, exitcode=0) sys.exit(0) # child process t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() print('end of main') r rrrrtest_2_join_in_forked_processrsz2ThreadJoinOnShutdown.test_2_join_in_forked_processcCsd}||dS)Naif 1: from test import support main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: # parent process support.wait_process(childpid, exitcode=0) sys.exit(0) # child process t = threading.Thread(target=joiningfunc, args=(main_thread,)) print('end of main') t.start() t.join() # Should not block: main_thread is already stopped w = threading.Thread(target=worker) w.start() r rrrr!test_3_join_in_forked_from_threadsz6ThreadJoinOnShutdown.test_3_join_in_forked_from_threadcCs"d}td|\}}}||dS)Nacif True: import os import random import sys import time import threading thread_has_run = set() def random_io(): '''Loop for a while sleeping random tiny amounts and doing some I/O.''' while True: with open(os.__file__, 'rb') as in_f: stuff = in_f.read(200) with open(os.devnull, 'wb') as null_f: null_f.write(stuff) time.sleep(random.random() / 1995) thread_has_run.add(threading.current_thread()) def main(): count = 0 for _ in range(40): new_thread = threading.Thread(target=random_io) new_thread.daemon = True new_thread.start() count += 1 while len(thread_has_run) < count: time.sleep(0.001) # Trigger process shutdown sys.exit(0) main() rrrIrr rrrrrrtest_4_daemon_threadss!z*ThreadJoinOnShutdown.test_4_daemon_threadscCsNdd}g}tdD]"}tj|d}|||q|D] }|q.do_fork_and_waitr)r<rrr=rBrH)rrrMrNr7rrrtest_reinit_tls_after_forks     z/ThreadJoinOnShutdown.test_reinit_tls_after_forkcCsg}tdD]&}tjddd}|||q t}|dkrltt dkr`t dqzt dnt j |dd |D] }| q~dS) NrcSs tdS)Ng333333?)r(r)rrrrrr9zKThreadJoinOnShutdown.test_clear_threads_states_after_fork..rrr34r)r<rrr=rBrrrGr_current_framesrr rrH)rrMrNr7rrrr$test_clear_threads_states_after_forks     z9ThreadJoinOnShutdown.test_clear_threads_states_after_forkN)rrrr rr_rrCrrrrr rrrrrrrrrr Ss    ( r c@s0eZdZddZddZddZeddZd S) SubinterpThreadingTestscCsFt\}}|tj||tj|ttdr>t|d||fS)N set_blockingF)rpipercloserCr)rrwrrrrs    zSubinterpThreadingTests.pipecCsL|\}}td|f}tj|}||d|t|dddS)Na import os import random import threading import time def random_sleep(): seconds = random.random() * 0.010 time.sleep(seconds) def f(): # Sleep a bit so that the thread is still running when # Py_EndInterpreter is called. random_sleep() os.write(%d, b"x") threading.Thread(target=f).start() random_sleep() rrx rrrr-r Zrun_in_subinterprFrreadrr!r"rr~rrrtest_threads_joins   z)SubinterpThreadingTests.test_threads_joincCsL|\}}td|f}tj|}||d|t|dddS)Na import os import random import threading import time def random_sleep(): seconds = random.random() * 0.010 time.sleep(seconds) class Sleeper: def __del__(self): random_sleep() tls = threading.local() def f(): # Sleep a bit so that the thread is still running when # Py_EndInterpreter is called. random_sleep() tls.x = Sleeper() os.write(%d, b"x") threading.Thread(target=f).start() random_sleep() rrr#r$r&rrrtest_threads_join_2#s   z+SubinterpThreadingTests.test_threads_join_2cCshdtjjd}d|f}tj td|\}}}Wdn1sJ0Y|d|dS)Nzif 1: import os import threading import time def f(): # Make sure the daemon thread is still running when # Py_EndInterpreter is called. time.sleep(zJ) threading.Thread(target=f, daemon=True).start() z[if 1: import _testcapi _testcapi.run_in_subinterp(%r) rz:Fatal Python error: Py_EndInterpreter: not the last thread)r-r r{ZSuppressCrashReportrrgr)rZsubinterp_coder rrrrrrtest_daemon_threads_fatal_errorHs  .z7SubinterpThreadingTests.test_daemon_threads_fatal_errorN)rrrrr'r(rr)rrrrrs %rc@sdeZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ ddZ ddZ dS)ThreadingExceptionTestscCs*t}||t|j|dSr)rrrBr RuntimeErrorrHrrrrrtest_start_thread_againcsz/ThreadingExceptionTests.test_start_thread_againcCst}|t|jdSr)rrbrr+rH)rrbrrrtest_joining_current_threadisz3ThreadingExceptionTests.test_joining_current_threadcCst}|t|jdSr)rrrr+rHr,rrrtest_joining_inactive_threadmsz4ThreadingExceptionTests.test_joining_inactive_threadcCs.t}||tt|dd|dSr)rrrBrr+setattrrHr,rrrtest_daemonize_active_threadqsz4ThreadingExceptionTests.test_daemonize_active_threadcCst}|t|jdSr)rrerr+rc)rlockrrrtest_releasing_unacquired_lockwsz6ThreadingExceptionTests.test_releasing_unacquired_lockcCshd}d}tjtjd|gtjtjd}|\}}|dd}||j dd||||dS) Naif True: import threading def recurse(): return recurse() def outer(): try: recurse() except RecursionError: pass w = threading.Thread(target=outer) w.start() w.join() print('end of main thread') zend of main thread r)stdoutstderrrrrzUnexpected error: ) subprocessPopenr executablePIPE communicaterrrF returncode)rr Zexpected_outputpr4r5rrrrtest_recursion_limit{s z,ThreadingExceptionTests.test_recursion_limitcCs\d}td|\}}}||d|}|d||d||d||d|dS)Naif True: import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() rr9Exception in thread"Traceback (most recent call last):ZeroDivisionErrorUnhandled exceptionrrFrrgrErrrrtest_print_exceptions    z,ThreadingExceptionTests.test_print_exceptioncCs\d}td|\}}}||d|}|d||d||d||d|dS)Naif True: import sys import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) sys.stderr = None running = False t.join() rr9r>r?r@rArBrrrr%test_print_exception_stderr_is_none_1s    z=ThreadingExceptionTests.test_print_exception_stderr_is_none_1cCs4d}td|\}}}||d|d|dS)Naif True: import sys import threading import time running = False def run(): global running running = True while running: time.sleep(0.01) 1/0 sys.stderr = None t = threading.Thread(target=run) t.start() while not running: time.sleep(0.01) running = False t.join() rr9rA)rrFrErrrrr%test_print_exception_stderr_is_none_2s z=ThreadingExceptionTests.test_print_exception_stderr_is_none_2csXddGfdddtj}|}||||j||jtd|_dS)NcSsdSrrrrrr bare_raiseszOThreadingExceptionTests.test_bare_raise_in_brand_new_thread..bare_raisecseZdZdZfddZdS)zOThreadingExceptionTests.test_bare_raise_in_brand_new_thread..Issue27558Nc s:z Wn*ty4}z||_WYd}~n d}~00dSr)rwexc)rrGrFrrr+s zSThreadingExceptionTests.test_bare_raise_in_brand_new_thread..Issue27558.run)rrrrGr+rrHrr Issue27558srI)rrrBrHrLrGrhr+)rrIrrrHr#test_bare_raise_in_brand_new_threads  z;ThreadingExceptionTests.test_bare_raise_in_brand_new_threadcsLdd|ttjjfddtdD}|D]}||q2dS)NcSsHttjjddd"}|dtWdn1s:0YdS)Nr"zutf-8)encoding )openr-r TESTFNwrite traceback format_stack)fprrr modify_files zQThreadingExceptionTests.test_multithread_modify_file_noerror..modify_filecsg|]}tjdqSr)rr)r6rNrSrrrszPThreadingExceptionTests.test_multithread_modify_file_noerror..r)rrr-r rNr<rBrH)rrMr7rrTr$test_multithread_modify_file_noerrors z.ThreadExitcSstddSr)rexitr rrrr+:sz8ExceptHookTests.test_system_exit..ThreadExit.runNrqrrrr ThreadExit9srer5r)rrr r]rBrHrFr^)rrer5rrrrtest_system_exit8s  &z ExceptHookTests.test_system_exitcsdfdd}zttd|&t}||Wdn1sL0Y|jt|t j d|j j j | j|Wdnd0dS)Ncs|dSrr)Z hook_argsrrrhookHsz4ExceptHookTests.test_custom_excepthook..hookrbrW)r swap_attrrrVrBrHrFexc_typerstr exc_value exc_traceback __traceback__rr)rrgrrrrtest_custom_excepthookEs &z&ExceptHookTests.test_custom_excepthookc sdd}dfdd}ttd|~ttd|Ptd&}t}||Wdn1sn0YWdn1s0YWdn1s0Y|| d|ddS) NcSs tddS)Nthreading_hook failedrXrrrrthreading_hook[szCExceptHookTests.test_custom_excepthook_fail..threading_hookcs t|dSr)rj)rirkrlZerr_strrrsys_hook`sz=ExceptHookTests.test_custom_excepthook_fail..sys_hookrbr5z#Exception in threading.excepthook: ro) r rhrrr]rVrBrHrFr^)rrprrr5rrrqrtest_custom_excepthook_failZs   b z+ExceptHookTests.test_custom_excepthook_failN) rrrr_r rrcrfrnrsrrrrrYs    rYc@s$eZdZddZddZddZdS) TimerTestscCst|g|_t|_dSr)r,r/ callback_argsrrScallback_eventr rrrr/rs zTimerTests.setUpcCstd|j}||j|jdd|jd<|j td|j}||j| t |j d| |j difdifg| | dS)NrZblahZbarZfoorr)rTimer _callback_spyrBrvrWrr=rclearrFrGrurH)rZtimer1Ztimer2rrr test_init_immutable_default_argsws     z+TimerTests.test_init_immutable_default_argscOs*|j|dd|f|jdSr)rur=copyrvrD)rrrrrrrxszTimerTests._callback_spyN)rrrr/rzrxrrrrrtpsrtc@seZdZeejZdS) LockTestsN)rrr staticmethodrrelocktyperrrrr|sr|c@seZdZeejZdS) PyRLockTestsN)rrrr}r_PyRLockr~rrrrrsrzRLock not implemented in Cc@seZdZeejZdS) CRLockTestsN)rrrr}r_CRLockr~rrrrrsrc@seZdZeejZdS) EventTestsN)rrrr}rrSZ eventtyperrrrrsrc@seZdZeejZdS)ConditionAsRLockTestsN)rrrr}r Conditionr~rrrrrsrc@seZdZeejZdS)ConditionTestsN)rrrr}rrZcondtyperrrrrsrc@seZdZeejZdS)SemaphoreTestsN)rrrr}r Semaphoresemtyperrrrrsrc@seZdZeejZdS)BoundedSemaphoreTestsN)rrrr}rr:rrrrrrsrc@seZdZeejZdS) BarrierTestsN)rrrr}rBarrierZ barriertyperrrrrsrc@seZdZddZdS) MiscTestCasecCs&dh}ddh}tj|td||ddS)NrrPr)rrU)extra blacklist)r Z check__all__r)rrrrrr test__all__s  zMiscTestCase.test__all__N)rrrrrrrrrsrc@s$eZdZddZddZddZdS)InterruptMainTestscCsZdd}tj|d}|t ||Wdn1sD0Y|dS)NcSs tdSr)rUinterrupt_mainrrrrcall_interruptszHInterruptMainTests.test_interrupt_main_subthread..call_interruptr)rrrKeyboardInterruptrBrH)rrr7rrrtest_interrupt_main_subthreads   &z0InterruptMainTests.test_interrupt_main_subthreadcCs6|ttWdn1s(0YdSr)rrrUrr rrrtest_interrupt_main_mainthreads z1InterruptMainTests.test_interrupt_main_mainthreadc Csdttj}zBttjtjtttjtjtWttj|nttj|0dSr)signal getsignalSIGINTSIG_IGNrUrSIG_DFL)rhandlerrrrtest_interrupt_main_noerrors  z.InterruptMainTests.test_interrupt_main_noerrorN)rrrrrrrrrrrs rc@s$eZdZddZddZddZdS) AtexitTestscCs.tdd\}}}||||ddS)Nrzif True: import threading def run_last(): print('parrot') threading._register_atexit(run_last) sparrot)rrIrFrrrrrtest_atexit_outputs zAtexitTests.test_atexit_outputcCstdd\}}}||dS)NraNif True: import threading from unittest.mock import Mock mock = Mock() threading._register_atexit(mock) mock.assert_not_called() # force early shutdown to ensure it was called once threading._shutdown() mock.assert_called_once() rrrrrtest_atexit_called_onces z#AtexitTests.test_atexit_called_oncecCs.tdd\}}}|||d|dS)Nrzif True: import threading def func(): pass def run_last(): threading._register_atexit(func) threading._register_atexit(run_last) z2RuntimeError: can't register atexit after shutdown)rrirgrrrrrtest_atexit_after_shutdowns  z&AtexitTests.test_atexit_after_shutdownN)rrrrrrrrrrrs r__main__)7__doc__Z test.supportr-rrrrZtest.support.script_helperrrr&rrUrr(r_rrr6rrrPrr r rr rrZTestCaser,r2r rr*rVrYrtr|Z RLockTestsrrrrrrrrrrrrrrrrrrrsf   ! ,b2Z 2