U =_S@srdZddlZddlZddlZddlZddlmZddlmZddlmZddlm Z ddlm Z ddl Z ddl m Z dd l m Z dd l mZdd l mZdd l mZdd l mZddl mZddl mZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z ddlm!Z!ddlm"Z"ddlm#Z#ddlm$Z$ddlm%Z%ddlm&Z&dd lm'Z'dd!lm(Z(e )Z*e+ed"e,Z-ej.dd#d$kZ/e"Gd%d&d&eZ0Gd'd(d(e0Z1e"Gd)d*d*e0Z2e"Gd+d,d,e0Z3Gd-d.d.e0Z4e&5e$d/Gd0d1d1e0Z6Gd2d3d3eZ7e8d4krndd5l9m:Z:e:e;dS)6z;Tests for net_connections() and Process.connections() APIs.N)closing)AF_INET)AF_INET6) SOCK_DGRAM) SOCK_STREAM)FREEBSD)LINUX)MACOS)NETBSD)OPENBSD)POSIX)SUNOS)WINDOWS) supports_ipv6)PY3)AF_UNIX) bind_socket)bind_unix_socket)check_connection_ntuple)create_sockets)HAS_CONNECTIONS_UNIX)PsutilTestCase) reap_children)retry_on_failure) serialrun)skip_on_access_denied) SKIP_SYSCONS)tcp_socketpair)unittest)unix_socketpair) wait_for_fileSOCK_SEQPACKET) c@s&eZdZddZddZd ddZdS) ConnectionTestCasecCstststjdd}dSNallkind)r rthisproc connectionsselfconsr/P/opt/alt/python38/lib64/python3.8/site-packages/psutil/tests/test_connections.pysetUp8s zConnectionTestCase.setUpcCstststjdd}dSr&)rr r*r+r,r/r/r0tearDown>s zConnectionTestCase.tearDownr'csfztj|d}Wn"tjk r2tr,YdSYnXfdd|D}|||||dS)zGiven a process PID and its list of connections compare those against system-wide connections retrieved via psutil.net_connections. r(Ncs"g|]}|jkr|ddqS)Npid.0cr4r/r0 Ts zBConnectionTestCase.compare_procsys_connections..)psutilnet_connectionsZ AccessDeniedr sort assertEqual)r-r5Z proc_consr)Zsys_consr/r4r0compare_procsys_connectionsEsz.ConnectionTestCase.compare_procsys_connectionsN)r')__name__ __module__ __qualname__r1r2r>r/r/r/r0r%5sr%c@s0eZdZeedddZddZddZdS) TestBasicOperations requires rootc Cs0t tjddD] }t|qW5QRXdSr&)rr:r;rr-connr/r/r0 test_system\szTestBasicOperations.test_systemc Cs4t$tjddD] }t|qW5QRXdSr&)rr:Processr+rrDr/r/r0 test_processbsz TestBasicOperations.test_processcCs(|jttjdd|jttjdddS)Nz???r()Z assertRaises ValueErrorr*r+r:r;r-r/r/r0test_invalid_kindgsz%TestBasicOperations.test_invalid_kindN) r?r@rArskipIfrrFrHrKr/r/r/r0rBZs  rBc@seZdZdZddZddZddZee dd d Z d d Z ee dd dZ ee dddZee dddZdS)TestUnconnectedSocketsz;Tests sockets which are open but not connected to anything.cCsvtjdd}tdd|D}ts&tr2||S|t|d|djdkrj|||j||dSdS)Nr'r(cSsg|]}|j|fqSr/)fdr6r/r/r0r9rsz=TestUnconnectedSockets.get_conn_from_sock..rr3) r*r+dictr rfilenor=lenrN)r-sockr.Zsmapr/r/r0get_conn_from_sockps  z)TestUnconnectedSockets.get_conn_from_sockcCs||}t||jdkr.||j|||j|j||j|tj tj | }|szt rzt |trz|}|jtkr|dd}|jtkrtrn||j||jtkrtrtjdd}|jt|dd|S)zGiven a socket, makes sure it matches the one obtained via psutil. It assumes this process created one connection only (the one supposed to be checked). r3Nr"r'r()rTrrNr=rQfamilytypeZ getsockoptsocketZ SOL_SOCKETZSO_TYPEZ getsocknamer isinstancebytesdecoderrr laddrrr*r+r>osgetpid)r-rSrEr[r.r/r/r0 check_socket}s*     z#TestUnconnectedSockets.check_socketc Cs@d}tttt|d }||}||jtjW5QRXdSN 127.0.0.1raddr) rrrrr^r=statusr: CONN_LISTENr-rcrSrEr/r/r0 test_tcp_v4s z"TestUnconnectedSockets.test_tcp_v4zIPv6 not supportedc Cs@d}tttt|d }||}||jtjW5QRXdSN)::1rrb) rrrrr^r=rdr:rerfr/r/r0 test_tcp_v6s z"TestUnconnectedSockets.test_tcp_v6c Cs@d}tttt|d }||}||jtjW5QRXdSr_) rrrrr^r=rdr: CONN_NONErfr/r/r0 test_udp_v4s z"TestUnconnectedSockets.test_udp_v4c Cs@d}tttt|d }||}||jtjW5QRXdSrh) rrrrr^r=rdr:rkrfr/r/r0 test_udp_v6s z"TestUnconnectedSockets.test_udp_v6 POSIX onlyc CsB|}tt|td }||}||jtjW5QRXdSN)rV get_testfnrrrr^r=rdr:rkr-testfnrSrEr/r/r0 test_unix_tcps z$TestUnconnectedSockets.test_unix_tcpc CsB|}tt|td }||}||jtjW5QRXdSrorprrr/r/r0 test_unix_udps z$TestUnconnectedSockets.test_unix_udpN)r?r@rA__doc__rTr^rgrrLrrjrlrmr rtrur/r/r/r0rMls #     rMc@s:eZdZdZeedddZee dddZ dS) TestConnectedSocketzJTest socket pairs which are are actually connected to each other. zunreliable on SUONScCstd}tt|d\}}zHtjdd}|t|d||djtj ||djtj W5||XdS)Nr`rbtcp4r(r"rrO) rrcloser*r+r=rRrdr:ZCONN_ESTABLISHED)r-rcserverclientr.r/r/r0test_tcps zTestConnectedSocket.test_tcprncCs*|}t|\}}ztjdd}ts*tr8dd|D}|jt|d|dt sXtsXt r||dj d||d j d|||dj p|d j nvt r|dj |dj |d j |d j fD]}||dqn:||dj p|d j |||dj p |d j |W5||XdS) Nunixr(cSsg|]}|jdkr|qS)z /var/run/log)raddrr6r/r/r0r9s z1TestConnectedSocket.test_unix..r"msgrrO)rqrryr*r+r rr=rRrr r~r[r )r-rsrzr{r.rcr/r/r0 test_unixs*   "zTestConnectedSocket.test_unixN) r?r@rArvrrLr r|r rr/r/r/r0rws    rwc@s.eZdZddZeedddZddZdS) TestFiltersc sfdd}t|dtttgtttg|dttgttg|dtgttg|dttgtg|dtgtg|dtgtg|d ttgtg|d tgtg|d tgtgtr|d tgtttgW5QRXdS) Ncsdtj|dD] }|j||j|q ts`tj|dD] }|j||j|q>dS)Nr()r*r+assertInrUrVrr:r;)r)familiestypesrErJr/r0checksz'TestFilters.test_filters..checkr'inetinet4tcprxtcp6udpudp4udp6r})rrrrrrr!r)r-rr/rJr0 test_filterssV zTestFilters.test_filters)Zonly_ifc stfdd}td}td}tjjtd}|jt t d|d}|jt t d|d}|jt t d|d}|jt t d|d} |} t t|d d } |} t t|d d } tr |} t t|d d } |}t t|d d }nd} d}d}d}tD]}|}t|d |D]}|j| jkrb|||t t| d tjd n|j| jkr|||t t| d tjdnZ|jt| ddkr|||t t|d tjdn,|jt|ddkr6|||t t|d tjdq6qdS)Nc sd}t||j||j||j||j||j||D]} |j| d} | |krVqVqVtr |j |gdS)N) r'rrinet6rrxrrrrr() rr=rUrVr[r~rdr+rr>r5) procrErUrVr[r~rdZkindsZ all_kindsr)r.rJr/r0 check_conn>s z+TestFilters.test_combos..check_conna import socket, time s = socket.socket({family}, socket.SOCK_STREAM) s.bind(('{addr}', 0)) s.listen(5) with open('{testfn}', 'w') as f: f.write(str(s.getsockname()[:2])) time.sleep(60) a import socket, time s = socket.socket({family}, socket.SOCK_DGRAM) s.bind(('{addr}', 0)) with open('{testfn}', 'w') as f: f.write(str(s.getsockname()[:2])) time.sleep(60) )dirra)rUrcrsriT)deleterOr/)r'rrrrx)r'rrrrr5)r'rrrr)r'rrrr)rtextwrapdedentr\pathbasenamerqgetcwdformatintrrpyrunevalr rr*Zchildrenr+r=rRr5rr:rerrkgetattr)r-rZ tcp_templateZ udp_templateZtestfileZ tcp4_templateZ udp4_templateZ tcp6_templateZ udp6_templateZ tcp4_procZ tcp4_addrZ udp4_procZ udp4_addrZ tcp6_procZ tcp6_addrZ udp6_procZ udp6_addrpr.rEr/rJr0 test_combos:s|       zTestFilters.test_combosc Csttjdd}|t|tr*dnd|D]$}||jtt f||j t q4tjdd}|t|d||djt||dj t trtjdd}|t|d||djt ||dj t tjdd}|t|trdnd|D]&}||jtt f||j t q tjd d}|t|d||djt||dj t trtjd d}|t|d||djt ||dj t tjd d}|t|trd nd|D]*}||jtt f||j t t fqtrdtjd d}|t|d|D]&}||jt ||j t t fqts>q(|\}}t|}t|tt |||||q(W5QRXdS)NcsD|D]:}j|j||d|jtkr6j|j||dt|qdS)Nr)rrUrrVr)r.rtypes_rErJr/r0rs  z0TestSystemWideConnections.test_it..checkr) conn_tmapr}) rpsutil._commonritemsrr:r;r=rRset)r-rrr)groupsrrr.r/rJr0test_its    z!TestSystemWideConnections.test_itc st}t|}W5QRXgd}g}t|D]:}|}||td|}||}|jq.|D] }t |qnfddt j ddD} D]B| tfdd| D|t } | t| d|qdS)N a  import time, os from psutil.tests import create_sockets with create_sockets(): with open(r'%s', 'w') as f: f.write("hello") time.sleep(60) csg|]}|jkr|qSr/r4r7x)pidsr/r0r9 s zFTestSystemWideConnections.test_multi_sockets_procs..r'r(csg|]}|jkr|qSr/r4rr4r/r0r9s )rrRrangerqappendrrrr5r r:r;r=rGr+) r-ZsocksZexpectedtimesfnamesiZfnamesrcZsprocZsysconsrr/)r5rr0test_multi_sockets_procss,     z2TestSystemWideConnections.test_multi_sockets_procsN)r?r@rArvrrrr/r/r/r0rsrc@seZdZddZdS)TestMisccCs|g}g}ttD]L}|drtt|}t|}|t||||||||qtrntjtj t rxtj dS)NZCONN_) rr: startswithrstrZ assertNotInrr Z CONN_IDLEZ CONN_BOUNDrZCONN_DELETE_TCB)r-ZintsZstrsnameZnumZstr_r/r/r0test_connection_constantss       z"TestMisc.test_connection_constantsN)r?r@rArr/r/r/r0rsr__main__) run_from_name)sl                               $b=J >