o
    >hW&                     @  s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZ dd	lmZ G d
d deZeeG dd dZG dd deZG dd deZdS )z"
Tests for L{twisted.test.iosim}.
    )annotations)Literal)implementer)IPushProducer)Protocol)Clock)FakeTransportconnectconnectedServerAndClient)TestCasec                   @  s.   e Zd ZdZdddZdddZddd	Zd
S )FakeTransportTestsz%
    Tests for L{FakeTransport}.
    returnNonec                 C  sH   t t d}t t d}| |jt | |jt | |j|j dS )ze
        Each L{FakeTransport} receives a serial number that uniquely identifies
        it.
        TFN)r   objectassertIsInstanceserialintassertNotEqual)selfab r   w/var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/twisted/test/test_iosim.pytest_connectionSerial   s
   z(FakeTransportTests.test_connectionSerialc                 C  s<   t t d}|d |g d | d|jd dS )zl
        L{FakeTransport.writeSequence} will write a sequence of L{bytes} to the
        transport.
        F   a)   b   c   d    s   abcdN)r   r   writewriteSequenceassertEqualjoinstreamr   r   r   r   r   test_writeSequence$   s   
z%FakeTransportTests.test_writeSequencec                 C  s@   t t d}|d |  |d | d|jd dS )z
        L{FakeTransport.write} will accept writes after transport was closed,
        but the data will be silently discarded.
        Fs   befores   afterr   N)r   r   r   loseConnectionr!   r"   r#   r$   r   r   r   test_writeAfterClose0   s
   

z'FakeTransportTests.test_writeAfterCloseNr   r   )__name__
__module____qualname____doc__r   r%   r'   r   r   r   r   r      s
    

r   c                   @  s2   e Zd ZdZdZdddZdddZdd	d
ZdS )StrictPushProducerz
    An L{IPushProducer} implementation which produces nothing but enforces
    preconditions on its state transition methods.
    runningr   r   c                 C  s   | j dkr	tdd| _ d S )Nstoppedz)Cannot stop already-stopped IPushProducer_state
ValueErrorr   r   r   r   stopProducingF   s   

z StrictPushProducer.stopProducingc                 C  &   | j dkrtd| j  dd| _ d S )Nr.   zCannot pause  IPushProducerpausedr0   r3   r   r   r   pauseProducingK      

z!StrictPushProducer.pauseProducingc                 C  r5   )Nr7   zCannot resume r6   r.   r0   r3   r   r   r   resumeProducingP   r9   z"StrictPushProducer.resumeProducingNr(   )r)   r*   r+   r,   r1   r4   r8   r:   r   r   r   r   r-   =   s    

r-   c                   @  s   e Zd ZdZd-ddZd-ddZd-dd	Zd-d
dZd.ddZd.ddZ	d.ddZ
d/ddZd/ddZd/ddZd/ddZd/ddZd/ddZd/d d!Zd/d"d#Zd/d$d%Zd/d&d'Zd/d(d)Zd/d*d+Zd,S )0StrictPushProducerTestsz*
    Tests for L{StrictPushProducer}.
    r   r-   c                 C  s   t  S )zp
        @return: A new L{StrictPushProducer} which has not been through any state
            changes.
        )r-   r3   r   r   r   _initial[   s   z StrictPushProducerTests._initialc                 C     t  }|  |S )z@
        @return: A new, stopped L{StrictPushProducer}.
        )r-   r4   r   producerr   r   r   _stoppedb      z StrictPushProducerTests._stoppedc                 C  r=   )z?
        @return: A new, paused L{StrictPushProducer}.
        )r-   r8   r>   r   r   r   _pausedj   rA   zStrictPushProducerTests._pausedc                 C  s   t  }|  |  |S )zY
        @return: A new L{StrictPushProducer} which has been paused and resumed.
        )r-   r8   r:   r>   r   r   r   _resumedr   s   z StrictPushProducerTests._resumedr?   r   c                 C     |  |jd dS )z
        Assert that the given producer is in the stopped state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        r/   Nr!   r1   r>   r   r   r   assertStopped{      z%StrictPushProducerTests.assertStoppedc                 C  rD   )z
        Assert that the given producer is in the paused state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        r7   NrE   r>   r   r   r   assertPaused   rG   z$StrictPushProducerTests.assertPausedc                 C  rD   )z
        Assert that the given producer is in the running state.

        @param producer: The producer to verify.
        @type producer: L{StrictPushProducer}
        r.   NrE   r>   r   r   r   assertRunning   rG   z%StrictPushProducerTests.assertRunningc                 C     |  t|  j dS )zz
        L{StrictPushProducer.stopProducing} raises L{ValueError} if called when
        the producer is stopped.
        N)assertRaisesr2   r@   r4   r3   r   r   r   test_stopThenStop      z)StrictPushProducerTests.test_stopThenStopc                 C  rJ   )z{
        L{StrictPushProducer.pauseProducing} raises L{ValueError} if called when
        the producer is stopped.
        N)rK   r2   r@   r8   r3   r   r   r   test_stopThenPause   rM   z*StrictPushProducerTests.test_stopThenPausec                 C  rJ   )z|
        L{StrictPushProducer.resumeProducing} raises L{ValueError} if called when
        the producer is stopped.
        N)rK   r2   r@   r:   r3   r   r   r   test_stopThenResume   rM   z+StrictPushProducerTests.test_stopThenResumec                 C     |   }|  | | dS )zn
        L{StrictPushProducer} is stopped if C{stopProducing} is called on a paused
        producer.
        N)rB   r4   rF   r>   r   r   r   test_pauseThenStop      z*StrictPushProducerTests.test_pauseThenStopc                 C     |   }| t|j dS )zs
        L{StrictPushProducer.pauseProducing} raises L{ValueError} if called on a
        paused producer.
        N)rB   rK   r2   r8   r>   r   r   r   test_pauseThenPause      z+StrictPushProducerTests.test_pauseThenPausec                 C  rP   )zp
        L{StrictPushProducer} is resumed if C{resumeProducing} is called on a
        paused producer.
        N)rB   r:   rI   r>   r   r   r   test_pauseThenResume   rR   z,StrictPushProducerTests.test_pauseThenResumec                 C  rP   )zo
        L{StrictPushProducer} is stopped if C{stopProducing} is called on a
        resumed producer.
        N)rC   r4   rF   r>   r   r   r   test_resumeThenStop   rR   z+StrictPushProducerTests.test_resumeThenStopc                 C  rP   )zo
        L{StrictPushProducer} is paused if C{pauseProducing} is called on a
        resumed producer.
        N)rC   r8   rH   r>   r   r   r   test_resumeThenPause   rR   z,StrictPushProducerTests.test_resumeThenPausec                 C  rS   )zu
        L{StrictPushProducer.resumeProducing} raises L{ValueError} if called on a
        resumed producer.
        N)rC   rK   r2   r:   r>   r   r   r   test_resumeThenResume   rU   z-StrictPushProducerTests.test_resumeThenResumec                 C  rP   )zn
        L{StrictPushProducer} is stopped if C{stopProducing} is called in the
        initial state.
        N)r<   r4   rF   r>   r   r   r   	test_stop   rR   z!StrictPushProducerTests.test_stopc                 C  rP   )zn
        L{StrictPushProducer} is paused if C{pauseProducing} is called in the
        initial state.
        N)r<   r8   rH   r>   r   r   r   
