Skip to content

Commit

Permalink
fix: recv_fifo for asyncfl
Browse files Browse the repository at this point in the history
recv_fifo() takes first_k as one of its arguments to determine the
first_k number of messages are received and returned.

To implement the functionality, asynchronous call
_streamer_for_recv_fifo() was implemented. However the function has a
bug that is triggered in asyncfl. The function adds the first_k
messages into an rx queue and re-adds the remaining messages to the
head of each corresponding end id's rx queue. The remaining messages
then processed in the next recv_fifo call. However,
since _streamer_for_recv_fifo() is an asynchronous call, there is no
guarantee that the messages saved in the previous call will be
processed first. This causes some trainers blocked and made them fail
to participate future rounds.

The issue is fixed as follows. Since recv_fifo() always returns
first_k number of messages from the rx queue, there is no harm even if
the remaining messages are added to the queue. By removing all the
code to handle the remaining code, the code becomes now clean and easy
to understand.
  • Loading branch information
myungjin committed Sep 18, 2023
1 parent e175d42 commit fa494d7
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 43 deletions.
46 changes: 4 additions & 42 deletions lib/python/flame/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,51 +365,13 @@ async def _get_inner(end_id) -> tuple[str, Any]:
logger.debug(f"active task added for {end_id}")
logger.debug(f"{str(self._active_recv_fifo_tasks)}")

# DO NOT CHANGE self.count as a local variable
# with aiostream, local variable update looks incorrect.
# but with an instance variable , the variable update is
# done correctly.
self.count = 0
merged = stream.merge(*runs)
async with merged.stream() as streamer:
logger.debug(f"0) cnt: {self.count}, first_k: {self.first_k}")
async for result in streamer:
(end_id, payload) = result
logger.debug(f"1) end id: {end_id}, cnt: {self.count}")

self.count += 1
logger.debug(f"2) end id: {end_id}, cnt: {self.count}")
if self.count <= self.first_k:
logger.debug(f"3) end id: {end_id}, cnt: {self.count}")
await self._rx_queue.put(result)
self._active_recv_fifo_tasks.remove(end_id)

logger.debug(f"active task removed for {end_id}")
logger.debug(f"{str(self._active_recv_fifo_tasks)}")
else:
logger.debug(f"4) end id: {end_id}, cnt: {self.count}")
if end_id not in self._ends:
logger.debug(f"{end_id} not in _ends")
continue
# We already put the first_k number of messages into
# a queue.
#
# Now we need to save the remaining messages which
# were already taken out from each end's rcv queue.
# In order not to lose those messages, we use peek_buf
# in end object.

# WARNING: peek_buf must be none; if not, we called
# peek() somewhere else and then called recv_fifo()
# before recv() was called.
# To detect this potential issue, assert is given here.
assert self._ends[end_id].peek_buf is None

self._ends[end_id].peek_buf = payload
self._active_recv_fifo_tasks.remove(end_id)

logger.debug(f"active task removed for {end_id}")
logger.debug(f"{str(self._active_recv_fifo_tasks)}")
(end_id, _) = result

await self._rx_queue.put(result)
self._active_recv_fifo_tasks.remove(end_id)

def peek(self, end_id):
"""Peek rxq of end_id and return data if queue is not empty."""
Expand Down
2 changes: 1 addition & 1 deletion lib/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

setup(
name="flame",
version="0.1.0",
version="0.1.1",
author="Flame Maintainers",
author_email="flame-github-owners@cisco.com",
include_package_data=True,
Expand Down

0 comments on commit fa494d7

Please sign in to comment.