Skip to content

Commit

Permalink
[GROW-3134] release already acquired connections, when get_connection…
Browse files Browse the repository at this point in the history
…s raised an exception (#3)

* [GROW-3134] release already acquired connections, when get_connections raised an exception

* fix style

(cherry picked from commit 94bb915)
  • Loading branch information
zach-iee committed Jul 7, 2023
1 parent b1b91f2 commit 98d2346
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
2 changes: 2 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,8 @@ def _send_cluster_commands(
try:
connection = get_connection(redis_node, c.args)
except (ConnectionError, TimeoutError) as e:
for n in nodes.values():
n.connection_pool.release(n.connection)
if self.retry and isinstance(e, self.retry._supported_errors):
backoff = self.retry._backoff.compute(attempts_count)
if backoff > 0:
Expand Down
28 changes: 28 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pytest

import redis.cluster
from redis import Redis
from redis.backoff import (
ConstantBackoff,
Expand Down Expand Up @@ -3048,6 +3049,33 @@ def raise_ask_error():
assert ask_node.redis_connection.connection.read_response.called
assert res == ["MOCK_OK"]

@pytest.mark.parametrize("error", [ConnectionError, TimeoutError])
def test_return_previous_acquired_connections(self, r, error):
# in order to ensure that a pipeline will make use of connections
# from different nodes
assert r.keyslot("a") != r.keyslot("b")

orig_func = redis.cluster.get_connection
with patch("redis.cluster.get_connection") as get_connection:

def raise_error(target_node, *args, **kwargs):
if get_connection.call_count == 2:
raise error("mocked error")
else:
return orig_func(target_node, *args, **kwargs)

get_connection.side_effect = raise_error

r.pipeline().get("a").get("b").execute()

# there should have been two get_connections per execution and
# two executions due to exception raised in the first execution
assert get_connection.call_count == 4
for cluster_node in r.nodes_manager.nodes_cache.values():
connection_pool = cluster_node.redis_connection.connection_pool
num_of_conns = len(connection_pool._available_connections)
assert num_of_conns == connection_pool._created_connections

def test_empty_stack(self, r):
"""
If pipeline is executed with no commands it should
Expand Down

0 comments on commit 98d2346

Please sign in to comment.