diff --git a/distributed/dashboard/components/worker.py b/distributed/dashboard/components/worker.py index 42d2e85987..0f854bd536 100644 --- a/distributed/dashboard/components/worker.py +++ b/distributed/dashboard/components/worker.py @@ -91,7 +91,7 @@ def update(self): "Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)], "Ready": [len(w.state.ready)], "Waiting": [w.state.waiting_for_data_count], - "Connections": [w.state.comm_incoming_count], + "Connections": [w.state.transfer_incoming_count], "Serving": [len(w._comms)], } update(self.source, d) @@ -114,8 +114,8 @@ def __init__(self, worker, height=300, **kwargs): "total", ] - self.comm_incoming = ColumnDataSource({name: [] for name in names}) - self.comm_outgoing = ColumnDataSource({name: [] for name in names}) + self.transfer_incoming = ColumnDataSource({name: [] for name in names}) + self.transfer_outgoing = ColumnDataSource({name: [] for name in names}) x_range = DataRange1d(range_padding=0) y_range = DataRange1d(range_padding=0) @@ -131,7 +131,7 @@ def __init__(self, worker, height=300, **kwargs): ) fig.rect( - source=self.comm_incoming, + source=self.transfer_incoming, x="middle", y="y", width="duration", @@ -140,7 +140,7 @@ def __init__(self, worker, height=300, **kwargs): alpha="alpha", ) fig.rect( - source=self.comm_outgoing, + source=self.transfer_outgoing, x="middle", y="y", width="duration", @@ -159,36 +159,40 @@ def __init__(self, worker, height=300, **kwargs): self.root = fig - self.last_comm_incoming_cumulative_count = 0 - self.last_comm_outgoing_cumulative_count = 0 + self.last_transfer_incoming_count_total = 0 + self.last_transfer_outgoing_count_total = 0 self.who = dict() @without_property_validation @log_errors def update(self): - comm_outgoing_log = self.worker.comm_outgoing_log + transfer_outgoing_log = self.worker.transfer_outgoing_log n = ( - self.worker.comm_outgoing_cumulative_count - - self.last_comm_outgoing_cumulative_count + self.worker.transfer_outgoing_count_total + - self.last_transfer_outgoing_count_total ) - comm_outgoing_log = [comm_outgoing_log[-i].copy() for i in range(1, n + 1)] - self.last_comm_outgoing_cumulative_count = ( - self.worker.comm_outgoing_cumulative_count + transfer_outgoing_log = [ + transfer_outgoing_log[-i].copy() for i in range(1, n + 1) + ] + self.last_transfer_outgoing_count_total = ( + self.worker.transfer_outgoing_count_total ) - comm_incoming_log = self.worker.comm_incoming_log + transfer_incoming_log = self.worker.transfer_incoming_log n = ( - self.worker.comm_incoming_cumulative_count - - self.last_comm_incoming_cumulative_count + self.worker.transfer_incoming_count_total + - self.last_transfer_incoming_count_total ) - comm_incoming_log = [comm_incoming_log[-i].copy() for i in range(1, n + 1)] - self.last_comm_incoming_cumulative_count = ( - self.worker.comm_incoming_cumulative_count + transfer_incoming_log = [ + transfer_incoming_log[-i].copy() for i in range(1, n + 1) + ] + self.last_transfer_incoming_count_total = ( + self.worker.transfer_incoming_count_total ) for [msgs, source] in [ - [comm_incoming_log, self.comm_incoming], - [comm_outgoing_log, self.comm_outgoing], + [transfer_incoming_log, self.transfer_incoming], + [transfer_outgoing_log, self.transfer_outgoing], ]: for msg in msgs: @@ -235,7 +239,7 @@ def __init__(self, worker, **kwargs): fig = figure( title="Communication History", x_axis_type="datetime", - y_range=[-0.1, worker.state.comm_incoming_limit + 0.5], + y_range=[-0.1, worker.state.transfer_incoming_count_limit + 0.5], height=150, tools="", x_range=x_range, diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index 2609d9b201..f95cf415b9 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -37,7 +37,7 @@ def collect(self): yield GaugeMetricFamily( self.build_name("concurrent_fetch_requests"), "Number of open fetch requests to other workers.", - value=self.server.state.comm_incoming_count, + value=self.server.state.transfer_incoming_count, ) yield GaugeMetricFamily( diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 97df51f79a..40a5836c44 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -647,7 +647,7 @@ async def test_gather_skip(c, s, a): async def test_limit_concurrent_gathering(c, s, a, b): futures = c.map(inc, range(100)) await c.gather(futures) - assert len(a.comm_outgoing_log) + len(b.comm_outgoing_log) < 100 + assert len(a.transfer_outgoing_log) + len(b.transfer_outgoing_log) < 100 @gen_cluster(client=True) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index de314710da..f353f36f50 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -241,7 +241,7 @@ async def test_throttle_incoming_connections(c, s, a, *other_workers): await wait(remote_data) a.status = Status.paused - a.comm_outgoing_count = 2 + a.transfer_outgoing_count = 2 requests = [ await a.get_data(await w.rpc.connect(w.address), keys=[f.key], who=w.address) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 4c768121cf..c1efa0c7f5 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -227,7 +227,7 @@ def random(**kwargs): # Check that there were few transfers unexpected_transfers = [] for worker in workers: - for log in worker.comm_incoming_log: + for log in worker.transfer_incoming_log: keys = log["keys"] # The root-ish tasks should never be transferred assert not any(k.startswith("random") for k in keys), keys diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index db2b63e565..98d8bfad45 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -681,11 +681,11 @@ async def test_message_breakup(c, s, a, b): y = c.submit(lambda _: None, xs, key="y", workers=[b.address]) await y - assert 2 <= len(b.comm_incoming_log) <= 20 - assert 2 <= len(a.comm_outgoing_log) <= 20 + assert 2 <= len(b.transfer_incoming_log) <= 20 + assert 2 <= len(a.transfer_outgoing_log) <= 20 - assert all(msg["who"] == b.address for msg in a.comm_outgoing_log) - assert all(msg["who"] == a.address for msg in a.comm_incoming_log) + assert all(msg["who"] == b.address for msg in a.transfer_outgoing_log) + assert all(msg["who"] == a.address for msg in a.transfer_incoming_log) @gen_cluster(client=True) @@ -764,7 +764,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps): concurrent outgoing connections. If multiple small fetches from the same worker are scheduled all at once, they will result in a single call to gather_dep. """ - a.state.comm_incoming_limit = 2 + a.state.transfer_incoming_count_limit = 2 futures = await c.scatter( {f"x{i}": i for i in range(100)}, workers=[w.address for w in snd_workers], @@ -779,7 +779,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps): while len(a.data) < 100: await asyncio.sleep(0.01) - assert a.state.comm_incoming_bytes == 0 + assert a.state.transfer_incoming_bytes == 0 story = a.state.story("request-dep", "receive-dep") assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker @@ -814,9 +814,9 @@ async def test_share_communication(c, s, w1, w2, w3): await c._replicate([x, y], workers=[w1.address, w2.address]) z = c.submit(add, x, y, workers=w3.address) await wait(z) - assert len(w3.comm_incoming_log) == 2 - assert w1.comm_outgoing_log - assert w2.comm_outgoing_log + assert len(w3.transfer_incoming_log) == 2 + assert w1.transfer_outgoing_log + assert w2.transfer_outgoing_log @pytest.mark.xfail(reason="very high flakiness") @@ -827,8 +827,8 @@ async def test_dont_overlap_communications_to_same_worker(c, s, a, b): await wait([x, y]) z = c.submit(add, x, y, workers=b.address) await wait(z) - assert len(b.comm_incoming_log) == 2 - l1, l2 = b.comm_incoming_log + assert len(b.transfer_incoming_log) == 2 + l1, l2 = b.transfer_incoming_log assert l1["stop"] < l2["start"] @@ -1245,9 +1245,9 @@ async def test_wait_for_outgoing(c, s, a, b): y = c.submit(inc, future, workers=b.address) await wait(y) - assert len(b.comm_incoming_log) == len(a.comm_outgoing_log) == 1 - bb = b.comm_incoming_log[0]["duration"] - aa = a.comm_incoming_log[0]["duration"] + assert len(b.transfer_incoming_log) == len(a.transfer_outgoing_log) == 1 + bb = b.transfer_incoming_log[0]["duration"] + aa = a.transfer_incoming_log[0]["duration"] ratio = aa / bb assert 1 / 3 < ratio < 3 @@ -1263,8 +1263,8 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3): y = c.submit(inc, x, workers=[w2.address]) await wait(y) - assert any(d["who"] == w2.address for d in w1.comm_outgoing_log) - assert not any(d["who"] == w2.address for d in w3.comm_outgoing_log) + assert any(d["who"] == w2.address for d in w1.transfer_outgoing_log) + assert not any(d["who"] == w2.address for d in w3.transfer_outgoing_log) @gen_cluster( @@ -1282,10 +1282,10 @@ async def test_avoid_oversubscription(c, s, *workers): await wait(futures) # Original worker not responsible for all transfers - assert len(workers[0].comm_outgoing_log) < len(workers) - 2 + assert len(workers[0].transfer_outgoing_log) < len(workers) - 2 # Some other workers did some work - assert len([w for w in workers if len(w.comm_outgoing_log) > 0]) >= 3 + assert len([w for w in workers if len(w.transfer_outgoing_log) > 0]) >= 3 @gen_cluster(client=True, worker_kwargs={"metrics": {"my_port": lambda w: w.port}}) @@ -1967,7 +1967,7 @@ async def test_gather_dep_one_worker_always_busy(c, s, a, b): # We will block A for any outgoing communication. This simulates an # overloaded worker which will always return "busy" for get_data requests, # effectively blocking H indefinitely - a.comm_outgoing_count = 10000000 + a.transfer_outgoing_count = 10000000 h = c.submit(add, f, g, key="h", workers=[b.address]) @@ -2029,7 +2029,7 @@ async def test_gather_dep_from_remote_workers_if_all_local_workers_are_busy( ) )["f"] for w in lws: - w.comm_outgoing_count = 10000000 + w.transfer_outgoing_count = 10000000 g = c.submit(inc, f, key="g", workers=[a.address]) assert await g == 2 @@ -2726,9 +2726,7 @@ async def test_acquire_replicas_same_channel(c, s, a, b): ("request-dep", a.address, {fut.key}), ], ) - assert any( - fut.key in msg["keys"] for msg in b.outgocomm_outgoing_loging_transfer_log - ) + assert any(fut.key in msg["keys"] for msg in b.transfer_outgoing_log) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) @@ -3036,7 +3034,7 @@ async def test_missing_released_zombie_tasks(c, s, a, b): Ensure that no fetch/flight tasks are left in the task dict of a worker after everything was released """ - a.comm_outgoing_limit = 0 + a.transfer_outgoing_count_limit = 0 f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[b.address]) key = f1.key @@ -3320,8 +3318,8 @@ async def test_Worker__to_dict(c, s, a): "thread_id", "logs", "config", - "comm_incoming_log", - "comm_outgoing_log", + "transfer_incoming_log", + "transfer_outgoing_log", # Attributes of WorkerMemoryManager "data", "max_spill", @@ -3567,21 +3565,21 @@ async def test_execute_preamble_abort_retirement(c, s): async def test_deprecation_of_renamed_worker_attributes(s, a, b): msg = ( "The `Worker.incoming_count` attribute has been renamed to " - "`Worker.comm_incoming_cumulative_count`" + "`Worker.transfer_incoming_count_total`" ) with pytest.warns(DeprecationWarning, match=msg): - assert a.incoming_count == a.comm_incoming_cumulative_count + assert a.incoming_count == a.transfer_incoming_count_total msg = ( "The `Worker.outgoing_count` attribute has been renamed to " - "`Worker.comm_outgoing_cumulative_count`" + "`Worker.transfer_outgoing_count_total`" ) with pytest.warns(DeprecationWarning, match=msg): - assert a.outgoing_count == a.comm_outgoing_cumulative_count + assert a.outgoing_count == a.transfer_outgoing_count_total msg = ( "The `Worker.outgoing_current_count` attribute has been renamed to " - "`Worker.comm_outgoing_count`" + "`Worker.transfer_outgoing_count`" ) with pytest.warns(DeprecationWarning, match=msg): - assert a.outgoing_current_count == a.comm_outgoing_count + assert a.outgoing_current_count == a.transfer_outgoing_count diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 618edc670d..674b584258 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -673,7 +673,7 @@ async def test_fetch_to_missing_on_busy(c, s, a, b): x = c.submit(inc, 1, key="x", workers=[b.address]) await x - b.comm_outgoing_limit = 0 + b.transfer_outgoing_count_limit = 0 # Crucially, unlike with `c.submit(inc, x, workers=[a.address])`, the scheduler # doesn't keep track of acquire-replicas requests, so it won't proactively inform a # when we call remove_worker later on @@ -923,7 +923,7 @@ async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3): x = c.submit(inc, 1, key="x", workers=[w1.address]) y = c.submit(inc, 2, key="y", workers=[w1.address]) await wait([x, y]) - w1.comm_outgoing_limit = 0 + w1.transfer_outgoing_count_limit = 0 s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1") # The tasks will now flip-flop between fetch and flight every 150ms @@ -985,17 +985,17 @@ async def get_data(self, comm, *args, **kwargs): @gen_cluster() async def test_deprecated_worker_attributes(s, a, b): - n = a.state.comm_threshold_bytes + n = a.state.transfer_incoming_throttle_size_threshold msg = ( "The `Worker.comm_threshold_bytes` attribute has been moved to " - "`Worker.state.comm_threshold_bytes`" + "`Worker.state.transfer_incoming_throttle_size_threshold`" ) with pytest.warns(FutureWarning, match=msg): assert a.comm_threshold_bytes == n with pytest.warns(FutureWarning, match=msg): a.comm_threshold_bytes += 1 assert a.comm_threshold_bytes == n + 1 - assert a.state.comm_threshold_bytes == n + 1 + assert a.state.transfer_incoming_throttle_size_threshold == n + 1 # Old and new names differ msg = ( @@ -1040,7 +1040,7 @@ def test_gather_priority(ws): 3. in case of tie, from the worker with the most tasks queued 4. in case of tie, from a random worker (which is actually deterministic). """ - ws.comm_incoming_limit = 4 + ws.transfer_incoming_count_limit = 4 instructions = ws.handle_stimulus( PauseEvent(stimulus_id="pause"), @@ -1060,8 +1060,8 @@ def test_gather_priority(ws): # This will be fetched first because it's on the same worker as y "x8": ["127.0.0.7:1"], }, - # Substantial nbytes prevents comm_incoming_limit to be overridden by - # comm_threshold_bytes, but it's less than target_message_size + # Substantial nbytes prevents transfer_incoming_count_limit to be overridden by + # transfer_incoming_throttle_size_threshold, but it's less than target_message_size nbytes={f"x{i}": 4 * 2**20 for i in range(1, 9)}, stimulus_id="compute1", ), diff --git a/distributed/utils_test.py b/distributed/utils_test.py index c1649a39b8..de7f9139e1 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -2281,13 +2281,13 @@ def freeze_data_fetching(w: Worker, *, jump_start: bool = False) -> Iterator[Non If True, trigger ensure_communicating on exit; this simulates e.g. an unrelated worker moving out of in_flight_workers. """ - old_out_connections = w.state.comm_incoming_limit - old_comm_threshold = w.state.comm_threshold_bytes - w.state.comm_incoming_limit = 0 - w.state.comm_threshold_bytes = 0 + old_out_connections = w.state.transfer_incoming_count_limit + old_comm_threshold = w.state.transfer_incoming_throttle_size_threshold + w.state.transfer_incoming_count_limit = 0 + w.state.transfer_incoming_throttle_size_threshold = 0 yield - w.state.comm_incoming_limit = old_out_connections - w.state.comm_threshold_bytes = old_comm_threshold + w.state.transfer_incoming_count_limit = old_out_connections + w.state.transfer_incoming_throttle_size_threshold = old_comm_threshold if jump_start: w.status = Status.paused w.status = Status.running diff --git a/distributed/worker.py b/distributed/worker.py index e9f02d693f..6b5c887f4a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -271,10 +271,10 @@ class Worker(BaseWorker, ServerNode): * **services:** ``{str: Server}``: Auxiliary web servers running on this worker * **service_ports:** ``{str: port}``: - * **comm_outgoing_limit**: ``int`` + * **transfer_outgoing_count_limit**: ``int`` The maximum number of concurrent outgoing data transfers. See also - :attr:`distributed.worker_state_machine.WorkerState.comm_incoming_limit`. + :attr:`distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit`. * **batched_stream**: ``BatchedSend`` A batched stream along which we communicate to the scheduler * **log**: ``[(message)]`` @@ -374,7 +374,7 @@ class Worker(BaseWorker, ServerNode): nanny: Nanny | None _lock: threading.Lock - comm_outgoing_limit: int + transfer_outgoing_count_limit: int threads: dict[str, int] # {ts.key: thread ID} active_threads_lock: threading.Lock active_threads: dict[int, str] # {thread ID: ts.key} @@ -383,14 +383,14 @@ class Worker(BaseWorker, ServerNode): profile_keys_history: deque[tuple[float, dict[str, dict[str, Any]]]] profile_recent: dict[str, Any] profile_history: deque[tuple[float, dict[str, Any]]] - comm_incoming_log: deque[dict[str, Any]] - comm_outgoing_log: deque[dict[str, Any]] - #: Number of total communications used to transfer data to other workers. - comm_incoming_cumulative_count: int - #: Number of total communications used to transfer data to other workers. - comm_outgoing_cumulative_count: int - #: Number of open communications used to transfer data to other workers. - comm_outgoing_count: int + transfer_incoming_log: deque[dict[str, Any]] + transfer_outgoing_log: deque[dict[str, Any]] + #: Number of total data transfers from other workers. + transfer_incoming_count_total: int + #: Number of total data transfers to other workers. + transfer_outgoing_count_total: int + #: Number of open data transfers to other workers. + transfer_outgoing_count: int bandwidth: float latency: float profile_cycle_interval: float @@ -522,8 +522,10 @@ def __init__( self.nanny = nanny self._lock = threading.Lock() - comm_incoming_limit = dask.config.get("distributed.worker.connections.outgoing") - self.comm_outgoing_limit = dask.config.get( + transfer_incoming_count_limit = dask.config.get( + "distributed.worker.connections.outgoing" + ) + self.transfer_outgoing_count_limit = dask.config.get( "distributed.worker.connections.incoming" ) @@ -540,11 +542,11 @@ def __init__( if validate is None: validate = dask.config.get("distributed.scheduler.validate") - self.comm_incoming_log = deque(maxlen=100000) - self.comm_incoming_cumulative_count = 0 - self.comm_outgoing_log = deque(maxlen=100000) - self.comm_outgoing_cumulative_count = 0 - self.comm_outgoing_count = 0 + self.transfer_incoming_log = deque(maxlen=100000) + self.transfer_incoming_count_total = 0 + self.transfer_outgoing_log = deque(maxlen=100000) + self.transfer_outgoing_count_total = 0 + self.transfer_outgoing_count = 0 self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth")) self.bandwidth_workers = defaultdict( lambda: (0, 0) @@ -749,7 +751,7 @@ def __init__( threads=self.threads, plugins=self.plugins, resources=resources, - comm_incoming_limit=comm_incoming_limit, + transfer_incoming_count_limit=transfer_incoming_count_limit, validate=validate, transition_counter_max=transition_counter_max, ) @@ -850,8 +852,10 @@ def data(self) -> MutableMapping[str, Any]: actors = DeprecatedWorkerStateAttribute() available_resources = DeprecatedWorkerStateAttribute() busy_workers = DeprecatedWorkerStateAttribute() - comm_nbytes = DeprecatedWorkerStateAttribute(target="comm_incoming_bytes") - comm_threshold_bytes = DeprecatedWorkerStateAttribute() + comm_nbytes = DeprecatedWorkerStateAttribute(target="transfer_incoming_bytes") + comm_threshold_bytes = DeprecatedWorkerStateAttribute( + target="transfer_incoming_throttle_size_threshold" + ) constrained = DeprecatedWorkerStateAttribute() data_needed_per_worker = DeprecatedWorkerStateAttribute(target="data_needed") executed_count = DeprecatedWorkerStateAttribute() @@ -869,7 +873,9 @@ def data(self) -> MutableMapping[str, Any]: ready = DeprecatedWorkerStateAttribute() tasks = DeprecatedWorkerStateAttribute() target_message_size = DeprecatedWorkerStateAttribute() - total_out_connections = DeprecatedWorkerStateAttribute(target="comm_incoming_limit") + total_out_connections = DeprecatedWorkerStateAttribute( + target="transfer_incoming_count_limit" + ) total_resources = DeprecatedWorkerStateAttribute() transition_counter = DeprecatedWorkerStateAttribute() transition_counter_max = DeprecatedWorkerStateAttribute() @@ -1028,8 +1034,8 @@ def _to_dict(self, *, exclude: Container[str] = ()) -> dict: "status": self.status, "logs": self.get_logs(), "config": dask.config.config, - "comm_incoming_log": self.comm_incoming_log, - "comm_outgoing_log": self.comm_outgoing_log, + "transfer_incoming_log": self.transfer_incoming_log, + "transfer_outgoing_log": self.transfer_outgoing_log, } extra = {k: v for k, v in extra.items() if k not in exclude} info.update(extra) @@ -1642,7 +1648,7 @@ async def get_data( start = time() if max_connections is None: - max_connections = self.comm_outgoing_limit + max_connections = self.transfer_outgoing_count_limit # Allow same-host connections more liberally if ( @@ -1660,19 +1666,22 @@ async def get_data( else: throttle_msg = "" - if max_connections is not False and self.comm_outgoing_count >= max_connections: + if ( + max_connections is not False + and self.transfer_outgoing_count >= max_connections + ): logger.debug( "Worker %s has too many open connections to respond to data request " "from %s (%d/%d).%s", self.address, who, - self.comm_outgoing_count, + self.transfer_outgoing_count, max_connections, throttle_msg, ) return {"status": "busy"} - self.comm_outgoing_count += 1 + self.transfer_outgoing_count += 1 data = {k: self.data[k] for k in keys if k in self.data} if len(data) < len(keys): @@ -1704,16 +1713,16 @@ async def get_data( comm.abort() raise finally: - self.comm_outgoing_count -= 1 + self.transfer_outgoing_count -= 1 stop = time() if self.digests is not None: self.digests["get-data-send-duration"].add(stop - start) total_bytes = sum(filter(None, nbytes.values())) - self.comm_outgoing_cumulative_count += 1 + self.transfer_outgoing_count_total += 1 duration = (stop - start) or 0.5 # windows - self.comm_outgoing_log.append( + self.transfer_outgoing_log.append( { "start": start + self.scheduler_delay, "stop": stop + self.scheduler_delay, @@ -1928,7 +1937,7 @@ def _update_metrics_received_data( ) duration = (stop - start) or 0.010 bandwidth = total_bytes / duration - self.comm_incoming_log.append( + self.transfer_incoming_log.append( { "start": start + self.scheduler_delay, "stop": stop + self.scheduler_delay, @@ -1955,7 +1964,7 @@ def _update_metrics_received_data( self.digests["transfer-bandwidth"].add(total_bytes / duration) self.digests["transfer-duration"].add(duration) self.counters["transfer-count"].add(len(data)) - self.comm_incoming_cumulative_count += 1 + self.transfer_incoming_count_total += 1 @fail_hard async def gather_dep( @@ -2537,51 +2546,51 @@ def validate_state(self) -> None: def incoming_count(self): warnings.warn( "The `Worker.incoming_count` attribute has been renamed to " - "`Worker.comm_incoming_cumulative_count`", + "`Worker.transfer_incoming_count_total`", DeprecationWarning, stacklevel=2, ) - return self.comm_incoming_cumulative_count + return self.transfer_incoming_count_total @property def incoming_transfer_log(self): warnings.warn( "The `Worker.incoming_transfer_log` attribute has been renamed to " - "`Worker.comm_incoming_log`", + "`Worker.transfer_incoming_log`", DeprecationWarning, stacklevel=2, ) - return self.comm_incoming_log + return self.transfer_incoming_log @property def outgoing_count(self): warnings.warn( "The `Worker.outgoing_count` attribute has been renamed to " - "`Worker.comm_outgoing_cumulative_count`", + "`Worker.transfer_outgoing_count_total`", DeprecationWarning, stacklevel=2, ) - return self.comm_outgoing_cumulative_count + return self.transfer_outgoing_count_total @property def outgoing_current_count(self): warnings.warn( "The `Worker.outgoing_current_count` attribute has been renamed to " - "`Worker.comm_outgoing_count`", + "`Worker.transfer_outgoing_count`", DeprecationWarning, stacklevel=2, ) - return self.comm_outgoing_count + return self.transfer_outgoing_count @property def outgoing_transfer_log(self): warnings.warn( "The `Worker.outgoing_transfer_log` attribute has been renamed to " - "`Worker.comm_outgoing_log`", + "`Worker.transfer_outgoing_log`", DeprecationWarning, stacklevel=2, ) - return self.comm_outgoing_log + return self.transfer_outgoing_log def get_worker() -> Worker: diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index bc50520fae..d16d36b0b7 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1140,16 +1140,16 @@ class WorkerState: #: dependencies until the current query returns. in_flight_workers: dict[str, set[str]] - #: The total number of incoming bytes through in-flight tasks. - comm_incoming_bytes: int + #: The total size of incoming data transfers for in-flight tasks. + transfer_incoming_bytes: int #: The maximum number of concurrent incoming data transfers from other workers. - #: See also :attr:`distributed.worker.Worker.comm_outgoing_limit`. - comm_incoming_limit: int + #: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`. + transfer_incoming_count_limit: int - #: Ignore :attr:`comm_incoming_limit` as long as :attr:`comm_incoming_bytes` is + #: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is #: less than this value. - comm_threshold_bytes: int + transfer_incoming_throttle_size_threshold: int #: Peer workers that recently returned a busy status. Workers in this set won't be #: asked for additional dependencies for some time. @@ -1235,7 +1235,7 @@ def __init__( threads: dict[str, int] | None = None, plugins: dict[str, WorkerPlugin] | None = None, resources: Mapping[str, float] | None = None, - comm_incoming_limit: int = 9999, + transfer_incoming_count_limit: int = 9999, validate: bool = True, transition_counter_max: int | Literal[False] = False, ): @@ -1264,9 +1264,9 @@ def __init__( ) self.in_flight_workers = {} self.busy_workers = set() - self.comm_incoming_limit = comm_incoming_limit - self.comm_threshold_bytes = int(10e6) - self.comm_incoming_bytes = 0 + self.transfer_incoming_count_limit = transfer_incoming_count_limit + self.transfer_incoming_throttle_size_threshold = int(10e6) + self.transfer_incoming_bytes = 0 self.missing_dep_flight = set() self.generation = 0 self.ready = HeapSet(key=operator.attrgetter("priority")) @@ -1350,8 +1350,8 @@ def in_flight_tasks_count(self) -> int: return len(self.in_flight_tasks) @property - def comm_incoming_count(self) -> int: - """Count of open communications used to receive data from other workers. + def transfer_incoming_count(self) -> int: + """Count of open data transfers from other workers. See also -------- @@ -1462,8 +1462,9 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: if not self.running or not self.data_needed: return {}, [] if ( - self.comm_incoming_count >= self.comm_incoming_limit - and self.comm_incoming_bytes >= self.comm_threshold_bytes + self.transfer_incoming_count >= self.transfer_incoming_count_limit + and self.transfer_incoming_bytes + >= self.transfer_incoming_throttle_size_threshold ): return {}, [] @@ -1485,8 +1486,8 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: worker, len(available_tasks), len(self.data_needed), - self.comm_incoming_count, - self.comm_incoming_limit, + self.transfer_incoming_count, + self.transfer_incoming_count_limit, len(self.busy_workers), ) self.log.append( @@ -1515,10 +1516,11 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: ) self.in_flight_workers[worker] = to_gather_keys - self.comm_incoming_bytes += total_nbytes + self.transfer_incoming_bytes += total_nbytes if ( - self.comm_incoming_count >= self.comm_incoming_limit - and self.comm_incoming_bytes >= self.comm_threshold_bytes + self.transfer_incoming_count >= self.transfer_incoming_count_limit + and self.transfer_incoming_bytes + >= self.transfer_incoming_throttle_size_threshold ): break @@ -2765,7 +2767,7 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState] -------- _execute_done_common """ - self.comm_incoming_bytes -= ev.total_nbytes + self.transfer_incoming_bytes -= ev.total_nbytes keys = self.in_flight_workers.pop(ev.worker) for key in keys: ts = self.tasks[key]