Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: recv_fifo for asyncfl #449

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 4 additions & 42 deletions lib/python/flame/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,51 +365,13 @@ async def _get_inner(end_id) -> tuple[str, Any]:
logger.debug(f"active task added for {end_id}")
logger.debug(f"{str(self._active_recv_fifo_tasks)}")

# DO NOT CHANGE self.count as a local variable
# with aiostream, local variable update looks incorrect.
# but with an instance variable , the variable update is
# done correctly.
self.count = 0
merged = stream.merge(*runs)
async with merged.stream() as streamer:
logger.debug(f"0) cnt: {self.count}, first_k: {self.first_k}")
async for result in streamer:
(end_id, payload) = result
logger.debug(f"1) end id: {end_id}, cnt: {self.count}")

self.count += 1
logger.debug(f"2) end id: {end_id}, cnt: {self.count}")
if self.count <= self.first_k:
logger.debug(f"3) end id: {end_id}, cnt: {self.count}")
await self._rx_queue.put(result)
self._active_recv_fifo_tasks.remove(end_id)

logger.debug(f"active task removed for {end_id}")
logger.debug(f"{str(self._active_recv_fifo_tasks)}")
else:
logger.debug(f"4) end id: {end_id}, cnt: {self.count}")
if end_id not in self._ends:
logger.debug(f"{end_id} not in _ends")
continue
# We already put the first_k number of messages into
# a queue.
#
# Now we need to save the remaining messages which
# were already taken out from each end's rcv queue.
# In order not to lose those messages, we use peek_buf
# in end object.

# WARNING: peek_buf must be none; if not, we called
# peek() somewhere else and then called recv_fifo()
# before recv() was called.
# To detect this potential issue, assert is given here.
assert self._ends[end_id].peek_buf is None

self._ends[end_id].peek_buf = payload
self._active_recv_fifo_tasks.remove(end_id)

logger.debug(f"active task removed for {end_id}")
logger.debug(f"{str(self._active_recv_fifo_tasks)}")
(end_id, _) = result

await self._rx_queue.put(result)
self._active_recv_fifo_tasks.remove(end_id)

def peek(self, end_id):
"""Peek rxq of end_id and return data if queue is not empty."""
Expand Down
2 changes: 1 addition & 1 deletion lib/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

setup(
name="flame",
version="0.1.0",
version="0.1.1",
author="Flame Maintainers",
author_email="flame-github-owners@cisco.com",
include_package_data=True,
Expand Down