wo4hb,tddlZddlZddlZddlZddlZddlZddlZddlmZddl m Z ddl m Z m Z ddlmZddlmZmZmZddlmZejeZejdd d gZGd d eZGd deZdZGddeZGddeZ Gddej!Z"dS)N)suppress) attrgetter)MessageReject)BaseMessageProcessor)DAY ServiceBase rate_limit)gProcessingMessagemessage start_timec2eZdZdZdZdZdZdZdZdS)TheSinkct|td|_||_t |t |j|_|t_dS)NPROCESSING_ORDER)key) sortedr_sinks_ordered_loop TaskManagerMessageProcessor _task_managerr sink)self sink_listloops W/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/the_sink.py__init__zTheSink.__init__s`$ :&899    ( "4#677  c8|jjd|jjS)N.) __class__ __module____name__rs r__repr__zTheSink.__repr__#s .333T^5L5LMMr cfd|jD}t|dks Jdtt|dS)ze introspection: decompose a specific role :return classobj: instance or None c4g|]}t||S) isinstance).0rclassobjs r z%TheSink.decompose..+s8   JtX4N4N    r zAmbiguous requestN)rlennextiter)rr-optionss ` r decomposezTheSink.decompose&sf     !0   7||q   "5   DMM4(((r c8|jdS)z Make sure to run message processing bus only when every MessageSource (or MessageSource+MessageSink mix) got initialized N)rstartr&s rr6z TheSink.start1s   """""r cXKtd|jtd|jdd{Vtd|jd{VdS)Nzshutdown the sink startedzwait for current taskstimeoutzfinish wait task)loggerinfor should_stopwait_current_taskswaitr&s rshutdownzTheSink.shutdown9s /000 &&((( ,--- 33A3>>>>>>>>> &''' %%'''''''''''r cJK|j|d{VdSN)rpush_msg)rr s rprocess_messagezTheSink.process_messageAs5 ))'22222222222r N) r%r$ __qualname__rr'r4r6r@rDr*r rrrsqNNN ) ) )###(((33333r rcpeZdZdZdZdZfdZdZedZ d dZ d Z d Z e d ZxZS) rir8c~t|t|j|_|j|_|j|_||_ tj |_ tttj|_|tj|_dS)N)maxsize)periodon_drop)superr MessageQueueMAXSIZE_queue CONCURRENCY _concurrencyTIMEOUT_process_message_timeout_msg_processorweakrefWeakSettasksr rr;warning_throttled_loggererrorthrottled_log_error)rr msg_processorr#s rrzTaskManager.__init__Ms "4<888  ,(, %+_&& !+3!O!O!O#'#9#9&,#G#G   r c,K|js/|jt|d{VdS|jjrd|j|j|f}n"d|j|j|f}|j|dS)z&Push message unless the queue is full.NzNMessage queue is full %s. Current processing messages: %s. Message ignored: %szZMessage queue is full. Queue size: %s Current processing messages: %s. Message ignored: %s) rOfullputMessageComparablerYshould_be_calledcurrent_processing_messagesqsizer[)rmsgargss rrCzTaskManager.push_msgWs{!! ,+//"3C"8"899 9 9 9 9 9 9 9 9 9%6 OK4OK%%''4 %D $d + + + +r c>td|jDS)Nc3K|]R}||jjtt j|jjz dfVSdS)N)doneprocessing_msgr roundtime monotonicr)r,tasks r z:TaskManager.current_processing_messages..ysq  99;;  #+dn&&)<)GGKK       r )tuplerWr&s rrbz'TaskManager.current_processing_messagesrs6         r NcK|jrOd|jD}td|t j|j|d{VdSdS)Ncjg|]0\}}|d|d|f1S)method message_id)get)r,mlastings rr.z2TaskManager.wait_current_tasks..sIAwx!%% "5"5w?r z#Waiting for %r processing to finishr9)rWrbr;r<asyncior?)rr:msg_to_processs rr>zTaskManager.wait_current_taskss : <"&"BN KK5   ,tz7;;; ; ; ; ; ; ; ; ; ; < z"TaskManager._run..si.?.?.A.Ar z3There is still %s unprocessed messages in the queue Error during message processing:)rxBoundedSemaphorerQ _should_stopr;debugrOrc_TaskManager__limit_concurrencyruCancelledErrorr create_taskrTrdr rlrmrjadd_done_callback_on_msg_processedrWaddrX exception)rmsg_comparablet unprocessedr~s @r_runzTaskManager._runs,T->??  A' " 5t{7H7H7J7JKKK229=========+/;??+<+<%<%<%<%<%<%rr staticmethodr __classcell__r#s@rrrEsGKGHHHHH,,,6   X   < < < <AAA8   MM\MMMMMr rcK|sP|ttj5|d{VddddS#1swxYwYdSdSrB)ricancelrrxr)rns r cancel_taskrs 99;; g, - -  JJJJJJJ                  s AA!Ac$eZdZdZdZdZdZdS)rrGc||_tj|_t dt j|_dS)NrG)rJ)sinksrUWeakValueDictionarylocksr r;rZr[)rrs rrzMessageProcessor.__init__sC 022 #=:W#=#=#= L$ $    r cZK|d}|rv|j|tj}|4d{V||d{Vdddd{VdS#1d{VswxYwYdS||d{VdS)N attackers_ip)rur setdefaultrxLock_call_unlocked)rrdiplocks r__call__zMessageProcessor.__call__sT WW^ $ $  +:((W\^^< %r)filerzCMessage %r was not processed in the %r plugin in %ss; Traceback: %szError processing %r in %rz%s processed in %.4f secondszE%s message took longer to process than expected (%.4f sec > %.4f sec))rlrmrrxrrDrshieldTIMEOUT_TO_SINK_PROCESSr+rrrrr;r<strrioStringIO print_stackseekrZread ExceptionrPROCESSING_TIME_THRESHOLDr[) rrdr6rprocess_message_task processedrstackprocessing_times rrzMessageProcessor._call_unlockeds3  J* 8* 8D) 8'.':((--(($#*"2 N#788 8 ### Di11$#C7)   8""6777777777777    0#a&&#>>>2""67777777777771'     $00e0<<< 1  $0JJLL ""6777777777777     !)klassrdr)formatr#r%rdrr&s rr'zMessageComparable.__repr__$s28??.)]@   r ) r%r$rE__doc__rrrrr'rrs@rr`r`sm-- E\444       r r`c4eZdZfdZdeffd ZdZxZS)rMctj|i|tj|_d|j_d|j_dS)N2i)rLrreprlibRepr_repr maxstringmaxtuple)rrekwargsr#s rrzMessageQueue.__init__-sF$)&)))\^^ ! " r itemcVKt|d{VSrB)rLr_)rrr#s rr_zMessageQueue.put3s/WW[[&&&&&&&&&r cttjd|jDdd}d|jd|d|j|dS) Nc0g|]}|jjjSr*)rdr#rE)r,rs rr.z(MessageQueue.__str__..:s IIIT#0IIIr c|dSrr*)rs rrz&MessageQueue.__str__..<s T!Wr T)rreversez) r collectionsCounterrOitemsrIrcrrepr)r msg_countss r__str__zMessageQueue.__str__6s  IIT[III  egg$$     rs  >>>>>>>>BBBBBBEEEEEEEEEE444444  8 $ $*K*)\2 *3*3*3*3*3"*3*3*3ZwMwMwMwMwM+wMwMwMtJJJJJvJJJZ        2     7(     r