a ze$@sddlmZddlmZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl Z ej Z dZGddde jZGdddejZd d ZGd d d e jZGd ddeZGddde jZGddde jZGddde jZedkre dS))support) socket_helperNsQUIT c@s eZdZdZddZddZdS) echo_servercCs<tj|||_ttjtj|_t |j|_ d|_ dSN) threadingThread__init__eventsocketAF_INET SOCK_STREAMsockrZ bind_portportstart_resend_event)selfr rF/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_asynchat.pyr s  zecho_server.__init__cCs|j|j|j\}}d|_t|jvrP|d}|sBqP|j||_q(|jtd|_|j rp|j z<|jr| |jd|j }t d|j|d|_qrWn Yn0||jdS)NrgMbP?)rlistenr setacceptbuffer SERVER_QUITrecvreplacerwaitsend chunk_sizetimesleepclose)rconnclientdatanrrrrun"s*      zecho_server.runN)__name__ __module__ __qualname__rr r&rrrrrs rc@s>eZdZddZddZejdkr*ddZdd Zd d Z d S) echo_clientcCsDtj|g|_|tjtj|t |f| |d|_ dSNr) asynchat async_chatr contentsZ create_socketr r r connectHOSTset_terminatorr)r terminatorZ server_portrrrr Ds   zecho_client.__init__cCsdSrrrrrrhandle_connectLszecho_client.handle_connectdarwincCsdSrrr3rrr handle_exptRszecho_client.handle_exptcCs|j|7_dSr)r)rr$rrrcollect_incoming_dataUsz!echo_client.collect_incoming_datacCs|j|jd|_dSr+)r.appendrr3rrrfound_terminatorXszecho_client.found_terminatorN) r'r(r)r r4sysplatformr6r7r9rrrrr*Bs  r*cCs:t}t|}|||td||fS)N{Gz?)rEventrstartrclearrr )r srrrstart_echo_server\s rAc@seZdZdZddZddZddZdd Zd d Zd d Z ddZ ddZ ddZ ddZ ddZddZddZddZddZd S)! TestAsynchatFcCst|_dSr)rZthreading_setup_threadsr3rrrsetUpiszTestAsynchat.setUpcCstj|jdSr)rZthreading_cleanuprCr3rrrtearDownlszTestAsynchat.tearDowncCst}t|}||_|||tdt ||j }| d| d|| d|| t t j|jdddt|||jddgdS)Nr<shello sworldI'm not dead yet!,Zuse_pollcounttimeout hello world)rr=rrr>rr?rr r*rpushrasyncoreloopusepollr join_thread assertEqualr.)rZtermZ server_chunkr r@crrrline_terminator_checkos     z"TestAsynchat.line_terminator_checkcCsdD]}|d|qdS)Nr rSrlrrrtest_line_terminator1sz"TestAsynchat.test_line_terminator1cCsdD]}|d|qdS)NrTs rXrYrrrtest_line_terminator2sz"TestAsynchat.test_line_terminator2cCsdD]}|d|qdS)NrTsqqqrXrYrrrtest_line_terminator3sz"TestAsynchat.test_line_terminator3cCsft\}}t||j}d}|||ttj|jdddt || |j |d|gdSNshello world, I'm not dead yet! rGr<rH rAr*rrLrrMrNrOrrPrQr.)rZtermlenr@r rRr$rrrnumeric_terminator_checks     z%TestAsynchat.numeric_terminator_checkcCs|ddS)Nrr`r3rrrtest_numeric_terminator1sz%TestAsynchat.test_numeric_terminator1cCs|ddS)Nrar3rrrtest_numeric_terminator2sz%TestAsynchat.test_numeric_terminator2cCsjt\}}td|j}d}|||ttj|jdddt || |j g| |j |dSr^) rAr*rrLrrMrNrOrrPrQr.rrr@r rRr$rrrtest_none_terminators     z!TestAsynchat.test_none_terminatorcCsht\}}td|j}d}tj|tdd}||tj|j dddt || |j dd gdS) NrWhello world I'm not dead yet! )Z buffer_sizerGr<rHrKrF)rAr*rr,Zsimple_producerrpush_with_producerrMrNrOrrPrQr.)rr@r rRr$prrrtest_simple_producers    z!TestAsynchat.test_simple_producercCsZt\}}td|j}d}||ttj|jdddt || |j ddgdS)NrWrgrGr<rHrKrF) rAr*rrirrMrNrOrrPrQr.rerrrtest_string_producers   z!TestAsynchat.test_string_producercCs\t\}}td|j}|d|ttj|jdddt || |j gddS)NrWshello world I'm not dead yet! rGr<rH)rKrrFr_rr@r rRrrrtest_empty_lines     zTestAsynchat.test_empty_linecCst\}}t|_td|j}|d|t|t j |j ddd|j t |||jg|t|jddS)NrWrgrGr<rHr)rArr=rr*rrLrZclose_when_donerMrNrOrrrPrQr.Z assertGreaterlenrrmrrrtest_close_when_dones       z!TestAsynchat.test_close_when_donecCst\}}td|j}d}|||t||t||t|jd|t|jd|tt j |j dddt |||jgddS) NrWsbytes unicoderGr<rH)bytesrsrs)rAr*rrL bytearray memoryview assertRaises TypeErrorrrMrNrOrrPrQr.rerrr test_pushs     zTestAsynchat.test_pushN)r'r(r)rOrDrErSr[r\r]r`rbrdrfrkrlrnrprxrrrrrBfs      rBc@seZdZdZdS)TestAsynchat_WithPollTN)r'r(r)rOrrrrrysryc@seZdZddZdS)TestAsynchatMockedcCs~tj}ttj|j_t }| || |j tjj |d}|Wdn1sd0Y||jdS)NZ handle_error)unittestZmockZMockBlockingIOErrorerrnoEAGAINrZ side_effectr,r-Z set_socketZ addCleanupZ del_channelZpatchobjectZ handle_readZ assertFalseZcalled)rr dispatchererrorrrrtest_blockingioerrors   &z'TestAsynchatMocked.test_blockingioerrorN)r'r(r)rrrrrrzsrzc@seZdZddZdS)TestHelperFunctionscCs,|tddd|tddddS)Nzqwerty z rZ qwertydkjfr)rQr,Zfind_prefix_at_endr3rrrtest_find_prefix_at_endsz+TestHelperFunctions.test_find_prefix_at_endN)r'r(r)rrrrrrsrc@seZdZddZdS)TestNotConnectedcCst}|t|jddS)N)r,r-rv ValueErrorr1)rr#rrr!test_disallow_negative_terminatorsz2TestNotConnected.test_disallow_negative_terminatorN)r'r(r)rrrrrrsr__main__)testrZ test.supportrr,rMr}r r:rrr{Z unittest.mockr0rrrr-r*rAZTestCaserBryrzrrr'mainrrrrs.  .