From 3b6cdb5067554dc793e1446d6e8b4df07e19cf14 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 1 Aug 2023 16:17:30 +0200 Subject: [PATCH 1/5] Await async handlers --- distributed/core.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index accc976f72..554e8ef109 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -1006,10 +1006,7 @@ async def handle_stream(self, comm, extra=None): break handler = self.stream_handlers[op] if iscoroutinefunction(handler): - self._ongoing_background_tasks.call_soon( - handler, **merge(extra, msg) - ) - await asyncio.sleep(0) + await handler(**merge(extra, msg)) else: handler(**merge(extra, msg)) else: From d06cb5c6ab31d5f53f573bff4a3a08d0f3eb1549 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 1 Aug 2023 18:35:33 +0200 Subject: [PATCH 2/5] add test for message ordering --- distributed/core.py | 8 ++++ distributed/tests/test_core.py | 76 ++++++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/distributed/core.py b/distributed/core.py index 554e8ef109..6c86db301b 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -1518,6 +1518,14 @@ async def _() -> Self: return _().__await__() + async def __aenter__(self): + await self + return self + + async def __aexit__(self, *args): + await self.close() + return + async def start(self) -> None: # Invariant: semaphore._value == limit - open - _n_connecting self.semaphore = asyncio.Semaphore(self.limit) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 193c1554f0..a1800bed57 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -15,6 +15,7 @@ import dask +from distributed.batched import BatchedSend from distributed.comm.core import CommClosedError from distributed.comm.registry import backends from distributed.comm.tcp import TCPBackend, TCPListener @@ -1380,3 +1381,78 @@ async def test_async_listener_stop(monkeypatch): async with Server({}) as s: await s.listen(0) assert s.listeners + + +@gen_test() +async def test_messages_are_ordered_bsend(): + ledger = [] + + async def async_handler(val): + await asyncio.sleep(0.1) + ledger.append(val) + + def sync_handler(val): + ledger.append(val) + + async with Server( + {}, + stream_handlers={ + "sync_handler": sync_handler, + "async_handler": async_handler, + }, + ) as s: + await s.listen() + comm = await connect(s.address) + try: + b = BatchedSend(interval=10) + try: + await comm.write({"op": "connection_stream"}) + b.start(comm) + n = 100 + for ix in range(n): + if ix % 2: + b.send({"op": "sync_handler", "val": ix}) + else: + b.send({"op": "async_handler", "val": ix}) + while not len(ledger) == n: + await asyncio.sleep(0.01) + assert ledger == list(range(n)) + finally: + await b.close() + finally: + await comm.close() + + +@gen_test() +async def test_messages_are_ordered_raw(): + ledger = [] + + async def async_handler(val): + await asyncio.sleep(0.01) + ledger.append(val) + + def sync_handler(val): + ledger.append(val) + + async with Server( + {}, + stream_handlers={ + "sync_handler": sync_handler, + "async_handler": async_handler, + }, + ) as s: + await s.listen() + comm = await connect(s.address) + try: + await comm.write({"op": "connection_stream"}) + n = 100 + for ix in range(n): + if ix % 2: + await comm.write({"op": "sync_handler", "val": ix}) + else: + await comm.write({"op": "async_handler", "val": ix}) + while not len(ledger) == n: + await asyncio.sleep(0.01) + assert ledger == list(range(n)) + finally: + await comm.close() From 7f268a1b125e1bc24e1432560ec152e654759a8e Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 1 Aug 2023 18:56:05 +0200 Subject: [PATCH 3/5] reduce sleep time --- distributed/tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index a1800bed57..006fcfe9cb 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -1388,7 +1388,7 @@ async def test_messages_are_ordered_bsend(): ledger = [] async def async_handler(val): - await asyncio.sleep(0.1) + await asyncio.sleep(0.01) ledger.append(val) def sync_handler(val): From d73c081e1603378145e8afee969c4b8d1d841adf Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 2 Aug 2023 09:39:47 +0200 Subject: [PATCH 4/5] randomize sleep --- distributed/tests/test_core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 006fcfe9cb..5eb05dcb1f 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -3,6 +3,7 @@ import asyncio import contextlib import os +import random import socket import sys import threading @@ -1388,7 +1389,7 @@ async def test_messages_are_ordered_bsend(): ledger = [] async def async_handler(val): - await asyncio.sleep(0.01) + await asyncio.sleep(0.01 * random.random()) ledger.append(val) def sync_handler(val): @@ -1428,7 +1429,7 @@ async def test_messages_are_ordered_raw(): ledger = [] async def async_handler(val): - await asyncio.sleep(0.01) + await asyncio.sleep(0.01 * random.random()) ledger.append(val) def sync_handler(val): From 40253fb5c92fa6f8afb20bf364519bcbce556e2a Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 2 Aug 2023 09:56:07 +0200 Subject: [PATCH 5/5] handle_request_refresh_who_has sync --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 57cb266674..0f96df074f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5650,12 +5650,12 @@ def handle_worker_status_change( self.idle_task_count.discard(ws) self.saturated.discard(ws) - async def handle_request_refresh_who_has( + def handle_request_refresh_who_has( self, keys: Iterable[str], worker: str, stimulus_id: str ) -> None: - """Asynchronous request (through bulk comms) from a Worker to refresh the - who_has for some keys. Not to be confused with scheduler.who_has, which is a - synchronous RPC request from a Client. + """Request from a Worker to refresh the + who_has for some keys. Not to be confused with scheduler.who_has, which + is a dedicated comm RPC request from a Client. """ who_has = {} free_keys = []