Skip to content

Commit

Permalink
[GROW-2938] add retry to ClusterPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-iee committed Jun 22, 2023
1 parent aaa212b commit bc3192f
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,8 @@ def __init__(
self.retry = retry
kwargs.update({"retry": self.retry})
else:
kwargs.update({"retry": Retry(default_backoff(), 0)})
self.retry = Retry(default_backoff(), 0)
kwargs['retry'] = self.retry

self.encoder = Encoder(
kwargs.get("encoding", "utf-8"),
Expand Down Expand Up @@ -775,6 +776,7 @@ def pipeline(self, transaction=None, shard_hint=None):
read_from_replicas=self.read_from_replicas,
reinitialize_steps=self.reinitialize_steps,
lock=self._lock,
retry=self.retry,
)

def lock(
Expand Down Expand Up @@ -1794,6 +1796,7 @@ def __init__(
cluster_error_retry_attempts: int = 3,
reinitialize_steps: int = 5,
lock=None,
retry: Optional["Retry"] = None,
**kwargs,
):
""" """
Expand All @@ -1819,6 +1822,7 @@ def __init__(
if lock is None:
lock = threading.Lock()
self._lock = lock
self.retry = retry

def __repr__(self):
""" """
Expand Down Expand Up @@ -1951,8 +1955,9 @@ def send_cluster_commands(
stack,
raise_on_error=raise_on_error,
allow_redirections=allow_redirections,
attempts_count=self.cluster_error_retry_attempts - retry_attempts
)
except (ClusterDownError, ConnectionError) as e:
except (ClusterDownError, ConnectionError, TimeoutError) as e:
if retry_attempts > 0:
# Try again with the new cluster setup. All other errors
# should be raised.
Expand All @@ -1962,7 +1967,7 @@ def send_cluster_commands(
raise e

def _send_cluster_commands(
self, stack, raise_on_error=True, allow_redirections=True
self, stack, raise_on_error=True, allow_redirections=True, attempts_count=0
):
"""
Send a bunch of cluster commands to the redis cluster.
Expand Down Expand Up @@ -2017,9 +2022,11 @@ def _send_cluster_commands(
redis_node = self.get_redis_connection(node)
try:
connection = get_connection(redis_node, c.args)
except ConnectionError:
# Connection retries are being handled in the node's
# Retry object. Reinitialize the node -> slot table.
except (ConnectionError, TimeoutError) as e:
if self.retry and isinstance(e, self.retry._supported_errors):
backoff = self.retry._backoff.compute(attempts_count)
if backoff > 0:
time.sleep(backoff)
self.nodes_manager.initialize()
if is_default_node:
self.replace_default_node()
Expand Down

0 comments on commit bc3192f

Please sign in to comment.