o
    >h&                     @   s   d Z ddlZddlZddlmZmZmZmZmZm	Z	m
Z
mZmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ d
Zd
ZzejdrUddlmZmZ dZdZW n	 ey_   Y nw e  ZeeeZ e!  G dd deeZ"dS )z/
Tests for L{twisted.internet.asyncioreactor}.
    N)	AbstractEventLoopAbstractEventLoopPolicyDefaultEventLoopPolicyFutureSelectorEventLoopget_event_loopget_event_loop_policyset_event_loopset_event_loop_policy)skipIf)AsyncioSelectorReactor)platform)SynchronousTestCase   )ReactorBuilderFwin32)WindowsProactorEventLoopPolicyWindowsSelectorEventLoopPolicyTc                	   @   s(  e Zd ZdZdd ZdedefddZee	 d
eeejjejje d	d
 Zee	 d
eeejjejje dd Zee	d
eeejjejje dd Zee ddd Zee ddd Zee ddd Zee ddd Zdd Zdd Zd d! Zd"d# Zd$S )%AsyncioSelectorReactorTestsz*
    L{AsyncioSelectorReactor} tests.
    c                    sX   t  }g  fdd}|| |d | g  | j dd | dg dS )zn
        Ensure that C{reactor} has an event loop that works
        properly with L{asyncio.Future}.
        c                    s    |      d S N)appendresultstop)futurereactorr    /var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/twisted/internet/test/test_asyncioreactor.py	completed=   s   zRAsyncioSelectorReactorTests.assertReactorWorksWithAsyncioFuture.<locals>.completedTr   )timeoutN)r   add_done_callback
set_resultassertEqual
runReactor)selfr   r   r   r   r   r   #assertReactorWorksWithAsyncioFuture5   s   

z?AsyncioSelectorReactorTests.assertReactorWorksWithAsyncioFuturepolicyreturnc                    s.   t   t | | j fdd}S )z
        Make a new asyncio loop from a policy for use with a reactor, and add
        appropriate cleanup to restore any global state.
        c                      s      t  t d S r   )closer	   r
   r   existingLoopexistingPolicyr   r   r   cleanUpQ   s   z4AsyncioSelectorReactorTests.newLoop.<locals>.cleanUp)r   r   new_event_loop
addCleanup)r$   r&   r,   r   r)   r   newLoopH   s   z#AsyncioSelectorReactorTests.newLoopzLdefault event loop: {}
is not of type SelectorEventLoop on Python {}.{} ({})c                 C   s   t  }| | dS )a  
        L{AsyncioSelectorReactor} wraps the global policy's event loop
        by default.  This ensures that L{asyncio.Future}s and
        coroutines created by library code that uses
        L{asyncio.get_event_loop} are bound to the same loop.
        N)r   r%   r$   r   r   r   r   -test_defaultSelectorEventLoopFromGlobalPolicyY   s   zIAsyncioSelectorReactorTests.test_defaultSelectorEventLoopFromGlobalPolicyc                 C   *   |  t }t|}t| | | dS )z
        If we use the L{asyncio.DefaultLoopPolicy} to create a new event loop,
        and then pass that event loop to a new L{AsyncioSelectorReactor},
        this reactor should work properly with L{asyncio.Future}.
        N)r/   r   r   r	   r%   r$   
event_loopr   r   r   r   3test_newSelectorEventLoopFromDefaultEventLoopPolicym   s   zOAsyncioSelectorReactorTests.test_newSelectorEventLoopFromDefaultEventLoopPolicyzHdefault event loop: {}
is of type SelectorEventLoop on Python {}.{} ({})c                 C   s   |  tt dS )ay  
        On Windows Python 3.5 to 3.7, L{get_event_loop()} returns a
        L{WindowsSelectorEventLoop} by default.
        On Windows Python 3.8+, L{get_event_loop()} returns a
        L{WindowsProactorEventLoop} by default.
        L{AsyncioSelectorReactor} should raise a
        L{TypeError} if the default event loop is not a
        L{WindowsSelectorEventLoop}.
        N)assertRaises	TypeErrorr   r$   r   r   r   1test_defaultNotASelectorEventLoopFromGlobalPolicy   s   zMAsyncioSelectorReactorTests.test_defaultNotASelectorEventLoopFromGlobalPolicyz&WindowsProactorEventLoop not availablec                 C   s   |  t }| tt| dS )z
        L{AsyncioSelectorReactor} will raise a L{TypeError}
        if instantiated with a L{asyncio.WindowsProactorEventLoop}
        N)r/   r   r6   r7   r   )r$   r4   r   r   r   test_WindowsProactorEventLoop   s   z9AsyncioSelectorReactorTests.test_WindowsProactorEventLoopz(WindowsSelectorEventLoop only on Windowsc                 C   r2   )zR
        L{WindowsSelectorEventLoop} works with L{AsyncioSelectorReactor}
        N)r/   r   r   r	   r%   r3   r   r   r   test_WindowsSelectorEventLoop   s   z9AsyncioSelectorReactorTests.test_WindowsSelectorEventLoopz.WindowsProactorEventLoopPolicy only on Windowsc                 C   sN   t t  | dd  | t t  W d   dS 1 s w   Y  dS )z
        L{AsyncioSelectorReactor} will raise a L{TypeError}
        if L{asyncio.WindowsProactorEventLoopPolicy} is default.
        c                   S      t d S r   r
   r   r   r   r   <lambda>       zQAsyncioSelectorReactorTests.test_WindowsProactorEventLoopPolicy.<locals>.<lambda>N)r
   r   r.   r6   r7   r   r8   r   r   r   #test_WindowsProactorEventLoopPolicy   s
   
	"z?AsyncioSelectorReactorTests.test_WindowsProactorEventLoopPolicyz.WindowsSelectorEventLoopPolicy only on Windowsc                 C   s,   t t  | dd  t }| | dS )zy
        L{AsyncioSelectorReactor} will work if
        if L{asyncio.WindowsSelectorEventLoopPolicy} is default.
        c                   S   r<   r   r=   r   r   r   r   r>      r?   zQAsyncioSelectorReactorTests.test_WindowsSelectorEventLoopPolicy.<locals>.<lambda>N)r
   r   r.   r   r%   r0   r   r   r   #test_WindowsSelectorEventLoopPolicy   s   
	z?AsyncioSelectorReactorTests.test_WindowsSelectorEventLoopPolicyc                 C   sF   t rtt  | dd  t }| }| |d | |d dS )z0L{seconds} should return a plausible epoch time.c                   S   r<   r   r=   r   r   r   r   r>      r?   z:AsyncioSelectorReactorTests.test_seconds.<locals>.<lambda>i ^l    #G4 N)!hasWindowsSelectorEventLoopPolicyr
   r   r.   r   secondsassertGreater
