o
    >h	`                    @   s,  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
m
Z
 d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZ d dlmZmZ d dlm Z m!Z!m"Z" d dl#m$Z$ d d	l%m&Z&m'Z'm(Z( d d
l)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0m1Z1 d dl2m3Z3m4Z4m5Z5 d dl6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@ d dlAmBZBmCZC d dlDmEZEmFZF d dlGmHZH d dlImJZJmKZK d dlLmMZMmNZNmOZOmPZPmQZQmRZRmSZSmTZTmUZUmVZVmWZWmXZXmYZYmZZZm[Z[m\Z\m]Z]m^Z^ d dl_m`Z`maZambZb d dlcmdZdmeZemfZfmgZgmhZhmiZimjZj edrd dlkmlZlmmZm ndZldZmedended eedf ZoG dd de5e;eCZpG dd dZqG dd dZrG dd  d e5e;eCZse7D ]Ztetud!d"v Ztetd#krcqRewesete>et qRG d$d% d%ZxG d&d' d'eZyG d(d) d)eyZzG d*d+ d+ezZ{G d,d- d-ezZ|dS ).    N)ABCabstractmethod)copy)chain)AnyCallable	CoroutineDequeDict	GeneratorListMappingOptionalSetTupleTypeTypeVarUnion)AsyncCommandsParserEncoder)_RedisCallbacks_RedisCallbacksRESP2_RedisCallbacksRESP3)ResponseCallbackT)
ConnectionSSLConnection	parse_urlLock)Retry)TokenInterface)ExponentialWithJitterBackoff	NoBackoff)EMPTY_RESPONSENEVER_DECODEAbstractRedis)
PIPELINE_BLOCKED_COMMANDSPRIMARYREPLICASLOT_IDAbstractRedisClusterLoadBalancerLoadBalancingStrategyblock_pipeline_commandget_node_nameparse_cluster_slots)READ_COMMANDSAsyncRedisClusterCommands)REDIS_CLUSTER_HASH_SLOTSkey_slot)CredentialProvider)#AfterAsyncClusterInstantiationEventEventDispatcher)AskErrorBusyLoadingErrorClusterDownErrorClusterErrorConnectionErrorCrossSlotTransactionError	DataErrorExecAbortErrorInvalidPipelineStackMaxConnectionsError
MovedErrorRedisClusterException
RedisErrorResponseErrorSlotNotCoveredErrorTimeoutErrorTryAgainError
WatchError)AnyKeyT
EncodableTKeyT)SSL_AVAILABLEdeprecated_argsdeprecated_functionget_lib_versionsafe_strstr_if_bytestruncate_text)
TLSVersion
VerifyModeTargetNodesTClusterNodec                V   @   s  e Zd ZdZedededd fddZdZe	dgd	d
de	dgdddddddddddddddddddddde
 ddddddddddddddddddddf(dee deeef deed  d eded!ee d"ed#eded$ed%ed& d'eeee   d(eeef d)ee d*ee d+ee d,ee d-ee d.ee d/ee d0ed1ed2ed3ed4ee d5ed6eeeeeef f  d7ee d8ed9ee d:ee d;eeef d<ee d=ed>ee d?ee d@ee dAee dBeeeeef geeef f  dCee ddfRdDdEZddFdGZddHdIZedJdKdLdMddNdOZ ddPdQZ!ddUdVZ"de#edd f fdWdXZ$dYZ%e&j'e(j)fdZed[eddfd\d]Z*d^e+ddfd_d`Z,ded fdadbZ-ded fdcddZ.ded fdedfZ/ddgdhZ0ddidjZ1ddldmZ2			ddee dee dnee ded fdodpZ3	ddqedreded fdsdtZ4dqe5defdudvZ6de7fdwdxZ8de9eee f fdydzZ:d%e;ddfd{d|Z<d}ed~e=ddfddZ>ddd}ededee ded fddZ?d}ededefddZ@dedefddZAdeded fddZBde5dedefddZCdddeeDe5f dedefddZE	ddee dee ddfddZF							ddeDdee dededee deeeG  dededeGfddZHdeIddef fddZJdS )RedisClustera  
    Create a new RedisCluster client.

    Pass one of parameters:

      - `host` & `port`
      - `startup_nodes`

    | Use ``await`` :meth:`initialize` to find cluster nodes & create connections.
    | Use ``await`` :meth:`close` to disconnect connections & close client.

    Many commands support the target_nodes kwarg. It can be one of the
    :attr:`NODE_FLAGS`:

      - :attr:`PRIMARIES`
      - :attr:`REPLICAS`
      - :attr:`ALL_NODES`
      - :attr:`RANDOM`
      - :attr:`DEFAULT_NODE`

    Note: This client is not thread/process/fork safe.

    :param host:
        | Can be used to point to a startup node
    :param port:
        | Port used if **host** is provided
    :param startup_nodes:
        | :class:`~.ClusterNode` to used as a startup node
    :param require_full_coverage:
        | When set to ``False``: the client will not require a full coverage of
          the slots. However, if not all slots are covered, and at least one node
          has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
          a :class:`~.ClusterDownError` for some key-based commands.
        | When set to ``True``: all slots must be covered to construct the cluster
          client. If not all slots are covered, :class:`~.RedisClusterException` will be
          thrown.
        | See:
          https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
    :param read_from_replicas:
        | @deprecated - please use load_balancing_strategy instead
        | Enable read from replicas in READONLY mode.
          When set to true, read commands will be assigned between the primary and
          its replications in a Round-Robin manner.
          The data read from replicas is eventually consistent with the data in primary nodes.
    :param load_balancing_strategy:
        | Enable read from replicas in READONLY mode and defines the load balancing
          strategy that will be used for cluster node selection.
          The data read from replicas is eventually consistent with the data in primary nodes.
    :param dynamic_startup_nodes:
        | Set the RedisCluster's startup nodes to all the discovered nodes.
          If true (default value), the cluster's discovered nodes will be used to
          determine the cluster nodes-slots mapping in the next topology refresh.
          It will remove the initial passed startup nodes if their endpoints aren't
          listed in the CLUSTER SLOTS output.
          If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
          specific IP addresses, it is best to set it to false.
    :param reinitialize_steps:
        | Specifies the number of MOVED errors that need to occur before reinitializing
          the whole cluster topology. If a MOVED error occurs and the cluster does not
          need to be reinitialized on this current error handling, only the MOVED slot
          will be patched with the redirected node.
          To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1.
          To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
          0.
    :param cluster_error_retry_attempts:
        | @deprecated - Please configure the 'retry' object instead
          In case 'retry' object is set - this argument is ignored!

          Number of times to retry before raising an error when :class:`~.TimeoutError`,
          :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError`
          or :class:`~.ClusterDownError` are encountered
    :param retry:
        | A retry object that defines the retry strategy and the number of
          retries for the cluster client.
          In current implementation for the cluster client (starting form redis-py version 6.0.0)
          the retry object is not yet fully utilized, instead it is used just to determine
          the number of retries for the cluster client.
          In the future releases the retry object will be used to handle the cluster client retries!
    :param max_connections:
        | Maximum number of connections per node. If there are no free connections & the
          maximum number of connections are already created, a
          :class:`~.MaxConnectionsError` is raised.
    :param address_remap:
        | An optional callable which, when provided with an internal network
          address of a node, e.g. a `(host, port)` tuple, will return the address
          where the node is reachable.  This can be used to map the addresses at
          which the nodes _think_ they are, to addresses at which a client may
          reach them, such as when they sit behind a proxy.

    | Rest of the arguments will be passed to the
      :class:`~redis.asyncio.connection.Connection` instances when created

    :raises RedisClusterException:
        if any arguments are invalid or unknown. Eg:

        - `db` != 0 or None
        - `path` argument for unix socket connection
        - none of the `host`/`port` & `startup_nodes` were provided

    urlkwargsreturnc                 K   s4   | t| |ddtu rd|d< | di |S )a  
        Return a Redis client object configured from the given URL.

        For example::

            redis://[[username]:[password]]@localhost:6379/0
            rediss://[[username]:[password]]@localhost:6379/0

        Three URL schemes are supported:

        - `redis://` creates a TCP socket connection. See more at:
          <https://www.iana.org/assignments/uri-schemes/prov/redis>
        - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
          <https://www.iana.org/assignments/uri-schemes/prov/rediss>

        The username, password, hostname, path and all querystring values are passed
        through ``urllib.parse.unquote`` in order to replace any percent-encoded values
        with their corresponding characters.

        All querystring options are cast to their appropriate Python types. Boolean
        arguments can be specified with string values "True"/"False" or "Yes"/"No".
        Values that cannot be properly cast cause a ``ValueError`` to be raised. Once
        parsed, the querystring arguments and keyword arguments are passed to
        :class:`~redis.asyncio.connection.Connection` when created.
        In the case of conflicting arguments, querystring arguments are used.
        connection_classNTssl )updater   popr   )clsrX   rY   r]   r]   u/var/www/vedio/testing/chatpythonscript.ninositsolution.com/env/lib/python3.10/site-packages/redis/asyncio/cluster.pyfrom_url   s   zRedisCluster.from_url)_initialize_lockretrycommand_flagscommands_parserconnection_kwargsencoder
