o
    >hg                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlm Z d dl	m
Z
 d dlmZ ddlmZ ddlmZmZmZmZmZ eeZG dd	 d	ZG d
d de jZG dd dZG dd deZdS )    N)asyncio)ChannelFull)BaseChannelLayer   )registry)_close_redis_consistent_hash_wrap_closecreate_pooldecode_hostsc                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )ChannelLockz
    Helper class for per-channel locking.

    Once a lock is released and has no waiters, it will also be deleted,
    to mitigate multi-event loop problems.
    c                 C   s   t tj| _t t| _d S N)collectionsdefaultdictr   Locklocksintwait_countsself r   s/var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/channels_redis/core.py__init__"   s   zChannelLock.__init__c                    s(   | j |  d7  < | j|  I dH S )z9
        Acquire the lock for the given channel.
        r   N)r   r   acquirer   channelr   r   r   r   &   s   zChannelLock.acquirec                 C   s   | j |  S )zP
        Return ``True`` if the lock for the given channel is acquired.
        )r   lockedr   r   r   r   r   -   s   zChannelLock.lockedc                 C   sF   | j |   | j|  d8  < | j| dk r!| j |= | j|= dS dS )z9
        Release the lock for the given channel.
        r   N)r   releaser   r   r   r   r   r   3   s   zChannelLock.releaseN)__name__
__module____qualname____doc__r   r   r   r   r   r   r   r   r      s    r   c                       s   e Zd Z fddZ  ZS )BoundedQueuec                    s    |   r|   tt| |S r   )full
get_nowaitsuperr"   
put_nowait)r   item	__class__r   r   r&   ?   s   zBoundedQueue.put_nowait)r   r   r    r&   __classcell__r   r   r(   r   r"   >   s    r"   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )RedisLoopLayerc                 C   s   t  | _|| _i | _d S r   )r   r   _lockchannel_layer_connections)r   r-   r   r   r   r   L   s   

zRedisLoopLayer.__init__c                 C   s2   || j vr| j|}tj|d| j |< | j | S )N)connection_pool)r.   r-   r
   aioredisRedis)r   indexpoolr   r   r   get_connectionQ   s   

zRedisLoopLayer.get_connectionc              	      sl   | j 4 I d H ! t| jD ]}| j|}t|I d H  qW d   I d H  d S 1 I d H s/w   Y  d S r   )r,   listr.   popr   )r   r2   
connectionr   r   r   flushX   s   .zRedisLoopLayer.flushN)r   r   r    r   r4   r8   r   r   r   r   r+   K   s    r+   c                   @   s   e Zd ZdZdZ										d9d
dZdd ZddgZdd Zdd Z	dd Z
dd Zdd Zdd Zd:ddZdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1d2 Zd3d4 Zd5d6 Zd7d8 ZdS );RedisChannelLayerz
    Redis channel layer.

    It routes all messages into remote Redis server. Support for
    sharding among different Redis installations and message
    encryption are provided.
       Nasgi<   Q d      msgpackc
           
      C   s   || _ || _|| _| |pi | _|| _t| jtsJ dt|| _	t
| j	| _tj|	|| j |d| _i | _ttt
| j	| _ttt
| j	| _t j| _d| _d | _d | _ttt | j| _!g | _"t# | _$d S )NzPrefix must be unicode)random_prefix_lengthexpirysymmetric_encryption_keysr   )%rB   group_expirycapacitycompile_capacitieschannel_capacityprefix
isinstancestrr   hostslen	ring_sizer   get_serializer_serializer_layers	itertoolscyclerange_receive_index_generator_send_index_generatoruuiduuid4hexclient_prefixreceive_countreceive_lockreceive_event_loopr   r   	functoolspartialr"   receive_bufferreceive_cleanersr   receive_clean_locks)
r   rK   rH   rB   rD   rE   rG   rC   rA   serializer_formatr   r   r   r   j   s4   
zRedisChannelLayer.__init__c                 C   s   t | j| S r   )r
   rK   )r   r2   r   r   r   r
      s   zRedisChannelLayer.create_poolgroupsr8   c                    s  t |ts
J d| |sJ dd|vsJ |}d|v r.t| }||d< | |}| j| }d|v r=| |}nt| j}| 	|}|j
|dtt t| j dI dH  ||dd	I dH | |krmt ||| |t iI dH  ||t| jI dH  dS )
zF
        Send a message onto a (general or specific) channel.
        zmessage is not a dictChannel name not valid__asgi_channel__!r   minmaxNz-infz+inf)rI   dictvalid_channel_nameitemsnon_local_namerH   consistent_hashnextrU   r7   zremrangebyscorer   timerB   zcountget_capacityr   zadd	serializeexpire)r   r   messagechannel_non_local_namechannel_keyr2   r7   r   r   r   send   s.   



 zRedisChannelLayer.sendc                 C   s   |d S )zQ
        Construct the key used as a backup queue for the given channel.
        z	$inflightr   r   r   r   r   _backup_channel_name   s   z&RedisChannelLayer._backup_channel_namec                    s|   d}|  |}| |}||d||I dH  |j||dI dH }|dur:|\}}	}
