Skip to content

Commit

Permalink
fix(handlers): make AsyncResult call all registered callbacks instant…
Browse files Browse the repository at this point in the history
…ly if the handler has stopped running (#549)

This avoids zombie thread to appear when creating and closing the client right after. A new unit case is added.
  • Loading branch information
laura-surcel authored and StephenSorriaux committed Jan 15, 2019
1 parent 1452a48 commit d9e0e72
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
4 changes: 4 additions & 0 deletions kazoo/handlers/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def __init__(self):
self._state_change = Semaphore()
self._workers = []

@property
def running(self):
return self._running

class timeout_exception(gevent.Timeout):
def __init__(self, msg):
gevent.Timeout.__init__(self, exception=msg)
Expand Down
4 changes: 4 additions & 0 deletions kazoo/handlers/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def __init__(self):
self._state_change = threading.Lock()
self._workers = []

@property
def running(self):
return self._running

def _create_thread_worker(self, queue):
def _thread_worker(): # pragma: nocover
while True:
Expand Down
33 changes: 18 additions & 15 deletions kazoo/handlers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,14 @@ def set(self, value=None):
with self._condition:
self.value = value
self._exception = None
for callback in self._callbacks:
self._handler.completion_queue.put(
functools.partial(callback, self)
)
self._do_callbacks()
self._condition.notify_all()

def set_exception(self, exception):
"""Store the exception. Wake up the waiters."""
with self._condition:
self._exception = exception
for callback in self._callbacks:
self._handler.completion_queue.put(
functools.partial(callback, self)
)
self._do_callbacks()
self._condition.notify_all()

def get(self, block=True, timeout=None):
Expand Down Expand Up @@ -102,16 +96,13 @@ def rawlink(self, callback):
"""Register a callback to call when a value or an exception is
set"""
with self._condition:
# Are we already set? Dispatch it now
if self.ready():
self._handler.completion_queue.put(
functools.partial(callback, self)
)
return

if callback not in self._callbacks:
self._callbacks.append(callback)

# Are we already set? Dispatch it now
if self.ready():
self._do_callbacks()

def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
with self._condition:
Expand All @@ -122,6 +113,18 @@ def unlink(self, callback):
if callback in self._callbacks:
self._callbacks.remove(callback)

def _do_callbacks(self):
"""Execute the callbacks that were registered by :meth:`rawlink`.
If the handler is in running state this method only schedules
the calls to be performed by the handler. If it's stopped,
the callbacks are called right away."""

for callback in self._callbacks:
if self._handler.running:
self._handler.completion_queue.put(
functools.partial(callback, self))
else:
functools.partial(callback, self)()

def _set_fd_cloexec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
Expand Down
24 changes: 23 additions & 1 deletion kazoo/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ def test_context(self):
eq_(self.client.get('/smith')[0], b'32')


class TestCallbacks(unittest.TestCase):
class TestSessionCallbacks(unittest.TestCase):
def test_session_callback_states(self):
from kazoo.protocol.states import KazooState, KeeperState
from kazoo.client import KazooClient
Expand Down Expand Up @@ -1185,6 +1185,28 @@ def test_session_callback_states(self):
eq_(client.state, KazooState.SUSPENDED)


class TestCallbacks(KazooTestCase):
def test_async_result_callbacks_are_always_called(self):
# create a callback object
callback_mock = mock.Mock()

# simulate waiting for a response
async_result = self.client.handler.async_result()
async_result.rawlink(callback_mock)

# begin the procedure to stop the client
self.client.stop()

# the response has just been received;
# this should be on another thread,
# simultaneously with the stop procedure
async_result.set_exception(
Exception("Anything that throws an exception"))

# with the fix the callback should be called
self.assertGreater(callback_mock.call_count, 0)


class TestNonChrootClient(KazooTestCase):

def test_create(self):
Expand Down

0 comments on commit d9e0e72

Please sign in to comment.