node_flagsnodes_managerread_from_replicasreinitialize_counterreinitialize_stepsresponse_callbacksresult_callbacksrl   z6Please configure the 'load_balancing_strategy' insteadz5.3.0)args_to_warnreasonversioncluster_error_retry_attemptsz+Please configure the 'retry' object insteadz6.0.0Ni  TF              r   zredis-pyzutf-8strictrequired   hostportstartup_nodesrV   require_full_coverageload_balancing_strategydynamic_startup_nodesrn   max_connectionsre   r   retry_on_errordbpathcredential_providerusernamepasswordclient_namelib_namelib_versionencodingencoding_errorsdecode_responseshealth_check_intervalsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionssocket_timeoutr\   ssl_ca_certsssl_ca_datassl_cert_reqsssl_certfilessl_check_hostnamessl_keyfilessl_min_versionssl_ciphersprotocoladdress_remapevent_dispatcherc)           ,      C   sF  |rt d|rt d|r|s|st di d|
dtd|d|d|d	|d
|d|d|d|d|d|d|d|d|d|d|&})|r\|)t||| |!|"|#|$|%d	 |s`|re| j|)d< |rk|| _nttddd|	d| _|r~| j| t	
 |)d< |)ddv r|)d t n|)d t |)| _|rg }*|D ]}+|*t|+j|+jfi | j q|*}ng }|r|r|t||fi | j |(d u rt | _n|(| _t|||)||'| jd| _t|||| _|| _|| _|| _d| _t | _| jj 
 | _!| jj"
 | _#|)d | _$| jj%
 | _&dd  | j&d!< d"| _'d | _(d S )#Nz/Argument 'db' must be 0 or None in cluster modez3Unix domain socket is not supported in cluster modea1  RedisCluster requires at least one node to discover the cluster.
Please provide one of the following or use RedisCluster.from_url:
   - host and port: RedisCluster(host="localhost", port=6379)
   - startup_nodes: RedisCluster(startup_nodes=[ClusterNode("localhost", 6379), ClusterNode("localhost", 6380)])r   r[   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )	r[   r   r   r   r   r   r   r   r   redis_connect_func   
   )basecap)backoffretriesro   )3rv   )r   r   r   r   c                 [   s   t t| d fi |S Nr   )r/   listvalues)cmdresrY   r]   r]   ra   <lambda>  s
    z'RedisCluster.__init__.<locals>.<lambda>CLUSTER SLOTST))rB   r   r^   r   
on_connectre   r   r!   update_supported_errorsr   r   getr   r   rh   appendrV   r{   r|   r6   _event_dispatcherNodesManagerrk   r   ri   rl   r   rn   rm   r   rg   	__class__
NODE_FLAGSrj   COMMAND_FLAGSrf   ro   RESULT_CALLBACKSrp   rc   rd   ),selfr{   r|   r}   r~   rl   r   r   rn   rt   r   re   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r\   r   r   r   r   r   r   r   r   r   r   r   rY   passed_nodesnoder]   r]   ra   __init__   s   <	




zRedisCluster.__init__c              	      s   | j rj| jst | _| j4 I dH L | j rIz| j I dH  | j| jjI dH  d| _ W n$ tyH   | j	 I dH  | j	dI dH   w W d  I dH  | S W d  I dH  | S 1 I dH sew   Y  | S )zJGet all nodes from startup nodes & creates connections if not initialized.NFr}   )
rc   rd   asyncior   rk   
initializerg   default_nodeBaseExceptionacloser   r]   r]   ra   r     s2   


