a  zeY@s0ddlZddlZddlmZGdddeZdS)N)TestCasecseZdZd#fdd ZddZddZdd Zd d Zd d ZddZ ddZ ddZ ddZ ddZ ddZddZd$fdd Zfdd Zd!d"ZZS)%IsolatedAsyncioTestCaserunTestcst|d|_d|_dSN)super__init___asyncioTestLoop_asyncioCallsQueue)selfZ methodName __class__G/opt/bitninja-python-dojo/embedded/lib/python3.9/unittest/async_case.pyr"s z IsolatedAsyncioTestCase.__init__csdSrrr rrr asyncSetUp'sz"IsolatedAsyncioTestCase.asyncSetUpcsdSrrrrrr asyncTearDown*sz%IsolatedAsyncioTestCase.asyncTearDowncOs|j|g|Ri|dSr)Z addCleanup)r funcargskwargsrrraddAsyncCleanup-s z'IsolatedAsyncioTestCase.addAsyncCleanupcCs|||jdSr)ZsetUp _callAsyncrrrrr _callSetUp<sz"IsolatedAsyncioTestCase._callSetUpcCs||dSr_callMaybeAsync)r methodrrr_callTestMethod@sz'IsolatedAsyncioTestCase._callTestMethodcCs||j|dSr)rrZtearDownrrrr _callTearDownCs z%IsolatedAsyncioTestCase._callTearDowncOs|j|g|Ri|dSrr)r functionrrrrr _callCleanupGsz$IsolatedAsyncioTestCase._callCleanupcOs^|jdusJd||i|}t|s8J|d|j}|j||f|j|S)N$asyncio test loop is not initializedz returned non-awaitabler inspectZ isawaitable create_futurer put_nowaitrun_until_completer rrrretfutrrrrJs  z"IsolatedAsyncioTestCase._callAsynccOsX|jdusJd||i|}t|rP|j}|j||f|j|S|SdS)Nr r!r&rrrrRs   z'IsolatedAsyncioTestCase._callMaybeAsyncc st|_}|d|IdH}||dur:dS|\}}z |IdH}|s`||WqttfyzYqt tj fy}z|s| |WYd}~qd}~00qdSr) asyncioQueuer set_resultget task_done cancelled SystemExitKeyboardInterrupt BaseExceptionCancelledError set_exception)r r(queuequeryZ awaitabler'exrrr_asyncioLoopRunner\s   z*IsolatedAsyncioTestCase._asyncioLoopRunnercCs\|jdusJdt}t||d||_|}||||_| |dS)Nz%asyncio test loop already initializedT) r r)Znew_event_loopset_event_loopZ set_debugr#Z create_taskr7Z_asyncioCallsTaskr%)r loopr(rrr_setupAsyncioLoopos  z)IsolatedAsyncioTestCase._setupAsyncioLoopc Cs|jdusJd|j}d|_|jd||jzt|}|sbWtd|dS|D] }| qf|tj ||dd|D]0}| rq| dur| d| |dq||Wtd|ntd|0dS)Nr T)r9Zreturn_exceptionsz(unhandled exception during test shutdown)message exceptiontask)r r r$r%joinr)Z all_tasksr8closecancelZgatherr.r<Zcall_exception_handlerZshutdown_asyncgens)r r9Z to_cancelr=rrr_tearDownAsyncioLoopys>        z,IsolatedAsyncioTestCase._tearDownAsyncioLoopNcs.|zt|W|S|0dSr)r:rrunrA)r resultr rrrBs  zIsolatedAsyncioTestCase.runcs|t|dSr)r:rdebugrArr rrrDs zIsolatedAsyncioTestCase.debugcCs|jdur|dSr)r rArrrr__del__s zIsolatedAsyncioTestCase.__del__)r)N)__name__ __module__ __qualname__rrrrrrrrrrr7r:rArBrDrE __classcell__rrr rrs   " r)r)r"Zcaserrrrrrs