o
    >h21                     @   s   d dl Z d dlZd dlZd dlZd dlm Z ddlmZ ddlm	Z	m
Z
mZmZmZ eeZdd ZG dd	 d	ZG d
d dZG dd dZdS )    N)asyncio   )registry)_close_redis_consistent_hash_wrap_closecreate_pooldecode_hostsc                    s$   |   }t|||i |I d H S N)
_get_layergetattr)objnameargskwargslayer r   u/var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/channels_redis/pubsub.py_async_proxy   s   r   c                   @   s@   e Zd Zddd	dddZdd Zd	d
 Zdd Zdd ZdS )RedisPubSubChannelLayerNmsgpack)symmetric_encryption_keysserializer_formatreturnc                O   s&   || _ || _i | _tj||d| _d S )N)r   )_args_kwargs_layersr   get_serializer_serializer)selfr   r   r   r   r   r   r   __init__   s   z RedisPubSubChannelLayer.__init__c                 C   s$   |dv rt t| |S t|  |S )N)new_channelsendreceive	group_addgroup_discard
group_sendflush)	functoolspartialr   r   r   )r   r   r   r   r   __getattr__,   s   	z#RedisPubSubChannelLayer.__getattr__c                 C      | j |S )z6
        Serializes message to a byte string.
        )r   	serializer   messager   r   r   r,   :      z!RedisPubSubChannelLayer.serializec                 C   r+   )z2
        Deserializes from a byte string.
        )r   deserializer-   r   r   r   r0   @   r/   z#RedisPubSubChannelLayer.deserializec                 C   s\   t  }z| j| }W |S  ty-   t| ji | jd| i}|| j|< t| | Y |S w )Nchannel_layer)r   get_running_loopr   KeyErrorRedisPubSubLoopLayerr   r   r   )r   loopr   r   r   r   r   F   s    

z"RedisPubSubChannelLayer._get_layer)r   N)__name__
__module____qualname__r    r*   r,   r0   r   r   r   r   r   r      s    
r   c                   @   s~   e Zd ZdZ					dddZdd Zdd	 Zd
d ZddgZdd Z	dddZ
dd Zdd Zdd Zdd Zdd ZdS )r4   z@
    Channel Layer that uses Redis's pub/sub functionality.
    Nasgic                    s@   | _ | _| _| _i  _i  _ fddt|D  _d S )Nc                    s   g | ]}t | qS r   )RedisSingleShardConnection).0hostr   r   r   
<listcomp>t   s    
z1RedisPubSubLoopLayer.__init__.<locals>.<listcomp>)prefixon_disconnecton_reconnectr1   channelsgroupsr	   _shards)r   hostsr?   r@   rA   r1   r   r   r=   r   r    \   s   	
zRedisPubSubLoopLayer.__init__c                 C   s   | j t|t| j  S )zV
        Return the shard that is used exclusively for this channel or group.
        )rD   r   len)r   channel_or_group_namer   r   r   
_get_shardx   s   zRedisPubSubLoopLayer._get_shardc                 C   s   | j  d| S )a\  
        Return the channel name used by a group.
        Includes '__group__' in the returned
        string so that these names are distinguished
        from those returned by `new_channel()`.
        Technically collisions are possible, but it
        takes what I believe is intentional abuse in
        order to have colliding names.
        	__group__)r?   )r   groupr   r   r   _get_group_channel_name~   s   
z,RedisPubSubLoopLayer._get_group_channel_namec                    s.   t  | j|< | |}||I d H  d S r
   )r   QueuerB   rH   	subscribe)r   channelshardr   r   r   _subscribe_to_channel   s   
z*RedisPubSubLoopLayer._subscribe_to_channelrC   r'   c                    s*   |  |}||| j|I dH  dS )zF
        Send a message onto a (general or specific) channel.
        N)rH   publishr1   r,   )r   rN   r.   rO   r   r   r   r"      s   
zRedisPubSubLoopLayer.send	specific.c                    s.   | j  | t j }| |I dH  |S )zy
        Returns a new channel name that can be used by a consumer in our
        process as a specific channel.
        N)r?   uuiduuid4hexrP   )r   r?   rN   r   r   r   r!      s   z RedisPubSubLoopLayer.new_channelc                    s   || j vr| |I dH  | j | }z	| I dH }W n4 tjtjtfyP   || j v rO| j |= z| |}||I dH  W   t	yN   t
d Y  w  w | j|S )z
        Receive the first message that arrives on the channel.
        If more than one coroutine waits on the same channel, a random one
        of the waiting coroutines will get the result.
        Nz/Unexpected exception while cleaning-up channel:)rB   rP   getr   CancelledErrorTimeoutErrorGeneratorExitrH   unsubscribeBaseExceptionlogger	exceptionr1   r0   )r   rN   qr.   rO   r   r   r   r#      s(   


	