zRedisCluster.initializec              	      s   | j sN| jst | _| j4 I dH . | j s6d| _ | j I dH  | jdI dH  W d  I dH  dS W d  I dH  dS 1 I dH sGw   Y  dS dS )z.Close all connections & client if initialized.NTr}   )rc   rd   r   r   rk   r   r   r]   r]   ra   r     s   
.zRedisCluster.aclosez5.0.0zUse aclose() insteadclose)rs   rr   namec                    s   |   I dH  dS )z.alias for aclose() for backwards compatibilityNr   r   r]   r]   ra   r     s   zRedisCluster.closec                       |   I d H S Nr   r   r]   r]   ra   
__aenter__     zRedisCluster.__aenter__exc_type	exc_value	tracebackc                       |   I d H  d S r   r   r   r   r   r   r]   r]   ra   	__aexit__     zRedisCluster.__aexit__c                 C      |    S r   r   	__await__r   r]   r]   ra   r        zRedisCluster.__await__zUnclosed RedisCluster client_warn_grlc                 C   sf   t | dr/| js1|| j d| t| d z| | jd}| | W d S  ty.   Y d S w d S d S )Nrc    sourceclientmessage)hasattrrc   _DEL_MESSAGEResourceWarningcall_exception_handlerRuntimeError)r   r   r   contextr]   r]   ra   __del__  s   zRedisCluster.__del__
connectionc                    sB   |  I d H  |dI d H  t| I d H dkrtdd S )NREADONLYOKzREADONLY command failed)r   send_commandrQ   read_responser;   r   r   r]   r]   ra   r     s   zRedisCluster.on_connectc                 C   s   t | jj S )zGet all nodes of the cluster.)r   rk   nodes_cacher   r   r]   r]   ra   	get_nodes  s   zRedisCluster.get_nodesc                 C      | j tS )z%Get the primary nodes of the cluster.)rk   get_nodes_by_server_typer'   r   r]   r]   ra   get_primaries     zRedisCluster.get_primariesc                 C   r   )z%Get the replica nodes of the cluster.)rk   r   r(   r   r]   r]   ra   get_replicas  r   zRedisCluster.get_replicasc                 C   s   t t| jj S )z!Get a random node of the cluster.)randomchoicer   rk   r   r   r   r]   r]   ra   get_random_node  s   zRedisCluster.get_random_nodec                 C   s   | j jS )z#Get the default node of the client.)rk   r   r   r]   r]   ra   get_default_node	  s   zRedisCluster.get_default_noder   c                 C   s&   |r	| j |jdstd|| j_dS )z
        Set the default node of the client.

        :raises DataError: if None is passed or node does not exist in cluster.
        	node_namez1The requested node does not exist in the cluster.N)get_noder   r=   rk   r   )r   r   r]   r]   ra   set_default_node  s   zRedisCluster.set_default_noder   c                 C   s   | j |||S )z&Get node by (host, port) or node_name.)rk   r   r   r{   r|   r   r]   r]   ra   r     s   zRedisCluster.get_nodekeyreplicac                 C   s`   |  |}| jj|}|std| d|r*t| jj| dk r$dS d}|| S d}|| S )aG  
        Get the cluster node corresponding to the provided key.

        :param key:
        :param replica:
            | Indicates if a replica should be returned
            |
              None will returned if no replica holds this key

        :raises SlotNotCoveredError: if the key is not covered by any slot.
        Slot "z " is not covered by the cluster.rz   Nr   r   )keyslotrk   slots_cacher   rE   len)r   r   r   slot
slot_cachenode_idxr]   r]   ra   get_node_from_key!  s   
zRedisCluster.get_node_from_keyc                 C   s   t | j|S )z
        Find the keyslot for a given key.

        See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding
        )r3   ri   encode)r   r   r]   r]   ra   r   =  s   zRedisCluster.keyslotc                 C      | j S )z%Get the encoder object of the client.)ri   r   r]   r]   ra   get_encoderE     zRedisCluster.get_encoderc                 C   r  )zGGet the kwargs passed to :class:`~redis.asyncio.connection.Connection`.)rh   r   r]   r]   ra   get_connection_kwargsI  r  z"RedisCluster.get_connection_kwargsc                 C   
   || _ d S r   )re   )r   re   r]   r]   ra   	set_retryM     
zRedisCluster.set_retrycommandcallbackc                 C   s   || j |< dS )zSet a custom response callback.N)ro   )r   r  r  r]   r]   ra   set_response_callbackP  s   z"RedisCluster.set_response_callback)	node_flagargsr  c                   s   |s	| j |}|| jv rQ|| jjkr| jjgS || jjkr%| jt	S || jj
kr1| jtS || jjkr?t| jj S || jjkrQtt| jj gS | j| j|g|R  I d H | joe|tv |tv rn| jgS d gS r   )rf   r   rj   r   DEFAULT_NODErk   r   	PRIMARIESr   r'   REPLICASr(   	ALL_NODESr   r   r   RANDOMr   r   get_node_from_slot_determine_slotrl   r0   r   )r   r  r  r  r]   r]   ra   _determine_nodesT  s.   

zRedisCluster._determine_nodesc                    s   j |tkrt|d S | dv r;t|dk r&td|g|R  |ddt|d   }|s:tdt	S n" j
j|g|R  I d H }|s]| dv rVtdt	S td| t|dkrj |d S  fdd	|D }t|dkrt| d
| S )Nr   )EVALEVALSHArz   zInvalid args in command: r   )FCALLFCALL_ROzNo way to dispatch this command to Redis Cluster. Missing key.
You can execute the command by specifying target nodes.
Command: c                    s   h | ]}  |qS r]   )r   ).0r   r   r]   ra   	<setcomp>      z/RedisCluster._determine_slot.<locals>.<setcomp>z) - all keys must map to the same key slot)rf   r   r)   intupperr   rB   r   	randranger2   rg   get_keysr   r_   )r   r  r  keysslotsr]   r   ra   r  w  s:   
zRedisCluster._determine_slottarget_nodesc                 C   s   t |to	|| jv S r   )
isinstancestrrj   )r   r%  r]   r]   ra   _is_node_flag  s   zRedisCluster._is_node_flagc                 C   sR   t |tr	|}|S t |tr|g}|S t |tr t| }|S tdt| )Nztarget_nodes type can be one of the following: node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. The passed type is )r&  r   rV   dictr   	TypeErrortype)r   r%  nodesr]   r]   ra   _parse_target_nodes  s   


z RedisCluster._parse_target_nodesc                    s   d }g }d}j  }dd}|r$|s$|}d}d}d| }t|D ]}	jrJ I dH  t|dkrJ|d 	 krJ
  z|sbj d|iI dH }|sbtd  d	t|dkrj|d g R i I dH }
|jv rj| ||d j|
ifi W   S |
W   S d
d |D }tj fdd|D  I dH }|jv rȈj| |tt||fi W   S tt||W   S  ty } z|dkrt|jjv r|d8 }W Y d}~q,|d}~ww dS )a  
        Execute a raw command on the appropriate cluster node or target_nodes.

        It will retry the command as specified by the retries property of
        the :attr:`retry` & then raise an exception.

        :param args:
            | Raw command args
        :param kwargs:

            - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
              or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
            - Rest of the kwargs are passed to the Redis connection

        :raises RedisClusterException: if target_nodes is not provided & the command
            can't be mapped to a slot
        r   Fr%  NTr   r  !No targets were found to execute  command onc                 S      g | ]}|j qS r]   r   r  r   r]   r]   ra   
<listcomp>      z0RedisCluster.execute_command.<locals>.<listcomp>c                 3   s.    | ]}t j|g R i V  qd S r   )r   create_task_execute_commandr2  r  rY   r   r]   ra   	<genexpr>  s    
z/RedisCluster.execute_command.<locals>.<genexpr>)re   get_retriesr_   r(  r-  rangerc   r   r   r   replace_default_noder  rB   r6  rp   r   r   gatherr)  zip	Exceptionr+  r   ERRORS_ALLOW_RETRY)r   r  rY   r  r%  target_nodes_specifiedretry_attemptspassed_targetsexecute_attempts_retr#  r   er]   r7  ra   execute_command  st   


"



zRedisCluster.execute_commandtarget_nodec           
   
      s  d }}d }| j }|dkr|d8 }zC|r&| j|d}|dI d H  d}n$|rJ| j| I d H }| j|| jo<|d tv |d tv rE| jnd }d}|j|i |I d H W S  t	t
fy_     ttfyx   | jj|jd  |  I d H    ttfy   |  I d H  tdI d H    ty }	 z)|  jd7  _| jr| j| j dkr|  I d H  d| _n|	| j_d}W Y d }	~	n7d }	~	w ty }	 zt|	j|	jd}d}W Y d }	~	nd }	~	w ty   || j d	 k rtd
I d H  Y nw |dkstd)NFr   r   r   ASKING      ?Tr{   r|   rz   g?zTTL exhausted.)RedisClusterRequestTTLr   rG  r  rk   r  rl   r0   r   r8   r@   r;   rF   r}   r_   r   r   r9   rE   r   sleeprA   rm   rn   _moved_exceptionr7   r.   r{   r|   rG   r:   )
r   rH  r  rY   askingmovedredirect_addrttlr   rF  r]   r]   ra   r6    sj   
	IzRedisCluster._execute_commandtransaction
shard_hintClusterPipelinec                 C   s   |rt dt| |S )z
        Create & return a new :class:`~.ClusterPipeline` object.

        Cluster implementation of pipeline does not support transaction or shard_hint.

        :raises RedisClusterException: if transaction or shard_hint are truthy values
        z(shard_hint is deprecated in cluster mode)rB   rU  )r   rS  rT  r]   r]   ra   pipelineg  s   

zRedisCluster.pipeline皙?r   timeoutrM  blockingblocking_timeout
lock_classthread_localraise_on_release_errorc	           	   
   C   s$   |du rt }|| |||||||dS )a}  
        Return a new Lock object using key ``name`` that mimics
        the behavior of threading.Lock.

        If specified, ``timeout`` indicates a maximum life for the lock.
        By default, it will remain locked until release() is called.

        ``sleep`` indicates the amount of time to sleep per loop iteration
        when the lock is in blocking mode and another client is currently
        holding the lock.

        ``blocking`` indicates whether calling ``acquire`` should block until
        the lock has been acquired or to fail immediately, causing ``acquire``
        to return False and the lock not being acquired. Defaults to True.
        Note this value can be overridden by passing a ``blocking``
        argument to ``acquire``.

        ``blocking_timeout`` indicates the maximum amount of time in seconds to
        spend trying to acquire the lock. A value of ``None`` indicates
        continue trying forever. ``blocking_timeout`` can be specified as a
        float or integer, both representing the number of seconds to wait.

        ``lock_class`` forces the specified lock implementation. Note that as
        of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
        a Lua-based lock). So, it's unlikely you'll need this parameter, unless
        you have created your own custom lock class.

        ``thread_local`` indicates whether the lock token is placed in
        thread-local storage. By default, the token is placed in thread local
        storage so that a thread only sees its token, not a token set by
        another thread. Consider the following timeline:

            time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
                     thread-1 sets the token to "abc"
            time: 1, thread-2 blocks trying to acquire `my-lock` using the
                     Lock instance.
            time: 5, thread-1 has not yet completed. redis expires the lock
                     key.
            time: 5, thread-2 acquired `my-lock` now that it's available.
                     thread-2 sets the token to "xyz"
            time: 6, thread-1 finishes its work and calls release(). if the
                     token is *not* stored in thread local storage, then
                     thread-1 would see the token value as "xyz" and would be
                     able to successfully release the thread-2's lock.

        ``raise_on_release_error`` indicates whether to raise an exception when
        the lock is no longer owned when exiting the context manager. By default,
        this is True, meaning an exception will be raised. If False, the warning
        will be logged and the exception will be suppressed.

        In some use cases it's necessary to disable thread local storage. For
        example, if you have code where one thread acquires a lock and passes
        that lock instance to a worker thread to release later. If thread
        local storage isn't disabled in this case, the worker thread won't see
        the token set by the thread that acquired the lock. Our assumption
        is that these cases aren't common and as such default to using
        thread local storage.N)rX  rM  rY  rZ  r\  r]  r   )	r   r   rX  rM  rY  rZ  r[  r\  r]  r]   r]   ra   lockv  s   DzRedisCluster.lockfuncc           
   	      s   | dd}| dd}| dd}| d|4 I dH B}	 z)|r*|j| I dH  ||I dH }| I dH }	|r<|n|	W W  d  I dH  S  ty^   |dur\|dkr\t| Y qw 1 I dH sew   Y  dS )z
        Convenience method for executing the callable `func` as a transaction
        while watching all keys specified in `watches`. The 'func' callable
        should expect a single argument which is a Pipeline object.
        rT  Nvalue_from_callableFwatch_delayTr   )r_   rV  watchexecuterH   timerM  )
r   r_  watchesrY   rT  r`  ra  pipe
func_value
exec_valuer]   r]   ra   rS    s&   
zRedisCluster.transaction)rZ   rW   rZ   Nr   Nr   Nr   NrZ   N)rZ   rV   )r   rV   rZ   NNNNF)NN)NrW  TNNTT)K__name__
__module____qualname____doc__classmethodr'  r   rb   	__slots__rM   rO   r   r   r  r   boolr,   r   r>  r4   floatr   bytesrT   rS   r   r   r6   r   r   r   rN   r   r   r   r   r   r   warningswarnr   get_running_loopr   r   r   r   r   r   r   r   r   r   r  rJ   r   r   r  r
   r  r   r	  r   r  r  r  r(  r-  rG  rK   r6  rV  r   r^  r   rS  r]   r]   r]   ra   rW   c   s   e 	

	

 !#$%
&'()*+, -./ 
,










#3T

S

	

QrW   c                   @   s&  e Zd ZdZdZ	d1deddedeeef de	e d	ed
e
e deddfddZdefddZdedefddZdZejejfdededdfddZd2ddZdefddZdeddfddZded ed!edefd"d#Zd$ed!edefd%d&Zd'ed( defd)d*Zd+efd,d-Zd.efd/d0Z dS )3rV   z
    Create a new ClusterNode.

    Each ClusterNode manages multiple :class:`~redis.asyncio.connection.Connection`
    objects for the (host, port).
    )_connections_freerd   r   r[   rh   r{   r   r   r|   ro   server_typeNrw   )r   r[   r{   r|   r{  r   r[   rh   rZ   c                K   s   |dkr	t |}||d< ||d< || _|| _t||| _|| _|| _|| _|| _	|
di | _g | _tj| jd| _| j	dd | _| jd u rNt | _d S d S )N	localhostr{   r|   ro   )maxlenr   )socketgethostbynamer{   r|   r.   r   r{  r   r[   rh   r_   ro   ry  collectionsdequerz  r   r   r6   )r   r{   r|   r{  r   r[   rh   r]   r]   ra   r     s$   


zClusterNode.__init__c              	   C   s&   d| j  d| j d| j d| j d	S )Nz[host=z, port=z, name=z, server_type=])r{   r|   r   r{  r   r]   r]   ra   __repr__  s   zClusterNode.__repr__objc                 C   s   t |to
|j| jkS r   )r&  rV   r   )r   r  r]   r]   ra   __eq__  s   zClusterNode.__eq__zUnclosed ClusterNode objectr   r   c              	   C   sh   | j D ].}|jr1|| j d| t| d z| | jd}| | W  d S  ty0   Y  d S w qd S )Nr   r   r   )ry  is_connectedr   r   r   r   )r   r   r   r   r   r]   r]   ra   r   !  s   
zClusterNode.__del__c                    sD   t jdd | jD ddiI d H }tdd |D d }|r |d S )Nc                 s       | ]
}t | V  qd S r   r   r5  
disconnect)r  r   r]   r]   ra   r8  3  
    
z)ClusterNode.disconnect.<locals>.<genexpr>return_exceptionsTc                 s   s    | ]
}t |tr|V  qd S r   )r&  r>  )r  r   r]   r]   ra   r8  9  s    )r   r<  ry  next)r   rE  excr]   r]   ra   r  1  s   zClusterNode.disconnectc                 C   sz   z| j  W S  ty<   t| j| jk r9tt dtfd}| j	
 }||d< | jdi |}| j| | Y S t w )Nr   )r   r   supported_errorsre   r]   )rz  popleft
IndexErrorr   ry  r   r   r"   r;   rh   r   r[   r   r@   )r   re   rh   r   r]   r]   ra   acquire_connection=  s    

zClusterNode.acquire_connectionr   c                 C   s   | j | dS )z8
        Release connection back to free queue.
        N)rz  r   r   r]   r]   ra   releaseX  s   zClusterNode.releaser  rY   c                    s   zt |v r|jddI d H }|t  n| I d H }W n ty/   t|v r.|t  Y S  w t|v r9|t |dd  || jv rO| j| |fi |S |S )NT)disable_decodingr#  )r$   r   r_   rD   r#   ro   )r   r   r  rY   responser]   r]   ra   parse_response^  s$   

zClusterNode.parse_responser  c              	      s^   |   }||j| dI d H  z| j||d fi |I d H W | j| S | j| w )NFr   )r  send_packed_commandpack_commandr  rz  r   )r   r  rY   r   r]   r]   ra   rG  x  s   zClusterNode.execute_commandcommandsPipelineCommandc                    s   |   }||dd |D dI d H  d}|D ]-}z| j||jd fi |jI d H |_W q tyG } z||_d}W Y d }~qd }~ww | j	| |S )Nc                 s   s    | ]}|j V  qd S r   r  r  r   r]   r]   ra   r8    s    z/ClusterNode.execute_pipeline.<locals>.<genexpr>Fr   T)
r  r  pack_commandsr  r  rY   resultr>  rz  r   )r   r  r   rE  r   rF  r]   r]   ra   execute_pipeline  s(   

zClusterNode.execute_pipelinetokenc                    s   t  }jr:j   j fddfddI d H   j fddfddI d H  |  js|rJ|  j  |s<d S d S )Nc                      s     dd S )NAUTHoid)r   try_get	get_valuer]   )connr  r]   ra   r     s    z.ClusterNode.re_auth_callback.<locals>.<lambda>c                    
     | S r   _mockerrorr   r]   ra   r        
 c                      s      S r   )r   r]   )r  r]   ra   r     s    c                    r  r   r  r  r   r]   ra   r     r  )r  r  rz  r  re   call_with_retryr   )r   r  	tmp_queuer]   )r  r   r  ra   re_auth_callback  s"   




zClusterNode.re_auth_callbackr  c                       dS )z
        Dummy functions, needs to be passed as error callback to retry object.
        :param error:
        :return:
        Nr]   r   r  r]   r]   ra   r    s   zClusterNode._mockr   ri  )!rm  rn  ro  rp  rr  r   r'  r   r  r   r   r   r   r  rs  r  r   rv  rw  r   rx  r   r  r  r  r  rG  r   r  r    r  rC   r  r]   r]   r]   ra   rV     sd    
	



c                   @   sL  e Zd ZdZ			d,ded dedeeef ded	e	e
eeef geeef f  d
e	e ddfddZ			d-de	e de	e de	e de	d fddZ	d.deedf deedf deddfddZdd Zd/ddZ		d0dededdfdd Zd!eded fd"d#Zd/d$d%Zd1d'eddfd(d)Zdededeeef fd*d+ZdS )2r   )_dynamic_startup_nodesrN  r   rh   r   r   read_load_balancerr~   r   r}   r   TNr}   rV   r~   rh   r   r   r   rZ   c                 C   sf   dd |D | _ || _|| _|| _d | _i | _i | _t | _|| _	d | _
|d u r.t | _d S || _d S )Nc                 S   s   i | ]}|j |qS r]   r1  r2  r]   r]   ra   
<dictcomp>      z)NodesManager.__init__.<locals>.<dictcomp>)r}   r~   rh   r   r   r   r   r+   r  r  rN  r6   r   )r   r}   r~   rh   r   r   r   r]   r]   ra   r     s   	
zNodesManager.__init__r{   r|   r   c                 C   sF   |r|r|dkrt |}| jt||dS |r| j|S td)Nr|  rK  zEget_node requires one of the following: 1. node name 2. host and port)r~  r  r   r   r.   r=   r   r]   r]   ra   r     s   
zNodesManager.get_nodeFoldnew
remove_oldc                 C   sx   |rt | D ]}||vrt|| }q| D ]\}}||v r5|| |u r,qt||  }|||< qd S r   )r   r#  r   r5  r_   r  items)r   r  r  r  r   taskr   r]   r]   ra   	set_nodes  s   
zNodesManager.set_nodesc                 C   r  r   )rN  )r   	exceptionr]   r]   ra   update_moved_exception  r
  z#NodesManager.update_moved_exceptionc                 C   s   | j }| j|j|jd}|r|jtkrt|_nt|j|jtfi | j}| | j	|j
|i || j|j v rd| j|j d }t|_| j|j | | j|j | || j|j d< | j|krc|| _n|g| j|j< d | _ d S )NrK  r   )rN  r   r{   r|   r{  r'   rV   rh   r  r   r   r   slot_idr(   r   remover   )r   rF  redirected_nodeold_primaryr]   r]   ra   _update_moved_slots  s.   



z NodesManager._update_moved_slotsr   rl   c              	   C   s   | j r|   |du r|d u rtj}z0t| j| dkr;|r;| j| d j}| j|t| j| |}| j| | W S | j| d W S  t	t
fyW   td| d| j dw )NTr   r   r   z5" not covered by the cluster. "require_full_coverage=")rN  r  r,   ROUND_ROBINr   r   r   r  get_server_indexr  r*  rE   r~   )r   r   rl   r   primary_namer  r]   r]   ra   r  1  s&   zNodesManager.get_node_from_slotr{  c                    s    fdd| j  D S )Nc                    s   g | ]	}|j  kr|qS r]   r{  r2  r  r]   ra   r3  M  s
    
z9NodesManager.get_nodes_by_server_type.<locals>.<listcomp>)r   r   )r   r{  r]   r  ra   r   L  s   
z%NodesManager.get_nodes_by_server_typec                    sl  | j   i }i }g }d}d}d }t| j D ]S}z)z| jt| j| j	
dd  |dI d H }W n tyA   tdw d}W n tyY }	 z|	}W Y d }	~	qd }	~	ww t|dkrx|d d d sxt| jdkrx|j|d d d< |D ]}
tdt|
D ]}d	d
 |
| D |
|< q|
d }|d }|dkr|j}t|d }| ||\}}g }|
t||}|st||tfi | j	}|||j< || |
dd  }|D ]1}|d }|d }| ||\}}|
t||}|st||tfi | j	}|||j< || qtt|
d t|
d d D ];}||vr%|||< q|| d }|j|jkrR||j d|j d|  t|dkrRtdd| qqzd}ttD ]}||vrfd} nq[|rm nq|s{tdt| ||s| jrtdt| dt d|| _| j| j|dd | j r| j| j| jdd | !td | _"d | _#d S )NFr   r   z(Cluster mode is not enabled on this nodeTr   r   rz   c                 S   s   g | ]}t |qS r]   )rQ   )r  valr]   r]   ra   r3    r  z+NodesManager.initialize.<locals>.<listcomp> rv   z vs z
 on slot: ru   z6startup_nodes could not agree on a valid slots cache: z, zORedis Cluster cannot be connected. Please provide at least one reachable node: z9All slots are not covered after query all startup_nodes. z of z covered...)r  )$r  resettupler}   r   r   dispatchr5   r   rh   r   rG  rD   rB   r>  r   r{   r:  r  remap_host_portr.   rV   r'   r   r   r(   joinr2   r'  r~   r   r  r  r   r   rN  )r   tmp_nodes_cache	tmp_slotsdisagreementsstartup_nodes_reachablefully_coveredr  startup_nodecluster_slotsrF  r   iprimary_noder{   r|   nodes_for_slotrH  replica_nodesreplica_nodetarget_replica_nodetmp_slotr]   r]   ra   r   S  s   



"


zNodesManager.initializer   attrc                    s0   d | _ tjdd t| | D  I d H  d S )Nc                 s   r  r   r  r2  r]   r]   ra   r8    r  z&NodesManager.aclose.<locals>.<genexpr>)r   r   r<  getattrr   )r   r  r]   r]   ra   r     s   zNodesManager.aclosec                 C   s   | j r
|  ||fS ||fS )z
        Remap the host and port returned from the cluster to a different
        internal value.  Useful if the client is not connecting directly
        to the cluster.
        )r   )r   r{   r|   r]   r]   ra   r    s   zNodesManager.remap_host_port)TNNrk  rl  ri  )FN)r   )rm  rn  ro  rr  r   rs  r
   r'  r   r   r   r   r  r6   r   r   r  r  r  r  r   r   r   r  r]   r]   r]   ra   r     st    
 





)

 "	r   c                   @   s4  e Zd ZdZdZ	d8dedee ddfddZd9d	d
