Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure stream messages are always ordered #8059

Merged
merged 5 commits into from
Aug 2, 2023

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Aug 1, 2023

Some motivation for this over here #8049 (comment)

Essentially this breaks ordering of operations as soon as a handler is async. With this change, we'd ensure ordering within a given BatchedStream regardless of whether the handlers are sync or async.
I think this is a much easier to reason about behavior (but I have no idea where this will blow up... )

@fjetter
Copy link
Member Author

fjetter commented Aug 1, 2023

CI doesn't look sadder than usual. I'll follow up with a test and I think we can merge

@github-actions
Copy link
Contributor

github-actions bot commented Aug 1, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files  +       4         20 suites  +4   10h 57m 56s ⏱️ + 2h 37m 11s
  3 748 tests +       2    3 637 ✔️ +     13     106 💤  -   10  5  - 1 
36 254 runs  +7 453  34 498 ✔️ +7 148  1 748 💤 +304  8 +1 

For more details on these failures, see this check.

Results for commit 40253fb. ± Comparison against base commit 31af5c0.

♻️ This comment has been updated with latest results.

@fjetter fjetter marked this pull request as ready for review August 1, 2023 16:35
Comment on lines 1386 to 1423
@gen_test()
async def test_messages_are_ordered_bsend():
ledger = []

async def async_handler(val):
await asyncio.sleep(0.1)
ledger.append(val)

def sync_handler(val):
ledger.append(val)

async with Server(
{},
stream_handlers={
"sync_handler": sync_handler,
"async_handler": async_handler,
},
) as s:
await s.listen()
comm = await connect(s.address)
try:
b = BatchedSend(interval=10)
try:
await comm.write({"op": "connection_stream"})
b.start(comm)
n = 100
for ix in range(n):
if ix % 2:
b.send({"op": "sync_handler", "val": ix})
else:
b.send({"op": "async_handler", "val": ix})
while not len(ledger) == n:
await asyncio.sleep(0.01)
assert ledger == list(range(n))
finally:
await b.close()
finally:
await comm.close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incredible that we didn't have test for this

Comment on lines +1521 to +1527
async def __aenter__(self):
await self
return self

async def __aexit__(self, *args):
await self.close()
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated but pleasant addition to the CommPool

@fjetter fjetter changed the title Await async handlers Ensure stream messages are always ordered Aug 1, 2023
Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the reduced complexity this approach introduces. However, I can see this causing issues in real-world use cases because long-running handlers might delay others. Have you run benchmarks on this to see if anything pops up?

The possible benefits warrant giving this a try. So, this looks good to me with one suggestion for improving the tests before giving ✅.

distributed/tests/test_core.py Outdated Show resolved Hide resolved
distributed/tests/test_core.py Outdated Show resolved Hide resolved
@hendrikmakait
Copy link
Member

hendrikmakait commented Aug 1, 2023

However, I can see this causing issues in real-world use cases because long-running handlers might delay others.

Just to highlight this: This shouldn't cause issues, if anything, it should prevent them. I just wouldn't be surprised if some behavior implicitly relied on async handlers being non-blocking and executed in the background.

@fjetter
Copy link
Member Author

fjetter commented Aug 2, 2023

I like the reduced complexity this approach introduces. However, I can see this causing issues in real-world use cases because long-running handlers might delay others. Have you run benchmarks on this to see if anything pops up?

No but reviewing the code base we actually don't even have any async handlers right now with the exception of handle_request_refresh_who_has although this one should not be async...

@fjetter
Copy link
Member Author

fjetter commented Aug 2, 2023

Well, there is Worker.close which is an async handler for the "terminate" operation. AFAICT this op is never used since the workers are more or less closed forcefully by terminating the stream connection. I don't think the connection termination is a very clean way to do this (and we're seeing tons of logs). However, I'll leave this to a follow up. If the close would actually block, there are ways to do this without blocking

@fjetter
Copy link
Member Author

fjetter commented Aug 2, 2023

Ah sorry, the above was wrong. terminate is actually a dedicated-comm RPC call that is not being used. The close stream op is in fact used

@hendrikmakait
Copy link
Member

Ah sorry, the above was wrong. terminate is actually a dedicated-comm RPC call that is not being used. The close stream op is in fact used

Thanks for the clarification. Let's drop terminate in a dedicated PR then?

@fjetter
Copy link
Member Author

fjetter commented Aug 2, 2023

Thanks for the clarification. Let's drop terminate in a dedicated PR then?

Sure... doesn't really matter

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, assuming CI isn't unhappier than usual.

@fjetter fjetter merged commit 7ba2dea into dask:main Aug 2, 2023
17 of 25 checks passed
@fjetter fjetter deleted the await_async_handlers branch August 2, 2023 09:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants