o
    >hA                     @  s  d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZmZmZmZmZmZmZmZmZ ddlmZmZ dd	lmZmZmZmZ dd
lmZ ddlm Z  G dd de Z!G dd de Z"G dd dZ#G dd de Z$G dd deZ%G dd de Z&dS )z(
Tests for L{twisted.internet.testing}.
    )annotations)Callable)verifyObject)Protocol)IPv4Address)	IAddress
IConnector	IConsumerIListeningPortIPushProducerIReactorSSLIReactorTCPIReactorUNIX
ITransport)ClientFactoryFactory)MemoryReactorNonStreamingProducerRaisingMemoryReactorStringTransport)namedAny)TestCasec                   @  s   e Zd ZdZd+ddZd+ddZd+dd	Zd+d
dZd+ddZd+ddZ	d+ddZ
d+ddZd+ddZd+ddZd+ddZd+ddZd+ddZd+ddZd+d d!Zd+d"d#Zd+d$d%Zd+d&d'Zd+d(d)Zd*S ),StringTransportTestsz@
    Tests for L{twisted.internet.testing.StringTransport}.
    returnNonec                 C  s   t  | _d S N)r   	transportself r   /var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/twisted/internet/test/test_testing.pysetUp+   s   zStringTransportTests.setUpc                 C  s:   |  tt| j |  tt| j |  tt| j dS )zq
        L{StringTransport} instances provide L{ITransport}, L{IPushProducer},
        and L{IConsumer}.
        N)
assertTruer   r   r   r   r	   r   r   r   r    test_interfaces.   s   z$StringTransportTests.test_interfacesc                 C  s>   t  }t  }| j|| | | jj| | | jj| dS )zz
        L{StringTransport.registerProducer} records the arguments supplied to
        it as instance attributes.
        N)objectr   registerProducerassertIsproducer	streamingr   r'   r(   r   r   r    test_registerProducer7   s
   z*StringTransportTests.test_registerProducerc                 C  sL   t  }| j|d | t| jjt  d | | jj| | | jj dS )zy
        L{StringTransport.registerProducer} raises L{RuntimeError} if a
        producer is already registered.
        TFN)	r$   r   r%   assertRaisesRuntimeErrorr&   r'   r"   r(   )r   r'   r   r   r    test_disallowedRegisterProducerB   s   z4StringTransportTests.test_disallowedRegisterProducerc                 C  sb   t  }t  }| j|d | j  | | jj | j|d | | jj| | | jj dS )z
        L{StringTransport.unregisterProducer} causes the transport to forget
        about the registered producer and makes it possible to register a new
        one.
        FTN)	r$   r   r%   unregisterProducerassertIsNoner'   r&   r"   r(   )r   oldProducernewProducerr   r   r    test_unregisterProducerO   s   
z,StringTransportTests.test_unregisterProducerc                 C  s   |  t| jj dS )z
        L{StringTransport.unregisterProducer} raises L{RuntimeError} if called
        when no producer is registered.
        N)r+   r,   r   r.   r   r   r   r    test_invalidUnregisterProducer^   s   z3StringTransportTests.test_invalidUnregisterProducerc                 C  s   |  | jjd dS )zO
        L{StringTransport.producerState} is initially C{'producing'}.
        	producingN)assertEqualr   producerStater   r   r   r    test_initialProducerStatee   s   z.StringTransportTests.test_initialProducerStatec                 C     | j   | | j jd dS )zy
        L{StringTransport.pauseProducing} changes the C{producerState} of the
        transport to C{'paused'}.
        pausedN)r   pauseProducingr5   r6   r   r   r   r    test_pauseProducingk      
z(StringTransportTests.test_pauseProducingc                 C  s(   | j   | j   | | j jd dS )z}
        L{StringTransport.resumeProducing} changes the C{producerState} of the
        transport to C{'producing'}.
        r4   N)r   r:   resumeProducingr5   r6   r   r   r   r    test_resumeProducings   s   

