a  ze! ã@s–ddlZddlZddlmZdd„ZGdd„dejƒZGdd„dejƒZ Gd d „d e ej ƒZ e  e ed ƒd ¡Gd d„de ej ƒƒZedkr’e ¡dS)éN)Ú functionalcCst d¡dS©N)ÚasyncioZset_event_loop_policy©rrúY/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_asyncio/test_buffered_proto.pyÚtearDownModulesrc@s,eZdZdd„Zdd„Zdd„Zdd„Zd S) ÚReceiveStuffProtocCs||_||_dSr)ÚcbÚ con_lost_fut)Úselfr r rrrÚ__init__ szReceiveStuffProto.__init__cCstdƒ|_|jS)Néd)Ú bytearrayÚbuffer)r ÚsizehintrrrÚ get_buffers zReceiveStuffProto.get_buffercCs| |jd|…¡dSr)r r)r ÚnbytesrrrÚbuffer_updatedsz ReceiveStuffProto.buffer_updatedcCs&|dur|j d¡n |j |¡dSr)r Ú set_resultÚ set_exception)r ÚexcrrrÚconnection_lostsz!ReceiveStuffProto.connection_lostN)Ú__name__Ú __module__Ú __qualname__r rrrrrrrr src@seZdZdd„Zdd„ZdS)ÚBaseTestBufferedProtocolcCst‚dSr)ÚNotImplementedError©r rrrÚnew_loop sz!BaseTestBufferedProtocol.new_loopcszdd‰‡‡fdd„}‡fdd„}ˆj t |dd¡¡}|jd ¡}ˆj t ||ƒd ¡¡| ¡ˆj | ¡¡dS) Ns 12345678+ic“sTd‰‡‡‡fdd„‰ˆj ¡‰ˆjj‡‡fdd„g|¢RŽIdH\‰}ˆIdHdS)Nócsˆ|7‰ˆˆkrˆ d¡dS)Nó1)Úwrite)Úbuf)ÚNOISEÚdataÚtrrrÚon_buf*sz^BaseTestBufferedProtocol.test_buffered_proto_create_connection..client..on_bufcs tˆˆƒSr)rr)Ú conn_lost_futr&rrÚ3rz`BaseTestBufferedProtocol.test_buffered_proto_create_connection..client..)ÚloopZ create_futureÚcreate_connection)ÚaddrZpr©r#r )r'r$r&r%rÚclient's  ÿÿzNBaseTestBufferedProtocol.test_buffered_proto_create_connection..clientc“s4| ˆ¡| d¡IdH| ¡| ¡IdHdS)Né)r!Z readexactlyÚcloseÚ wait_closed)ÚreaderÚwriter)r#rrÚon_server_client7s zXBaseTestBufferedProtocol.test_buffered_proto_create_connection..on_server_clientz 127.0.0.1ré) r)Zrun_until_completerZ start_serverZsocketsÚ getsocknameÚwait_forr/r0)r r-r3Zsrvr+rr,rÚ%test_buffered_proto_create_connection#s ÿÿÿz>BaseTestBufferedProtocol.test_buffered_proto_create_connectionN)rrrrr7rrrrrsrc@seZdZdd„ZdS)ÚBufferedProtocolSelectorTestscCst ¡Sr)rZSelectorEventLooprrrrrLsz&BufferedProtocolSelectorTests.new_loopN©rrrrrrrrr8Isr8ÚProactorEventLoopz Windows onlyc@seZdZdd„ZdS)ÚBufferedProtocolProactorTestscCst ¡Sr)rr:rrrrrTsz&BufferedProtocolProactorTests.new_loopNr9rrrrr;Psr;Ú__main__)rZunittestZtest.test_asynciorZ func_testsrZBufferedProtocolrZFunctionalTestCaseMixinrZTestCaser8Z skipUnlessÚhasattrr;rÚmainrrrrÚs + ÿ ÿ