Skip to content

Commit

Permalink
Merge pull request #450 from python-zk/feat/revert-pr-305
Browse files Browse the repository at this point in the history
fix(core): revert PR #305 SetWatches which caused RuntimeError
  • Loading branch information
bbangert committed Jun 12, 2017
2 parents 1956bab + a7b4539 commit b4967d1
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 84 deletions.
3 changes: 1 addition & 2 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,7 @@ def _session_callback(self, state):
self._live.clear()
self._notify_pending(state)
self._make_state_change(KazooState.SUSPENDED)
if state != KeeperState.CONNECTING:
self._reset_watchers()
self._reset_watchers()

def _notify_pending(self, state):
"""Used to clear a pending response queue and request queue
Expand Down
22 changes: 0 additions & 22 deletions kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
Ping,
PingInstance,
ReplyHeader,
SetWatches,
Transaction,
Watch,
int_struct
Expand Down Expand Up @@ -60,7 +59,6 @@
WATCH_XID = -1
PING_XID = -2
AUTH_XID = -4
SET_WATCHES_XID = -8

CLOSE_RESPONSE = Close.type

Expand Down Expand Up @@ -410,8 +408,6 @@ def _read_socket(self, read_timeout):
async_object.set(True)
elif header.xid == WATCH_XID:
self._read_watch_event(buffer, offset)
elif header.xid == SET_WATCHES_XID:
self.logger.log(BLATHER, 'Received SetWatches reply')
else:
self.logger.log(BLATHER, 'Reading for header %r', header)

Expand Down Expand Up @@ -444,8 +440,6 @@ def _send_request(self, read_timeout, connect_timeout):
# Special case for auth packets
if request.type == Auth.type:
xid = AUTH_XID
elif request.type == SetWatches.type:
xid = SET_WATCHES_XID
else:
self._xid = (self._xid % 2147483647) + 1
xid = self._xid
Expand Down Expand Up @@ -619,11 +613,6 @@ def _connect(self, host, port):
client._session_id or 0, client._session_passwd,
client.read_only)

# save the client's last_zxid before it gets overwritten by the
# server's.
# we'll need this to reset watches via SetWatches further below.
last_zxid = client.last_zxid

connect_result, zxid = self._invoke(
client._session_timeout / 1000.0, connect)

Expand Down Expand Up @@ -663,15 +652,4 @@ def _connect(self, host, port):
if zxid:
client.last_zxid = zxid

# TODO: separate exist from data watches
if client._data_watchers or client._child_watchers.keys():
sw = SetWatches(last_zxid,
client._data_watchers.keys(),
client._data_watchers.keys(),
client._child_watchers.keys())
zxid = self._invoke(connect_timeout / 1000.0, sw,
xid=SET_WATCHES_XID)
if zxid:
client.last_zxid = zxid

return read_timeout, connect_timeout
23 changes: 0 additions & 23 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# Struct objects with formats compiled
bool_struct = struct.Struct('B')
int_struct = struct.Struct('!i')
long_struct = struct.Struct('!q')
int_int_struct = struct.Struct('!ii')
int_int_long_struct = struct.Struct('!iiq')

Expand Down Expand Up @@ -52,14 +51,6 @@ def write_string(bytes):
return int_struct.pack(len(utf8_str)) + utf8_str


def write_string_vector(v):
b = bytearray()
b.extend(int_struct.pack(len(v)))
for s in v:
b.extend(write_string(s))
return b


def write_buffer(bytes):
if bytes is None:
return int_struct.pack(-1)
Expand Down Expand Up @@ -386,20 +377,6 @@ def serialize(self):
write_string(self.auth))


class SetWatches(
namedtuple('SetWatches',
'relativeZxid, dataWatches, existWatches, childWatches')):
type = 101

def serialize(self):
b = bytearray()
b.extend(long_struct.pack(self.relativeZxid))
b.extend(write_string_vector(self.dataWatches))
b.extend(write_string_vector(self.existWatches))
b.extend(write_string_vector(self.childWatches))
return b


class Watch(namedtuple('Watch', 'type state path')):
@classmethod
def deserialize(cls, bytes, offset):
Expand Down
37 changes: 0 additions & 37 deletions kazoo/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -963,43 +963,6 @@ def test_update_host_list(self):
finally:
self.cluster[0].run()

def test_set_watches_on_reconnect(self):
client = self.client
watch_event = client.handler.event_object()

client.create("/tacos")

# set the watch
def w(we):
eq_(we.path, "/tacos")
watch_event.set()

client.get_children("/tacos", watch=w)

# force a reconnect
states = []
rc = client.handler.event_object()

@client.add_listener
def listener(state):
if state == KazooState.CONNECTED:
states.append(state)
rc.set()

client._connection._socket.shutdown(socket.SHUT_RDWR)

rc.wait(10)
eq_(states, [KazooState.CONNECTED])

# watches should still be there
self.assertTrue(len(client._child_watchers) == 1)

# ... and they should fire
client.create("/tacos/hello_", b"", ephemeral=True, sequence=True)

watch_event.wait(1)
self.assertTrue(watch_event.is_set())


dummy_dict = {
'aversion': 1, 'ctime': 0, 'cversion': 1,
Expand Down

0 comments on commit b4967d1

Please sign in to comment.