a ze,@sdZddlZddlZddlmZddlmZed Zddl Z ddl Z ddl Z e ddZ Gdd d ejZGd d d ejZGd d d eZGdddeZddZedkredS)z&Unit tests for socket timeout feature.N)support) socket_helpernetworkcCsJt|,t||tjtjddWdS1s<0YdS)zResolve an (host, port) to an address. We must perform name resolution before timeout tests, otherwise it will be performed by connect(). rN)rtransient_internetsocket getaddrinfoAF_INET SOCK_STREAM)hostportr E/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_timeout.pyresolve_addresss  rc@sXeZdZdZddZddZddZdd Zd d Zd d Z ddZ ddZ ddZ dS)CreationTestCasez9Test case for socket.gettimeout() and socket.settimeout()cCsttjtj|_dSN)rr r sockselfr r rsetUpszCreationTestCase.setUpcCs|jdSrrcloserr r rtearDown"szCreationTestCase.tearDowncCs||jdddS)Nztimeout not disabled by default) assertEqualr gettimeoutrr r rtestObjectCreation%sz#CreationTestCase.testObjectCreationcCs^|jd||jd|jd||jd|jd||jddS)NgzGa@)r settimeoutrrrr r rtestFloatReturnValue*s    z%CreationTestCase.testFloatReturnValuecCsP|jd|t|jtd|jd|t|jtddS)N?g333333@)rrrtyperrr r rtestReturnType5s  zCreationTestCase.testReturnTypecCs|jd|jd|jd|jd|t|jjd|t|jjd|t|jjd|t|jjg|t|jji|t|jjddS)Nrr y)rr assertRaises TypeErrorrr r r testTypeCheck=s    zCreationTestCase.testTypeCheckcCs:|t|jjd|t|jjd|t|jjddS)Ng)r% ValueErrorrrrr r rtestRangeCheckJszCreationTestCase.testRangeCheckcCs|jd|jd||jd|jd||jd|jd|jd||jd|jd||jddS)N TFr#)rr setblockingrrrr r rtestTimeoutThenBlockingPs      z(CreationTestCase.testTimeoutThenBlockingcCsX|jd|jd||jd|jd|jd||jddS)NFrT)rr,rrrrr r rtestBlockingThenTimeout^s     z(CreationTestCase.testBlockingThenTimeoutN) __name__ __module__ __qualname____doc__rrrrr"r'r*r-r.r r r rrs  rc@s*eZdZdZejZddZeZddZ dS)TimeoutTestCaseg@cCs tdSr)NotImplementedErrorrr r rrtszTimeoutTestCase.setUpc Gs|j|t|j|}t|D]Z}t}z ||Wq tjyx}z&t|}WYd}~qWYd}~q d}~00q |d| |||j | ||ddS)z Test the specified socket method. The method is run at most `count` times and must raise a socket.timeout within `timeout` + self.fuzz seconds. Nzsocket.timeout was not raisedr ) rrgetattrrangetime monotonicrtimeoutZfailZ assertLessfuzzZ assertGreater) rcountr9methodargsit1eZdeltar r r_sock_operationys     & zTimeoutTestCase._sock_operationN) r/r0r1r:rZHOST localhostrrrAr r r rr3is r3c@s\eZdZdZddZddZedddd Zd d Z d d Z ddZ ddZ ddZ dS)TCPTimeoutTestCasez3TCP test case for socket.socket() timeout functionscCs"ttjtj|_tdd|_dS)Nzwww.python.org.P)rr r rr addr_remoterr r rrszTCPTimeoutTestCase.setUpcCs|jdSrrrr r rrszTCPTimeoutTestCase.tearDownTz*need to replace these hosts; see bpo-35518c Cs$tdd}tdd}d}ttjtj}tj}||zdz||WnFtjy\Yn4t y}z|j t j krzd}WYd}~n d}~00W| ~n | ~0|r| d|d|d ||d|d ||_t|jd"|d d d |jWdn1s0YdS) Nzblackhole.snakebite.netiZzwhitehole.snakebite.neti[TFzWe didn't receive a connection reset (RST) packet from {}:{} within {} seconds, so we're unable to test connect timeout against the corresponding {}:{} (which is configured to silently drop packets).rrgMbP?connect)rrr r rZLOOPBACK_TIMEOUTrrFr9OSErrorerrnoZ ECONNREFUSEDrZskipTestformatrErrrA)rZ blackholeZ whiteholeskiprr9errr r rtestConnectTimeouts<   z%TCPTimeoutTestCase.testConnectTimeoutcCsRt|jd.|j|j|ddddWdn1sD0YdS)Nrr?recv)rrrErrFrArr r rtestRecvTimeoutsz"TCPTimeoutTestCase.testRecvTimeoutcCs,t|j|j|j|ddddS)NrrMaccept)r bind_portrrBlistenrArr r rtestAcceptTimeouts z$TCPTimeoutTestCase.testAcceptTimeoutcCsnttjtjJ}t||j||j| | dddddWdn1s`0YdS)NdrMsendX@  rr r rrRrBrSrrF getsocknamerArZservr r rtestSends zTCPTimeoutTestCase.testSendc CstttjtjP}t||j||j| | ddddd| Wdn1sf0YdS)NrUrMsendtorWrXrYr[r r r testSendtoszTCPTimeoutTestCase.testSendtocCsnttjtjJ}t||j||j| | dddddWdn1s`0YdS)NrUrMsendallrWrXrYr[r r r testSendall s zTCPTimeoutTestCase.testSendallN)r/r0r1r2rrunittestZskipIfrLrPrTr\r^r`r r r rrCs  Q  rCc@s(eZdZdZddZddZddZdS) UDPTimeoutTestCasez3UDP test case for socket.socket() timeout functionscCsttjtj|_dSr)rr SOCK_DGRAMrrr r rrszUDPTimeoutTestCase.setUpcCs|jdSrrrr r rrszUDPTimeoutTestCase.tearDowncCs$t|j|j|dddddS)NrrMrecvfromrO)rrRrrBrArr r rtestRecvfromTimeoutsz&UDPTimeoutTestCase.testRecvfromTimeoutN)r/r0r1r2rrrer r r rrbsrbcCstddS)Nr)rZrequiresr r r r setUpModule%srf__main__)r2 functoolsratestrZ test.supportrZis_resource_enabledZ skip_expectedr7rHr lru_cacherZTestCaserr3rCrbrfr/mainr r r rs$    M'