a  ze%@sddlZddlZddlZddlZddlZddlZddlZddlZddlm Z ddl m Z ddl m Z mZmZmZmZmZmZddZddZd d d d d ZGdddZGdddZGdddejZddZedkredS)N)mock)support)verboseTESTFNforgetunlinkrmtree start_threads script_helperc Cszz>t|dr"ddl}ddl}nddl}ddl}|dd}Wn4tyt}z||dWYd}~n d}~00W|tt||k}|r| n(|tt||k}|r| 0dS)Nr) len modulefinderrandom randrange Exceptionappendwith_traceback threading get_identset) Ndone done_taskserrorsrrxefinishedr\/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_importlib/test_threaded_import.pytasks"  (   r!cCstjddd|S)Nzos.register_at_forkT)Zcreate)rZpatch)funcrrr mock_register_at_fork*sr#zaif 1: import time time.sleep(%(delay)s) x = 'a' import C zaif 1: import time time.sleep(%(delay)s) x = 'b' import D zimport Bzimport A)ABCDc@s"eZdZdZddZdddZdS)FinderzIA dummy finder to detect concurrent access to its find_spec() method.cCsd|_d|_t|_dSNr)numcallsrrLocklockselfrrr __init__HszFinder.__init__NcCsR|j|jd7_Wdn1s*0Y|j}td|d|_dS)Nr g{Gz?)r,r*rtimesleep)r.namepathtargetrrrr find_specMs , zFinder.find_spec)NN)__name__ __module__ __qualname____doc__r/r5rrrr r(Dsr(c@seZdZdZdddZdS)FlushingFinderzMA dummy finder which flushes sys.path_importer_cache when it gets called.NcCstjdSN)syspath_importer_cacheclear)r.r2r3r4rrr r5\szFlushingFinder.find_spec)NN)r6r7r8r9r5rrrr r:Xsr:c@sleZdZddZddZeddZddZd d Zd d Z d dZ ddZ eddZ ddZ ddZdS)ThreadedImportTestscCstjdd|_dSNr)r<modulespop old_randomr-rrr setUpbszThreadedImportTests.setUpcCs|jdur|jtjd<dSr@)rCr<rAr-rrr tearDownes zThreadedImportTests.tearDownc s$trtdtdD]tr8tdddddD]$}z tj |=Wq<t y^Yq<0qs zAThreadedImportTests.check_parallel_module_init..iXz%.1f msg@@T)flushrIz done: %s/%szOK.)imp lock_heldunittestZSkipTestrEventrprintr<rAKeyErrorr>r0 monotonicr rangewaitr assertFalseZ assertTrue)r.mock_osmodnamet0Z completedZdtZdbg_inforrNr check_parallel_module_initls8         z.ThreadedImportTests.check_parallel_module_initcCs |dSr;)r^r-rrr test_parallel_module_initsz-ThreadedImportTests.test_parallel_module_initc Cs^t}tjd|z6|||jd||j|jWtj |ntj |0dSr)) r(r< meta_pathinsertr^ assertGreaterr* assertEqualrremove)r.finderrrr test_parallel_meta_pathsz+ThreadedImportTests.test_parallel_meta_pathc stt}fdd}tjd|tj|zL|d|}| j d| j j Wtj |tj |ntj |tj |0dS)NcsdtdS)N)r5 ImportError)r3rerr path_hooks z?ThreadedImportTests.test_parallel_path_hooks..path_hookrrg)r(r:r< path_hooksrar`rr5r^rbr*rcrrd)r.Zflushing_finderrjZnumtestsrrir test_parallel_path_hookss     z,ThreadedImportTests.test_parallel_path_hookscCs<z tjd=WntyYn0ddl}||jjjdS)Nz+test.test_importlib.threaded_import_hangersr)r<rArVZ+test.test_importlib.threaded_import_hangersrZZtest_importlibZthreaded_import_hangersr)r.testrrr test_import_hangerss   z'ThreadedImportTests.test_import_hangersc s d}tt|tjttjdt|tjj tt D]h\}}|d|i}t tj t|dd }||dWdn1s0Y|t|qBtgfdd}fd d }tj|d }tj|d }||| | |td d hdS)Ng?rdelay.pywbutf-8csddl}t|dddSNrr)r$rgetattr)r$resultsrr import_absz.import_abcsddl}t|dddSrs)r%rrt)r%rurr import_basz.import_ba)r4ab)osmkdirr addCleanupshutilrr<r3rardcircular_imports_modulesitemsopenjoinwriteencoder importlibinvalidate_cachesrrKstartrcr) r.ror2contentsfrwrxt1t2rrur test_circular_importss*   .    z)ThreadedImportTests.test_circular_importscCsd}tjdtj|tjjtjtd}t|d }| | dWdn1s^0Y|t ||t t|t dttttjt=dS)Nzif 1: import threading def target(): import random t = threading.Thread(target=target) t.start() t.join() t = Nonerrprqrr __pycache__)r<r3rar{curdirr}rdrrrrrrrrr __import__rA)r.r[codefilenamerrrr test_side_effect_imports .   z+ThreadedImportTests.test_side_effect_importcCs&tjtjtdd}t|dS)Npartialz cfimport.pyr{r3rdirname__file__r Zassert_python_okr.fnrrr 'test_concurrent_futures_circular_importsz;ThreadedImportTests.test_concurrent_futures_circular_importcCs&tjtjtdd}t|dS)Nrzpool_in_threads.pyrrrrr )test_multiprocessing_pool_circular_importsz=ThreadedImportTests.test_multiprocessing_pool_circular_importN)r6r7r8rDrEr#r^r_rfrlrnrrrrrrrr r?`s !  & r?cCsXt}tjtjg|Rz$t}ttj|tdWntyRYn0dS)Ngh㈵>) rZthreading_setuprSZaddModuleCleanupZthreading_cleanupr<getswitchintervalsetswitchintervalAttributeError) thread_infoZold_switchintervalrrr setUpModules r__main__)_imprQr{rr<r0r~rrSrrmrZ test.supportrrrrrr r r!r#rr(r:ZTestCaser?rr6Zunittetsmainrrrr s0  $ &