test_pause   rR   z"StrictPushProducerTests.test_pausec                 C  rS   )zz
        L{StrictPushProducer} raises L{ValueError} if C{resumeProducing} is called
        in the initial state.
        N)r<   rK   r2   r:   r>   r   r   r   test_resume   rU   z#StrictPushProducerTests.test_resumeN)r   r-   )r?   r-   r   r   r(   )r)   r*   r+   r,   r<   r@   rB   rC   rF   rH   rI   rL   rN   rO   rQ   rT   rV   rW   rX   rY   rZ   r[   r\   r   r   r   r   r;   V   s*    




	
	
	
	



	

	
	
	

		r;   c                   @  s8   e Zd ZdZdddZddd	Zdd
dZdddZdS )IOPumpTestsz
    Tests for L{IOPump}.
    modeLiteral['server', 'client']r   r   c           	      C  sr   t  }t|dd}t  }t|dd}t||||dd}t }||d| }|j|dd |  | d|j dS )	a  
        Connect a couple protocol/transport pairs to an L{IOPump} and then pump
        it.  Verify that a streaming producer registered with one of the
        transports does not receive invalid L{IPushProducer} method calls and
        ends in the right state.

        @param mode: C{u"server"} to test a producer registered with the
            server transport.  C{u"client"} to test a producer registered with
            the client transport.
        T)isServerF)greet)serverclient)	streamingr.   N)r   r   r	   r-   registerProducerpumpr!   r1   )	r   r^   serverProtoserverTransportclientProtoclientTransportrf   r?   victimr   r   r   _testStreamingProducer   s(   z"IOPumpTests._testStreamingProducerc                 C     | j dd dS )z
        L{IOPump.pump} does not call C{resumeProducing} on a L{IPushProducer}
        (stream producer) registered with the server transport.
        rb   r^   Nrl   r3   r   r   r   test_serverStreamingProducer"     z(IOPumpTests.test_serverStreamingProducerc                 C  rm   )z
        L{IOPump.pump} does not call C{resumeProducing} on a L{IPushProducer}
        (stream producer) registered with the client transport.
        rc   rn   Nro   r3   r   r   r   test_clientStreamingProducer)  rq   z(IOPumpTests.test_clientStreamingProducerc                   sR   g  t  }ttt|d\}}}|d fdd |   |  |   dS )zE
        L{IOPump.pump} advances time in the given L{Clock}.
        )clockr   c                     s
     dS )NT)appendr   time_passedr   r   <lambda>7  s   
 z/IOPumpTests.test_timeAdvances.<locals>.<lambda>N)r   r
   r   	callLaterassertFalserf   
assertTrue)r   rs   _rf   r   ru   r   test_timeAdvances0  s   
zIOPumpTests.test_timeAdvancesN)r^   r_   r   r   r(   )r)   r*   r+   r,   rl   rp   rr   r|   r   r   r   r   r]      s    

#
r]   N)r,   
__future__r   typingr   zope.interfacer   twisted.internet.interfacesr   twisted.internet.protocolr   twisted.internet.taskr   twisted.test.iosimr   r	   r
   twisted.trial.unittestr   r   r-   r;   r]   r   r   r   r   <module>   s   ) %