wo4h&"ddlZddlZddlZddlZddlZddlZddlZddlZddl m Z ddl m Z m Z mZmZddlmZddlmZddlmZmZddlmZmZddlmZdd lmZmZdd lm Z ej!e"Z#Gd d Z$Gd de$eZ%dS)N) Generator)APIErrorAPIErrorTooManyRequests APITokenError send_message)license)Core)Message MessageType) MessageSinkexpect)PersistentMessagesQueue)recurring_checkScope)ServerJSONEncoderceZdZdZeejddZdZ dZ de j fdZ dd Zd Zed Zejdeejd d ffd Zeejdedd fdZee dZeddZdede fdZ!de defdZ"ddZ#d S)SendToServerClientaSend messages to server. * process Reportable messages; * add them to a pending messages list; * send all pending messages to server when list is full (contains _PENDING_MESSAGES_LIMIT items or more); * send all pending messages on plugin shutdown. IMUNIFYAV_MESSAGES_COUNT_TO_SENDi,2loopcNK||_t|_tj|_tj|_|| |_ || |_ dSN) _loopr_pendingasyncioEvent _try_sendLock_lock create_task_send _sender_task_invoke_send_message_invoke_send_message_task)selfrs S/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/client.py create_sinkzSendToServerClient.create_sink0s /11  \^^  ,,TZZ\\::)-)9)9  % % ' '* * &&&returnNcK tj||jd{Vn#tj$rt d|j|js\|j tj tj 5|jd{Vdddn #1swxYwYYnwxYw|j jdkrrt d|j j|j t d|j dSdS)a When shutdown begins it gives 50 seconds to send _pending messages to the server (after 60 seconds process will bi killed by systemd) If stop() isn't done in 50 second it terminates process of sending messages and logs error Nz5Timeout (%ds) sending messages to server on shutdown.rz&Save %s messages to persistent storagezStored queue %r)rwait_forstop_SHUTDOWN_SEND_TIMEOUT TimeoutErrorloggererrorr# cancelledcancel contextlibsuppressCancelledErrorr buffer_sizewarningpush_buffer_to_storageqsizer&s r'shutdownzSendToServerClient.shutdown:s ,"499;;0KLL L L L L L L L L# , , , LLG+   $..00 ,!((***()?@@,,++++++++,,,,,,,,,,,,,,, , = $q ( ( NN8 )    M 0 0 2 2 2 NN,dm.A.A.C.C D D D D D ) (s427A?C6C CC CC CCcKtd|jt jt j5|jd{Vdddn #1swxYwYtd|j4d{Vtd|j t jt j5|j d{Vdddn #1swxYwY| d{Vdddd{VdS#1d{VswxYwYdS)aq Stop sending. 1. wait for the lock being available i.e., while _sender_task finishes the current round of sending message (if it takes too long, then the timeout in shutdown() is triggered 2. once the sending round complete (we got the lock), cancel the next iteration of the _sender_task (it exits) 3. send _pending messages (again, if it takes too long, the timeout in shutdown() is triggered and the coroutine is cancelled That method makes sure that the coroutine that was started in it has ended. It excludes a situation when: -> The result of a coroutine that started BEFORE shutdown() is started. -> And the process of sending messages from _pending is interrupted because of it z2SendToServer.stop cancel _invoke_send_message_taskNzSendToServer.stop wait lockz4SendToServer.stop lock acquired, cancel _sender_task) r0infor%r3r4r5rr6r r#_send_pending_messagesr;s r'r-zSendToServerClient.stopVsa0  HIII &--///  !7 8 8 1 10 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1222: 0 0 0 0 0 0 0 0 KKN O O O   $ $ & & &$W%;<< ( ('''''''' ( ( ( ( ( ( ( ( ( ( ( ( ( ( (--// / / / / / / / 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0sIA..A25A2 AE2D  E D ED E EEc,|tj|tj|tj|Sr)set_product_namer LicenseCLNget_product_name set_server_id get_server_id set_license get_token)apis r'_set_api_attrsz!SendToServerClient._set_api_attrs}sn W/@@BBCCC ',::<<=== *4466777 r)c#*Ktjd}tjd5}t jtj ||}| |VddddS#1swxYwYdS)NIMUNIFYAV_API_BASE) max_workers)executor) osenvironget concurrentfuturesThreadPoolExecutorrSendMessageAPIr VERSIONrI)r&base_urlrNrHs r'_get_apizSendToServerClient._get_apis:>>"677   2 2q 2 A A +X- hC%%c** * * *  + + + + + + + + + + + + + + + + + +s9BB B messagecKd|vrtj|d<d|vrtjj|d<|j|||jdS)N timestamp message_id) timeuuiduuid4hexrput_encode_data_to_put_in_queuersetr&rYs r'send_to_serverz!SendToServerClient.send_to_servers g % %#'9;;GK w & &$(JLL$4GL ! $;;GDDEEE r)c<K|jdSr)rrcr;s r'r$z'SendToServerClient._invoke_send_messages  r)rctK|jd{V|j|j|jkrt dd}|j4d{Vt d | d{Vn8#tj $r&}t d|}Yd}~nd}~wwxYwdddd{Vn#1d{VswxYwYt d|r|dSdS)NzSendToServer._send wait lockz SendToServer._send lock acquiredz&SendToServer._send cancelled unlockingz SendToServer._send lock released) rwaitclearrr:_PENDING_MESSAGES_LIMITr0r>r r?rr6)r&need_to_canceles r'r"zSendToServerClient._sendsn!!#########  =   D$@ @ @ KK6 7 7 7!Nz ' ' ' ' ' ' ' ' >???'557777777777-'''KK HIII%&NNNNNN'  ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' ' KK: ; ; ; %$$ A @ % %s<D"B=<D=C2 C-(D-C22D DDdatacftj|tdz}|S)N)cls )jsondumpsrencode)r&rmmsgs r'rbz/SendToServerClient._encode_data_to_put_in_queues*j#4555<zz||r)cDttj|Sr)r rqloadsrds r'_decode_messagez"SendToServerClient._decode_messagestz'**+++r)c `K|j}tdt ||5}|jd}|D]\}}|r|j||% ||}| |d{Vtd| d| dd#t$rn}t d| d| dd||j||d }Yd}~d}~wt$rn}t d | d| dd||j||d }Yd}~d}~wt$rl}t d | d| dd||j||Yd}~d}~wwxYwdddn #1swxYwYtd |jdS) NzSending %s messagesF)r[zmessage sent %smethodr\)ryr\z2Too many requests on send message %s to server: %sTz,Token error on send message %s to server: %sz'Failed to send message %s to server: %sz Unsuccessful to send %s messages)rpop_allr0r>lenrX server_idrarwrrQrr8rrr:)r&messagesrH stop_attemptr[_messagerYexcs r'r?z)SendToServerClient._send_pending_messagessJ=((** )3x==999 ]]__1 I}($ +3.I.I'Ix#! ))(i)HHH *I"&"6"6x"@"@!..w777777777 -*1++h*?*?.5kk,.G.G3 , , ,*+2++h*?*?.5kk,.G.G  ))(i)HHH'+ ( , , ,J*1++h*?*?.5kk,.G.G  ))(i)HHH'+ # I I IE*1++h*?*?.5kk,.G.G  ))(i)HHHHHHHH IQ1 I1 I1 I1 I1 I1 I1 I1 I1 I1 I1 I1 I1 I1 I1 Id  6 8K8K8M8MNNNNNsd2I1 A4D?I1 I" A#E3-I13 I"A#G)#I1) I"6A!II1I""I11I58I5)r*N)$__name__ __module__ __qualname____doc__intrOrPrQrj_SEND_MESSAGE_RECURRING_TIMEr.rAbstractEventLoopr(r<r- staticmethodrIr4contextmanagerrrrUrXr r Reportabler rerr$r"bytesrbrwr?r)r'rrs77"c 92>>$*  g&?    EEEE8%0%0%0N\ +)L$?t$KL++++ VK "##G$#_12232_Q%%%$U,u,,,,,5O5O5O5O5O5Or)rceZdZejZdS) SendToServerN)rrrrAVSCOPErr)r'rrs HEEEr)r)&rconcurrent.futuresrRr4rqloggingrOr]r^typingrdefence360agent.api.serverrrrrdefence360agent.contractsr defence360agent.contracts.configr "defence360agent.contracts.messagesr r !defence360agent.contracts.pluginsr r ,defence360agent.internals.persistent_messagerdefence360agent.utilsrrdefence360agent.utils.jsonr getLoggerrr0rrrr)r'rs   .-----111111CCCCCCCCAAAAAAAA98888888888888  8 $ $OOOOOOOOOOOOOOOOd%{r)