diff --git a/redis/cluster.py b/redis/cluster.py index 78d09310fd..2471ebe76f 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -576,7 +576,8 @@ def __init__( self.retry = retry kwargs.update({"retry": self.retry}) else: - kwargs.update({"retry": Retry(default_backoff(), 0)}) + self.retry = Retry(default_backoff(), 0) + kwargs['retry'] = self.retry self.encoder = Encoder( kwargs.get("encoding", "utf-8"), @@ -759,6 +760,7 @@ def pipeline(self, transaction=None, shard_hint=None): read_from_replicas=self.read_from_replicas, reinitialize_steps=self.reinitialize_steps, lock=self._lock, + retry=self.retry, ) def lock( @@ -1764,6 +1766,7 @@ def __init__( cluster_error_retry_attempts: int = 3, reinitialize_steps: int = 5, lock=None, + retry: Optional["Retry"] = None, **kwargs, ): """ """ @@ -1789,6 +1792,7 @@ def __init__( if lock is None: lock = threading.Lock() self._lock = lock + self.retry = retry def __repr__(self): """ """ @@ -1921,8 +1925,9 @@ def send_cluster_commands( stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, + attempts_count=self.cluster_error_retry_attempts - retry_attempts ) - except (ClusterDownError, ConnectionError) as e: + except (ClusterDownError, ConnectionError, TimeoutError) as e: if retry_attempts > 0: # Try again with the new cluster setup. All other errors # should be raised. @@ -1932,7 +1937,7 @@ def send_cluster_commands( raise e def _send_cluster_commands( - self, stack, raise_on_error=True, allow_redirections=True + self, stack, raise_on_error=True, allow_redirections=True, attempts_count=0 ): """ Send a bunch of cluster commands to the redis cluster. @@ -1987,9 +1992,11 @@ def _send_cluster_commands( redis_node = self.get_redis_connection(node) try: connection = get_connection(redis_node, c.args) - except ConnectionError: - # Connection retries are being handled in the node's - # Retry object. Reinitialize the node -> slot table. + except (ConnectionError, TimeoutError) as e: + if self.retry and isinstance(e, self.retry._supported_errors): + backoff = self.retry._backoff.compute(attempts_count) + if backoff > 0: + time.sleep(backoff) self.nodes_manager.initialize() if is_default_node: self.replace_default_node()