a  ze@sddlZddlZddlZddlmZddlZddlmZddZGdddZ Gdd d ej Z Gd d d e Z Gd d d e Z Gddde ZedkredS)N)mock)utilscCstddSN)asyncioZset_event_loop_policyrrQ/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_asyncio/test_pep492.pytearDownModule src@s.eZdZddZd ddZddZdd ZdS) FakeCorocCsdSrr)selfvaluerrrsendsz FakeCoro.sendNcCsdSrr)r typvaltbrrrthrowszFakeCoro.throwcCsdSrrr rrrcloseszFakeCoro.closeccs dVdSrrrrrr __await__szFakeCoro.__await__)NN)__name__ __module__ __qualname__r rrrrrrrr s r cseZdZfddZZS)BaseTestcsHtt|_t|j_t|j_d|jjj _ | |jdS)Nr) supersetUprZ BaseEventLooplooprZMockZ_process_events _selectorselectZ return_valueZset_event_loopr __class__rrr"s      zBaseTest.setUp)rrrr __classcell__rrrrr src@seZdZddZddZdS) LockTestscstDtjjdtjjdtjjdtjjdg}Wdn1sT0Yfdd}|D]"}j|| | qndS)Nrc stdIdH||4IdHN}|d|tdIdH|WdIdHq1IdHs0Y|dS)N{Gz?)rsleep assertFalselockedZassertIs assertTrue)lock_lockrrrtest6s 8z7LockTests.test_context_manager_async_with..test assertWarnsDeprecationWarningrLockr Condition SemaphoreBoundedSemaphorerun_until_completer$r%r Z primitivesr)Z primitiverrrtest_context_manager_async_with-s     " z)LockTests.test_context_manager_async_withcstDtjjdtjjdtjjdtjjdg}Wdn1sT0Yfdd}|D]"}j|| | qndS)Nr!c sxtdIdH|td:|IdHWdn1sL0YWdn1sj0YdS)Nr"z#can't be used in 'await' expression)rr#r$r%ZassertRaisesRegex TypeError)r'rrrr)Ms z7LockTests.test_context_manager_with_await..testr*r2rrrtest_context_manager_with_awaitDs     " z)LockTests.test_context_manager_with_awaitN)rrrr3r5rrrrr +sr c@seZdZddZdS)StreamReaderTestscsRd}tj|jd|fdd}|j|}||gddS)Nsline1 line2 line3r!cs&g}2z3dHW}||q6|Sr)append)datalinestreamrrreaderesz/StreamReaderTests.test_readline..reader)sline1 sline2 sline3)r StreamReaderrZ feed_dataZfeed_eofr1 assertEqual)r ZDATAr<r8rr:r test_readline^s  zStreamReaderTests.test_readlineN)rrrr?rrrrr6\sr6c@sLeZdZddZddZddZddZd d Zd d Zd dZ ddZ dS)CoroutineTestscCsLdd}|}z|t|W|n |0|ttdS)NcsdSrrrrrrfoorz,CoroutineTests.test_iscoroutine..foo)r&rZ iscoroutinerr )r rAfrrrtest_iscoroutineqs zCoroutineTests.test_iscoroutinecCsdd}|t|dS)NcsdSrrrrrrrA}rBz4CoroutineTests.test_iscoroutinefunction..foo)r&rZiscoroutinefunction)r rArrrtest_iscoroutinefunction|sz'CoroutineTests.test_iscoroutinefunctioncsnGddd|t"tjfdd}Wdn1s@0Y|}||dd|dS)Nc@seZdZddZdS)zCCoroutineTests.test_function_returning_awaitable..AwaitablecSsdS)N)spamrrrrrrszMCoroutineTests.test_function_returning_awaitable..Awaitable.__await__N)rrrrrrrr AwaitablesrGcsSrrrrGrrfuncsz>CoroutineTests.test_function_returning_awaitable..funcrF)r+r,r coroutiner>r r)r rIcororrHr!test_function_returning_awaitables ,z0CoroutineTests.test_function_returning_awaitablecsXddfdd}|j|}||d|jd|j|}||ddS)NcsdS)NrFrrrrrbarsz5CoroutineTests.test_async_def_coroutines..barcs IdHSrrrrMrrrAsz5CoroutineTests.test_async_def_coroutines..foorFT)rr1r> set_debug)r rAr8rrNrtest_async_def_coroutiness   z(CoroutineTests.test_async_def_coroutinescsJfdd}tdjdj|tddS)NcstdkdSNr)r&sys#get_coroutine_origin_tracking_depthrrrrstartszOCoroutineTests.test_debug_mode_manages_coroutine_origin_tracking..startrT)r>rRrSrrOr1)r rTrrr1test_debug_mode_manages_coroutine_origin_trackings   z@CoroutineTests.test_debug_mode_manages_coroutine_origin_trackingcsFddtjfddfdd}j|}|ddS)NcssdEdHdS)NrrFrrrrrgens z0CoroutineTests.test_types_coroutine..gencsSrrr)rVrrrIsz1CoroutineTests.test_types_coroutine..funccs}|tj|IdHSr)ZassertIsInstancetypes_GeneratorWrapper)wrapper)rIr rrrKsz1CoroutineTests.test_types_coroutine..cororF)rWrJrr1r>)r rKr8r)rIrVr rtest_types_coroutines z#CoroutineTests.test_types_coroutinecs4dfddfdd}j|dS)Ncs4jdd}z|djjdWd}nd}0dS)N)limitrrA)Z get_stackr>f_codeco_name)rC)Tr rrrAs z1CoroutineTests.test_task_print_stack..foocs tjjdIdHdS)Nr!)rZ ensure_futurerrr_rAr rrrunnersz4CoroutineTests.test_task_print_stack..runner)rr1r rarr`rtest_task_print_stacksz$CoroutineTests.test_task_print_stackcsbddfdd}jdjtddj|Wdn1sT0YdS)NcstdIdHdS)Ng?)rr#rrrrafuncsz/CoroutineTests.test_double_await..afunccsH}j|}z&tdIdH|IdHW|n |0dSrQ)rZ create_taskrr#cancel)rKtrdr rrras   z0CoroutineTests.test_double_await..runnerTz"coroutine is being awaited already)msg)rrOZ assertRaises RuntimeErrorr1rbrrgrtest_double_awaits z CoroutineTests.test_double_awaitN) rrrrDrErLrPrUrZrcrjrrrrr@os  r@__main__)rRrWZunittestrrZtest.test_asynciorZ test_utilsrr ZTestCaserr r6r@rmainrrrrs   1n