a  ze D @sdZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl Z ddl Z ddlZddlZddlZddlmZddlmZddlmZmZz ddlZWneydZYn0ddlmZddlmZddlmZdd lmZdd lmZdd lmZdd l m!Z!d dZ"e"dZ#e"dZ$e"dZ%e"dZ&ddddddddddd Z'ddZ(d d!d"d#Z)d$d%Z*d&d'Z+e!j,fd(d)Z-d*d+Z.Gd,d-d-eZ/Gd.d/d/eZ0Gd0d1d1Z1Gd2d3d3e1e0Z2d4d5d6d7Z3e4e d8rnGd9d:d:e j5eZ6Gd;d<dd>e7Z8Gd?d@d@e1e8Z9dAdBZ:ej;dCdDZdKdLZ?GdMdNdNej@ZAGdOdPdPejBZCdQdRZDGdSdTdTeEZFGdUdVdVZGdWdXZHGdYdZdZejIZIej;d[d\ZJe jKe jLe jMfd]d^ZNdS)_zUtilities shared by tests.N)mock) HTTPServer)WSGIRequestHandler WSGIServer) base_events)events)format_helpers)futures)tasks)logger)supportcCs^ttdr*tjtj|}tj|r*|Stjtjtd|}tj|rR|St |dS)N TEST_HOME_DIRz..) hasattrr ospathjoinr isfiledirname__file__FileNotFoundError)filenamefullnamerK/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_asyncio/utils.py data_file's   rz ssl_cert.pemz ssl_key.pemz keycert3.pemz pycacert.pem)z)http://testca.pythontest.net/testca/ocsp/)z0http://testca.pythontest.net/testca/pycacert.cer)z2http://testca.pythontest.net/testca/revocation.crl))Z countryNameZXY))organizationNamezPython Software Foundation CA)) commonNamez our-ca-serverzOct 28 14:23:16 2037 GMTzAug 29 14:23:16 2018 GMTZCB2D80995A69525C)r))Z localityNamezCastle Anthrax))rzPython Software Foundation))r localhost))ZDNSr) ZOCSPZ caIssuersZcrlDistributionPointsZissuerZnotAfterZ notBeforeZ serialNumberZsubjectZsubjectAltNameversioncCs*ttj}|ttd|_tj|_|SNF) ssl SSLContextZPROTOCOL_TLS_SERVERload_cert_chainONLYCERTONLYKEYcheck_hostname CERT_NONE verify_mode)Zserver_contextrrrsimple_server_sslcontextIs   r*T)disable_verifycCs"ttj}d|_|rtj|_|Sr!)r"r#ZPROTOCOL_TLS_CLIENTr'r(r))r+Zclient_contextrrrsimple_client_sslcontextQs  r,cCstdur dSttjSdSN)r"r#Z PROTOCOL_TLSrrrrdummy_ssl_contextYsr.cCsDdd}|}||}d|_z||W|n |0dS)NcsdSr-rrrrronceaszrun_briefly..onceF)Z create_taskZ_log_destroy_pendingrun_until_completeclose)loopr/gentrrr run_briefly`s  r5cCsLt|}|sH|dur6|t}|dkr6t|tdq dS)NrgMbP?)time monotonicr TimeoutErrorr0r sleep)r2Zpredtimeoutdeadlinerrr run_untilns  r<cCs||j|dS)zLegacy API to run once through the event loop. This is the recommended pattern for test code. It will poll the selector once and run all callbacks scheduled in response to I/O events. N)Z call_soonstopZ run_foreverr2rrrrun_oncexs r?c@seZdZddZddZdS)SilentWSGIRequestHandlercCstSr-)ioStringIOselfrrr get_stderrsz#SilentWSGIRequestHandler.get_stderrcGsdSr-r)rDformatargsrrr log_messagesz$SilentWSGIRequestHandler.log_messageN)__name__ __module__ __qualname__rErHrrrrr@sr@cs*eZdZejZfddZddZZS)SilentWSGIServercs"t\}}||j||fSr-super get_request settimeoutrequest_timeoutrDrequestZ client_addr __class__rrrOs zSilentWSGIServer.get_requestcCsdSr-rrDrSclient_addressrrr handle_errorszSilentWSGIServer.handle_error) rIrJrKr LOOPBACK_TIMEOUTrQrOrX __classcell__rrrTrrLs rLc@seZdZddZdS)SSLWSGIServerMixincCsTt}|tt|j|dd}z|||||WntyNYn0dS)NT)Z server_side) r"r#r$r%r&Z wrap_socketZRequestHandlerClassr1OSError)rDrSrWcontextZssockrrrfinish_requests   z!SSLWSGIServerMixin.finish_requestN)rIrJrKr^rrrrr[sr[c@s eZdZdS) SSLWSGIServerNrIrJrKrrrrr_sr_F)use_sslc#sddfdd}|r|n|}||t|j_tjfddd}|z"VW| n| 0dS)Ncss<t|d}|r8|dt|d}|V|t|8}q dS)NZCONTENT_LENGTHz wsgi.inputi)intreadminlen)environsizedatarrrr2s  z_run_test_server..loopcs2d}dg}||||ddkr(|SdgSdS)Nz200 OK)z Content-typez text/plainZ PATH_INFOz/loops Test messager)rfZstart_responsestatusZheadersr>rrapps   z_run_test_server..appcs jddS)Ng?)Z poll_interval)Z serve_foreverr)httpdrrz"_run_test_server..)target) r@Zset_appZserver_addressaddress threadingThreadstartshutdownZ server_closer)rora server_clsserver_ssl_clsrjZ server_classZ server_threadr)rkr2r_run_test_servers$     rvAF_UNIXc@seZdZddZdS)UnixHTTPServercCstj|d|_d|_dS)N 127.0.0.1P) socketserverUnixStreamServer server_bindZ server_nameZ server_portrCrrrr}s zUnixHTTPServer.server_bindN)rIrJrKr}rrrrrxsrxcs*eZdZejZddZfddZZS)UnixWSGIServercCst||dSr-)rxr}Z setup_environrCrrrr}s zUnixWSGIServer.server_bindcs"t\}}||j|dfS)N)ryrMrRrTrrrOs zUnixWSGIServer.get_request) rIrJrKr rYrQr}rOrZrrrTrr~sr~c@seZdZddZdS)SilentUnixWSGIServercCsdSr-rrVrrrrXsz!SilentUnixWSGIServer.handle_errorN)rIrJrKrXrrrrrsrc@s eZdZdS)UnixSSLWSGIServerNr`rrrrrsrcCs2t}|jWdS1s$0YdSr-)tempfileNamedTemporaryFilename)filerrrgen_unix_socket_paths rccs\t}z,|VWzt|WqXty0YqX0n$zt|WntyTYn00dSr-)rrunlinkr\)rrrrunix_socket_paths  rccs@t&}t||ttdEdHWdn1s20YdSN)rorartru)rrvrr)rarrrrrun_test_unix_server s rryhostportraccst||f|ttdEdHdSr)rvrLr_rrrrrun_test_servers rcCsLi}t|D](}|dr&|dr&q tdd||<q td|f|j|S)N__) return_valueZ TestProtocol)dir startswithendswith MockCallbacktype __bases__)baseZdctrrrrmake_test_protocols  rc@s6eZdZddZd ddZddZdd Zd d ZdS) TestSelectorcCs i|_dSr-keysrCrrr__init__'szTestSelector.__init__NcCst|d||}||j|<|S)Nr) selectors SelectorKeyr)rDfileobjrrhkeyrrrregister*s zTestSelector.registercCs |j|Sr-)rpop)rDrrrr unregister/szTestSelector.unregistercCsgSr-r)rDr:rrrselect2szTestSelector.selectcCs|jSr-rrCrrrget_map5szTestSelector.get_map)N)rIrJrKrrrrrrrrrr%s  rcseZdZdZd.fdd ZddZddZfd d Zd d Zd dZ ddZ ddZ ddZ ddZ ddZddZddZddZdd Zd!d"Zd#d$Zfd%d&Zdd'fd(d) Zd*d+Zd,d-ZZS)/TestLoopaLoop for unittests. It manages self time directly. If something scheduled to be executed later then on next loop iteration after all ready handlers done generator passed to __init__ is calling. Generator should be like this: def gen(): ... when = yield ... ... = yield time_advance Value returned by yield is absolute time of next scheduled handler. Value passed to yield is time advance to move loop's time forward. Ncsvt|dur"dd}d|_nd|_||_t|jd|_d|_g|_t|_ i|_ i|_ | t |_dS)Ncss dVdSr-rrrrrr3PszTestLoop.__init__..genFTrg& .>)rNr_check_on_close_gennext_timeZ_clock_resolution_timersr _selectorreaderswritersreset_countersweakrefWeakValueDictionary _transports)rDr3rTrrrLs  zTestLoop.__init__cCs|jSr-rrCrrrr6csz TestLoop.timecCs|r|j|7_dS)zMove test time forward.Nr)rDadvancerrr advance_timefszTestLoop.advance_timecs@t|jrJd|j||j|ksZJd|j|dS)Nzfd {} is not registeredz {!r} != {!r})rrFrrrrrr assert_writers zTestLoop.assert_writerc Cszt|tsDzt|}Wn(tttfyBtd|dYn0z|j|}WntydYn0t d||dS)NzInvalid file object: {!r}z.File descriptor {!r} is used by transport {!r}) isinstancerbfilenoAttributeError TypeError ValueErrorrFrKeyError RuntimeError)rDrZ transportrrr_ensure_fd_no_transports$  z TestLoop._ensure_fd_no_transportcGs|||j||g|RS)zAdd a reader callback.)rrrrrr add_readers zTestLoop.add_readercCs||||S)zRemove a reader callback.)rrrrrr remove_readers zTestLoop.remove_readercGs|||j||g|RS)zAdd a writer callback..)rrrrrr add_writers zTestLoop.add_writercCs||||S)zRemove a writer callback.)rrrrrr remove_writers zTestLoop.remove_writercCstt|_tt|_dSr-) collections defaultdictrbrrrCrrrrs zTestLoop.reset_counterscs6t|jD]}|j|}||qg|_dSr-)rN _run_oncerrrr)rDwhenrrTrrrs     zTestLoop._run_once)r]cs(|j|tj||g|Rd|iS)Nr])rappendrNcall_at)rDrrr]rGrTrrrs zTestLoop.call_atcCsdSr-r)rDZ event_listrrr_process_eventsszTestLoop._process_eventscCsdSr-rrCrrr_write_to_selfszTestLoop._write_to_self)N)rIrJrK__doc__rr6rr1rrrrrrrrrrrrrrrrrrZrrrTrr9s,   rcKstjfddgi|S)Nspec__call__)rZMock)kwargsrrrrsrc@seZdZdZddZdS) MockPatternzA regex based str with a fuzzy __eq__. Use this helper with 'mock.assert_called_with', or anywhere where a regex comparison between strings is needed. For instance: mock_call.assert_called_with(MockPattern('spam.*ham')) cCsttt||tjSr-)boolresearchstrSrDotherrrr__eq__szMockPattern.__eq__N)rIrJrKrrrrrrrsrc@seZdZddZddZdS)MockInstanceOfcCs ||_dSr-)_type)rDrrrrrszMockInstanceOf.__init__cCs t||jSr-)rrrrrrrszMockInstanceOf.__eq__N)rIrJrKrrrrrrrsrcCs$t|}|dur td|f|S)Nzunable to get the source of %r)rZ_get_function_sourcer)funcsourcerrrget_function_sources rc@sHeZdZeddZddddZddd Zd d Zd d ZddZ dS)TestCasecCs|jdur0|s"||n|jjdd|t}|durz |}Wnt yfYn.0t |t j rt |j}|D] }|qdS)NT)wait)Z_default_executorZ is_closedr0Zshutdown_default_executorrsr1r Zmaybe_get_event_loop_policyZget_child_watcherNotImplementedErrorrasyncioZThreadedChildWatcherlistZ_threadsvaluesr)r2ZpolicyZwatcherthreadsthreadrrr close_loops    zTestCase.close_loopT)cleanupcCs,|dus Jtd|r(||j|dSr-)rset_event_loopZ addCleanupr)rDr2rrrrrs  zTestCase.set_event_loopNcCst|}|||Sr-)rr)rDr3r2rrr new_test_loops zTestCase.new_test_loopcCs |jt_dSr-)_get_running_looprrCrrrunpatch_get_running_loopsz!TestCase.unpatch_get_running_loopcCs tj|_ddt_t|_dS)NcSsdSr-rrrrrrl$rmz TestCase.setUp..)rrr Zthreading_setup_thread_cleanuprCrrrsetUp"s zTestCase.setUpcCsB|td|td|tj|j t dS)N)NNN) rrrZ assertEqualsysexc_infoZ doCleanupsr Zthreading_cleanupr reap_childrenrCrrrtearDown's   zTestCase.tearDown)N) rIrJrK staticmethodrrrrrrrrrrrs  rc cs<tj}z$ttjddVWt|n t|0dS)zrContext manager to disable asyncio logger. For example, it can be used to ignore warnings in debug mode. rN)r levelsetLevelloggingCRITICAL)Z old_levelrrrdisable_logger5s r cCs*ttj}||_||_||_d|j_|S)z'Create a mock of a non-blocking socket.g)rZ MagicMocksocketprotorfamily gettimeoutr)rrrsockrrrmock_nonblocking_socketCs  r)Orrr contextlibrAr rrrrr{rrrpr6ZunittestrrZ http.serverrZwsgiref.simple_serverrrr" ImportErrorrrrr r Z asyncio.logr testr rr%r&ZSIGNED_CERTFILEZ SIGNING_CAZPEERCERTr*r,r.r5Z SHORT_TIMEOUTr<r?r@rLr[r_rvrr|rxr~rrrcontextmanagerrrrr BaseSelectorrZ BaseEventLooprrrrrrrr  IPPROTO_TCP SOCK_STREAMAF_INETrrrrrs                 #   % 8