Z	d9ddZ
d:ddZdeedd f fddZd9ddZd:ddZdefddZdefddZdeeef dedd fddZ	 d;d!ed"edee fd#d$Zd%ed&edd fd'd(Zd)d* Zd+d, Zd-d. Zd/d0 Zd1d2 Z d3d4 Z!d5e"e#ef dd fd6d7Z$dS )<rU  a  
    Create a new ClusterPipeline object.

    Usage::

        result = await (
            rc.pipeline()
            .set("A", 1)
            .get("A")
            .hset("K", "F", "V")
            .hgetall("K")
            .mset_nonatomic({"A": 2, "B": 3})
            .get("A")
            .get("B")
            .delete("A", "B", "K")
            .execute()
        )
        # result = [True, "1", 1, {"F": "V"}, True, True, "2", "3", 1, 1, 1]

    Note: For commands `DELETE`, `EXISTS`, `TOUCH`, `UNLINK`, `mset_nonatomic`, which
    are split across multiple nodes, you'll get multiple results for them in the array.

    Retryable errors:
        - :class:`~.ClusterDownError`
        - :class:`~.ConnectionError`
        - :class:`~.TimeoutError`

    Redirection errors:
        - :class:`~.TryAgainError`
        - :class:`~.MovedError`
        - :class:`~.AskError`

    :param client:
        | Existing :class:`~.RedisCluster` client
    )cluster_client_transaction_execution_strategyNr   rS  rZ   c                 C   s.   || _ || _| jst| | _d S t| | _d S r   )r  r  PipelineStrategyTransactionStrategyr  )r   r   rS  r]   r]   ra   r     s   zClusterPipeline.__init__c                    s   | j  I d H  | S r   )r  r   r   r]   r]   ra   r   #  s   zClusterPipeline.initializec                    r   r   r   r   r]   r]   ra   r   '  r   zClusterPipeline.__aenter__r   r   r   c                    r   r   r  r   r]   r]   ra   r   *  r   zClusterPipeline.__aexit__c                 C   r   r   r   r   r]   r]   ra   r   -  r   zClusterPipeline.__await__c                 C   s   g | j _| S r   r  _command_queuer   r]   r]   ra   	__enter__0  s   zClusterPipeline.__enter__c                 C   s   g | j _d S r   r  r   r]   r]   ra   __exit__5  r   zClusterPipeline.__exit__c                 C      dS )z?Pipeline instances should  always evaluate to True on Python 3+Tr]   r   r]   r]   ra   __bool__9     zClusterPipeline.__bool__c                 C   
   t | jS r   )r   r  r   r]   r]   ra   __len__=  r
  zClusterPipeline.__len__r  rY   c                 O   s   | j j|i |S )ad  
        Append a raw command to the pipeline.

        :param args:
            | Raw command args
        :param kwargs:

            - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
              or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
            - Rest of the kwargs are passed to the Redis connection
        )r  rG  r   r  rY   r]   r]   ra   rG  @  s   zClusterPipeline.execute_commandTraise_on_errorallow_redirectionsc                    s8   z| j ||I dH W |  I dH  S |  I dH  w )a  
        Execute the pipeline.

        It will retry the commands as specified by retries specified in :attr:`retry`
        & then raise an exception.

        :param raise_on_error:
            | Raise the first error if there are any errors
        :param allow_redirections:
            | Whether to retry each failed command individually in case of redirection
              errors

        :raises RedisClusterException: if target_nodes is not provided & the command
            can't be mapped to a slot
        N)r  rc  r  r   r  r  r]   r]   ra   rc  P  s   
 zClusterPipeline.executer  r#  c                 G   s,   | j | D ]}| j|g|R   q| S r   )r  _partition_keys_by_slotr   rG  )r   r  r#  	slot_keysr]   r]   ra   _split_command_across_slotsi  s   z+ClusterPipeline._split_command_across_slotsc                       | j  I dH  dS z/
        Reset back to empty pipeline.
        N)r  r  r   r]   r]   ra   r  q  s   zClusterPipeline.resetc                 C   s   | j   dS )z
        Start a transactional block of the pipeline after WATCH commands
        are issued. End the transactional block with `execute`.
        N)r  multir   r]   r]   ra   r  w  s   zClusterPipeline.multic                    r  )r   N)r  discardr   r]   r]   ra   r  ~     zClusterPipeline.discardc                    s   | j j| I dH  dS )z$Watches the values at keys ``names``N)r  rb  r   namesr]   r]   ra   rb    s   zClusterPipeline.watchc                    r  )z'Unwatches all previously specified keysN)r  unwatchr   r]   r]   ra   r    r  zClusterPipeline.unwatchc                    s   | j j| I d H  d S r   )r  unlinkr  r]   r]   ra   r    s   zClusterPipeline.unlinkmappingc                 C   s   | j |S r   )r  mset_nonatomicr   r  r]   r]   ra   r    s   zClusterPipeline.mset_nonatomicr   rZ   rU  rj  TT)%rm  rn  ro  rp  rr  rW   r   rs  r   r   r   r   r   r   r   r  r  r  r  r  r   rK   rJ   rG  r   rc  r'  r  r  r  r  rb  r  r  r   rI   r  r]   r]   r]   ra   rU    sh    $










rU  r   rD  r  c                   @   s4   e Zd ZdedededdfddZdefdd	ZdS )
r  positionr  rY   rZ   Nc                 O   s   || _ || _|| _d | _d S r   )r  rY   r  r  )r   r  r  rY   r]   r]   ra   r     s   
zPipelineCommand.__init__c                 C   s   d| j  d| j d| j dS )N[z]  ())r  r  rY   r   r]   r]   ra   r    s   zPipelineCommand.__repr__)rm  rn  ro  r  r   r   r'  r  r]   r]   r]   ra   r    s    r  c                	   @   s   e Zd Zed ddZedeeef deddfddZ	e		d!d
e
de
dee fddZedeeef ddfddZedd Zedd Zedd Zedd Zedd Zedd ZedefddZdS )"ExecutionStrategyrZ   rU  c                    r  )z^
        Initialize the execution strategy.

        See ClusterPipeline.initialize()
        Nr]   r   r]   r]   ra   r        zExecutionStrategy.initializer  rY   c                 O   r  )zf
        Append a raw command to the pipeline.

        See ClusterPipeline.execute_command()
        Nr]   r  r]   r]   ra   rG       	z!ExecutionStrategy.execute_commandTr  r  c                    r  )z
        Execute the pipeline.

        It will retry the commands as specified by retries specified in :attr:`retry`
        & then raise an exception.

        See ClusterPipeline.execute()
        Nr]   r  r]   r]   ra   rc    s   zExecutionStrategy.executer  c                 C   r  )z
        Executes multiple MSET commands according to the provided slot/pairs mapping.

        See ClusterPipeline.mset_nonatomic()
        Nr]   r  r]   r]   ra   r    r  z ExecutionStrategy.mset_nonatomicc                    r  )zZ
        Resets current execution strategy.

        See: ClusterPipeline.reset()
        Nr]   r   r]   r]   ra   r    r  zExecutionStrategy.resetc                 C   r  )zU
        Starts transactional context.

        See: ClusterPipeline.multi()
        Nr]   r   r]   r]   ra   r    s   zExecutionStrategy.multic                    r  )zI
        Watch given keys.

        See: ClusterPipeline.watch()
        Nr]   r  r]   r]   ra   rb    r  zExecutionStrategy.watchc                    r  )za
        Unwatches all previously specified keys

        See: ClusterPipeline.unwatch()
        Nr]   r   r]   r]   ra   r    r  zExecutionStrategy.unwatchc                       d S r   r]   r   r]   r]   ra   r       zExecutionStrategy.discardc                    r  )z^
        "Unlink a key specified by ``names``"

        See: ClusterPipeline.unlink()
        Nr]   r  r]   r]   ra   r    r  zExecutionStrategy.unlinkc                 C      d S r   r]   r   r]   r]   ra   r    r  zExecutionStrategy.__len__Nr  r   )rm  rn  ro  r   r   r   rK   rJ   r   rG  rs  r   rc  r   rI   r  r  r  rb  r  r  r  r  r  r]   r]   r]   ra   r    sP    









r  c                	   @   s   e Zd ZdeddfddZd%ddZd	eeef d
e	ddfddZ
dd Zedeeef ddfddZe	d&dededee	 fddZedd Zedd Zedd Zedd Zedd  Zed!d" Zdefd#d$ZdS )'AbstractStrategyrf  rZ   Nc                 C   s   || _ g | _d S r   )_piper  r   rf  r]   r]   ra   r     s   
zAbstractStrategy.__init__rU  c                    s*   | j jjr| j j I d H  g | _| j S r   )r  r  rc   r   r  r   r]   r]   ra   r     s
   
