diff --git a/redis/cluster.py b/redis/cluster.py index 487e3d7f85..ef961dd91f 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -2026,6 +2026,8 @@ def _send_cluster_commands( try: connection = get_connection(redis_node, c.args) except (ConnectionError, TimeoutError) as e: + for n in nodes.values(): + n.connection_pool.release(n.connection) if self.retry and isinstance(e, self.retry._supported_errors): backoff = self.retry._backoff.compute(attempts_count) if backoff > 0: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index fd8a4d208e..f393463e3c 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -12,6 +12,7 @@ import pytest +import redis.cluster from redis import Redis from redis.backoff import ( ConstantBackoff, @@ -3048,6 +3049,33 @@ def raise_ask_error(): assert ask_node.redis_connection.connection.read_response.called assert res == ["MOCK_OK"] + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) + def test_return_previous_acquired_connections(self, r, error): + # in order to ensure that a pipeline will make use of connections + # from different nodes + assert r.keyslot("a") != r.keyslot("b") + + orig_func = redis.cluster.get_connection + with patch("redis.cluster.get_connection") as get_connection: + + def raise_error(target_node, *args, **kwargs): + if get_connection.call_count == 2: + raise error("mocked error") + else: + return orig_func(target_node, *args, **kwargs) + + get_connection.side_effect = raise_error + + r.pipeline().get("a").get("b").execute() + + # there should have been two get_connections per execution and + # two executions due to exception raised in the first execution + assert get_connection.call_count == 4 + for cluster_node in r.nodes_manager.nodes_cache.values(): + connection_pool = cluster_node.redis_connection.connection_pool + num_of_conns = len(connection_pool._available_connections) + assert num_of_conns == connection_pool._created_connections + def test_empty_stack(self, r): """ If pipeline is executed with no commands it should