diff --git a/distributed/dashboard/components/worker.py b/distributed/dashboard/components/worker.py index 20d55b7c9d..c5ba33bdc3 100644 --- a/distributed/dashboard/components/worker.py +++ b/distributed/dashboard/components/worker.py @@ -91,7 +91,7 @@ def update(self): "Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)], "Ready": [len(w.state.ready)], "Waiting": [w.state.waiting_for_data_count], - "Connections": [len(w.state.in_flight_workers)], + "Connections": [w.state.transfer_incoming_count], "Serving": [len(w._comms)], } update(self.source, d) @@ -114,8 +114,8 @@ def __init__(self, worker, height=300, **kwargs): "total", ] - self.incoming = ColumnDataSource({name: [] for name in names}) - self.outgoing = ColumnDataSource({name: [] for name in names}) + self.transfer_incoming = ColumnDataSource({name: [] for name in names}) + self.transfer_outgoing = ColumnDataSource({name: [] for name in names}) x_range = DataRange1d(range_padding=0) y_range = DataRange1d(range_padding=0) @@ -131,7 +131,7 @@ def __init__(self, worker, height=300, **kwargs): ) fig.rect( - source=self.incoming, + source=self.transfer_incoming, x="middle", y="y", width="duration", @@ -140,7 +140,7 @@ def __init__(self, worker, height=300, **kwargs): alpha="alpha", ) fig.rect( - source=self.outgoing, + source=self.transfer_outgoing, x="middle", y="y", width="duration", @@ -159,26 +159,40 @@ def __init__(self, worker, height=300, **kwargs): self.root = fig - self.last_incoming = 0 - self.last_outgoing = 0 + self.last_transfer_incoming_count_total = 0 + self.last_transfer_outgoing_count_total = 0 self.who = dict() @without_property_validation @log_errors def update(self): - outgoing = self.worker.outgoing_transfer_log - n = self.worker.outgoing_count - self.last_outgoing - outgoing = [outgoing[-i].copy() for i in range(1, n + 1)] - self.last_outgoing = self.worker.outgoing_count + transfer_outgoing_log = self.worker.transfer_outgoing_log + n = ( + self.worker.transfer_outgoing_count_total + - self.last_transfer_outgoing_count_total + ) + transfer_outgoing_log = [ + transfer_outgoing_log[-i].copy() for i in range(1, n + 1) + ] + self.last_transfer_outgoing_count_total = ( + self.worker.transfer_outgoing_count_total + ) - incoming = self.worker.incoming_transfer_log - n = self.worker.incoming_count - self.last_incoming - incoming = [incoming[-i].copy() for i in range(1, n + 1)] - self.last_incoming = self.worker.incoming_count + transfer_incoming_log = self.worker.transfer_incoming_log + n = ( + self.worker.state.transfer_incoming_count_total + - self.last_transfer_incoming_count_total + ) + transfer_incoming_log = [ + transfer_incoming_log[-i].copy() for i in range(1, n + 1) + ] + self.last_transfer_incoming_count_total = ( + self.worker.state.transfer_incoming_count_total + ) for [msgs, source] in [ - [incoming, self.incoming], - [outgoing, self.outgoing], + [transfer_incoming_log, self.transfer_incoming], + [transfer_outgoing_log, self.transfer_outgoing], ]: for msg in msgs: @@ -225,7 +239,7 @@ def __init__(self, worker, **kwargs): fig = figure( title="Communication History", x_axis_type="datetime", - y_range=[-0.1, worker.state.total_out_connections + 0.5], + y_range=[-0.1, worker.state.transfer_incoming_count_limit + 0.5], height=150, tools="", x_range=x_range, @@ -247,7 +261,7 @@ def update(self): { "x": [time() * 1000], "out": [len(self.worker._comms)], - "in": [len(self.worker.state.in_flight_workers)], + "in": [self.worker.state.transfer_incoming_count], }, 10000, ) diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index 04b4778e4d..6b86cbf6f3 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -142,11 +142,11 @@ async def test_CommunicatingStream(c, s, a, b): aa.update() bb.update() - assert len(first(aa.outgoing.data.values())) and len( - first(bb.outgoing.data.values()) + assert len(first(aa.transfer_outgoing.data.values())) and len( + first(bb.transfer_outgoing.data.values()) ) - assert len(first(aa.incoming.data.values())) and len( - first(bb.incoming.data.values()) + assert len(first(aa.transfer_incoming.data.values())) and len( + first(bb.transfer_incoming.data.values()) ) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index 79b85b8a61..f95cf415b9 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -37,7 +37,7 @@ def collect(self): yield GaugeMetricFamily( self.build_name("concurrent_fetch_requests"), "Number of open fetch requests to other workers.", - value=len(self.server.state.in_flight_workers), + value=self.server.state.transfer_incoming_count, ) yield GaugeMetricFamily( diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ef2d1e79c6..43da018b84 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -647,7 +647,7 @@ async def test_gather_skip(c, s, a): async def test_limit_concurrent_gathering(c, s, a, b): futures = c.map(inc, range(100)) await c.gather(futures) - assert len(a.outgoing_transfer_log) + len(b.outgoing_transfer_log) < 100 + assert len(a.transfer_outgoing_log) + len(b.transfer_outgoing_log) < 100 @gen_cluster(client=True) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 41b59b7df5..17773918c5 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -232,7 +232,7 @@ async def test_nanny_timeout(c, s, a): clean_kwargs={"threads": False}, config={"distributed.worker.memory.pause": False}, ) -async def test_throttle_outgoing_connections(c, s, a, *other_workers): +async def test_throttle_outgoing_transfers(c, s, a, *other_workers): # Put a bunch of small data on worker a logging.getLogger("distributed.worker").setLevel(logging.DEBUG) remote_data = c.map( @@ -241,7 +241,7 @@ async def test_throttle_outgoing_connections(c, s, a, *other_workers): await wait(remote_data) a.status = Status.paused - a.outgoing_current_count = 2 + a.transfer_outgoing_count = 2 requests = [ await a.get_data(await w.rpc.connect(w.address), keys=[f.key], who=w.address) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 2c8fef3e3f..05850924f8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -227,7 +227,7 @@ def random(**kwargs): # Check that there were few transfers unexpected_transfers = [] for worker in workers: - for log in worker.incoming_transfer_log: + for log in worker.transfer_incoming_log: keys = log["keys"] # The root-ish tasks should never be transferred assert not any(k.startswith("random") for k in keys), keys diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index f9f990677d..0103808cf5 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -673,19 +673,19 @@ async def test_clean(c, s, a, b): @gen_cluster(client=True) async def test_message_breakup(c, s, a, b): n = 100_000 - a.state.target_message_size = 10 * n - b.state.target_message_size = 10 * n + a.state.transfer_message_target_bytes = 10 * n + b.state.transfer_message_target_bytes = 10 * n xs = [ c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30) ] y = c.submit(lambda _: None, xs, key="y", workers=[b.address]) await y - assert 2 <= len(b.incoming_transfer_log) <= 20 - assert 2 <= len(a.outgoing_transfer_log) <= 20 + assert 2 <= len(b.transfer_incoming_log) <= 20 + assert 2 <= len(a.transfer_outgoing_log) <= 20 - assert all(msg["who"] == b.address for msg in a.outgoing_transfer_log) - assert all(msg["who"] == a.address for msg in a.incoming_transfer_log) + assert all(msg["who"] == b.address for msg in a.transfer_outgoing_log) + assert all(msg["who"] == a.address for msg in a.transfer_incoming_log) @gen_cluster(client=True) @@ -764,7 +764,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps): concurrent outgoing connections. If multiple small fetches from the same worker are scheduled all at once, they will result in a single call to gather_dep. """ - a.state.total_out_connections = 2 + a.state.transfer_incoming_count_limit = 2 futures = await c.scatter( {f"x{i}": i for i in range(100)}, workers=[w.address for w in snd_workers], @@ -779,7 +779,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps): while len(a.data) < 100: await asyncio.sleep(0.01) - assert a.state.comm_nbytes == 0 + assert a.state.transfer_incoming_bytes == 0 story = a.state.story("request-dep", "receive-dep") assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker @@ -808,27 +808,31 @@ async def test_multiple_transfers(c, s, w1, w2, w3): @pytest.mark.xfail(reason="very high flakiness") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_share_communication(c, s, w1, w2, w3): - x = c.submit(mul, b"1", int(w3.target_message_size + 1), workers=w1.address) - y = c.submit(mul, b"2", int(w3.target_message_size + 1), workers=w2.address) + x = c.submit( + mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address + ) + y = c.submit( + mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address + ) await wait([x, y]) await c._replicate([x, y], workers=[w1.address, w2.address]) z = c.submit(add, x, y, workers=w3.address) await wait(z) - assert len(w3.incoming_transfer_log) == 2 - assert w1.outgoing_transfer_log - assert w2.outgoing_transfer_log + assert len(w3.transfer_incoming_log) == 2 + assert w1.transfer_outgoing_log + assert w2.transfer_outgoing_log @pytest.mark.xfail(reason="very high flakiness") @gen_cluster(client=True) async def test_dont_overlap_communications_to_same_worker(c, s, a, b): - x = c.submit(mul, b"1", int(b.target_message_size + 1), workers=a.address) - y = c.submit(mul, b"2", int(b.target_message_size + 1), workers=a.address) + x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address) + y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address) await wait([x, y]) z = c.submit(add, x, y, workers=b.address) await wait(z) - assert len(b.incoming_transfer_log) == 2 - l1, l2 = b.incoming_transfer_log + assert len(b.transfer_incoming_log) == 2 + l1, l2 = b.transfer_incoming_log assert l1["stop"] < l2["start"] @@ -1244,8 +1248,8 @@ async def test_wait_for_outgoing(c, s, a, b): y = c.submit(inc, future, workers=b.address) await wait(y) - assert len(b.incoming_transfer_log) == len(a.outgoing_transfer_log) == 1 - bb = b.incoming_transfer_log[0]["duration"] + assert len(b.transfer_incoming_log) == len(a.transfer_outgoing_log) == 1 + bb = b.transfer_incoming_log[0]["duration"] aa = a.outgoing_transfer_log[0]["duration"] ratio = aa / bb @@ -1262,8 +1266,8 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3): y = c.submit(inc, x, workers=[w2.address]) await wait(y) - assert any(d["who"] == w2.address for d in w1.outgoing_transfer_log) - assert not any(d["who"] == w2.address for d in w3.outgoing_transfer_log) + assert any(d["who"] == w2.address for d in w1.transfer_outgoing_log) + assert not any(d["who"] == w2.address for d in w3.transfer_outgoing_log) @gen_cluster( @@ -1281,10 +1285,10 @@ async def test_avoid_oversubscription(c, s, *workers): await wait(futures) # Original worker not responsible for all transfers - assert len(workers[0].outgoing_transfer_log) < len(workers) - 2 + assert len(workers[0].transfer_outgoing_log) < len(workers) - 2 # Some other workers did some work - assert len([w for w in workers if len(w.outgoing_transfer_log) > 0]) >= 3 + assert len([w for w in workers if len(w.transfer_outgoing_log) > 0]) >= 3 @gen_cluster(client=True, worker_kwargs={"metrics": {"my_port": lambda w: w.port}}) @@ -1959,7 +1963,7 @@ async def test_gather_dep_one_worker_always_busy(c, s, a, b): # We will block A for any outgoing communication. This simulates an # overloaded worker which will always return "busy" for get_data requests, # effectively blocking H indefinitely - a.outgoing_current_count = 10000000 + a.transfer_outgoing_count = 10000000 h = c.submit(add, f, g, key="h", workers=[b.address]) @@ -2021,7 +2025,7 @@ async def test_gather_dep_from_remote_workers_if_all_local_workers_are_busy( ) )["f"] for w in lws: - w.outgoing_current_count = 10000000 + w.transfer_outgoing_count = 10000000 g = c.submit(inc, f, key="g", workers=[a.address]) assert await g == 2 @@ -2718,7 +2722,7 @@ async def test_acquire_replicas_same_channel(c, s, a, b): ("request-dep", a.address, {fut.key}), ], ) - assert any(fut.key in msg["keys"] for msg in b.incoming_transfer_log) + assert any(fut.key in msg["keys"] for msg in b.transfer_incoming_log) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) @@ -2997,9 +3001,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b): @gen_cluster(client=True, nthreads=[("", 1)]) async def test_acquire_replicas_large_data(c, s, a): """When acquire-replicas is used to acquire multiple sizeable tasks, it respects - target_message_size and acquires them over multiple iterations. + transfer_message_target_bytes and acquires them over multiple iterations. """ - size = a.state.target_message_size // 5 - 10_000 + size = a.state.transfer_message_target_bytes // 5 - 10_000 class C: def __sizeof__(self): @@ -3026,7 +3030,7 @@ async def test_missing_released_zombie_tasks(c, s, a, b): Ensure that no fetch/flight tasks are left in the task dict of a worker after everything was released """ - a.total_in_connections = 0 + a.transfer_outgoing_count_limit = 0 f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[b.address]) key = f1.key @@ -3310,8 +3314,8 @@ async def test_Worker__to_dict(c, s, a): "thread_id", "logs", "config", - "incoming_transfer_log", - "outgoing_transfer_log", + "transfer_incoming_log", + "transfer_outgoing_log", # Attributes of WorkerMemoryManager "data", "max_spill", @@ -3551,3 +3555,20 @@ async def test_execute_preamble_abort_retirement(c, s): # Test that y does not get stuck. assert await y == 2 + + +@gen_cluster() +async def test_deprecation_of_renamed_worker_attributes(s, a, b): + msg = ( + "The `Worker.outgoing_count` attribute has been renamed to " + "`Worker.transfer_outgoing_count_total`" + ) + with pytest.warns(DeprecationWarning, match=msg): + assert a.outgoing_count == a.transfer_outgoing_count_total + + msg = ( + "The `Worker.outgoing_current_count` attribute has been renamed to " + "`Worker.transfer_outgoing_count`" + ) + with pytest.warns(DeprecationWarning, match=msg): + assert a.outgoing_current_count == a.transfer_outgoing_count diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index bc6f4b8689..a1db60cc5e 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -673,7 +673,7 @@ async def test_fetch_to_missing_on_busy(c, s, a, b): x = c.submit(inc, 1, key="x", workers=[b.address]) await x - b.total_in_connections = 0 + b.transfer_outgoing_count_limit = 0 # Crucially, unlike with `c.submit(inc, x, workers=[a.address])`, the scheduler # doesn't keep track of acquire-replicas requests, so it won't proactively inform a # when we call remove_worker later on @@ -923,7 +923,7 @@ async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3): x = c.submit(inc, 1, key="x", workers=[w1.address]) y = c.submit(inc, 2, key="y", workers=[w1.address]) await wait([x, y]) - w1.total_in_connections = 0 + w1.transfer_outgoing_count_limit = 0 s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1") # The tasks will now flip-flop between fetch and flight every 150ms @@ -985,17 +985,17 @@ async def get_data(self, comm, *args, **kwargs): @gen_cluster() async def test_deprecated_worker_attributes(s, a, b): - n = a.state.comm_threshold_bytes + n = a.state.generation msg = ( - "The `Worker.comm_threshold_bytes` attribute has been moved to " - "`Worker.state.comm_threshold_bytes`" + "The `Worker.generation` attribute has been moved to " + "`Worker.state.generation`" ) with pytest.warns(FutureWarning, match=msg): - assert a.comm_threshold_bytes == n + assert a.generation == n with pytest.warns(FutureWarning, match=msg): - a.comm_threshold_bytes += 1 - assert a.comm_threshold_bytes == n + 1 - assert a.state.comm_threshold_bytes == n + 1 + a.generation -= 1 + assert a.generation == n - 1 + assert a.state.generation == n - 1 # Old and new names differ msg = ( @@ -1012,7 +1012,7 @@ async def test_deprecated_worker_attributes(s, a, b): @pytest.mark.parametrize( "nbytes,n_in_flight", [ - # Note: target_message_size = 50e6 bytes + # Note: transfer_message_target_bytes = 50e6 bytes (int(10e6), 3), (int(20e6), 2), (int(30e6), 1), @@ -1040,7 +1040,7 @@ def test_gather_priority(ws): 3. in case of tie, from the worker with the most tasks queued 4. in case of tie, from a random worker (which is actually deterministic). """ - ws.total_out_connections = 4 + ws.transfer_incoming_count_limit = 4 instructions = ws.handle_stimulus( PauseEvent(stimulus_id="pause"), @@ -1060,8 +1060,9 @@ def test_gather_priority(ws): # This will be fetched first because it's on the same worker as y "x8": ["127.0.0.7:1"], }, - # Substantial nbytes prevents total_out_connections to be overridden by - # comm_threshold_bytes, but it's less than target_message_size + # Substantial nbytes prevents transfer_incoming_count_limit to be + # overridden by transfer_incoming_bytes_throttle_threshold, + # but it's less than transfer_message_target_bytes nbytes={f"x{i}": 4 * 2**20 for i in range(1, 9)}, stimulus_id="compute1", ), diff --git a/distributed/utils_test.py b/distributed/utils_test.py index baa1b6751d..450290a0b8 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -2281,13 +2281,13 @@ def freeze_data_fetching(w: Worker, *, jump_start: bool = False) -> Iterator[Non If True, trigger ensure_communicating on exit; this simulates e.g. an unrelated worker moving out of in_flight_workers. """ - old_out_connections = w.state.total_out_connections - old_comm_threshold = w.state.comm_threshold_bytes - w.state.total_out_connections = 0 - w.state.comm_threshold_bytes = 0 + old_count_limit = w.state.transfer_incoming_count_limit + old_threshold = w.state.transfer_incoming_bytes_throttle_threshold + w.state.transfer_incoming_count_limit = 0 + w.state.transfer_incoming_bytes_throttle_threshold = 0 yield - w.state.total_out_connections = old_out_connections - w.state.comm_threshold_bytes = old_comm_threshold + w.state.transfer_incoming_count_limit = old_count_limit + w.state.transfer_incoming_bytes_throttle_threshold = old_threshold if jump_start: w.status = Status.paused w.status = Status.running diff --git a/distributed/worker.py b/distributed/worker.py index 86e1fba531..b863f319d7 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -271,10 +271,10 @@ class Worker(BaseWorker, ServerNode): * **services:** ``{str: Server}``: Auxiliary web servers running on this worker * **service_ports:** ``{str: port}``: - * **total_in_connections**: ``int`` - The maximum number of concurrent incoming requests for data. + * **transfer_outgoing_count_limit**: ``int`` + The maximum number of concurrent outgoing data transfers. See also - :attr:`distributed.worker_state_machine.WorkerState.total_out_connections`. + :attr:`distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit`. * **batched_stream**: ``BatchedSend`` A batched stream along which we communicate to the scheduler * **log**: ``[(message)]`` @@ -374,7 +374,7 @@ class Worker(BaseWorker, ServerNode): nanny: Nanny | None _lock: threading.Lock - total_in_connections: int + transfer_outgoing_count_limit: int threads: dict[str, int] # {ts.key: thread ID} active_threads_lock: threading.Lock active_threads: dict[int, str] # {thread ID: ts.key} @@ -383,11 +383,12 @@ class Worker(BaseWorker, ServerNode): profile_keys_history: deque[tuple[float, dict[str, dict[str, Any]]]] profile_recent: dict[str, Any] profile_history: deque[tuple[float, dict[str, Any]]] - incoming_transfer_log: deque[dict[str, Any]] - outgoing_transfer_log: deque[dict[str, Any]] - incoming_count: int - outgoing_count: int - outgoing_current_count: int + transfer_incoming_log: deque[dict[str, Any]] + transfer_outgoing_log: deque[dict[str, Any]] + #: Number of total data transfers to other workers. + transfer_outgoing_count_total: int + #: Number of open data transfers to other workers. + transfer_outgoing_count: int bandwidth: float latency: float profile_cycle_interval: float @@ -519,10 +520,10 @@ def __init__( self.nanny = nanny self._lock = threading.Lock() - total_out_connections = dask.config.get( + transfer_incoming_count_limit = dask.config.get( "distributed.worker.connections.outgoing" ) - self.total_in_connections = dask.config.get( + self.transfer_outgoing_count_limit = dask.config.get( "distributed.worker.connections.incoming" ) @@ -539,11 +540,10 @@ def __init__( if validate is None: validate = dask.config.get("distributed.scheduler.validate") - self.incoming_transfer_log = deque(maxlen=100000) - self.incoming_count = 0 - self.outgoing_transfer_log = deque(maxlen=100000) - self.outgoing_count = 0 - self.outgoing_current_count = 0 + self.transfer_incoming_log = deque(maxlen=100000) + self.transfer_outgoing_log = deque(maxlen=100000) + self.transfer_outgoing_count_total = 0 + self.transfer_outgoing_count = 0 self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth")) self.bandwidth_workers = defaultdict( lambda: (0, 0) @@ -748,7 +748,7 @@ def __init__( threads=self.threads, plugins=self.plugins, resources=resources, - total_out_connections=total_out_connections, + transfer_incoming_count_limit=transfer_incoming_count_limit, validate=validate, transition_counter_max=transition_counter_max, ) @@ -849,14 +849,19 @@ def data(self) -> MutableMapping[str, Any]: actors = DeprecatedWorkerStateAttribute() available_resources = DeprecatedWorkerStateAttribute() busy_workers = DeprecatedWorkerStateAttribute() - comm_nbytes = DeprecatedWorkerStateAttribute() - comm_threshold_bytes = DeprecatedWorkerStateAttribute() + comm_nbytes = DeprecatedWorkerStateAttribute(target="transfer_incoming_bytes") + comm_threshold_bytes = DeprecatedWorkerStateAttribute( + target="transfer_incoming_bytes_throttle_threshold" + ) constrained = DeprecatedWorkerStateAttribute() data_needed_per_worker = DeprecatedWorkerStateAttribute(target="data_needed") executed_count = DeprecatedWorkerStateAttribute() executing_count = DeprecatedWorkerStateAttribute() generation = DeprecatedWorkerStateAttribute() has_what = DeprecatedWorkerStateAttribute() + incoming_count = DeprecatedWorkerStateAttribute( + target="transfer_incoming_count_total" + ) in_flight_tasks = DeprecatedWorkerStateAttribute(target="in_flight_tasks_count") in_flight_workers = DeprecatedWorkerStateAttribute() log = DeprecatedWorkerStateAttribute() @@ -867,8 +872,12 @@ def data(self) -> MutableMapping[str, Any]: story = DeprecatedWorkerStateAttribute() ready = DeprecatedWorkerStateAttribute() tasks = DeprecatedWorkerStateAttribute() - target_message_size = DeprecatedWorkerStateAttribute() - total_out_connections = DeprecatedWorkerStateAttribute() + target_message_size = DeprecatedWorkerStateAttribute( + target="transfer_message_target_bytes" + ) + total_out_connections = DeprecatedWorkerStateAttribute( + target="transfer_incoming_count_limit" + ) total_resources = DeprecatedWorkerStateAttribute() transition_counter = DeprecatedWorkerStateAttribute() transition_counter_max = DeprecatedWorkerStateAttribute() @@ -1027,8 +1036,8 @@ def _to_dict(self, *, exclude: Container[str] = ()) -> dict: "status": self.status, "logs": self.get_logs(), "config": dask.config.config, - "incoming_transfer_log": self.incoming_transfer_log, - "outgoing_transfer_log": self.outgoing_transfer_log, + "transfer_incoming_log": self.transfer_incoming_log, + "transfer_outgoing_log": self.transfer_outgoing_log, } extra = {k: v for k, v in extra.items() if k not in exclude} info.update(extra) @@ -1641,7 +1650,7 @@ async def get_data( start = time() if max_connections is None: - max_connections = self.total_in_connections + max_connections = self.transfer_outgoing_count_limit # Allow same-host connections more liberally if ( @@ -1653,26 +1662,28 @@ async def get_data( if self.status == Status.paused: max_connections = 1 - throttle_msg = " Throttling outgoing connections because worker is paused." + throttle_msg = ( + " Throttling outgoing data transfers because worker is paused." + ) else: throttle_msg = "" if ( max_connections is not False - and self.outgoing_current_count >= max_connections + and self.transfer_outgoing_count >= max_connections ): logger.debug( "Worker %s has too many open connections to respond to data request " "from %s (%d/%d).%s", self.address, who, - self.outgoing_current_count, + self.transfer_outgoing_count, max_connections, throttle_msg, ) return {"status": "busy"} - self.outgoing_current_count += 1 + self.transfer_outgoing_count += 1 data = {k: self.data[k] for k in keys if k in self.data} if len(data) < len(keys): @@ -1704,16 +1715,16 @@ async def get_data( comm.abort() raise finally: - self.outgoing_current_count -= 1 + self.transfer_outgoing_count -= 1 stop = time() if self.digests is not None: self.digests["get-data-send-duration"].add(stop - start) total_bytes = sum(filter(None, nbytes.values())) - self.outgoing_count += 1 + self.transfer_outgoing_count_total += 1 duration = (stop - start) or 0.5 # windows - self.outgoing_transfer_log.append( + self.transfer_outgoing_log.append( { "start": start + self.scheduler_delay, "stop": stop + self.scheduler_delay, @@ -1928,7 +1939,7 @@ def _update_metrics_received_data( ) duration = (stop - start) or 0.010 bandwidth = total_bytes / duration - self.incoming_transfer_log.append( + self.transfer_incoming_log.append( { "start": start + self.scheduler_delay, "stop": stop + self.scheduler_delay, @@ -1955,7 +1966,6 @@ def _update_metrics_received_data( self.digests["transfer-bandwidth"].add(total_bytes / duration) self.digests["transfer-duration"].add(duration) self.counters["transfer-count"].add(len(data)) - self.incoming_count += 1 @fail_hard async def gather_dep( @@ -2533,6 +2543,56 @@ def validate_state(self) -> None: raise + @property + def incoming_transfer_log(self): + warnings.warn( + "The `Worker.incoming_transfer_log` attribute has been renamed to " + "`Worker.transfer_incoming_log`", + DeprecationWarning, + stacklevel=2, + ) + return self.transfer_incoming_log + + @property + def outgoing_count(self): + warnings.warn( + "The `Worker.outgoing_count` attribute has been renamed to " + "`Worker.transfer_outgoing_count_total`", + DeprecationWarning, + stacklevel=2, + ) + return self.transfer_outgoing_count_total + + @property + def outgoing_current_count(self): + warnings.warn( + "The `Worker.outgoing_current_count` attribute has been renamed to " + "`Worker.transfer_outgoing_count`", + DeprecationWarning, + stacklevel=2, + ) + return self.transfer_outgoing_count + + @property + def outgoing_transfer_log(self): + warnings.warn( + "The `Worker.outgoing_transfer_log` attribute has been renamed to " + "`Worker.transfer_outgoing_log`", + DeprecationWarning, + stacklevel=2, + ) + return self.transfer_outgoing_log + + @property + def total_in_connections(self): + warnings.warn( + "The `Worker.total_in_connections` attribute has been renamed to " + "`Worker.transfer_outgoing_count_limit`", + DeprecationWarning, + stacklevel=2, + ) + return self.transfer_outgoing_count_limit + def get_worker() -> Worker: """Get the worker currently running this task diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index c63bd86850..88d2e93692 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1122,7 +1122,7 @@ class WorkerState: #: :meth:`BaseWorker.gather_dep`. Multiple small tasks that can be fetched from the #: same worker will be clustered in a single instruction as long as their combined #: size doesn't exceed this value. - target_message_size: int + transfer_message_target_bytes: int #: All and only tasks with ``TaskState.state == 'missing'``. missing_dep_flight: set[TaskState] @@ -1142,16 +1142,19 @@ class WorkerState: #: dependencies until the current query returns. in_flight_workers: dict[str, set[str]] - #: The total number of bytes in flight - comm_nbytes: int + #: The total size of incoming data transfers for in-flight tasks. + transfer_incoming_bytes: int - #: The maximum number of concurrent incoming requests for data. - #: See also :attr:`distributed.worker.Worker.total_in_connections`. - total_out_connections: int + #: The maximum number of concurrent incoming data transfers from other workers. + #: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`. + transfer_incoming_count_limit: int - #: Ignore :attr:`total_out_connections` as long as :attr:`comm_nbytes` is + #: Number of total data transfers from other workers since the worker was started. + transfer_incoming_count_total: int + + #: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is #: less than this value. - comm_threshold_bytes: int + transfer_incoming_bytes_throttle_threshold: int #: Peer workers that recently returned a busy status. Workers in this set won't be #: asked for additional dependencies for some time. @@ -1237,7 +1240,7 @@ def __init__( threads: dict[str, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, - total_out_connections: int = 9999, + transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, ): @@ -1266,9 +1269,10 @@ def __init__( ) self.in_flight_workers = {} self.busy_workers = set() - self.total_out_connections = total_out_connections - self.comm_threshold_bytes = int(10e6) - self.comm_nbytes = 0 + self.transfer_incoming_count_limit = transfer_incoming_count_limit + self.transfer_incoming_count_total = 0 + self.transfer_incoming_bytes_throttle_threshold = int(10e6) + self.transfer_incoming_bytes = 0 self.missing_dep_flight = set() self.generation = 0 self.ready = HeapSet(key=operator.attrgetter("priority")) @@ -1277,7 +1281,7 @@ def __init__( self.in_flight_tasks = set() self.executed_count = 0 self.long_running = set() - self.target_message_size = int(50e6) # 50 MB + self.transfer_message_target_bytes = int(50e6) # 50 MB self.log = deque(maxlen=100_000) self.stimulus_log = deque(maxlen=10_000) self.transition_counter = 0 @@ -1351,6 +1355,16 @@ def in_flight_tasks_count(self) -> int: """ return len(self.in_flight_tasks) + @property + def transfer_incoming_count(self) -> int: + """Count of open data transfers from other workers. + + See also + -------- + WorkerState.in_flight_workers + """ + return len(self.in_flight_workers) + ######################### # Shared helper methods # ######################### @@ -1454,8 +1468,9 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: if not self.running or not self.data_needed: return {}, [] if ( - len(self.in_flight_workers) >= self.total_out_connections - and self.comm_nbytes >= self.comm_threshold_bytes + self.transfer_incoming_count >= self.transfer_incoming_count_limit + and self.transfer_incoming_bytes + >= self.transfer_incoming_bytes_throttle_threshold ): return {}, [] @@ -1477,8 +1492,8 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: worker, len(available_tasks), len(self.data_needed), - len(self.in_flight_workers), - self.total_out_connections, + self.transfer_incoming_count, + self.transfer_incoming_count_limit, len(self.busy_workers), ) self.log.append( @@ -1507,10 +1522,11 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: ) self.in_flight_workers[worker] = to_gather_keys - self.comm_nbytes += total_nbytes + self.transfer_incoming_bytes += total_nbytes if ( - len(self.in_flight_workers) >= self.total_out_connections - and self.comm_nbytes >= self.comm_threshold_bytes + self.transfer_incoming_count >= self.transfer_incoming_count_limit + and self.transfer_incoming_bytes + >= self.transfer_incoming_bytes_throttle_threshold ): break @@ -1586,7 +1602,7 @@ def _select_keys_for_gather( """Helper of _ensure_communicating. Fetch all tasks that are replicated on the target worker within a single - message, up to target_message_size. + message, up to transfer_message_target_bytes. """ to_gather: list[TaskState] = [] total_nbytes = 0 @@ -1594,7 +1610,10 @@ def _select_keys_for_gather( while available: ts = available.peek() # The top-priority task is fetched regardless of its size - if to_gather and total_nbytes + ts.get_nbytes() > self.target_message_size: + if ( + to_gather + and total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes + ): break for worker in ts.who_has: # This also effectively pops from available @@ -2757,7 +2776,8 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState] -------- _execute_done_common """ - self.comm_nbytes -= ev.total_nbytes + self.transfer_incoming_bytes -= ev.total_nbytes + self.transfer_incoming_count_total += 1 keys = self.in_flight_workers.pop(ev.worker) for key in keys: ts = self.tasks[key] diff --git a/docs/source/diagnosing-performance.rst b/docs/source/diagnosing-performance.rst index 2d634a1cf5..b41643627b 100644 --- a/docs/source/diagnosing-performance.rst +++ b/docs/source/diagnosing-performance.rst @@ -95,7 +95,7 @@ Bandwidth --------- Dask workers track every incoming and outgoing transfer in the -``Worker.outgoing_transfer_log`` and ``Worker.incoming_transfer_log`` +``Worker.transfer_outgoing_log`` and ``Worker.transfer_incoming_log`` attributes including 1. Total bytes transferred @@ -110,8 +110,8 @@ command on the workers: .. code-block:: python - client.run(lambda dask_worker: dask_worker.outgoing_transfer_log) - client.run(lambda dask_worker: dask_worker.incoming_transfer_log) + client.run(lambda dask_worker: dask_worker.transfer_outgoing_log) + client.run(lambda dask_worker: dask_worker.transfer_incoming_log) Performance Reports