B
    u9af*                 @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZe	e j
dkdG dd dZG dd deejZG dd	 d	eejZG d
d deZG dd deejZG dd deejZG dd deZG dd deejZG dd deejZedkre  dS )    Nposixztests requires a posix system.c               @   sR   e Zd Zdd Zdd Zdd Zdd	d
Zdd ZdZdd Z	dd Z
dd ZdS )TestFileIOSignalInterruptc             C   s
   d | _ d S )N)_process)self r   %/usr/lib/python3.7/test_file_eintr.pysetUp   s    zTestFileIOSignalInterrupt.setUpc             C   s<   | j r8| j  d kr8y| j   W n tk
r6   Y nX d S )N)r   pollkillOSError)r   r   r   r   tearDown   s
    z"TestFileIOSignalInterrupt.tearDownc             C   s
   d| j  S )Nz=import %s as io ;infile = io.FileIO(sys.stdin.fileno(), "rb"))modname)r   r   r   r   _generate_infile_setup_code$   s    z5TestFileIOSignalInterrupt._generate_infile_setup_code    Tc             C   s~   | j  d kr<td y| j   W n tk
r:   Y nX |r^| j  \}}||7 }||7 }| d|| | f  d S )Ng?z/Error from IO process %s:
STDOUT:
%sSTDERR:
%s
)	r   r	   timeZsleepZ	terminater   communicatefaildecode)r   Zwhystdoutstderrr   Z
stdout_endZ
stderr_endr   r   r   fail_with_process_info-   s    
z0TestFileIOSignalInterrupt.fail_with_process_infoc             C   s6  |   }tjtjddd| d d | d d gtjtjtjd| _| jjt	d}|dkrl| j
d	|d
 | jj| d}g }xV|st| jjgddd\}}}| jtj |d7 }|dkr| j  | d qW | jj }|dkr| j
d|d
 | jjdd\}	}
| jjr2| j
d| jj |	|
dd d S )Nz-uz-czXimport signal, sys ;signal.signal(signal.SIGINT, lambda s, f: sys.stderr.write("$\n")) ;z ;z"sys.stderr.write("Worm Sign!\n") ;zinfile.close())stdinr   r   s   Worm Sign!
zwhile awaiting a sign)r   r   r   g?      z,reader process failed to handle our signals.s   $
zwhile awaiting signal   
)inputzexited rc=%dF)r   )r   
subprocessPopensys
executablePIPEr   r   readlenr   r   writeselectZsend_signalsignalSIGINTr
   r   readliner   
returncode)r   data_to_writeread_and_verify_codeZinfile_setup_codeZ	worm_signZsignals_sentZrlist_Zsignal_liner   r   r   r   r   _test_readingH   s<    


z'TestFileIOSignalInterrupt._test_readingzgot = infile.{read_method_name}() ;expected = {expected!r} ;assert got == expected, ("{read_method_name} returned wrong data.\n""got data %r\nexpected %r" % (got, expected))c             C   s   | j d| jjdddd d S )Ns   hello, world!r'   s   hello, world!
)read_method_nameexpected)r)   r*   )r,   _READING_CODE_TEMPLATEformat)r   r   r   r   test_readline   s
    z'TestFileIOSignalInterrupt.test_readlinec             C   s"   | j d| jjdddgdd d S )Ns   hello
world!	readliness   hello
s   world!
)r-   r.   )r)   r*   )r,   r/   r0   )r   r   r   r   test_readlines   s
    z(TestFileIOSignalInterrupt.test_readlinesc             C   s8   | j d| jjdddd | j d| jjdddd d S )Ns   hello
world!readalls   hello
world!
)r-   r.   )r)   r*   r!   )r,   r/   r0   )r   r   r   r   test_readall   s    z&TestFileIOSignalInterrupt.test_readallN)r   r   T)__name__
__module____qualname__r   r   r   r   r,   r/   r1   r3   r5   r   r   r   r   r      s   	 
Ir   c               @   s   e Zd ZdZdS )CTestFileIOSignalInterrupt_ioN)r6   r7   r8   r   r   r   r   r   r9      s   r9   c               @   s   e Zd ZdZdS )PyTestFileIOSignalInterrupt_pyioN)r6   r7   r8   r   r   r   r   r   r;      s   r;   c               @   s   e Zd Zdd Zdd ZdS )TestBufferedIOSignalInterruptc             C   s
   d| j  S )Nziimport %s as io ;infile = io.open(sys.stdin.fileno(), "rb") ;assert isinstance(infile, io.BufferedReader))r   )r   r   r   r   r      s    z9TestBufferedIOSignalInterrupt._generate_infile_setup_codec             C   s   | j d| jjdddd d S )Ns   hello
world!r!   s   hello
world!
)r-   r.   )r)   r*   )r,   r/   r0   )r   r   r   r   r5      s
    z*TestBufferedIOSignalInterrupt.test_readallN)r6   r7   r8   r   r5   r   r   r   r   r=      s   r=   c               @   s   e Zd ZdZdS )CTestBufferedIOSignalInterruptr:   N)r6   r7   r8   r   r   r   r   r   r>      s   r>   c               @   s   e Zd ZdZdS )PyTestBufferedIOSignalInterruptr<   N)r6   r7   r8   r   r   r   r   r   r?      s   r?   c               @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
TestTextIOSignalInterruptc             C   s
   d| j  S )Nzvimport %s as io ;infile = io.open(sys.stdin.fileno(), "rt", newline=None) ;assert isinstance(infile, io.TextIOWrapper))r   )r   r   r   r   r      s    z5TestTextIOSignalInterrupt._generate_infile_setup_codec             C   s   | j d| jjdddd d S )Ns   hello, world!r'   zhello, world!
)r-   r.   )r)   r*   )r,   r/   r0   )r   r   r   r   r1      s
    z'TestTextIOSignalInterrupt.test_readlinec             C   s"   | j d| jjdddgdd d S )Ns   hello
world!r2   zhello
zworld!
)r-   r.   )r)   r*   )r,   r/   r0   )r   r   r   r   r3      s
    z(TestTextIOSignalInterrupt.test_readlinesc             C   s   | j d| jjdddd d S )Ns   hello
world!r!   zhello
world!
)r-   r.   )r)   r*   )r,   r/   r0   )r   r   r   r   r5      s
    z&TestTextIOSignalInterrupt.test_readallN)r6   r7   r8   r   r1   r3   r5   r   r   r   r   r@      s   r@   c               @   s   e Zd ZdZdS )CTestTextIOSignalInterruptr:   N)r6   r7   r8   r   r   r   r   r   rA      s   rA   c               @   s   e Zd ZdZdS )PyTestTextIOSignalInterruptr<   N)r6   r7   r8   r   r   r   r   r   rB      s   rB   __main__)osr$   r%   r   r   r   Zunittestr:   r<   Z
skipUnlessnamer   ZTestCaser9   r;   r=   r>   r?   r@   rA   rB   r6   mainr   r   r   r   <module>   s(       
