Skip to content

Commit

Permalink
comm -> transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 29, 2022
1 parent 0742765 commit 4b073aa
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 135 deletions.
48 changes: 26 additions & 22 deletions distributed/dashboard/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def update(self):
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
"Ready": [len(w.state.ready)],
"Waiting": [w.state.waiting_for_data_count],
"Connections": [w.state.comm_incoming_count],
"Connections": [w.state.transfer_incoming_count],
"Serving": [len(w._comms)],
}
update(self.source, d)
Expand All @@ -114,8 +114,8 @@ def __init__(self, worker, height=300, **kwargs):
"total",
]

self.comm_incoming = ColumnDataSource({name: [] for name in names})
self.comm_outgoing = ColumnDataSource({name: [] for name in names})
self.transfer_incoming = ColumnDataSource({name: [] for name in names})
self.transfer_outgoing = ColumnDataSource({name: [] for name in names})

x_range = DataRange1d(range_padding=0)
y_range = DataRange1d(range_padding=0)
Expand All @@ -131,7 +131,7 @@ def __init__(self, worker, height=300, **kwargs):
)

fig.rect(
source=self.comm_incoming,
source=self.transfer_incoming,
x="middle",
y="y",
width="duration",
Expand All @@ -140,7 +140,7 @@ def __init__(self, worker, height=300, **kwargs):
alpha="alpha",
)
fig.rect(
source=self.comm_outgoing,
source=self.transfer_outgoing,
x="middle",
y="y",
width="duration",
Expand All @@ -159,36 +159,40 @@ def __init__(self, worker, height=300, **kwargs):

self.root = fig

self.last_comm_incoming_cumulative_count = 0
self.last_comm_outgoing_cumulative_count = 0
self.last_transfer_incoming_count_total = 0
self.last_transfer_outgoing_count_total = 0
self.who = dict()

@without_property_validation
@log_errors
def update(self):
comm_outgoing_log = self.worker.comm_outgoing_log
transfer_outgoing_log = self.worker.transfer_outgoing_log
n = (
self.worker.comm_outgoing_cumulative_count
- self.last_comm_outgoing_cumulative_count
self.worker.transfer_outgoing_count_total
- self.last_transfer_outgoing_count_total
)
comm_outgoing_log = [comm_outgoing_log[-i].copy() for i in range(1, n + 1)]
self.last_comm_outgoing_cumulative_count = (
self.worker.comm_outgoing_cumulative_count
transfer_outgoing_log = [
transfer_outgoing_log[-i].copy() for i in range(1, n + 1)
]
self.last_transfer_outgoing_count_total = (
self.worker.transfer_outgoing_count_total
)

comm_incoming_log = self.worker.comm_incoming_log
transfer_incoming_log = self.worker.transfer_incoming_log
n = (
self.worker.comm_incoming_cumulative_count
- self.last_comm_incoming_cumulative_count
self.worker.transfer_incoming_count_total
- self.last_transfer_incoming_count_total
)
comm_incoming_log = [comm_incoming_log[-i].copy() for i in range(1, n + 1)]
self.last_comm_incoming_cumulative_count = (
self.worker.comm_incoming_cumulative_count
transfer_incoming_log = [
transfer_incoming_log[-i].copy() for i in range(1, n + 1)
]
self.last_transfer_incoming_count_total = (
self.worker.transfer_incoming_count_total
)

for [msgs, source] in [
[comm_incoming_log, self.comm_incoming],
[comm_outgoing_log, self.comm_outgoing],
[transfer_incoming_log, self.transfer_incoming],
[transfer_outgoing_log, self.transfer_outgoing],
]:

for msg in msgs:
Expand Down Expand Up @@ -235,7 +239,7 @@ def __init__(self, worker, **kwargs):
fig = figure(
title="Communication History",
x_axis_type="datetime",
y_range=[-0.1, worker.state.comm_incoming_limit + 0.5],
y_range=[-0.1, worker.state.transfer_incoming_count_limit + 0.5],
height=150,
tools="",
x_range=x_range,
Expand Down
2 changes: 1 addition & 1 deletion distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def collect(self):
yield GaugeMetricFamily(
self.build_name("concurrent_fetch_requests"),
"Number of open fetch requests to other workers.",
value=self.server.state.comm_incoming_count,
value=self.server.state.transfer_incoming_count,
)

yield GaugeMetricFamily(
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ async def test_gather_skip(c, s, a):
async def test_limit_concurrent_gathering(c, s, a, b):
futures = c.map(inc, range(100))
await c.gather(futures)
assert len(a.comm_outgoing_log) + len(b.comm_outgoing_log) < 100
assert len(a.transfer_outgoing_log) + len(b.transfer_outgoing_log) < 100


@gen_cluster(client=True)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ async def test_throttle_incoming_connections(c, s, a, *other_workers):
await wait(remote_data)

a.status = Status.paused
a.comm_outgoing_count = 2
a.transfer_outgoing_count = 2

requests = [
await a.get_data(await w.rpc.connect(w.address), keys=[f.key], who=w.address)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def random(**kwargs):
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.comm_incoming_log:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not any(k.startswith("random") for k in keys), keys
Expand Down
62 changes: 30 additions & 32 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,11 @@ async def test_message_breakup(c, s, a, b):
y = c.submit(lambda _: None, xs, key="y", workers=[b.address])
await y

assert 2 <= len(b.comm_incoming_log) <= 20
assert 2 <= len(a.comm_outgoing_log) <= 20
assert 2 <= len(b.transfer_incoming_log) <= 20
assert 2 <= len(a.transfer_outgoing_log) <= 20

assert all(msg["who"] == b.address for msg in a.comm_outgoing_log)
assert all(msg["who"] == a.address for msg in a.comm_incoming_log)
assert all(msg["who"] == b.address for msg in a.transfer_outgoing_log)
assert all(msg["who"] == a.address for msg in a.transfer_incoming_log)


@gen_cluster(client=True)
Expand Down Expand Up @@ -764,7 +764,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
concurrent outgoing connections. If multiple small fetches from the same worker are
scheduled all at once, they will result in a single call to gather_dep.
"""
a.state.comm_incoming_limit = 2
a.state.transfer_incoming_count_limit = 2
futures = await c.scatter(
{f"x{i}": i for i in range(100)},
workers=[w.address for w in snd_workers],
Expand All @@ -779,7 +779,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
while len(a.data) < 100:
await asyncio.sleep(0.01)

assert a.state.comm_incoming_bytes == 0
assert a.state.transfer_incoming_bytes == 0

story = a.state.story("request-dep", "receive-dep")
assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker
Expand Down Expand Up @@ -814,9 +814,9 @@ async def test_share_communication(c, s, w1, w2, w3):
await c._replicate([x, y], workers=[w1.address, w2.address])
z = c.submit(add, x, y, workers=w3.address)
await wait(z)
assert len(w3.comm_incoming_log) == 2
assert w1.comm_outgoing_log
assert w2.comm_outgoing_log
assert len(w3.transfer_incoming_log) == 2
assert w1.transfer_outgoing_log
assert w2.transfer_outgoing_log


@pytest.mark.xfail(reason="very high flakiness")
Expand All @@ -827,8 +827,8 @@ async def test_dont_overlap_communications_to_same_worker(c, s, a, b):
await wait([x, y])
z = c.submit(add, x, y, workers=b.address)
await wait(z)
assert len(b.comm_incoming_log) == 2
l1, l2 = b.comm_incoming_log
assert len(b.transfer_incoming_log) == 2
l1, l2 = b.transfer_incoming_log

assert l1["stop"] < l2["start"]

Expand Down Expand Up @@ -1245,9 +1245,9 @@ async def test_wait_for_outgoing(c, s, a, b):
y = c.submit(inc, future, workers=b.address)
await wait(y)

assert len(b.comm_incoming_log) == len(a.comm_outgoing_log) == 1
bb = b.comm_incoming_log[0]["duration"]
aa = a.comm_incoming_log[0]["duration"]
assert len(b.transfer_incoming_log) == len(a.transfer_outgoing_log) == 1
bb = b.transfer_incoming_log[0]["duration"]
aa = a.transfer_incoming_log[0]["duration"]
ratio = aa / bb

assert 1 / 3 < ratio < 3
Expand All @@ -1263,8 +1263,8 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3):
y = c.submit(inc, x, workers=[w2.address])
await wait(y)

assert any(d["who"] == w2.address for d in w1.comm_outgoing_log)
assert not any(d["who"] == w2.address for d in w3.comm_outgoing_log)
assert any(d["who"] == w2.address for d in w1.transfer_outgoing_log)
assert not any(d["who"] == w2.address for d in w3.transfer_outgoing_log)


@gen_cluster(
Expand All @@ -1282,10 +1282,10 @@ async def test_avoid_oversubscription(c, s, *workers):
await wait(futures)

# Original worker not responsible for all transfers
assert len(workers[0].comm_outgoing_log) < len(workers) - 2
assert len(workers[0].transfer_outgoing_log) < len(workers) - 2

# Some other workers did some work
assert len([w for w in workers if len(w.comm_outgoing_log) > 0]) >= 3
assert len([w for w in workers if len(w.transfer_outgoing_log) > 0]) >= 3


@gen_cluster(client=True, worker_kwargs={"metrics": {"my_port": lambda w: w.port}})
Expand Down Expand Up @@ -1967,7 +1967,7 @@ async def test_gather_dep_one_worker_always_busy(c, s, a, b):
# We will block A for any outgoing communication. This simulates an
# overloaded worker which will always return "busy" for get_data requests,
# effectively blocking H indefinitely
a.comm_outgoing_count = 10000000
a.transfer_outgoing_count = 10000000

h = c.submit(add, f, g, key="h", workers=[b.address])

Expand Down Expand Up @@ -2029,7 +2029,7 @@ async def test_gather_dep_from_remote_workers_if_all_local_workers_are_busy(
)
)["f"]
for w in lws:
w.comm_outgoing_count = 10000000
w.transfer_outgoing_count = 10000000

g = c.submit(inc, f, key="g", workers=[a.address])
assert await g == 2
Expand Down Expand Up @@ -2726,9 +2726,7 @@ async def test_acquire_replicas_same_channel(c, s, a, b):
("request-dep", a.address, {fut.key}),
],
)
assert any(
fut.key in msg["keys"] for msg in b.outgocomm_outgoing_loging_transfer_log
)
assert any(fut.key in msg["keys"] for msg in b.transfer_outgoing_log)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
Expand Down Expand Up @@ -3036,7 +3034,7 @@ async def test_missing_released_zombie_tasks(c, s, a, b):
Ensure that no fetch/flight tasks are left in the task dict of a
worker after everything was released
"""
a.comm_outgoing_limit = 0
a.transfer_outgoing_count_limit = 0
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
key = f1.key
Expand Down Expand Up @@ -3320,8 +3318,8 @@ async def test_Worker__to_dict(c, s, a):
"thread_id",
"logs",
"config",
"comm_incoming_log",
"comm_outgoing_log",
"transfer_incoming_log",
"transfer_outgoing_log",
# Attributes of WorkerMemoryManager
"data",
"max_spill",
Expand Down Expand Up @@ -3567,21 +3565,21 @@ async def test_execute_preamble_abort_retirement(c, s):
async def test_deprecation_of_renamed_worker_attributes(s, a, b):
msg = (
"The `Worker.incoming_count` attribute has been renamed to "
"`Worker.comm_incoming_cumulative_count`"
"`Worker.transfer_incoming_count_total`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.incoming_count == a.comm_incoming_cumulative_count
assert a.incoming_count == a.transfer_incoming_count_total

msg = (
"The `Worker.outgoing_count` attribute has been renamed to "
"`Worker.comm_outgoing_cumulative_count`"
"`Worker.transfer_outgoing_count_total`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_count == a.comm_outgoing_cumulative_count
assert a.outgoing_count == a.transfer_outgoing_count_total

msg = (
"The `Worker.outgoing_current_count` attribute has been renamed to "
"`Worker.comm_outgoing_count`"
"`Worker.transfer_outgoing_count`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_current_count == a.comm_outgoing_count
assert a.outgoing_current_count == a.transfer_outgoing_count
16 changes: 8 additions & 8 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ async def test_fetch_to_missing_on_busy(c, s, a, b):
x = c.submit(inc, 1, key="x", workers=[b.address])
await x

b.comm_outgoing_limit = 0
b.transfer_outgoing_count_limit = 0
# Crucially, unlike with `c.submit(inc, x, workers=[a.address])`, the scheduler
# doesn't keep track of acquire-replicas requests, so it won't proactively inform a
# when we call remove_worker later on
Expand Down Expand Up @@ -923,7 +923,7 @@ async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3):
x = c.submit(inc, 1, key="x", workers=[w1.address])
y = c.submit(inc, 2, key="y", workers=[w1.address])
await wait([x, y])
w1.comm_outgoing_limit = 0
w1.transfer_outgoing_count_limit = 0
s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1")

# The tasks will now flip-flop between fetch and flight every 150ms
Expand Down Expand Up @@ -985,17 +985,17 @@ async def get_data(self, comm, *args, **kwargs):

@gen_cluster()
async def test_deprecated_worker_attributes(s, a, b):
n = a.state.comm_threshold_bytes
n = a.state.transfer_incoming_throttle_size_threshold
msg = (
"The `Worker.comm_threshold_bytes` attribute has been moved to "
"`Worker.state.comm_threshold_bytes`"
"`Worker.state.transfer_incoming_throttle_size_threshold`"
)
with pytest.warns(FutureWarning, match=msg):
assert a.comm_threshold_bytes == n
with pytest.warns(FutureWarning, match=msg):
a.comm_threshold_bytes += 1
assert a.comm_threshold_bytes == n + 1
assert a.state.comm_threshold_bytes == n + 1
assert a.state.transfer_incoming_throttle_size_threshold == n + 1

# Old and new names differ
msg = (
Expand Down Expand Up @@ -1040,7 +1040,7 @@ def test_gather_priority(ws):
3. in case of tie, from the worker with the most tasks queued
4. in case of tie, from a random worker (which is actually deterministic).
"""
ws.comm_incoming_limit = 4
ws.transfer_incoming_count_limit = 4

instructions = ws.handle_stimulus(
PauseEvent(stimulus_id="pause"),
Expand All @@ -1060,8 +1060,8 @@ def test_gather_priority(ws):
# This will be fetched first because it's on the same worker as y
"x8": ["127.0.0.7:1"],
},
# Substantial nbytes prevents comm_incoming_limit to be overridden by
# comm_threshold_bytes, but it's less than target_message_size
# Substantial nbytes prevents transfer_incoming_count_limit to be overridden by
# transfer_incoming_throttle_size_threshold, but it's less than target_message_size
nbytes={f"x{i}": 4 * 2**20 for i in range(1, 9)},
stimulus_id="compute1",
),
Expand Down
12 changes: 6 additions & 6 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2281,13 +2281,13 @@ def freeze_data_fetching(w: Worker, *, jump_start: bool = False) -> Iterator[Non
If True, trigger ensure_communicating on exit; this simulates e.g. an unrelated
worker moving out of in_flight_workers.
"""
old_out_connections = w.state.comm_incoming_limit
old_comm_threshold = w.state.comm_threshold_bytes
w.state.comm_incoming_limit = 0
w.state.comm_threshold_bytes = 0
old_out_connections = w.state.transfer_incoming_count_limit
old_comm_threshold = w.state.transfer_incoming_throttle_size_threshold
w.state.transfer_incoming_count_limit = 0
w.state.transfer_incoming_throttle_size_threshold = 0
yield
w.state.comm_incoming_limit = old_out_connections
w.state.comm_threshold_bytes = old_comm_threshold
w.state.transfer_incoming_count_limit = old_out_connections
w.state.transfer_incoming_throttle_size_threshold = old_comm_threshold
if jump_start:
w.status = Status.paused
w.status = Status.running
Expand Down
Loading

0 comments on commit 4b073aa

Please sign in to comment.