a zef*@sddlZddlZddlZddlZddlZddlZddlZddlZddlZe ej dkdGdddZ Gddde ej Z Gdd d e ej ZGd d d e ZGd d d eej ZGdddeej ZGddde ZGdddeej ZGdddeej ZedkredS)Nposixztests requires a posix system.c@sReZdZddZddZddZdd d Zd d Zd ZddZ ddZ ddZ dS)TestFileIOSignalInterruptcCs d|_dSN)_processselfrH/opt/bitninja-python-dojo/embedded/lib/python3.9/test/test_file_eintr.pysetUpszTestFileIOSignalInterrupt.setUpcCs:|jr6|jdur6z|jWnty4Yn0dSr)rpollkillOSErrorrrrr tearDowns  z"TestFileIOSignalInterrupt.tearDowncCs d|jS)Nz=import %s as io ;infile = io.FileIO(sys.stdin.fileno(), "rb")modnamerrrr _generate_infile_setup_code$sz5TestFileIOSignalInterrupt._generate_infile_setup_codeTcCs||jdur:tdz|jWnty8Yn0|r\|j\}}||7}||7}|d|||fdS)Ng?z/Error from IO process %s: STDOUT: %sSTDERR: %s ) rr timesleep terminater communicatefaildecode)rwhystdoutstderrrZ stdout_endZ stderr_endrrr fail_with_process_info-s  z0TestFileIOSignalInterrupt.fail_with_process_infoc Cs2|}tjtjddd|dd|ddgtjtjtjd|_|jjt d}|dkrl|j d |d |jj |d }g}|st |jjgd d d \}}}|jtj|d7}|dkr|j|dq|jj}|dkr|j d|d |jjdd\} } |jjr.|j d|jj| | dddS)Nz-uz-czXimport signal, sys ;signal.signal(signal.SIGINT, lambda s, f: sys.stderr.write("$\n")) ;z ;z"sys.stderr.write("Worm Sign!\n") ;zinfile.close())stdinrrs Worm Sign! zwhile awaiting a sign)rrrg?z,reader process failed to handle our signals.s$ zwhile awaiting signal )inputz exited rc=%dF)r)r subprocessPopensys executablePIPErrreadlenrrwriteselect send_signalsignalSIGINTr rreadliner returncode) r data_to_writeread_and_verify_codeZinfile_setup_codeZ worm_signZ signals_sentZrlist_Z signal_linerrrrr _test_readingHs\       z'TestFileIOSignalInterrupt._test_readingzgot = infile.{read_method_name}() ;expected = {expected!r} ;assert got == expected, ("{read_method_name} returned wrong data.\n""got data %r\nexpected %r" % (got, expected))cCs|jd|jjdddddS)N hello, world!r.shello, world! Zread_method_nameZexpectedr0r1r3_READING_CODE_TEMPLATEformatrrrr test_readlinesz'TestFileIOSignalInterrupt.test_readlinecCs"|jd|jjdddgdddS)N hello world! readlinesshello sworld! r5r6r7rrrr test_readlinessz(TestFileIOSignalInterrupt.test_readlinescCs8|jd|jjdddd|jd|jjdddddS)Nr;readall hello world! r5r6r'r7rrrr test_readallsz&TestFileIOSignalInterrupt.test_readallN)rrT) __name__ __module__ __qualname__r rrrr3r8r:r=r@rrrr rs  Irc@seZdZdZdS)CTestFileIOSignalInterrupt_ioNrArBrCrrrrr rDsrDc@seZdZdZdS)PyTestFileIOSignalInterrupt_pyioNrFrrrr rGsrGc@seZdZddZddZdS)TestBufferedIOSignalInterruptcCs d|jS)Nziimport %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;assert isinstance(infile, io.BufferedReader)rrrrr rsz9TestBufferedIOSignalInterrupt._generate_infile_setup_codecCs|jd|jjdddddS)Nr;r'r?r5r6r7rrrr r@sz*TestBufferedIOSignalInterrupt.test_readallN)rArBrCrr@rrrr rIsrIc@seZdZdZdS)CTestBufferedIOSignalInterruptrENrFrrrr rJsrJc@seZdZdZdS)PyTestBufferedIOSignalInterruptrHNrFrrrr rKsrKc@s,eZdZddZddZddZddZd S) TestTextIOSignalInterruptcCs d|jS)Nzvimport %s as io ;infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;assert isinstance(infile, io.TextIOWrapper)rrrrr rsz5TestTextIOSignalInterrupt._generate_infile_setup_codecCs|jd|jjdddddS)Nr4r.zhello, world! r5r6r7rrrr r:sz'TestTextIOSignalInterrupt.test_readlinecCs"|jd|jjdddgdddS)Ns hello world!r<zhello zworld! r5r6r7rrrr r=sz(TestTextIOSignalInterrupt.test_readlinescCs|jd|jjdddddS)Nr;r'z hello world! r5r6r7rrrr r@sz&TestTextIOSignalInterrupt.test_readallN)rArBrCrr:r=r@rrrr rLsrLc@seZdZdZdS)CTestTextIOSignalInterruptrENrFrrrr rMsrMc@seZdZdZdS)PyTestTextIOSignalInterruptrHNrFrrrr rNsrN__main__)osr*r,r"r$rZunittestrErHZ skipUnlessnamerZTestCaserDrGrIrJrKrLrMrNrAmainrrrr  s*