From 4456f180735a0f8520bfc42474de9d27fa01bb2c Mon Sep 17 00:00:00 2001 From: Jiangge Zhang Date: Tue, 31 Oct 2017 15:15:47 +0800 Subject: [PATCH] perf(recipe): Give TreeCache standalone queue This commit lets TreeCache do not use queue of connection routine any more. --- kazoo/recipe/cache.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/kazoo/recipe/cache.py b/kazoo/recipe/cache.py index 1a084af7..65fad854 100644 --- a/kazoo/recipe/cache.py +++ b/kazoo/recipe/cache.py @@ -36,6 +36,8 @@ class TreeCache(object): STATE_STARTED = 1 STATE_CLOSED = 2 + _STOP = object() + def __init__(self, client, path): self._client = client self._root = TreeNode.make_root(self, path) @@ -44,6 +46,8 @@ def __init__(self, client, path): self._is_initialized = False self._error_listeners = [] self._event_listeners = [] + self._task_queue = client.handler.queue_impl() + self._task_thread = None def start(self): """Starts the cache. @@ -66,6 +70,7 @@ def start(self): else: raise KazooException('already started') + self._task_thread = self._client.handler.spawn(self._do_background) self._client.add_listener(self._session_watcher) self._client.ensure_path(self._root._path) @@ -87,6 +92,7 @@ def close(self): """ if self._state == self.STATE_STARTED: self._state = self.STATE_CLOSED + self._task_queue.put(self._STOP) self._client.remove_listener(self._session_watcher) with handle_exception(self._error_listeners): self._root.on_deleted() @@ -168,7 +174,16 @@ def _do_publish_event(self, event): listener(event) def _in_background(self, func, *args, **kwargs): - self._client.handler.callback_queue.put(lambda: func(*args, **kwargs)) + self._task_queue.put((func, args, kwargs)) + + def _do_background(self): + while True: + with handle_exception(self._error_listeners): + cb = self._task_queue.get() + if cb is self._STOP: + break + func, args, kwargs = cb + func(*args, **kwargs) def _session_watcher(self, state): if state == KazooState.SUSPENDED: