U =_S@srdZddlZddlZddlZddlZddlmZddlmZddlmZddlm Z ddlm Z ddl Z ddl m Z dd l m Z dd l mZdd l mZdd l mZdd l mZddl mZddl mZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z ddlm!Z!ddlm"Z"ddlm#Z#ddlm$Z$ddlm%Z%ddlm&Z&dd lm'Z'dd!lm(Z(e )Z*e+ed"e,Z-ej.dd#d$kZ/e"Gd%d&d&eZ0Gd'd(d(e0Z1e"Gd)d*d*e0Z2e"Gd+d,d,e0Z3Gd-d.d.e0Z4e&5e$d/Gd0d1d1e0Z6Gd2d3d3eZ7e8d4krndd5l9m:Z:e:e;dS)6z;Tests for net_connections() and Process.connections() APIs.N)closing)AF_INET)AF_INET6) SOCK_DGRAM) SOCK_STREAM)FREEBSD)LINUX)MACOS)NETBSD)OPENBSD)POSIX)SUNOS)WINDOWS) supports_ipv6)PY3)AF_UNIX) bind_socket)bind_unix_socket)check_connection_ntuple)create_sockets)HAS_CONNECTIONS_UNIX)PsutilTestCase) reap_children)retry_on_failure) serialrun)skip_on_access_denied) SKIP_SYSCONS)tcp_socketpair)unittest)unix_socketpair) wait_for_fileSOCK_SEQPACKET) c@s&eZdZddZddZd ddZdS) ConnectionTestCasecCs$ts ts tjdd}|r t|dSNallkind)r rthisproc connectionsAssertionErrorselfconsr0P/opt/alt/python38/lib64/python3.8/site-packages/psutil/tests/test_connections.pysetUp8s zConnectionTestCase.setUpcCs$ts ts tjdd}|r t|dSr&)rr r*r+r,r-r0r0r1tearDown>s zConnectionTestCase.tearDownr'csfztj|d}Wn"tjk r2tr,YdSYnXfdd|D}|||||dS)zGiven a process PID and its list of connections compare those against system-wide connections retrieved via psutil.net_connections. r(Ncs"g|]}|jkr|ddqS)Npid.0cr5r0r1 Ts zBConnectionTestCase.compare_procsys_connections..)psutilnet_connectionsZ AccessDeniedr sort assertEqual)r.r6Z proc_consr)Zsys_consr0r5r1compare_procsys_connectionsEsz.ConnectionTestCase.compare_procsys_connectionsN)r')__name__ __module__ __qualname__r2r3r?r0r0r0r1r%5sr%c@s0eZdZeedddZddZddZdS) TestBasicOperations requires rootc Cs0t tjddD] }t|qW5QRXdSr&)rr;r<rr.connr0r0r1 test_system\szTestBasicOperations.test_systemc Cs4t$tjddD] }t|qW5QRXdSr&)rr;Processr+rrEr0r0r1 test_processbsz TestBasicOperations.test_processcCs(|jttjdd|jttjdddS)Nz???r()Z assertRaises ValueErrorr*r+r;r<r.r0r0r1test_invalid_kindgsz%TestBasicOperations.test_invalid_kindN) r@rArBrskipIfrrGrIrLr0r0r0r1rCZs  rCc@seZdZdZddZddZddZee dd d Z d d Z ee dd dZ ee dddZee dddZdS)TestUnconnectedSocketsz;Tests sockets which are open but not connected to anything.cCsvtjdd}tdd|D}ts&tr2||S|t|d|djdkrj|||j||dSdS)Nr'r(cSsg|]}|j|fqSr0)fdr7r0r0r1r:rsz=TestUnconnectedSockets.get_conn_from_sock..rr4) r*r+dictr rfilenor>lenrO)r.sockr/Zsmapr0r0r1get_conn_from_sockps  z)TestUnconnectedSockets.get_conn_from_sockcCs||}t||jdkr.||j|||j|j||j|tj tj | }|szt rzt |trz|}|jtkr|dd}|jtkrtrn||j||jtkrtrtjdd}|jt|dd|S)zGiven a socket, makes sure it matches the one obtained via psutil. It assumes this process created one connection only (the one supposed to be checked). r4Nr"r'r()rUrrOr>rRfamilytypeZ getsockoptsocketZ SOL_SOCKETZSO_TYPEZ getsocknamer isinstancebytesdecoderrr laddrrr*r+r?osgetpid)r.rTrFr\r/r0r0r1 check_socket}s*     z#TestUnconnectedSockets.check_socketc CsJd}tttt|d*}||}|jr,t||jt j W5QRXdSN 127.0.0.1raddr) rrrrr_raddrr,r>statusr; CONN_LISTENr.rdrTrFr0r0r1 test_tcp_v4s   z"TestUnconnectedSockets.test_tcp_v4zIPv6 not supportedc CsJd}tttt|d*}||}|jr,t||jt j W5QRXdSN)::1rrc) rrrrr_rer,r>rfr;rgrhr0r0r1 test_tcp_v6s   z"TestUnconnectedSockets.test_tcp_v6c CsJd}tttt|d*}||}|jr,t||jt j W5QRXdSr`) rrrrr_rer,r>rfr; CONN_NONErhr0r0r1 test_udp_v4s   z"TestUnconnectedSockets.test_udp_v4c CsJd}tttt|d*}||}|jr,t||jt j W5QRXdSrj) rrrrr_rer,r>rfr;rmrhr0r0r1 test_udp_v6s   z"TestUnconnectedSockets.test_udp_v6 POSIX onlyc CsL|}tt|td*}||}|jr.t||jt j W5QRXdSN)rW get_testfnrrrr_rer,r>rfr;rmr.testfnrTrFr0r0r1 test_unix_tcps   z$TestUnconnectedSockets.test_unix_tcpc CsL|}tt|td*}||}|jr.t||jt j W5QRXdSrqrrrtr0r0r1 test_unix_udps   z$TestUnconnectedSockets.test_unix_udpN)r@rArB__doc__rUr_rirrMrrlrnror rvrwr0r0r0r1rNls #     rNc@s:eZdZdZeedddZee dddZ dS) TestConnectedSocketzJTest socket pairs which are are actually connected to each other. zunreliable on SUONScCsd}tjddrttt|d\}}zHtjdd}|t|d||djt j ||djt j W5||XdS)Nratcp4r(rcr"rrP) r*r+r,rrcloser>rSrfr;ZCONN_ESTABLISHED)r.rdserverclientr/r0r0r1test_tcps zTestConnectedSocket.test_tcprpcCs`|}t|\}}z2tjdd}|djr<|djr.r"msg)rsrr{r*r+r\rer,r rr>rSrr r )r.rur|r}r/rdr0r0r1 test_unixs.   "zTestConnectedSocket.test_unixN) r@rArBrxrrMr r~r rr0r0r0r1rys    ryc@s.eZdZddZeedddZddZdS) TestFiltersc sfdd}t|dtttgtttg|dttgttg|dtgttg|dttgtg|dtgtg|dtgtg|d ttgtg|d tgtg|d tgtgtr|d tgtttgW5QRXdS) Ncsdtj|dD] }|j||j|q ts`tj|dD] }|j||j|q>dS)Nr()r*r+assertInrVrWrr;r<)r)familiestypesrFrKr0r1checksz'TestFilters.test_filters..checkr'inetinet4tcprztcp6udpudp4udp6r)rrrrrrr!r)r.rr0rKr1 test_filterssV zTestFilters.test_filters)Zonly_ifc stfdd}td}td}tjjtd}|jt t d|d}|jt t d|d}|jt t d|d}|jt t d|d} |} t t|d d } |} t t|d d } tr |} t t|d d } |}t t|d d }nd} d}d}d}tD]}|}t|d |D]}|j| jkrb|||t t| d tjd n|j| jkr|||t t| d tjdnZ|jt| ddkr|||t t|d tjdn,|jt|ddkr6|||t t|d tjdq6qdS)Nc sd}t||j||j||j||j||j||D].} |j| d} | |krx| stqV| rVt| qVt r |j |gdS)N) r'rrinet6rrzrrrrr() rr>rVrWr\rerfr+r,rr?r6) procrFrVrWr\rerfZkindsZ all_kindsr)r/rKr0r1 check_conn>s  z+TestFilters.test_combos..check_conna import socket, time s = socket.socket({family}, socket.SOCK_STREAM) s.bind(('{addr}', 0)) s.listen(5) with open('{testfn}', 'w') as f: f.write(str(s.getsockname()[:2])) time.sleep(60) a import socket, time s = socket.socket({family}, socket.SOCK_DGRAM) s.bind(('{addr}', 0)) with open('{testfn}', 'w') as f: f.write(str(s.getsockname()[:2])) time.sleep(60) )dirrb)rVrdrurkT)deleterPr0)r'rrrrz)r'rrrrr6)r'rrrr)r'rrrr)rtextwrapdedentr]pathbasenamersgetcwdformatintrrpyrunevalr rr*Zchildrenr+r>rSr6rr;rgrrmgetattr)r.rZ tcp_templateZ udp_templateZtestfileZ tcp4_templateZ udp4_templateZ tcp6_templateZ udp6_templateZ tcp4_procZ tcp4_addrZ udp4_procZ udp4_addrZ tcp6_procZ tcp6_addrZ udp6_procZ udp6_addrpr/rFr0rKr1 test_combos:s|       zTestFilters.test_combosc Csttjdd}|t|tr*dnd|D]$}||jtt f||j t q4tjdd}|t|d||djt||dj t trtjdd}|t|d||djt ||dj t tjdd}|t|trdnd|D]&}||jtt f||j t q tjd d}|t|d||djt||dj t trtjd d}|t|d||djt ||dj t tjd d}|t|trd nd|D]*}||jtt f||j t t fqtrdtjd d}|t|d|D]&}||jt ||j t t fqrSrrrVrrrWrrrrr r)r.r/rFr0r0r1 test_counts\          zTestFilters.test_countN)r@rArBrrr rrr0r0r0r1r s+ `rrDc@s&eZdZdZddZeddZdS)TestSystemWideConnectionszTests for net_connections().c sfdd}thddlm}|D]L\}}|dkr>ts>q(|\}}t|}t|tt |||||q(W5QRXdS)NcsD|D]:}j|j||d|jtkr6j|j||dt|qdS)Nr)rrVrrWr)r/rtypes_rFrKr0r1rs  z0TestSystemWideConnections.test_it..checkr) conn_tmapr) rpsutil._commonritemsrr;r<r>rSset)r.rrr)groupsrrr/r0rKr1test_its    z!TestSystemWideConnections.test_itc st}t|}W5QRXgd}g}t|D]:}|}||td|}||}|jq.|D] }t |qnfddt j ddD} D]B| tfdd| D|t } | t| d|qdS)N a  import time, os from psutil.tests import create_sockets with create_sockets(): with open(r'%s', 'w') as f: f.write("hello") time.sleep(60) csg|]}|jkr|qSr0r5r8x)pidsr0r1r: s zFTestSystemWideConnections.test_multi_sockets_procs..r'r(csg|]}|jkr|qSr0r5rr5r0r1r:s )rrSrangersappendrrrr6r r;r<r>rHr+) r.ZsocksZexpectedtimesfnamesiZfnamesrcZsprocZsysconsrr0)r6rr1test_multi_sockets_procss,     z2TestSystemWideConnections.test_multi_sockets_procsN)r@rArBrxrrrr0r0r0r1rsrc@seZdZddZdS)TestMisccCsg}g}ttD]\}|drtt|}t|}|s@t||t||||||||qt r~tj tj t rtj dS)NZCONN_)rr; startswithrstrisupperr,Z assertNotInrr Z CONN_IDLEZ CONN_BOUNDrZCONN_DELETE_TCB)r.ZintsZstrsnameZnumZstr_r0r0r1test_connection_constantss        z"TestMisc.test_connection_constantsN)r@rArBrr0r0r0r1rsr__main__) run_from_name)sl                               $b=J >