a ze:(@sdZddlZddlZddlZddlZddlmZddlZddl Z ej dddkrbe dej e Z eZejjdkoejjdkZd d Zd d Zd dZGdddejZddZedkredS)z6PyUnit testing that threads honor our signal semanticsN)supportwinzCan't test signal on %sZpthreadz mutex+condcCs4ttj|}ttj|}ttj|}|||fSN)signalSIGUSR1SIGUSR2SIGALRM)Zfor_usr1Zfor_usr2Zfor_alrmZusr1Zusr2Zalrmr K/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_threadsignals.pyregisterSignalssr cCs(t|dd7<tt|d<dS)Ntripped tripped_by)signal_blackboardthread get_ident)sigframer r r handle_signalssrcCs(tttjtttjtdSr)oskill process_pidrrr signalled_allreleaser r r r send_signals"src@seZdZddZddZddZeedee j doBe j j d ee j d d d d Zeedee j doe j j d ee j d d ddZddZddZddZddZdS) ThreadSignalsc Cst(t|tWdn1s60Yttjddksdttjddkrz t dt Wt dn t d0| ttjdd| ttjdt | ttjdd| ttjdt tdS)Nr rrr)rwait_threads_exitracquirespawnSignallingThreadrrrralarmpauseZ assertEqualrrrselfr r r test_signals)s( &  zThreadSignals.test_signalscCsttddS)Nr )rstart_new_threadrr"r r r rIsz#ThreadSignals.spawnSignallingThreadcCstdSr)KeyboardInterrupt)r#rrr r r alarm_interruptLszThreadSignals.alarm_interruptz/POSIX condition variables cannot be interruptedlinuxzBIssue 34004: musl does not allow interruption of locks by signals.Zopenbsdz%lock cannot be interrupted on OpenBSDc Csttj|j}zht}|tdt}|j t |jddt|}| |dWtdttj|ntdttj|0dS)Nrtimeout@r) rr r'r allocate_lockrr time monotonic assertRaisesr& assertLess)r#oldalrmlockt1dtr r r test_lock_acquire_interruptionOs    z,ThreadSignals.test_lock_acquire_interruptionc sttj|j}ztfdd}txt|djddr\ t dq.other_threadr Fblocking{Gz?rr)r*r,r)rr r'rRLockrrr%rrr.sleepr r/r0r&r1)r#r2r8r4r5r r7r test_rlock_acquire_interruptionns$       ,  z-ThreadSignals.test_rlock_acquire_interruptionc sd_fdd}ttj|}zfdd}tZt|djddrdt dqD} j |Wdn1s0YWttj|nttj|0dS) NFcs d_dS)NT) sig_recvd)rrr"r r my_handlersz9ThreadSignals.acquire_retries_on_intr..my_handlercs6tdtttjtddS)N?) rr.r=rrrrrrr )r3r r r8s   z;ThreadSignals.acquire_retries_on_intr..other_threadr r9r;) r?rrrrrr%rrr.r=Z assertTrue)r#r3r@ old_handlerr8resultr r3r#r acquire_retries_on_intrs      *z%ThreadSignals.acquire_retries_on_intrcCs|tdSr)rErr-r"r r r !test_lock_acquire_retries_on_intrsz/ThreadSignals.test_lock_acquire_retries_on_intrcCs|tdSr)rErr<r"r r r "test_rlock_acquire_retries_on_intrsz0ThreadSignals.test_rlock_acquire_retries_on_intrc sd_d_d_ttfdd}ttj|}zfdd}fdd}t `t |d| jjd  jjd  jdWdn1s0YWttj|nttj|0dS) Nrcsjd7_dS)Nr) sigs_recvd)signumrr"r r r@sz@ThreadSignals.test_interrupted_timed_acquire..my_handlercs$t_jddt_dS)NrAr*)r.r/startrendr rDr r timed_acquires  zCThreadSignals.test_interrupted_timed_acquire..timed_acquirecs2tdD]}tdtttjqdS)N(g{Gz?) ranger.r=rrrrrr)_)doner r rs  zBThreadSignals.test_interrupted_timed_acquire..send_signalsr g@g333333?) rJrKrHrr-rrrrrr%r1Z assertGreater)r#r@rBrLrr )rPr3r#r test_interrupted_timed_acquires(    .z,ThreadSignals.test_interrupted_timed_acquireN)__name__ __module__ __qualname__r$rr'unittestZskipIfUSING_PTHREAD_CONDsysplatform startswith thread_infoversionr6r>rErFrGrQr r r r r's:  rcCsJtjdddtjdddtjdddiatttt}tjtg|RdS)Nr)r r) rrrr rr rrUZaddModuleCleanup)Zoldsigsr r r setUpModules     r\__main__)__doc__rUrrrWtestr_threadrr.rXZSkipTestgetpidrr-rrZnamer3rVr rrZTestCaserr\rRmainr r r r s,    C