-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure stream messages are always ordered #8059
Conversation
CI doesn't look sadder than usual. I'll follow up with a test and I think we can merge |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files + 4 20 suites +4 10h 57m 56s ⏱️ + 2h 37m 11s For more details on these failures, see this check. Results for commit 40253fb. ± Comparison against base commit 31af5c0. ♻️ This comment has been updated with latest results. |
distributed/tests/test_core.py
Outdated
@gen_test() | ||
async def test_messages_are_ordered_bsend(): | ||
ledger = [] | ||
|
||
async def async_handler(val): | ||
await asyncio.sleep(0.1) | ||
ledger.append(val) | ||
|
||
def sync_handler(val): | ||
ledger.append(val) | ||
|
||
async with Server( | ||
{}, | ||
stream_handlers={ | ||
"sync_handler": sync_handler, | ||
"async_handler": async_handler, | ||
}, | ||
) as s: | ||
await s.listen() | ||
comm = await connect(s.address) | ||
try: | ||
b = BatchedSend(interval=10) | ||
try: | ||
await comm.write({"op": "connection_stream"}) | ||
b.start(comm) | ||
n = 100 | ||
for ix in range(n): | ||
if ix % 2: | ||
b.send({"op": "sync_handler", "val": ix}) | ||
else: | ||
b.send({"op": "async_handler", "val": ix}) | ||
while not len(ledger) == n: | ||
await asyncio.sleep(0.01) | ||
assert ledger == list(range(n)) | ||
finally: | ||
await b.close() | ||
finally: | ||
await comm.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incredible that we didn't have test for this
async def __aenter__(self): | ||
await self | ||
return self | ||
|
||
async def __aexit__(self, *args): | ||
await self.close() | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unrelated but pleasant addition to the CommPool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the reduced complexity this approach introduces. However, I can see this causing issues in real-world use cases because long-running handlers might delay others. Have you run benchmarks on this to see if anything pops up?
The possible benefits warrant giving this a try. So, this looks good to me with one suggestion for improving the tests before giving ✅.
Just to highlight this: This shouldn't cause issues, if anything, it should prevent them. I just wouldn't be surprised if some behavior implicitly relied on async handlers being non-blocking and executed in the background. |
No but reviewing the code base we actually don't even have any async handlers right now with the exception of handle_request_refresh_who_has although this one should not be async... |
Well, there is |
Ah sorry, the above was wrong. |
Thanks for the clarification. Let's drop |
Sure... doesn't really matter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming CI isn't unhappier than usual.
Some motivation for this over here #8049 (comment)
Essentially this breaks ordering of operations as soon as a handler is async. With this change, we'd ensure ordering within a given BatchedStream regardless of whether the handlers are sync or async.
I think this is a much easier to reason about behavior (but I have no idea where this will blow up... )