a  ze%@sddlZddlZddlZddlZddlZddlZddlZddlZddlm Z ddl m Z ddl m Z mZmZmZmZmZmZddZddZd d d d d ZGdddZGdddZGdddejZddZedkredS)N)mock)support)verboseTESTFNforgetunlinkrmtree start_threads script_helperc Cszz>t|dr"ddl}ddl}nddl}ddl}|dd}Wn4tyt}z||dWYd}~n d}~00W|tt||k}|r| n(|tt||k}|r| 0dS)Nr) len modulefinderrandom randrange Exceptionappendwith_traceback threading get_identset) Ndone done_taskserrorsrrxefinishedr\/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_importlib/test_threaded_import.pytasks"  (   r!cCstjddd|S)Nzos.register_at_forkT)Zcreate)rZpatch)funcrrr mock_register_at_fork*sr#zaif 1: import time time.sleep(%(delay)s) x = 'a' import C zaif 1: import time time.sleep(%(delay)s) x = 'b' import D zimport Bzimport A)ABCDc@seZdZddZdddZdS)FindercCsd|_d|_t|_dSNr)numcallsrrLocklockselfrrr __init__HszFinder.__init__NcCsR|j|jd7_Wdn1s*0Y|j}td|d|_dS)Nr g{Gz?)r,r*rtimesleep)r.namepathtargetrrrr find_specMs , zFinder.find_spec)NN)__name__ __module__ __qualname__r/r5rrrr r(Dsr(c@seZdZdddZdS)FlushingFinderNcCstjdSN)syspath_importer_cacheclear)r.r2r3r4rrr r5\szFlushingFinder.find_spec)NN)r6r7r8r5rrrr r9Xsr9c@sleZdZddZddZeddZddZd d Zd d Z d dZ ddZ eddZ ddZ ddZdS)ThreadedImportTestscCstjdd|_dSNr)r;modulespop old_randomr-rrr setUpbszThreadedImportTests.setUpcCs|jdur|jtjd<dSr?)rBr;r@r-rrr tearDownes zThreadedImportTests.tearDownc s$trtdtdD]tr8tdddddD]$}z tj |=Wq<t y^Yq<0qs zAThreadedImportTests.check_parallel_module_init..iXz%.1f msg@@T)flushrHz done: %s/%szOK.)imp lock_heldunittestZSkipTestrEventrprintr;r@KeyErrorr=r0 monotonicr rangewaitr assertFalseZ assertTrue)r.mock_osmodnamet0Z completedZdtZdbg_inforrMr check_parallel_module_initls8         z.ThreadedImportTests.check_parallel_module_initcCs |dSr:)r]r-rrr test_parallel_module_initsz-ThreadedImportTests.test_parallel_module_initc Cs^t}tjd|z6|||jd||j|jWtj |ntj |0dSr)) r(r; meta_pathinsertr] assertGreaterr* assertEqualrremove)r.finderrrr test_parallel_meta_pathsz+ThreadedImportTests.test_parallel_meta_pathc stt}fdd}tjd|tj|zL|d|}| j d| j j Wtj |tj |ntj |tj |0dS)NcsdtdS)N)r5 ImportError)r3rdrr path_hooks z?ThreadedImportTests.test_parallel_path_hooks..path_hookrrf)r(r9r; path_hooksr`r_rr5r]rar*rbrrc)r.Zflushing_finderriZnumtestsrrhr test_parallel_path_hookss     z,ThreadedImportTests.test_parallel_path_hookscCs<z tjd=WntyYn0ddl}||jjjdS)Nz+test.test_importlib.threaded_import_hangersr)r;r@rUZ+test.test_importlib.threaded_import_hangersrYZtest_importlibZthreaded_import_hangersr)r.testrrr test_import_hangerss   z'ThreadedImportTests.test_import_hangersc s d}tt|tjttjdt|tjj tt D]h\}}|d|i}t tj t|dd }||dWdn1s0Y|t|qBtgfdd}fd d }tj|d }tj|d }||| | |td d hdS)Ng?rdelay.pywbutf-8csddl}t|dddSNrr)r$rgetattr)r$resultsrr import_absz.import_abcsddl}t|dddSrr)r%rrs)r%rtrr import_basz.import_ba)r4ab)osmkdirr addCleanupshutilrr;r3r`rccircular_imports_modulesitemsopenjoinwriteencoder importlibinvalidate_cachesrrJstartrbr) r.rnr2contentsfrvrwt1t2rrtr test_circular_importss*   .    z)ThreadedImportTests.test_circular_importscCsd}tjdtj|tjjtjtd}t|d }| | dWdn1s^0Y|t ||t t|t dttttjt=dS)Nzif 1: import threading def target(): import random t = threading.Thread(target=target) t.start() t.join() t = Nonerrorprq __pycache__)r;r3r`rzcurdirr|rcrrrrrrrrr __import__r@)r.rZcodefilenamerrrr test_side_effect_imports .   z+ThreadedImportTests.test_side_effect_importcCs&tjtjtdd}t|dS)Npartialz cfimport.pyrzr3rdirname__file__r Zassert_python_okr.fnrrr 'test_concurrent_futures_circular_importsz;ThreadedImportTests.test_concurrent_futures_circular_importcCs&tjtjtdd}t|dS)Nrzpool_in_threads.pyrrrrr )test_multiprocessing_pool_circular_importsz=ThreadedImportTests.test_multiprocessing_pool_circular_importN)r6r7r8rCrDr#r]r^rerkrmrrrrrrrr r>`s !  & r>cCsXt}tjtjg|Rz$t}ttj|tdWntyRYn0dS)Ngh㈵>) rZthreading_setuprRZaddModuleCleanupZthreading_cleanupr;getswitchintervalsetswitchintervalAttributeError) thread_infoZold_switchintervalrrr setUpModules r__main__)_imprPrzrr;r0r}rrRrrlrZ test.supportrrrrrr r r!r#r~r(r9ZTestCaser>rr6Zunittetsmainrrrr s0  $ &