zRedisPubSubLoopLayer.receivec                    sz   || j vrtdt| d| |}|| jvr t | j|< | j| }||vr.|| | |}||I dH  dS )z3
        Adds the channel name to a group.
        zYou can only call group_add() on channels that exist in-process.
Consumers are encouraged to use the common pattern:
   self.channel_layer.group_add(z, self.channel_name)N)	rB   RuntimeErrorreprrK   rC   setaddrH   rM   r   rJ   rN   group_channelgroup_channelsrO   r   r   r   r$      s   





zRedisPubSubLoopLayer.group_addc                    sh   |  |}| j|t }||vrdS || t|dkr2| j|= | |}||I dH  dS dS )zy
        Removes the channel from a group if it is in the group;
        does nothing otherwise (does not error)
        Nr   )rK   rC   rV   ra   removerF   rH   rZ   rc   r   r   r   r%      s   


z"RedisPubSubLoopLayer.group_discardc                    s4   |  |}| |}||| j|I dH  dS )zC
        Send the message to all subscribers of the group.
        N)rK   rH   rQ   r1   r,   )r   rJ   r.   rd   rO   r   r   r   r&      s   

zRedisPubSubLoopLayer.group_sendc                    s,   i | _ i | _| jD ]	}| I dH  q
dS )z
        Flush the layer, making it like new. It can continue to be used as if it
        was just created. This also closes connections, serving as a clean-up
        method; connections will be re-opened if you continue using this layer.
        N)rB   rC   rD   r'   )r   rO   r   r   r   r'      s   
zRedisPubSubLoopLayer.flush)Nr9   NNN)rR   )r6   r7   r8   __doc__r    rH   rK   rP   
extensionsr"   r!   r#   r$   r%   r&   r'   r   r   r   r   r4   W   s&    

	%r4   c                   @   sT   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )r:   c                 C   s4   || _ || _t | _t | _d | _d | _d | _	d S r
   )
r<   r1   ra   _subscribed_tor   Lock_lock_redis_pubsub_receive_task)r   r<   r1   r   r   r   r      s   

z#RedisSingleShardConnection.__init__c              	      s^   | j 4 I d H  |   | j||I d H  W d   I d H  d S 1 I d H s(w   Y  d S r
   )rk   _ensure_redisrl   rQ   )r   rN   r.   r   r   r   rQ     s
   .z"RedisSingleShardConnection.publishc              	      s   | j 4 I d H 3 || jvr0|   |   | j|I d H  | j| W d   I d H  d S W d   I d H  d S 1 I d H sAw   Y  d S r
   )rk   ri   ro   _ensure_receiverrm   rM   rb   r   rN   r   r   r   rM        
.z$RedisSingleShardConnection.subscribec              	      s   | j 4 I d H 3 || jv r0|   |   | j|I d H  | j| W d   I d H  d S W d   I d H  d S 1 I d H sAw   Y  d S r
   )rk   ri   ro   rp   rm   rZ   rf   rq   r   r   r   rZ     rr   z&RedisSingleShardConnection.unsubscribec              	      s   | j 4 I d H C | jd ur)| j  z| jI d H  W n
 tjy%   Y nw d | _| jd ur<t| jI d H  d | _d | _t | _	W d   I d H  d S 1 I d H sQw   Y  d S r
   )
rk   rn   cancelr   rW   rl   r   rm   ra   ri   r=   r   r   r   r'   $  s    



.z RedisSingleShardConnection.flushc              
      s   	 z"| j r| j jr| j jdddI d H }| | ntdI d H  W n" tjtjtfy1     t	yF   t
d tdI d H  Y nw q)NTg?)ignore_subscribe_messagestimeoutz$Unexpected exception in receive taskr   )rm   
subscribedget_message_receive_messager   sleeprW   rX   rY   r[   r\   r]   r-   r   r   r   _do_receiving6  s*   

z(RedisSingleShardConnection._do_receivingc                 C   s   |d urD|d }|d }t |tr| }|| jjv r&| jj| | d S || jjv rF| jj| D ]}|| jjv rC| jj| | q2d S d S d S )NrN   data)
isinstancebytesdecoder1   rB   
put_nowaitrC   )r   r.   r   r{   channel_namer   r   r   rx   J  s   
z+RedisSingleShardConnection._receive_messagec                 C   s6   | j d u rt| j}tj|d| _ | j  | _d S d S )N)connection_pool)rl   r   r<   aioredisRedispubsubrm   )r   poolr   r   r   ro   W  s
   

z(RedisSingleShardConnection._ensure_redisc                 C   s"   | j d u rt|  | _ d S d S r
   )rn   r   ensure_futurerz   r=   r   r   r   rp   ]  s   
z+RedisSingleShardConnection._ensure_receiverN)r6   r7   r8   r    rQ   rM   rZ   r'   rz   rx   ro   rp   r   r   r   r   r:     s    	r:   )r   r(   loggingrS   redisr   serializersr   utilsr   r   r   r   r	   	getLoggerr6   r\   r   r   r4   r:   r   r   r   r   <module>   s    
< /