|||	t|
iI dH  |	S d}	|	S )z
        Perform a Redis BRPOP and manage the backup processing queue.
        In case of cancellation, make sure the message is not lost.
        a  
            local backed_up = redis.call('ZRANGE', ARGV[2], 0, -1, 'WITHSCORES')
            for i = #backed_up, 1, -2 do
                redis.call('ZADD', ARGV[1], backed_up[i], backed_up[i - 1])
            end
            redis.call('DEL', ARGV[2])
        r   Ntimeout)r{   r7   evalbzpopminrt   float)r   r2   r   r}   cleanup_scriptbackup_queuer7   result_member	timestampr   r   r   _brpop_with_clean   s   


z#RedisChannelLayer._brpop_with_cleanc                    s&   |  |}|| |I dH  dS )z
        Pop the oldest message off the channel backup queue.
        The result isn't interesting as it was already processed.
        N)r7   zpopminr{   )r   r2   r   r7   r   r   r   _clean_receive_backup   s   
z'RedisChannelLayer._clean_receive_backupc                    s&  |  |sJ d|v r| |}|| jd sJ dt }|  jd7  _zB| jdkr9t | _|| _	n	| j	|krBt
dd}| j|  r6| j | j|  g}dd |D }ztj|tjdI dH \}}|D ]}|  qoW n' tjy   | j|= |D ]}| s| sJ | d	u r| j  q w d } }	}
|D ]-}z| }W n ty } z|}
W Y d}~qd}~ww |d	u r|}	qt|tsJ |}q|s|
r|	r| j  |
r|
nQ|	sJ z?z*| |I dH \}}t|tr	|D ]
}| j| | qn| j| | d}W n ty!   | j|=  w W | j  n| j  w | j|  sL|du rB| j|  }| j|  rN| j|= |W |  jd8  _| jd
krl| j reJ d| _d| _	S S |  jd8  _| jd
kr| j rJ d| _d| _	w | |I dH d S )z
        Receive the first message that arrives on the channel.
        If more than one coroutine waits on the same channel, the first waiter
        will be given the message when it arrives.
        rf   zWrong client prefixr   zETwo event loops are trying to receive() on one channel layer at once!Nc                 S   s   g | ]}t |qS r   )r   ensure_future).0taskr   r   r   
<listcomp>      z-RedisChannelLayer.receive.<locals>.<listcomp>)return_whenTr   )rk   rm   endswithrY   r   get_running_looprZ   r   r[   r\   RuntimeErrorr_   emptyr   getwaitFIRST_COMPLETEDcancelCancelledErrordoner   r   BaseExceptionrI   rj   receive_singler5   r&   	Exceptionr$   r   )r   r   real_channellooprw   tasksr   pendingr   token	exceptionr   errormessage_channelchanr   r   r   receive   s   








KzRedisChannelLayer.receivec                    s  j |ddsJ dd|v r|dsJ |}ntj}j|  d}j I dH  z1|du rGj| j	dI dH }|du s7t
| }j|  fdd}|| W n tyq   j   w |}d	|v r|d	 }|d	= ||fS )
zN
        Receives a single message off of the channel and returns it.
        T)r   zChannel name invalidrf   Nr|   c                    s   j |  j  d S r   )r`   removera   r   )cleanerry   r   r   r   _cleanup_done  s   z7RedisChannelLayer.receive_single.<locals>._cleanup_donere   )rk   r   rn   ro   rT   rH   ra   r   r   brpop_timeoutr   r   r   r`   appendadd_done_callbackr   r   deserialize)r   r   r2   contentr   r   rw   r   r   r   r   w  s<   



z RedisChannelLayer.receive_singlespecificc                    s   | d| j  dt j S )zx
        Returns a new channel name that can be used by something in our
        process as a specific channel.
        .rf   )rY   rV   rW   rX   )r   rH   r   r   r   new_channel  s   zRedisChannelLayer.new_channelc                    sZ   |   I dH  d}t| jD ]}| |}||d| jd I dH  q|  I dH  dS )z@
        Deletes all messages and groups on all shards.
        Nz
            local keys = redis.call('keys', ARGV[1])
            for i=1,#keys,5000 do
                redis.call('del', unpack(keys, i, math.min(i+4999, #keys)))
            end
        r   *)wait_receivedrS   rM   r7   r~   rH   close_pools)r   delete_prefixir7   r   r   r   r8     s   
zRedisChannelLayer.flushc                    s2   |   I dH  | j D ]	}| I dH  qdS )z@
        Close all connections in the event loop pools.
        N)r   rP   valuesr8   )r   layerr   r   r   r     s
   zRedisChannelLayer.close_poolsc                    s*   | j rt| j dd I dH  dS dS )zC
        Wait for all channel cleanup functions to finish.
        N)r`   r   r   r   r   r   r   r     s   zRedisChannelLayer.wait_receivedc                    sr   |  |s
J d| |sJ d| |}| | |}|||t iI dH  ||| jI dH  dS )z3
        Adds the channel name to a group.
        Group name not validrd   N)	valid_group_namerk   
_group_keyr7   rn   rt   rq   rv   rD   )r   groupr   	group_keyr7   r   r   r   	group_add  s   
zRedisChannelLayer.group_addc                    sV   |  |s
J d| |sJ d| |}| | |}|||I dH  dS )z
        Removes the channel from the named group if it is in the group;
        does nothing otherwise (does not error)
        r   rd   N)r   rk   r   r7   rn   zrem)r   r   r   keyr7   r   r   r   group_discard  s   
zRedisChannelLayer.group_discardc                    s`  |  |s
J d| |}| | |}|j|dtt | j dI dH  dd ||ddI dH D }| 	||\} |
 D ]h\}}| }	|D ]}|	j|dtt t| j d qO|	 I dH  d}
