From 69987bda1af5c730a07f25b82ef05a2ef495bd21 Mon Sep 17 00:00:00 2001 From: Zach Lee Date: Wed, 28 Jun 2023 15:18:30 +0900 Subject: [PATCH] [GROW-3134] release already acquired connections, when get_connections raised an exception (#3) * [GROW-3134] release already acquired connections, when get_connections raised an exception * fix style (cherry picked from commit 94bb9157a13562404394bd55394dd6e9c0775c85) --- redis/cluster.py | 2 ++ tests/test_cluster.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/redis/cluster.py b/redis/cluster.py index 806cf6da66..22160c7a9c 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1995,6 +1995,8 @@ def _send_cluster_commands( try: connection = get_connection(redis_node, c.args) except (ConnectionError, TimeoutError) as e: + for n in nodes.values(): + n.connection_pool.release(n.connection) if self.retry and isinstance(e, self.retry._supported_errors): backoff = self.retry._backoff.compute(attempts_count) if backoff > 0: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index fc4b99f1ee..3f4d070d43 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -8,6 +8,7 @@ import pytest +import redis.cluster from redis import Redis from redis.backoff import ( ConstantBackoff, @@ -2925,6 +2926,33 @@ def raise_ask_error(): assert ask_node.redis_connection.connection.read_response.called assert res == ["MOCK_OK"] + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) + def test_return_previous_acquired_connections(self, r, error): + # in order to ensure that a pipeline will make use of connections + # from different nodes + assert r.keyslot("a") != r.keyslot("b") + + orig_func = redis.cluster.get_connection + with patch("redis.cluster.get_connection") as get_connection: + + def raise_error(target_node, *args, **kwargs): + if get_connection.call_count == 2: + raise error("mocked error") + else: + return orig_func(target_node, *args, **kwargs) + + get_connection.side_effect = raise_error + + r.pipeline().get("a").get("b").execute() + + # there should have been two get_connections per execution and + # two executions due to exception raised in the first execution + assert get_connection.call_count == 4 + for cluster_node in r.nodes_manager.nodes_cache.values(): + connection_pool = cluster_node.redis_connection.connection_pool + num_of_conns = len(connection_pool._available_connections) + assert num_of_conns == connection_pool._created_connections + def test_empty_stack(self, r): """ If pipeline is executed with no commands it should