zAbstractStrategy.initializer  rY   c                 O   s*   | j tt| j g|R i | | jS r   )r  r   r  r   r  r  r]   r]   ra   rG    s   z AbstractStrategy.execute_commandc                 C   sJ   d tt|}d| dt| d|jd  }|f|jdd  |_dS )zS
        Provides extra context to the exception prior to it being handled
        r   
Command # r  ) of pipeline caused error: r   r   N)r  maprP   rR   r  )r   r  numberr  r   msgr]   r]   ra   _annotate_exception  s   z$AbstractStrategy._annotate_exceptionr  c                 C   r
  r   r]   r  r]   r]   ra   r  (  s   zAbstractStrategy.mset_nonatomicTr  r  c                    r  r   r]   r  r]   r]   ra   rc  .  s   zAbstractStrategy.executec                    r  r   r]   r   r]   r]   ra   r  4  r	  zAbstractStrategy.resetc                 C   r
  r   r]   r   r]   r]   ra   r  8  r  zAbstractStrategy.multic                    r  r   r]   r  r]   r]   ra   rb  <  r	  zAbstractStrategy.watchc                    r  r   r]   r   r]   r]   ra   r  @  r	  zAbstractStrategy.unwatchc                    r  r   r]   r   r]   r]   ra   r  D  r	  zAbstractStrategy.discardc                    r  r   r]   r  r]   r]   ra   r  H  r	  zAbstractStrategy.unlinkc                 C   r  r   )r   r  r   r]   r]   ra   r  L  r
  zAbstractStrategy.__len__r  r   )rm  rn  ro  rU  r   r   r   rK   rJ   r   rG  r  r   r   rI   r  rs  r   rc  r  r  rb  r  r  r  r  r  r]   r]   r]   ra   r  
  sN    









r  c                       s   e Zd Zdeddf fddZdeeef ddfdd	Z	
d!de	de	de
e fddZ	
	
d!ddde
d de	de	de
e f
ddZdd Zdd Zdd Zdd Zdd Zdd  Z  ZS )"r  rf  rZ   Nc                    s   t  | d S r   )superr   r  r   r]   ra   r   Q  s   zPipelineStrategy.__init__r  rU  c                 C   sf   | j jj}i }| D ]}t||d }||g | q| D ]}| j	dg|R   q$| j S )Nr   MSET)
r  r  ri   r  r3   r  
setdefaultextendr   rG  )r   r  ri   slots_pairspairr   pairsr]   r]   ra   r  T  s   
zPipelineStrategy.mset_nonatomicTr  r  c              
      s   | j sg S z_| jjj }	 z&| jjjr| jj I d H  | j| jj| j ||dI d H W W |  I d H  S  t	j
yd } z"|dkrX|d8 }| jj I d H  tdI d H  n|W Y d }~nd }~ww q|  I d H  w )NT)r  r  r   r   rJ  )r  r  r  re   r9  rc   r   _executer  rW   r?  r   r   rM  )r   r  r  rA  rF  r]   r]   ra   rc  c  s6   
zPipelineStrategy.executer   rW   stackr  c                    s&  dd |D }i }|D ]U}|j dd }|r"||s"||}	n|j|jd|iI d H }	|	s9td|j dt|	dkrGtd|j |	d	 }
|
j|vrW|
g f||
j< ||
j d 	| qt
jd
d | D  I d H }t|r|r|D ]1}t|jtttfrz|j|ji |j I d H |_W qz ty } z	||_W Y d }~qzd }~ww qz|r|D ]1}|j}t|trdtt|j}d|jd  dt| d|j }|f|jdd   |_|q| }|d ur||j}|d ur|d D ]}t|jtjv r|   nqdd |D S )Nc                 S   s"   g | ]}|j rt|j tr|qS r]   )r  r&  r>  r  r]   r]   ra   r3    s    
z-PipelineStrategy._execute.<locals>.<listcomp>r%  r  r.  r/  r   zToo many targets for command r   c                 s   s(    | ]}t |d  |d V  qdS )r   r   N)r   r5  r  r2  r]   r]   ra   r8    s
    
z,PipelineStrategy._execute.<locals>.<genexpr>r   r  r  r  c                 S   r0  r]   )r  r  r]   r]   ra   r3    r4  ) rY   r_   r(  r-  r  r  rB   r   r   r   r   r<  r   anyr&  r  rG   rA   r7   rG  r>  r  r  rP   r  rR   r   r   r+  rW   r?  r;  )r   r   r  r  r  todor,  r   rB  r%  r   errorsrF  r  r  r  default_cluster_noder   r]   r]   ra   r    s   






zPipelineStrategy._executec                    s   g | _ dS r  )r  r   r]   r]   ra   r    s   
zPipelineStrategy.resetc                 C      t d)Nz@method multi() is not supported outside of transactional contextrB   r   r]   r]   ra   r    s   zPipelineStrategy.multic                    
   t d)Nz@method watch() is not supported outside of transactional contextr#  r  r]   r]   ra   rb       zPipelineStrategy.watchc                    r$  )NzBmethod unwatch() is not supported outside of transactional contextr#  r   r]   r]   ra   r    r%  zPipelineStrategy.unwatchc                    r$  )NzBmethod discard() is not supported outside of transactional contextr#  r   r]   r]   ra   r    r%  zPipelineStrategy.discardc                    s&   t |dkrtd| d|d S )Nr   z>unlinking multiple keys is not implemented in pipeline commandUNLINKr   )r   rB   rG  r  r]   r]   ra   r    s   zPipelineStrategy.unlinkr   )rm  rn  ro  rU  r   r   rI   rJ   r  rs  r   r   rc  r  r  r  rb  r  r  r  __classcell__r]   r]   r  ra   r  P  sD    


$
Tr  c                       st  e Zd ZdhZddhZh dZeefZe	e
eefZdeddf fddZdeeef fd	d
Zdeeef deddfddZdeeef dedef fddZdd Zdd Zdd ZdedefddZdd Zdd Zd e e!ef dd!fd"d#Z"	$d;d%e#d&e#de$e fd'd(Z%d)e$d* d%e#fd+d,Z&d)e$d* d%e#fd-d.Z'd/d0 Z(d1d2 Z)d3d4 Z*d5d6 Z+d7d8 Z,d9d: Z-  Z.S )<r  UNWATCHWATCH>   EXECDISCARDr(  rf  rZ   Nc                    sZ   t  | d| _d| _t | _d | _d | _d| _t	| j
jj| _| jtj| j  d S )NF)r  r   _explicit_transaction	_watchingset_pipeline_slots_transaction_node_transaction_connection
_executingr   r  r  re   _retryr   rW   r?  SLOT_REDIRECT_ERRORSr  r  r]   ra   r     s   
zTransactionStrategy.__init__c                 C   sR   | j std| jjjt| j d d}|| _| js#| j	 }|| _| j| jfS )ao  
        Find a connection for a pipeline transaction.

        For running an atomic transaction, watch keys ensure that contents have not been
        altered as long as the watch commands for those keys were sent over the same
        connection. So once we start watching a key, we fetch a connection to the
        node that owns that slot and reuse it.
        z:At least a command with a key is needed to identify a noder   F)
r/  rB   r  r  rk   r  r   r0  r1  r  )r   r   r   r]   r]   ra   *_get_client_and_connection_for_transaction  s   

z>TransactionStrategy._get_client_and_connection_for_transactionr  rY   r   c                    sD   d d  fdd}t j|d}|  |  r S )Nc               
      sH   zt j i W d S  ty# }  z	| W Y d } ~ d S d } ~ ww r   )r   runr6  r>  )rF  r  r  rY   r  r   r]   ra   runner3  s   z3TransactionStrategy.execute_command.<locals>.runner)target)	threadingThreadstartr  )r   r  rY   r8  threadr]   r7  ra   rG  .  s   z#TransactionStrategy.execute_commandc                    s   | j jjr| j j I d H  d }|d | jvr"| j jj| I d H }| js,|d | jv ri| jsi|d dkr9| 	  |d urP| j
