From 6f107a64edc0447312394e174e5f7998ff8b52ab Mon Sep 17 00:00:00 2001 From: Myungjin Lee Date: Fri, 28 Apr 2023 19:40:27 -0700 Subject: [PATCH] fix: slow tx task termination tx task termination is slow because putting EMPTY_PAYLOAD was never executed. This was because channel.remove(end) is called that end is going to be removed. Hence, the length of channel._ends is always zero. To address the issue, in channel.remove(), puting EMPTY_PAYLOAD into the tx queue is implemented. With this, every time channel.remove() is called, the given end's tx task is safely terminated. --- lib/python/flame/backend/p2p.py | 9 ++++----- lib/python/flame/channel.py | 4 ++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/python/flame/backend/p2p.py b/lib/python/flame/backend/p2p.py index c4ae78f9c..7fee1efcc 100644 --- a/lib/python/flame/backend/p2p.py +++ b/lib/python/flame/backend/p2p.py @@ -194,8 +194,10 @@ def leave(self, channel) -> None: - for each end id in end ids in the channel, call _cleanup_end(end_id), which terminates rx task, releases data in _endpoints, _livecheck, delayed_channel_add and _context_keeper, and calls channel.remove() + Note: channel.remove() terminates tx queue by putting EMPTY_PAYLOAD + in tx queue of the given end - - terminate tx tasks by putting EMPTY_PAYLOAD in tx queue of each task + - terminate broadcast tx task by putting EMPTY_PAYLOAD in tx queue """ async def _leave_inner(): @@ -241,12 +243,9 @@ async def _leave_inner(): # removed this channel from self._channels await channel.remove(end_id) - # we use EMPTY_PAYLOAD as signal to finish tx tasks + # we use EMPTY_PAYLOAD as signal to finish broadcast tx task # put EMPTY_PAYLOAD to broadcast queue await channel._bcast_queue.put(EMPTY_PAYLOAD) - for _, end in channel._ends.items(): - # put EMPTY_PAYLOAD to unicast queue for each end - await end.put(EMPTY_PAYLOAD) await self.await_tx_tasks_done(channel.name()) diff --git a/lib/python/flame/channel.py b/lib/python/flame/channel.py index c30941871..5adb9c2f6 100644 --- a/lib/python/flame/channel.py +++ b/lib/python/flame/channel.py @@ -486,11 +486,15 @@ async def remove(self, end_id): return rxq = self._ends[end_id].get_rxq() + txq = self._ends[end_id].get_txq() del self._ends[end_id] # put bogus data to unblock a get() call await rxq.put(EMPTY_PAYLOAD) + # put bogus data to let tx_task finish + await txq.put(EMPTY_PAYLOAD) + if len(self._ends) == 0: # clear (or unset) the event self.await_join_event.clear()