z)StringTransportTests.test_resumeProducingc                 C  r8   )z{
        L{StringTransport.stopProducing} changes the C{'producerState'} of the
        transport to C{'stopped'}.
        stoppedN)r   stopProducingr5   r6   r   r   r   r    test_stopProducing|   r<   z'StringTransportTests.test_stopProducingc                 C     | j   | t| j j dS )zu
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport has been stopped.
        N)r   r@   r+   r,   r:   r   r   r   r     test_stoppedTransportCannotPause   r<   z5StringTransportTests.test_stoppedTransportCannotPausec                 C  rB   )zv
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport has been stopped.
        N)r   r@   r+   r,   r=   r   r   r   r    !test_stoppedTransportCannotResume   r<   z6StringTransportTests.test_stoppedTransportCannotResumec                 C  rB   )zz
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        N)r   loseConnectionr+   r,   r:   r   r   r   r    &test_disconnectingTransportCannotPause   r<   z;StringTransportTests.test_disconnectingTransportCannotPausec                 C  rB   )z{
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        N)r   rE   r+   r,   r=   r   r   r   r    'test_disconnectingTransportCannotResume   r<   z<StringTransportTests.test_disconnectingTransportCannotResumec                 C  s*   |  | jj | j  | | jj dS )zv
        L{StringTransport.loseConnection} toggles the C{disconnecting} instance
        variable to C{True}.
        N)assertFalser   disconnectingrE   r"   r   r   r   r    $test_loseConnectionSetsDisconnecting   s   
z9StringTransportTests.test_loseConnectionSetsDisconnectingc                 C  s   t  }| t| | dS )z
        If a host address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getHost}.
        N)r$   r&   r   getHostr   addressr   r   r    test_specifiedHostAddress   s   z.StringTransportTests.test_specifiedHostAddressc                 C  s    t  }| t|d | dS )z
        If a peer address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getPeer}.
        )peerAddressN)r$   r&   r   getPeerrL   r   r   r    test_specifiedPeerAddress   s   z.StringTransportTests.test_specifiedPeerAddressc                 C     t   }| |t dS )z
        If no host address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getHost}.
        N)r   rK   assertIsInstancer   rL   r   r   r    test_defaultHostAddress      
z,StringTransportTests.test_defaultHostAddressc                 C  rR   )z
        If no peer address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getPeer}.
        N)r   rP   rS   r   rL   r   r   r    test_defaultPeerAddress   rU   z,StringTransportTests.test_defaultPeerAddressNr   r   )__name__
__module____qualname____doc__r!   r#   r*   r-   r2   r3   r7   r;   r>   rA   rC   rD   rF   rG   rJ   rN   rQ   rT   rV   r   r   r   r    r   &   s*    


	






	





	

r   c                   @  sL   e Zd ZdZdddZdddZddd	Zdd
dZdddZdddZ	dS )ReactorTestszA
    Tests for L{MemoryReactor} and L{RaisingMemoryReactor}.
    r   r   c                 C  (   t  }tt| tt| tt| dS )zt
        L{MemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        N)r   r   r   r   r   )r   memoryReactorr   r   r    test_memoryReactorProvides      