rI|| j
vrItd| j
| n|d | jvratd|d  d| j|i |S |d urs| j
| t j|i |S )Nr   r)  z0Cannot watch or send commands on different slotsz)Cannot identify slot number for command: z(,it cannot be triggered in a transaction)r  r  rc   r   NO_SLOTS_COMMANDSr  r-  IMMEDIATE_EXECUTE_COMMANDSr,  _validate_watchr/  r<   addrB   _immediate_execute_commandr  rG  )r   r  rY   slot_numberr  r]   ra   r6  D  s6   
z$TransactionStrategy._execute_commandc                 C   s   | j rtdd| _d S )N"Cannot issue a WATCH after a MULTIT)r,  rC   r-  r   r]   r]   ra   r@  h  s   
z#TransactionStrategy._validate_watchc                    s$   j  fddjI d H S )Nc                      s   j  i S r   ) _get_connection_and_send_commandr]   r  optionsr   r]   ra   r   p  s    z@TransactionStrategy._immediate_execute_command.<locals>.<lambda>r3  r  _reinitialize_on_error)r   r  rG  r]   rF  ra   rB  n  s
   
z.TransactionStrategy._immediate_execute_commandc                    s4   |   \}}| j|||d g|R i |I d H S r   )r5  _send_command_parse_response)r   r  rG  
redis_noder   r]   r]   ra   rE  t  s   
z4TransactionStrategy._get_connection_and_send_commandr   rK  c                    s@   |j | I dH  |j||fi |I dH }|| jv rd| _|S )z7
        Send a command and parse the response
        NF)r   r  UNWATCH_COMMANDSr-  )r   r   rK  command_namer  rG  outputr]   r]   ra   rJ  z  s   
z0TransactionStrategy._send_command_parse_responsec                    s   | j rt|| jv r| jrtdt|| jv s t|| jv rV| jr&d | _| jj j	d7  _	| jjj
rN| jjj	| jjj
 dkrN| jjj I d H  d| _	n| jjj| d| _d S )Nz-Slot rebalancing occurred while watching keysr   r   F)r-  r+  r4  r2  rH   CONNECTION_ERRORSr1  r  r  rm   rn   rk   r   r  r  r]   r]   ra   rI    s(   
z*TransactionStrategy._reinitialize_on_errorc                 C   s<   t ||D ]\}}t|tr| ||jd |j |qdS )z8
        Raise the first exception on the stack
        r   N)r=  r&  r>  r  r  r  )r   	responsesr  rr   r]   r]   ra   _raise_first_error  s   
z&TransactionStrategy._raise_first_errorr  rU  c                 C   r"  )Nz1Method is not supported in transactional context.)NotImplementedErrorr  r]   r]   ra   r    s   z"TransactionStrategy.mset_nonatomicTr  r  c                    s.   | j }|s| jr| jsg S | ||I d H S r   )r  r-  r/  !_execute_transaction_with_retries)r   r  r  r  r]   r]   ra   rc    s
   zTransactionStrategy.executer  r  c                    s$   j  fddjI d H S )Nc                      s     S r   )_execute_transactionr]   r  r   r  r]   ra   r     s    zGTransactionStrategy._execute_transaction_with_retries.<locals>.<lambda>rH  )r   r  r  r]   rV  ra   rT    s
   
z5TransactionStrategy._execute_transaction_with_retriesc                    s"  t | jdkrtdd| _|  \}}ttddg|tddg}dd |D }||}||I d H  g }z|	|dI d H  W n2 t
yb } z| |dd || W Y d }~nd }~w | jyv }	 z| |	dd  d }	~	ww t| jD ]x\}
}t|jv r||
|jt f q|z|	|d	I d H }W q| | jy } z| ||
d |j || W Y d }~q|d }~w | jy }	 z| |	|
d |j  d }	~	w t
y } z| ||
d |j || W Y d }~q|d }~ww d }z|	|dI d H }W n ty   |r|d  w d
| _d
| _|d u r"td|D ]\}
}||
| q$t |t | jkrJtddd | jD t ||sTt |dkr[| || j g }t|| jD ]+\}}t|ts|jd }|| jjj v r| jjj | |fi |j}|| qc|S )Nr   zDAll keys involved in a cluster transaction must map to the same slotTr   MULTIr*  c                 S   s   g | ]
}t |jvr|jqS r]   )r#   rY   r  r  cr]   r]   ra   r3    s    z<TransactionStrategy._execute_transaction.<locals>.<listcomp>rD  FzWatched variable changed.zeUnexpected response length for cluster pipeline EXEC. Command stack was {} but response had length {}c                 S   s   g | ]}|j d  qS )r   r  rX  r]   r]   ra   r3  	  r  )!r   r/  r<   r2  r5  r   r  r  r  r  rD   r  r   rO  	enumerater  r#   rY   r4  r  r>   r-  rH   insertr?   formatrR  r=  r&  r>  r  r  ro   )r   r  r  rK  r   r  packed_commandsr   rF  cluster_errorr  r  rD  
slot_errorr  datarQ  r   rM  r]   r]   ra   rU    s   





z(TransactionStrategy._execute_transactionc                    s   g | _ | jr:z| jdI d H  | j I d H  | j| j d | _W n | jy9   | jr7| j I d H  Y nw d | _d| _d| _	t
 | _d| _d S )Nr(  F)r  r1  r   r   r0  r  rO  r  r-  r,  r.  r/  r2  r   r]   r]   ra   r  *	  s$   

zTransactionStrategy.resetc                 C   s&   | j rtd| jrtdd| _ d S )Nz"Cannot issue nested calls to MULTIz:Commands without an initial WATCH have already been issuedT)r,  rC   r  r   r]   r]   ra   r  E	  s   
zTransactionStrategy.multic                    s(   | j rtd| jdg|R  I d H S )NrD  r)  )r,  rC   rG  r  r]   r]   ra   rb  N	  s   zTransactionStrategy.watchc                    s   | j r| dI d H S dS )Nr(  T)r-  rG  r   r]   r]   ra   r  T	  s   zTransactionStrategy.unwatchc                    r   r   r  r   r]   r]   ra   r  Z	  r   zTransactionStrategy.discardc                    s   | j dg|R  S )Nr&  )rG  r  r]   r]   ra   r  ]	  r   zTransactionStrategy.unlinkr   )/rm  rn  ro  r>  r?  rL  r7   rA   r4  r;   OSErrorr9   rE   rO  rU  r   r   rV   r   r5  r   rK   rJ   r   rG  r6  r@  rB  rE  rJ  rI  rR  r   rI   r  rs  r   rc  rT  rU  r  r  rb  r  r  r  r'  r]   r]   r  ra   r    sz    


$
	


	

d	r  )}r   r  r   r~  r:  rd  rv  abcr   r   r   	itertoolsr   typingr   r   r   r	   r
   r   r   r   r   r   r   r   r   r   redis._parsersr   r   redis._parsers.helpersr   r   r   redis.asyncio.clientr   redis.asyncio.connectionr   r   r   redis.asyncio.lockr   redis.asyncio.retryr   redis.auth.tokenr    redis.backoffr!   r"   redis.clientr#   r$   r%   redis.clusterr&   r'   r(   r)   r*   r+   r,   r-   r.   r/   redis.commandsr0   r1   	redis.crcr2   r3   redis.credentialsr4   redis.eventr5   r6   redis.exceptionsr7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   redis.typingrI   rJ   rK   redis.utilsrL   rM   rN   rO   rP   rQ   rR   r\   rS   rT   r'  rU   rW   rV   r   rU  r  replacelowersetattrr  r  r  r  r  r]   r]   r]   ra   <module>   sz    @0P$
        \  8 #
dF +