a  ze#@sddlZddlZddlZddlZddlZddlZddlZddlmZejdkrXe dddl Z ddl Z ddl Z ddl m Z ddlmZddZGd d d e jZGd d d ejZGd ddejZGdddejZGdddejZedkredS)N)mockwin32z Windows only)windows_events)utilscCstddSN)asyncioset_event_loop_policyr r Y/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_asyncio/test_windows_events.pytearDownModulesr c@s$eZdZddZddZddZdS) UpperProtocCs g|_dSr)bufselfr r r __init__szUpperProto.__init__cCs ||_dSr)trans)rrr r r connection_madeszUpperProto.connection_madecCs:|j|d|vr6|jd|j|jdS)N )r appendrwritejoinupperclose)rdatar r r data_received s zUpperProto.data_receivedN)__name__ __module__ __qualname__rrrr r r r r sr c@seZdZddZdS)ProactorLoopCtrlCc Cszdd}tj|d}t}zDz"||j||dWntyRYn0W| |n | |0| dS)NcSstdttjdS)N皙?)timesleepsignal raise_signalSIGINTr r r r SIGINT_after_delay+s z9ProactorLoopCtrlC.test_ctrl_c..SIGINT_after_delaytargetz%should not fall through 'run_forever') threadingThreadrZget_event_loopZ call_soonstart run_foreverZfailKeyboardInterrupt close_loopr)rr&threadloopr r r test_ctrl_c)s   zProactorLoopCtrlC.test_ctrl_cN)rrrr1r r r r r'src@seZdZddZdS)ProactorMultithreadingcsDdddfdd}tj|d}|||dS)NFcstdIdHdS)Nr)rr"r r r r coroAszAProactorMultithreading.test_run_from_nonmain_thread..corocs$t}||ddS)NT)rZnew_event_looprun_until_completerr0r3finishedr r funcDs zAProactorMultithreading.test_run_from_nonmain_thread..funcr')r)r*r+r assertTrue)rr8r/r r6r test_run_from_nonmain_thread>s z3ProactorMultithreading.test_run_from_nonmain_threadN)rrrr:r r r r r2=sr2cs\eZdZfddZddZddZddZd d Zd d Zd dZ ddZ ddZ Z S) ProactorTestscs$tt|_||jdSr)supersetUprProactorEventLoopr0Zset_event_loopr __class__r r r=Ts  zProactorTests.setUpcCsht\}}|j|t}tj|j|d|jd}||j || | d|dS)Ndr5r) socket socketpairr0Z_make_socket_transportrProtocolZ ensure_futureZ sock_recvrr4 assertEqualresult)rabrfr r r test_closeYs  zProactorTests.test_closecCsVdt}t|}|tt|Wdn1s@0Y|dS)Nz\\.\pipe\test_double_bind-%s)osgetpidr PipeServer assertRaisesPermissionErrorr)rADDRESSZserver1r r r test_double_bindbs    (zProactorTests.test_double_bindcCs |j|}||ddS)Ndone)r0r4 _test_piperE)rresr r r test_pipeiszProactorTests.test_pipec sdt}|t&|jtj|IdHWdn1sB0Y|jt |IdH\}| |t j g}t dD]j}tj|jd}tj||jd|jfdd|IdH\}}| |tj|||||fq|t|D] \}\}} | d|qt|D]:\}\}} |IdH} || d|| q||t&|jtj|IdHWdn1s0YdS) Nz\\.\pipe\_test_pipe-%sr5csSrr r protocolr r ~rz*ProactorTests._test_pipe..z lower-{} z LOWER-{} rR)rKrLrNFileNotFoundErrorr0Zcreate_pipe_connectionrrDZstart_serving_piper assertIsInstancerrMrange StreamReaderZStreamReaderProtocolZ TransportrEr enumeraterformatencodereadliner) rrPZserverZclientsiZ stream_readerrprotorwresponser rWr rSmsB  (      *zProactorTests._test_pipec Cst}tj|_tjjtd|dj}|jj d}|j |}| | t j|j|Wdn1sr0YWdn1s0YdS)NZ ConnectPipe)Z side_effectZ pipe_address)OSError _overlappedZERROR_PIPE_BUSYwinerrorrZpatchobjectr0 _proactorZ connect_pipeZ create_taskcancelrNrCancelledErrorr4)rexcconnectr3Ztaskr r r test_connect_pipe_cancels  z&ProactorTests.test_connect_pipe_cancelcCstdddd}|tj||jj|d}|j}|j |}|j|}| |d| | | d|kodkn|t||jj|d}|j}|j |}|j|}| |d| | | d|kodkn||dS) NTFg?g?g? rg333333?)rh CreateEvent addCleanup_winapi CloseHandler0rkwait_for_handler!r4rE assertFalserFr9ZSetEventrl)reventfutr+rRelapsedr r r test_wait_for_handles$         z"ProactorTests.test_wait_for_handlecCstdddd}|tj||jj|d}||j }| t j |j |Wdn1sn0Y|j |}|d|kodkn||jj|}||dS)NTFrqrr )rhrrrsrtrur0rkrvrlr!rNrrmr4r9)rrxryr+rzr r r test_wait_for_handle_cancels * z)ProactorTests.test_wait_for_handle_cancelcCsrt|j_|jddd}|j|j|j|j|j|||j| |jjj dS)NcSsdSrr r r r r rYrz;ProactorTests.test_read_self_pipe_restart..) rZMockr0Zcall_exception_handlerZrun_in_executorstopr,r4r.rwZcalled)rrIr r r test_read_self_pipe_restarts      z)ProactorTests.test_read_self_pipe_restart) rrrr=rJrQrUrSrpr{r|r~ __classcell__r r r?r r;Rs  & "r;c@seZdZddZddZdS)WinPolicyTestsc sNfdd}t}z(ttt|Wt|n t|0dS)NcsttjdSr)r[rget_running_loopZSelectorEventLoopr rr r mainsz5WinPolicyTests.test_selector_win_policy..main)rget_event_loop_policyrZWindowsSelectorEventLoopPolicyrunrrZ old_policyr rr test_selector_win_policys z'WinPolicyTests.test_selector_win_policyc sNfdd}t}z(ttt|Wt|n t|0dS)NcsttjdSr)r[rrr>r rr r rsz5WinPolicyTests.test_proactor_win_policy..main)rrrZWindowsProactorEventLoopPolicyrrr rr test_proactor_win_policys z'WinPolicyTests.test_proactor_win_policyN)rrrrrr r r r rsr__main__)rKr#rBsysr!r)ZunittestrplatformZSkipTestrhrtrrZtest.test_asynciorZ test_utilsr rDr ZTestCaserr2r;rrrr r r r s.     "