assertLess)r$   r   r   r   r   r   test_seconds   s   
z(AsyncioSelectorReactorTests.test_secondsc                    s   t rtt  | dd  t  dg fdd}  } d|}|d  d j  	  | 
d  | d | d	 dS )
zQ
        L{DelayedCall.reset()} properly reschedules timer to later time
        c                   S   r<   r   r=   r   r   r   r   r>      r?   zJAsyncioSelectorReactorTests.test_delayedCallResetToLater.<locals>.<lambda>Nc                            d< d S Nr   rC   r   r   timer_called_atr   r   on_timer      zJAsyncioSelectorReactorTests.test_delayedCallResetToLater.<locals>.on_timerr         ?r   皙?)rB   r
   r   r.   r   rC   	callLaterresetr   runassertIsNotNonerD   )r$   rL   
start_timedcr   rJ   r   test_delayedCallResetToLater   s   

z8AsyncioSelectorReactorTests.test_delayedCallResetToLaterc                    s   t rtt  t  dg fdd}  } d|}|d  d j ddl}ddl	m
} | }||    W d   n1 sKw   Y  | | d | d  | d | d	 t rqtd dS dS )
zS
        L{DelayedCall.reset()} properly reschedules timer to earlier time
        Nc                      rG   rH   rI   r   rJ   r   r   rL      rM   zLAsyncioSelectorReactorTests.test_delayedCallResetToEarlier.<locals>.on_timerrN   r   r   )redirect_stderr rO   )rB   r
   r   r   rC   rP   rQ   r   io
contextlibrW   StringIOrR   r"   getvaluerS   rE   )r$   rL   rT   rU   rY   rW   stderrr   rJ   r   test_delayedCallResetToEarlier   s*   



z:AsyncioSelectorReactorTests.test_delayedCallResetToEarlierc                 C   s   t rtt  t }t  z6tt }d}t }t	|D ]
}|
ddd  q|  tt }| || | d W |rEt  n|rMt  w w t rVtd dS dS )zW
        L{AsyncioSelectorReactor.callLater()} doesn't leave cyclic references
        i  r   c                   S   s   d S r   r   r   r   r   r   r>     s    zOAsyncioSelectorReactorTests.test_noCycleReferencesInCallLater.<locals>.<lambda>r   N)rB   r
   r   gc	isenableddisablelenget_objectsr   rangerP   runUntilCurrentrE   enable)r$   gc_was_enabledobjects_beforetimer_countr   _objects_afterr   r   r   !test_noCycleReferencesInCallLater  s,   

z=AsyncioSelectorReactorTests.test_noCycleReferencesInCallLaterN) __name__
__module____qualname____doc__r%   r   r   r/   r   _defaultEventLoopIsSelectorformattype_defaultEventLoopsysversion_infomajorminorr   getTyper1   r5   r9   !hasWindowsProactorEventLoopPolicyr:   rB   r;   r@   rA   rF   rV   r^   rl   r   r   r   r   r   0   sx    








	



r   )#rp   r_   ru   asyncior   r   r   r   r   r   r   r	   r
   unittestr   twisted.internet.asyncioreactorr   twisted.python.runtimer   twisted.trial.unittestr   reactormixinsr   rz   rB   
startswithr   r   ImportErrorr-   rt   
isinstancerq   r(   r   r   r   r   r   <module>   s0   ,

