a ze2@s8ddlZddlZddlZddlZddlZddlmZddlmZddl Z ej Z ddZ Gddde j Z Gdd d eZGd d d ejZGd d d ejZejddZdefddZGddde j ZGdddeZGdddeZeZGddde j ZGddde j ZGdddeZedkr4e dS) N)support) socket_helpercCsZ||z:z|\}}|Wntjy>Yn0W|n |0dSN)listensetacceptclosesockettimeout)evtZservconnaddrrG/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_telnetlib.pyserver s  rc@sTeZdZddZddZddZddZd d Zd d Zd dZ ddZ ddZ dS) GeneralTestscCsrt|_ttjtj|_|jdt |j|_ tj t |j|jfd|_ |j d|j |jdS)N<)targetargsT) threadingEventr r AF_INET SOCK_STREAMsock settimeoutrZ bind_portportThreadrthread setDaemonstartwaitselfrrrsetUps    zGeneralTests.setUpcCs|j|`dSr)rjoinr!rrrtearDown$s zGeneralTests.tearDowncCstt|j}|jdSr) telnetlibTelnetHOSTrrrr"telnetrrr testBasic(szGeneralTests.testBasiccCsNtt|j}||Wdn1s20Y||dSr)r&r'r(rZassertIsNotNone get_socketZ assertIsNone)r"tnrrrtestContextManager-s,zGeneralTests.testContextManagerc Csf|tdutdztt|j}Wtdn td0||j d|j dS)N) assertTruer getdefaulttimeoutsetdefaulttimeoutr&r'r(r assertEqualr gettimeoutrr)rrrtestTimeoutDefault2s zGeneralTests.testTimeoutDefaultc Csl|tdutdz tjt|jdd}Wtdn td0||j du|j dSNr/)r ) r0r r1r2r&r'r(rrr4rr)rrrtestTimeoutNone<s zGeneralTests.testTimeoutNonecCs2tjt|jdd}||jd|jdSr6)r&r'r(rr3rr4rr)rrrtestTimeoutValueGszGeneralTests.testTimeoutValuecCs:t}|jt|jdd||jd|jdSr6) r&r'openr(rr3rr4rr)rrrtestTimeoutOpenLszGeneralTests.testTimeoutOpencCsJtjt|jdd}|j}|||||||jdSr6) r&r'r(rrr3r,filenor)r"r*Zt_sockrrr testGettersRs zGeneralTests.testGettersN) __name__ __module__ __qualname__r#r%r+r.r5r7r8r:r<rrrrrs   rc@s*eZdZdZd ddZddZddZd S) SocketStubz* a socket proxy that re-defines sendall() rcCst||_g|_d|_dS)NF)listreadswritesblock)r"rBrrr__init__\s zSocketStub.__init__cCs|j|dSr)rCappend)r"datarrrsendall`szSocketStub.sendallcCsZd}|jr(t||kr(||jd7}qt||krV|jd||d|d|}|S)Nr)rBlenpopinsert)r"sizeoutrrrrecvbs  zSocketStub.recvN)r)r=r>r?__doc__rErHrOrrrrr@Zs r@c@s,eZdZddZddZddZddZd S) TelnetAlikecCs tdSr)NotImplementedErrorr!rrrr;lszTelnetAlike.filenocCsdSrrr!rrrrnrIzTelnetAlike.closecCs |jj Sr)rrDr!rrr sock_availoszTelnetAlike.sock_availcGsTt&}tjj||g|RWdn1s40Y|j|7_dSr)rZcaptured_stdoutr&r'msg _messagesgetvalue)r"rTrrNrrrrTqs 4zTelnetAlike.msgN)r=r>r?r;rrSrTrrrrrQksrQc@sDeZdZddZeddZdddZdd Zdd d Zd d Z dS) MockSelectorcCs i|_dSrkeysr!rrrrEyszMockSelector.__init__cCsdS)NgMbP?rr!rrr resolution|szMockSelector.resolutionNcCst|d||}||j|<|S)Nr) selectors SelectorKeyrY)r"fileobjeventsrGkeyrrrregisters zMockSelector.registercCs |j|Sr)rYrK)r"r]rrr unregisterszMockSelector.unregistercCsFd}|jD]}t|tr |jj}q&q |r.gSdd|jDSdS)NFcSsg|]}||jfqSr)r^).0r_rrr rIz'MockSelector.select..)rY isinstancerQrrDvalues)r"r rDr]rrrselects  zMockSelector.selectcCs|jSrrXr!rrrget_mapszMockSelector.get_map)N)N) r=r>r?rEpropertyrZr`rarfrgrrrrrWws   rWc#s6fdd}ztj}|t_dVW|t_n|t_0dS)NcstSr)r@)ZignoredrBrrnew_connsztest_socket..new_conn)r create_connection)rBrjZold_connrrir test_sockets rlrcCsZ|D]}t|tusJ|qt| |dd}d|_Wdn1sL0Y|S)za return a telnetlib.Telnet object that uses a SocketStub with reads queued up to be read dummyrN)typebytesrlrU)rBclsxr*rrr test_telnets   $rsc@seZdZddZddZdS)ExpectAndReadTestCasecCstj|_tt_dSr)r&_TelnetSelector old_selectorrWr!rrrr#szExpectAndReadTestCase.setUpcCs |jt_dSr)rvr&rur!rrrr%szExpectAndReadTestCase.tearDownN)r=r>r?r#r%rrrrrtsrtc@sDeZdZddZddZddZddZd d Zd d Zd dZ dS) ReadTestscCsrdg}t|}|d}|j|d|j|j|jjfdgd}d|dd}t|}|d}|||dS) zc read_until(expected, timeout=None) test the blocking version of read_util s xxxmatchyyymatchsxxxmatch)rT)s2xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxrxs2yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyrIN)rsZ read_untilr3ZcookedqZrawqrrBr$)r"wantr*rGrBexpectrrrtest_read_untils  zReadTests.test_read_untilcCs2gd}d|}t|}|}|||dS)zJ read_all() Read all data until EOF; may block. )xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxsyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyszzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzrIN)r$rsread_allr3)r"rBr{r*rGrrr test_read_alls   zReadTests.test_read_allcCsBtdg}|}|t|dkt}|}|d|dS)zQ read_some() Read at least one byte or EOF; may block. r}rIN)rsZ read_somer0rJr3)r"r*rGrrrtest_read_somes  zReadTests.test_read_somecCsrd}t|g}t||}d|j_|d|d|j_d}z||7}Wq:ty^YqbYq:0q:|||dS)z read_*_eager() Read all data available already queued or on the socket, without blocking. dxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxTrIFN)rsgetattrrrDr3EOFError)r" func_namerzr*funcrGrrr _read_eagers    zReadTests._read_eagercCs|d|ddS)NZ read_eagerZread_very_eager)rr!rrrtest_read_eagers zReadTests.test_read_eagercCsVd}t|g}|d||jjr0|q|}||||t|jdSNrrI)rsr3read_very_lazyrrB fill_rawqZ assertRaisesr)r"rzr*rGrrrrs   zReadTests.read_very_lazycCs|d}t|g}|d|d}z |}||7}|s@|WntyXYqlYn0|||q"|||dSr)rsr3Z read_lazyrrr0 startswith)r"rzr*rGZ read_datarrrtest_read_lazys    zReadTests.test_read_lazyN) r=r>r?r|rrrrrrrrrrrws  rwc@seZdZdddZddZdS)nego_collectorNcCsd|_||_d|_dS)NrI)seen sb_gettersb_seen)r"rrrrrEsznego_collector.__init__cCs<|j||7_|tjkr8|jr8|}|j|7_dSr)rtlSErr)r"rcmdoptZsb_datarrrdo_negosznego_collector.do_nego)N)r=r>r?rErrrrrrs rc@seZdZdZddZdS) WriteTestszKThe only thing that write does is replace each tl.IAC for tl.IAC+tl.IACcCszddtjddtjtjdtjtjdg}|D]@}t}||d|jj}||tjtjtj|q4dS)Nsdata sample without IACsdata sample withs one IACsa fews iacsrI) rIACrswriter$rrCr3replace)r"Z data_samplerGr*Zwrittenrrr test_write(s  zWriteTests.test_writeN)r=r>r?rPrrrrrr$src@s`eZdZejejejejejej ej ej gZ ddZ ddZddZddZd d Zd d Zd S) OptionTestscCst|}td|}t}||j|}|j}|t|dk| |dd|j | |ddt j | |t||d|_dS)z helper for testing IAC + cmd rIrNr)rsrJr$rset_option_negotiation_callbackrr~rr0assertIncmdsr3rZNOOPTr)r"rGr*Zdata_lennegotxtrrrr _test_command8s zOptionTests._test_commandcCs^|jD]<}|tj|g|dtj|dg|dtj|dgq|dd|jDdS)Nrsdyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy xxxxxxxxxx yyyyyyyyyycSsg|]}tj|qSr)rr)rbrrrrrcLrIz1OptionTests.test_IAC_commands..)rrrr)r"rrrrtest_IAC_commandsFs  zOptionTests.test_IAC_commandscCs0tjtjtjtjtjtjtjtjtjtjtjtjtjtjdtjtjtjtjdtjtjtjtjtjtjdtjtjdtjtjg}t|}t|j}||j| }| |dtjtjdtjdtjd}| |j || d|d|_ dS)NsaasbbsccsddrIsaabb) rrZSBrrsrZ read_sb_datarrr~r3rr)r"sendr*rrZ want_sb_datarrrtest_SB_commandsNs"&&*   $zOptionTests.test_SB_commandscCsdtjtdgdftjtjtdgdftjtjtdgdftjtjtdgdftjtjtdgdfg}|D]2\}}t|g}|d| }| ||j q|dS) N)az : recv b'' Xz: IAC 88 not recognized rz : IAC DO 1 z : IAC DONT 1 z : IAC WILL 1 z : IAC WONT 1 ) rrrpZDOZDONTZWILLZWONTrsset_debuglevelr~rrU)r"Zgiven_a_expect_babr*rrrrtest_debuglevel_reads`s   z!OptionTests.test_debuglevel_readscCs0t}|d|dd}|||jdS)Nrsxxxz send b'xxx' )rsrrrrU)r"r*Zexpectedrrrtest_debuglevel_writess   z!OptionTests.test_debuglevel_writecCs^tg tdd}d|_Wdn1s.0Y|d|d||jddS)Nrm0rnrtestz0.*test)rlrQrUrrTZ assertRegexr)rrrtest_debug_accepts_str_portzs   $  z'OptionTests.test_debug_accepts_str_portN)r=r>r?rZAOZAYTZBRKZECZELZGAZIPZNOPrrrrrrrrrrrr4s$rc@seZdZddZdS) ExpectTestscCs@gd}t|}|dg\}}}||d|dddS)z expect(expected, [timeout]) Read until the expected string has been seen, or a timeout is hit (default is no timeout); may block. )rrxrrxrINry)rsr{r3r$)r"rzr*_rGrrr test_expectszExpectTests.test_expectN)r=r>r?rrrrrrsr__main__) r r[r&r contextlibrrZ test.supportrZunittestr(rZTestCaserobjectr@r'rQ BaseSelectorrWcontextmanagerrlrsrtrwrrrrrr=mainrrrrs2   B    b P