o
    >h                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ e	dkr/dd	lmZ ndd
lmZ G dd dZG dd deZG dd dejZdS )zK
Tests for the internal implementation details of L{twisted.internet.udp}.
    )annotationsN)udp)DatagramProtocol)platformType)unittestwin32)WSAEWOULDBLOCK)EWOULDBLOCKc                   @  s.   e Zd ZdZdddZdd
dZdddZdS )StringUDPSocketa  
    A fake UDP socket object, which returns a fixed sequence of strings and/or
    socket errors.  Useful for testing.

    @ivar retvals: A C{list} containing either strings or C{socket.error}s.

    @ivar connectedAddr: The address the socket is connected to.
    retvalslist[bytes | socket.error]returnNonec                 C  s   || _ d | _d S N)r   connectedAddr)selfr    r   /var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/twisted/internet/test/test_udp_internals.py__init__    s   
zStringUDPSocket.__init__addrobjectc                 C  s
   || _ d S r   )r   )r   r   r   r   r   connect$      
zStringUDPSocket.connectsizeinttuple[bytes, None]c                 C  s$   | j d}t|tjr||dfS )zH
        Return (or raise) the next value from C{self.retvals}.
        r   N)r   pop
isinstancesocketerror)r   r   retr   r   r   recvfrom'   s   zStringUDPSocket.recvfromN)r   r   r   r   )r   r   r   r   )r   r   r   r   )__name__
__module____qualname____doc__r   r   r!   r   r   r   r   r
      s
    
	
r
   c                   @  s$   e Zd ZdZdddZdd
dZdS )	KeepReadsz%
    Accumulate reads in a list.
    r   r   c                 C  s
   g | _ d S r   )reads)r   r   r   r   r   6   r   zKeepReads.__init__databytesr   r   c                 C  s   | j | d S r   )r'   append)r   r(   r   r   r   r   datagramReceived9   s   zKeepReads.datagramReceivedNr   r   )r(   r)   r   r   r   r   )r"   r#   r$   r%   r   r+   r   r   r   r   r&   1   s    
r&   c                   @  s8   e Zd ZdZdddZdddZddd	Zdd
dZdS )ErrorsTestsz/
    Error handling tests for C{udp.Port}.
    r   r   c                 C  s   t jd | t jjd t }t d|}tddt	ddt	dg|_|
  | |jddg |
  | |jg d dS )z
        Socket reads with some good data followed by a socket error which can
        be ignored causes reading to stop, and no log messages to be logged.
        iN   result   123   456)r.   r/   r0   )r   _sockErrReadIgnorer*   
addCleanupremover&   Portr
   r   r   doReadassertEqualr'   r   protocolportr   r   r   test_socketReadNormalB   s   z!ErrorsTests.test_socketReadNormalc                 C  s   t jd | t jjd t }dd |_t d|}tdt	
ddt	
tg|_	|  | |jdg |  | |jddg dS )z
        If the socket is unconnected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is not called.
        c                   S  s   dd S )N   r   r   r   r   r   r   <lambda>e   s    z5ErrorsTests.test_readImmediateError.<locals>.<lambda>N   a   b)r   _sockErrReadRefuser*   r2   r3   r&   connectionRefusedr4   r
   r   r   r	   r5   r6   r'   r7   r   r   r   test_readImmediateErrorX   s   
z#ErrorsTests.test_readImmediateErrorc                   s   t jd | t jjd t }g   fdd|_t d|}tdt	
ddt	
tg|_	|dd |  | |jdg |  d	g |  | |jddg |  d	g dS )
z
        If the socket connected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is called.
        r;   c                     s
     dS )NT)r*   r   refusedr   r   r=      s   
 z>ErrorsTests.test_connectedReadImmediateError.<locals>.<lambda>Nr>   r?   z	127.0.0.1i'  T)r   r@   r*   r2   r3   r&   rA   r4   r
   r   r   r	   r   r5   r6   r'   r7   r   rC   r    test_connectedReadImmediateErrort   s    z,ErrorsTests.test_connectedReadImmediateErrorc                 C  sJ   t  }td|}tdtdg|_| tj|j | |j	dg dS )zG
        Socket reads with an unknown socket error are raised.
        Ns   goodi)
r&   r   r4   r
   r   r   assertRaisesr5   r6   r'   r7   r   r   r   test_readUnknownError   s
   z!ErrorsTests.test_readUnknownErrorNr,   )r"   r#   r$   r%   r:   rB   rE   rG   r   r   r   r   r-   =   s    


r-   )r%   
__future__r   r   twisted.internetr   twisted.internet.protocolr   twisted.python.runtimer   twisted.trialr   errnor   r	   r
   r&   SynchronousTestCaser-   r   r   r   r   <module>   s   