From a7b45390f3720a33c9ad3896a8a185bfb2628839 Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Mon, 12 Jun 2017 11:00:21 -0700 Subject: [PATCH] fix(core): revert PR #305 SetWatches which caused RuntimeError PR #305 introduced a feature to restore watches on reconnect. Unfortunately this introduced RuntimeError's under various cases, so reverting it is necessary. --- kazoo/client.py | 3 +-- kazoo/protocol/connection.py | 22 -------------------- kazoo/protocol/serialization.py | 23 -------------------- kazoo/tests/test_client.py | 37 --------------------------------- 4 files changed, 1 insertion(+), 84 deletions(-) diff --git a/kazoo/client.py b/kazoo/client.py index b03f750a..8a4c699f 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -477,8 +477,7 @@ def _session_callback(self, state): self._live.clear() self._notify_pending(state) self._make_state_change(KazooState.SUSPENDED) - if state != KeeperState.CONNECTING: - self._reset_watchers() + self._reset_watchers() def _notify_pending(self, state): """Used to clear a pending response queue and request queue diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index 9cfb7a2b..a63f470b 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -26,7 +26,6 @@ Ping, PingInstance, ReplyHeader, - SetWatches, Transaction, Watch, int_struct @@ -60,7 +59,6 @@ WATCH_XID = -1 PING_XID = -2 AUTH_XID = -4 -SET_WATCHES_XID = -8 CLOSE_RESPONSE = Close.type @@ -410,8 +408,6 @@ def _read_socket(self, read_timeout): async_object.set(True) elif header.xid == WATCH_XID: self._read_watch_event(buffer, offset) - elif header.xid == SET_WATCHES_XID: - self.logger.log(BLATHER, 'Received SetWatches reply') else: self.logger.log(BLATHER, 'Reading for header %r', header) @@ -444,8 +440,6 @@ def _send_request(self, read_timeout, connect_timeout): # Special case for auth packets if request.type == Auth.type: xid = AUTH_XID - elif request.type == SetWatches.type: - xid = SET_WATCHES_XID else: self._xid = (self._xid % 2147483647) + 1 xid = self._xid @@ -619,11 +613,6 @@ def _connect(self, host, port): client._session_id or 0, client._session_passwd, client.read_only) - # save the client's last_zxid before it gets overwritten by the - # server's. - # we'll need this to reset watches via SetWatches further below. - last_zxid = client.last_zxid - connect_result, zxid = self._invoke( client._session_timeout / 1000.0, connect) @@ -663,15 +652,4 @@ def _connect(self, host, port): if zxid: client.last_zxid = zxid - # TODO: separate exist from data watches - if client._data_watchers or client._child_watchers.keys(): - sw = SetWatches(last_zxid, - client._data_watchers.keys(), - client._data_watchers.keys(), - client._child_watchers.keys()) - zxid = self._invoke(connect_timeout / 1000.0, sw, - xid=SET_WATCHES_XID) - if zxid: - client.last_zxid = zxid - return read_timeout, connect_timeout diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index e3249f7d..8a9ef0f2 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -12,7 +12,6 @@ # Struct objects with formats compiled bool_struct = struct.Struct('B') int_struct = struct.Struct('!i') -long_struct = struct.Struct('!q') int_int_struct = struct.Struct('!ii') int_int_long_struct = struct.Struct('!iiq') @@ -52,14 +51,6 @@ def write_string(bytes): return int_struct.pack(len(utf8_str)) + utf8_str -def write_string_vector(v): - b = bytearray() - b.extend(int_struct.pack(len(v))) - for s in v: - b.extend(write_string(s)) - return b - - def write_buffer(bytes): if bytes is None: return int_struct.pack(-1) @@ -386,20 +377,6 @@ def serialize(self): write_string(self.auth)) -class SetWatches( - namedtuple('SetWatches', - 'relativeZxid, dataWatches, existWatches, childWatches')): - type = 101 - - def serialize(self): - b = bytearray() - b.extend(long_struct.pack(self.relativeZxid)) - b.extend(write_string_vector(self.dataWatches)) - b.extend(write_string_vector(self.existWatches)) - b.extend(write_string_vector(self.childWatches)) - return b - - class Watch(namedtuple('Watch', 'type state path')): @classmethod def deserialize(cls, bytes, offset): diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index af8ecf5d..5bc79990 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -963,43 +963,6 @@ def test_update_host_list(self): finally: self.cluster[0].run() - def test_set_watches_on_reconnect(self): - client = self.client - watch_event = client.handler.event_object() - - client.create("/tacos") - - # set the watch - def w(we): - eq_(we.path, "/tacos") - watch_event.set() - - client.get_children("/tacos", watch=w) - - # force a reconnect - states = [] - rc = client.handler.event_object() - - @client.add_listener - def listener(state): - if state == KazooState.CONNECTED: - states.append(state) - rc.set() - - client._connection._socket.shutdown(socket.SHUT_RDWR) - - rc.wait(10) - eq_(states, [KazooState.CONNECTED]) - - # watches should still be there - self.assertTrue(len(client._child_watchers) == 1) - - # ... and they should fire - client.create("/tacos/hello_", b"", ephemeral=True, sequence=True) - - watch_event.wait(1) - self.assertTrue(watch_event.is_set()) - dummy_dict = { 'aversion': 1, 'ctime': 0, 'cversion': 1,