From d464d68f3ac459ff55a63562aa5818ebcfbabec7 Mon Sep 17 00:00:00 2001 From: "zach.lee" Date: Tue, 27 Jun 2023 22:16:33 +0900 Subject: [PATCH 1/2] [GROW-3134] release already acquired connections, when get_connections raised an exception --- redis/cluster.py | 2 ++ tests/test_cluster.py | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/redis/cluster.py b/redis/cluster.py index 487e3d7f85..ef961dd91f 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -2026,6 +2026,8 @@ def _send_cluster_commands( try: connection = get_connection(redis_node, c.args) except (ConnectionError, TimeoutError) as e: + for n in nodes.values(): + n.connection_pool.release(n.connection) if self.retry and isinstance(e, self.retry._supported_errors): backoff = self.retry._backoff.compute(attempts_count) if backoff > 0: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index fd8a4d208e..856a7b9169 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -12,6 +12,7 @@ import pytest +import redis.cluster from redis import Redis from redis.backoff import ( ConstantBackoff, @@ -3048,6 +3049,32 @@ def raise_ask_error(): assert ask_node.redis_connection.connection.read_response.called assert res == ["MOCK_OK"] + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) + def test_return_previous_acquired_connections(self, r, error): + # in order to ensure that a pipeline will make use of connections + # from different nodes + assert r.keyslot('a') != r.keyslot('b') + + orig_func = redis.cluster.get_connection + with patch("redis.cluster.get_connection") as get_connection: + def raise_error(target_node, *args, **kwargs): + if get_connection.call_count == 2: + raise error("mocked error") + else: + return orig_func(target_node, *args, **kwargs) + + get_connection.side_effect = raise_error + + r.pipeline().get('a').get('b').execute() + + # there should have been two get_connections per execution and + # two executions due to exception raised in the first execution + assert get_connection.call_count == 4 + for cluster_node in r.nodes_manager.nodes_cache.values(): + connection_pool = cluster_node.redis_connection.connection_pool + num_of_conns = len(connection_pool._available_connections) + assert num_of_conns == connection_pool._created_connections + def test_empty_stack(self, r): """ If pipeline is executed with no commands it should From 433e48176a3c1eabf1c7ff73deb4878d5ff3416e Mon Sep 17 00:00:00 2001 From: "zach.lee" Date: Wed, 28 Jun 2023 00:19:29 +0900 Subject: [PATCH 2/2] fix style --- tests/test_cluster.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 856a7b9169..f393463e3c 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -3053,10 +3053,11 @@ def raise_ask_error(): def test_return_previous_acquired_connections(self, r, error): # in order to ensure that a pipeline will make use of connections # from different nodes - assert r.keyslot('a') != r.keyslot('b') + assert r.keyslot("a") != r.keyslot("b") orig_func = redis.cluster.get_connection with patch("redis.cluster.get_connection") as get_connection: + def raise_error(target_node, *args, **kwargs): if get_connection.call_count == 2: raise error("mocked error") @@ -3065,7 +3066,7 @@ def raise_error(target_node, *args, **kwargs): get_connection.side_effect = raise_error - r.pipeline().get('a').get('b').execute() + r.pipeline().get("a").get("b").execute() # there should have been two get_connections per execution and # two executions due to exception raised in the first execution