z'ReactorTests.test_memoryReactorProvidesc                 C  r]   )z{
        L{RaisingMemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        N)r   r   r   r   r   )r   raisingReactorr   r   r    test_raisingReactorProvides   r`   z(ReactorTests.test_raisingReactorProvidesc                 C  s   t  }|ddt |ddt dfD ]}tt| | }tt| | |j	d | |j
d q|dt }tt| | }tt| | |jd dS )a  
        L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
        L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
        C{getDestination} method returns an L{IAddress} with attributes which
        reflect the values passed.
        ztest.example.comi   Ns
   /fake/path)r   
connectTCPr   
connectSSLr   r   getDestinationr   r5   hostportconnectUNIXname)r   r^   	connectorrM   r   r   r    test_connectDestination   s   



z$ReactorTests.test_connectDestinationc                 C  s   t  }|dt |dt dfD ]}tt| | }tt| | |j	d | |j
d q|dt }tt| | }tt| | |jd dS )a  
        L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
        L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
        C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
        will have a default host of C{'0.0.0.0'}, and a port that reflects the
        value passed, and C{listenUNIX} will have a name that reflects the path
        passed.
        i2   Nz0.0.0.0s   /path/to/socket)r   	listenTCPr   	listenSSLr   r
   rK   r   r5   rf   rg   
listenUNIXri   )r   r^   rg   rM   r   r   r    test_listenDefaultHost   s   	



z#ReactorTests.test_listenDefaultHostc                 C  P   t  }t }|| || | | |g || | | g  dS )z>
        Adding, removing, and listing readers works.
        N)r$   r   	addReaderr5   
getReadersremoveReader)r   readerreactorr   r   r    test_readers     


zReactorTests.test_readersc                 C  rp   )z>
        Adding, removing, and listing writers works.
        N)r$   r   	addWriterr5   
getWritersremoveWriter)r   writerru   r   r   r    test_writers'  rw   zReactorTests.test_writersNrW   )
rX   rY   rZ   r[   r_   rb   rk   ro   rv   r|   r   r   r   r    r\      s    






r\   c                   @  s8   e Zd ZdZdddZdd
dZdddZdddZdS )TestConsumerzP
    A very basic test consumer for use with the NonStreamingProducerTests.
    r   r   c                 C  s   g | _ d | _d | _d S r   )writesr'   producerStreamingr   r   r   r    __init__=  s   
zTestConsumer.__init__r'   r$   r(   boolc                 C  s   || _ || _dS )z
        Registers a single producer with this consumer. Just keeps track of it.

        @param producer: The producer to register.
        @param streaming: Whether the producer is a streaming one or not.
        Nr'   r   r)   r   r   r    r%   B  s   
zTestConsumer.registerProducerc                 C  s   d| _ d| _dS )zC
        Forget the producer we had previously registered.
        Nr   r   r   r   r    r.   L  s   
zTestConsumer.unregisterProducerdatabytesc                 C  s   | j | dS )zz
        Some data was written to the consumer: stores it for later use.

        @param data: The data to write.
        N)r~   append)r   r   r   r   r    writeS  s   zTestConsumer.writeNrW   )r'   r$   r(   r   r   r   )r   r   r   r   )rX   rY   rZ   r[   r   r%   r.   r   r   r   r   r    r}   8  s    



r}   c                   @  s$   e Zd ZdZd	ddZd	ddZdS )
NonStreamingProducerTestszF
    Tests for the L{NonStreamingProducer} to validate behaviour.
    r   r   c                 C  s   t  }t|}||d | |j| | |j| | |j tdD ]}|	  q%g d}| 
|j | 
|j | 
|j | |j| | t|j	 dS )z
        When the L{NonStreamingProducer} has resumeProducing called 10 times,
        it writes the counter each time and then fails.
        F
   )
   0   1   2   3   4   5   6   7   8   9N)r}   r   r%   r&   r'   consumerrH   r   ranger=   r/   r5   r~   r+   r,   )r   r   r'   _expectedWritesr   r   r    test_producesOnly10Timesa  s   
z2NonStreamingProducerTests.test_producesOnly10Timesc                 C  s4   t  }t|}||d |  | t|j dS )zb
        When the L{NonStreamingProducer} is paused, it raises a
        L{RuntimeError}.
        FN)r}   r   r%   r=   r+   r,   r:   )r   r   r'   r   r   r    test_cannotPauseProduction{  s
   z4NonStreamingProducerTests.test_cannotPauseProductionNrW   )rX   rY   rZ   r[   r   r   r   r   r   r    r   \  s    
r   c                   @  s   e Zd ZedddZ dS )_SupportsNamer   strc                 C  s   d S r   r   r   r   r   r    rX     s   z_SupportsName.__name__N)r   r   )rX   rY   rZ   propertyr   r   r   r    r     s    r   c                   @  s   e Zd ZdZd'dd	Zd(d
dZd(ddZd(ddZd(ddZd(ddZ	d(ddZ
d(ddZd(ddZd(ddZd(ddZd(ddZd(d d!Zd(d"d#Zd(d$d%Zd&S ))DeprecationTestsz8
    Deprecations in L{twisted.test.proto_helpers}.
    testCallable[..., object]objr   r   r   c                 C  sd   d|j  }| |g}| t|d d  | dt| | ||d d  | |t| d S )Nztwisted.internet.testing.r   category   message)rX   flushWarningsr5   DeprecationWarninglenassertInr&   r   )r   r   r   new_pathwarningsr   r   r    helper  s   zDeprecationTests.helperc                 C     ddl m} | | j| d S )Nr   )AccumulatingProtocol)twisted.test.proto_helpersr   r   test_accumulatingProtocol)r   r   r   r   r    r        z*DeprecationTests.test_accumulatingProtocolc                 C  r   )Nr   )LineSendingProtocol)r   r   r   test_lineSendingProtocol)r   r   r   r   r    r     r   z)DeprecationTests.test_lineSendingProtocolc                 C  r   )Nr   )FakeDatagramTransport)r   r   r   test_fakeDatagramTransport)r   r   r   r   r    r     r   z+DeprecationTests.test_fakeDatagramTransportc                 C  r   )Nr   )r   )r   r   r   test_stringTransport)r   r   r   r   r    r     r   z%DeprecationTests.test_stringTransportc                 C  r   )Nr   ) StringTransportWithDisconnection)r   r   r   %test_stringTransportWithDisconnection)r   r   r   r   r    r     s   z6DeprecationTests.test_stringTransportWithDisconnectionc                 C  r   )Nr   )StringIOWithoutClosing)r   r   r   test_stringIOWithoutClosing)r   r   r   r   r    r     r   z,DeprecationTests.test_stringIOWithoutClosingc                 C  r   )Nr   )_FakeConnector)r   r   r   test__fakeConnector)r   r   r   r   r    r     r   z$DeprecationTests.test__fakeConnectorc                 C  r   )Nr   )	_FakePort)r   r   r   test__fakePort)r   r   r   r   r    r     r   zDeprecationTests.test__fakePortc                 C  r   )Nr   )r   )r   r   r   test_memoryReactor)r   r   r   r   r    r     r   z#DeprecationTests.test_memoryReactorc                 C  r   )Nr   )MemoryReactorClock)r   r   r   test_memoryReactorClock)r   r   r   r   r    r     r   z(DeprecationTests.test_memoryReactorClockc                 C  r   )Nr   )r   )r   r   r   test_raisingMemoryReactor)r   r   r   r   r    r     r   z*DeprecationTests.test_raisingMemoryReactorc                 C  r   )Nr   )r   )r   r   r   test_nonStreamingProducer)r   r   r   r   r    r     r   z*DeprecationTests.test_nonStreamingProducerc                 C  r   )Nr   )waitUntilAllDisconnected)r   r   r   test_waitUntilAllDisconnected)r   r   r   r   r    r     r   z.DeprecationTests.test_waitUntilAllDisconnectedc                 C  r   )Nr   )EventLoggingObserver)r   r   r   test_eventLoggingObserver)r   r   r   r   r    r     r   z*DeprecationTests.test_eventLoggingObserverN)r   r   r   r   r   r   rW   )rX   rY   rZ   r[   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r     s"    













r   N)'r[   
__future__r   typingr   zope.interface.verifyr   typing_extensionsr   twisted.internet.addressr   twisted.internet.interfacesr   r   r	   r
   r   r   r   r   r   twisted.internet.protocolr   r   twisted.internet.testingr   r   r   r   twisted.python.reflectr   twisted.trial.unittestr   r   r\   r}   r   r   r   r   r   r   r    <module>   s$   , )j$.