fd	d|D }| fd
d|D 7 }|t | jg7 }| |}|j|
t|g||R  I dH }|dkrtd|t|| qEdS )z6
        Sends a message to the entire group.
        r   r   rg   Nc                 S   s   g | ]}| d qS )utf8)decode)r   xr   r   r   r     r   z0RedisChannelLayer.group_send.<locals>.<listcomp>aI  
                local over_capacity = 0
                local current_time = ARGV[#ARGV - 1]
                local expiry = ARGV[#ARGV]
                for i=1,#KEYS do
                    if redis.call('ZCOUNT', KEYS[i], '-inf', '+inf') < tonumber(ARGV[i + #KEYS]) then
                        redis.call('ZADD', KEYS[i], current_time, ARGV[i])
                        redis.call('EXPIRE', KEYS[i], expiry)
                    else
                        over_capacity = over_capacity + 1
                    end
                end
                return over_capacity
            c                       g | ]} | qS r   r   r   ry   )channel_keys_to_messager   r   r   .      c                    r   r   r   r   )channel_keys_to_capacityr   r   r   4  r   z+%s of %s channels over capacity in group %s)r   r   r7   rn   rp   r   rq   rD   zrange_map_channel_keys_to_connectionrl   pipelinerB   executer~   rL   loggerinfo)r   r   rw   r   r7   channel_namesconnection_to_channel_keysconnection_indexchannel_redis_keyspipegroup_send_luaargschannels_over_capacityr   )r   r   r   
group_send  sZ   



zRedisChannelLayer.group_sendc                 C   s   t t}t }t }|D ]B}|}d|v r| |}| j| }||vrFt| }|g|d< |||< | |||< | |}	||	 	| q|| d 	| q| D ]\}
}| 
|||
< qT|||fS )a  
        For a list of channel names, GET

        1. list of their redis keys bucket each one to a dict keyed by the connection index

        2. for each unique channel redis key create a serialized message specific to that redis key, by adding
           the list of channels mapped to that redis key in __asgi_channel__ key to the message

        3. returns a mapping of redis channels keys to their capacity
        rf   re   )r   r   r5   rj   rm   rH   rl   rs   rn   r   ru   )r   r   rw   r   channel_key_to_messagechannel_key_to_capacityr   rx   ry   idxr   valuer   r   r   r   H  s,   




z1RedisChannelLayer._map_channel_keys_to_connectionc                 C   s   | j  d| dS )zH
        Common function to make the storage key for the group.
        z:group:r   )rH   encode)r   r   r   r   r   r   z  s   zRedisChannelLayer._group_keyc                 C      | j |S )z6
        Serializes message to a byte string.
        )rO   ru   r   rw   r   r   r   ru        zRedisChannelLayer.serializec                 C   r   )z2
        Deserializes from a byte string.
        )rO   r   r   r   r   r   r     r   zRedisChannelLayer.deserializec                 C   s   t || jS r   )r   rM   )r   r   r   r   r   rn     s   z!RedisChannelLayer.consistent_hashc                 C   s   | j j d| j dS )Nz(hosts=))r)   r   rK   r   r   r   r   __str__  s   zRedisChannelLayer.__str__c                 C   s   d|  kr| j k sn td| j  d| dt }z| j| }W n ty:   t| | t|  }| j|< Y nw ||S )zh
        Returns the correct connection for the index given.
        Lazily instantiates pools.
        r   zThere are only z hosts - you asked for rf   )	rM   
ValueErrorr   r   rP   KeyErrorr	   r+   r4   )r   r2   r   r   r   r   r   r7     s   

zRedisChannelLayer.connection)	Nr;   r<   r=   r>   NNr?   r@   )r   )r   r   r    r!   r   r   r
   
extensionsrz   r{   r   r   r   r   r   r8   r   r   r   r   r   r   r   ru   r   rn   r   r7   r   r   r   r   r9   _   sF    
5(|
7	
	M2r9   )r   r   r]   rQ   loggingrq   rV   redisr0   channels.exceptionsr   channels.layersr   serializersr   utilsr   r   r	   r
   r   	getLoggerr   r   r   Queuer"   r+   r9   r   r   r   r   <module>   s"    
$