Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve differentiation between incoming/outgoing connections and transfers #6933

Merged
merged 23 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions distributed/dashboard/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def update(self):
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
"Ready": [len(w.state.ready)],
"Waiting": [w.state.waiting_for_data_count],
"Connections": [len(w.state.in_flight_workers)],
"Connections": [w.state.transfer_incoming_count],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I don't want to touch "Connections".

"Serving": [len(w._comms)],
}
update(self.source, d)
Expand All @@ -114,8 +114,8 @@ def __init__(self, worker, height=300, **kwargs):
"total",
]

self.incoming = ColumnDataSource({name: [] for name in names})
self.outgoing = ColumnDataSource({name: [] for name in names})
self.transfer_incoming = ColumnDataSource({name: [] for name in names})
self.transfer_outgoing = ColumnDataSource({name: [] for name in names})

x_range = DataRange1d(range_padding=0)
y_range = DataRange1d(range_padding=0)
Expand All @@ -131,7 +131,7 @@ def __init__(self, worker, height=300, **kwargs):
)

fig.rect(
source=self.incoming,
source=self.transfer_incoming,
x="middle",
y="y",
width="duration",
Expand All @@ -140,7 +140,7 @@ def __init__(self, worker, height=300, **kwargs):
alpha="alpha",
)
fig.rect(
source=self.outgoing,
source=self.transfer_outgoing,
x="middle",
y="y",
width="duration",
Expand All @@ -159,26 +159,40 @@ def __init__(self, worker, height=300, **kwargs):

self.root = fig

self.last_incoming = 0
self.last_outgoing = 0
self.last_transfer_incoming_count_total = 0
self.last_transfer_outgoing_count_total = 0
self.who = dict()

@without_property_validation
@log_errors
def update(self):
outgoing = self.worker.outgoing_transfer_log
n = self.worker.outgoing_count - self.last_outgoing
outgoing = [outgoing[-i].copy() for i in range(1, n + 1)]
self.last_outgoing = self.worker.outgoing_count
transfer_outgoing_log = self.worker.transfer_outgoing_log
n = (
self.worker.transfer_outgoing_count_total
- self.last_transfer_outgoing_count_total
)
transfer_outgoing_log = [
transfer_outgoing_log[-i].copy() for i in range(1, n + 1)
]
self.last_transfer_outgoing_count_total = (
self.worker.transfer_outgoing_count_total
)

incoming = self.worker.incoming_transfer_log
n = self.worker.incoming_count - self.last_incoming
incoming = [incoming[-i].copy() for i in range(1, n + 1)]
self.last_incoming = self.worker.incoming_count
transfer_incoming_log = self.worker.transfer_incoming_log
n = (
self.worker.state.transfer_incoming_count_total
- self.last_transfer_incoming_count_total
)
transfer_incoming_log = [
transfer_incoming_log[-i].copy() for i in range(1, n + 1)
]
self.last_transfer_incoming_count_total = (
self.worker.state.transfer_incoming_count_total
)

for [msgs, source] in [
[incoming, self.incoming],
[outgoing, self.outgoing],
[transfer_incoming_log, self.transfer_incoming],
[transfer_outgoing_log, self.transfer_outgoing],
]:

for msg in msgs:
Expand Down Expand Up @@ -225,7 +239,7 @@ def __init__(self, worker, **kwargs):
fig = figure(
title="Communication History",
x_axis_type="datetime",
y_range=[-0.1, worker.state.total_out_connections + 0.5],
y_range=[-0.1, worker.state.transfer_incoming_count_limit + 0.5],
height=150,
tools="",
x_range=x_range,
Expand All @@ -247,7 +261,7 @@ def update(self):
{
"x": [time() * 1000],
"out": [len(self.worker._comms)],
"in": [len(self.worker.state.in_flight_workers)],
"in": [self.worker.state.transfer_incoming_count],
},
10000,
)
Expand Down
8 changes: 4 additions & 4 deletions distributed/dashboard/tests/test_worker_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ async def test_CommunicatingStream(c, s, a, b):
aa.update()
bb.update()

assert len(first(aa.outgoing.data.values())) and len(
first(bb.outgoing.data.values())
assert len(first(aa.transfer_outgoing.data.values())) and len(
first(bb.transfer_outgoing.data.values())
)
assert len(first(aa.incoming.data.values())) and len(
first(bb.incoming.data.values())
assert len(first(aa.transfer_incoming.data.values())) and len(
first(bb.transfer_incoming.data.values())
)


Expand Down
2 changes: 1 addition & 1 deletion distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def collect(self):
yield GaugeMetricFamily(
self.build_name("concurrent_fetch_requests"),
"Number of open fetch requests to other workers.",
value=len(self.server.state.in_flight_workers),
value=self.server.state.transfer_incoming_count,
)

yield GaugeMetricFamily(
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ async def test_gather_skip(c, s, a):
async def test_limit_concurrent_gathering(c, s, a, b):
futures = c.map(inc, range(100))
await c.gather(futures)
assert len(a.outgoing_transfer_log) + len(b.outgoing_transfer_log) < 100
assert len(a.transfer_outgoing_log) + len(b.transfer_outgoing_log) < 100


@gen_cluster(client=True)
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async def test_nanny_timeout(c, s, a):
clean_kwargs={"threads": False},
config={"distributed.worker.memory.pause": False},
)
async def test_throttle_outgoing_connections(c, s, a, *other_workers):
async def test_throttle_outgoing_transfers(c, s, a, *other_workers):
# Put a bunch of small data on worker a
logging.getLogger("distributed.worker").setLevel(logging.DEBUG)
remote_data = c.map(
Expand All @@ -241,7 +241,7 @@ async def test_throttle_outgoing_connections(c, s, a, *other_workers):
await wait(remote_data)

a.status = Status.paused
a.outgoing_current_count = 2
a.transfer_outgoing_count = 2

requests = [
await a.get_data(await w.rpc.connect(w.address), keys=[f.key], who=w.address)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def random(**kwargs):
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.incoming_transfer_log:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not any(k.startswith("random") for k in keys), keys
Expand Down
83 changes: 52 additions & 31 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,19 +673,19 @@ async def test_clean(c, s, a, b):
@gen_cluster(client=True)
async def test_message_breakup(c, s, a, b):
n = 100_000
a.state.target_message_size = 10 * n
b.state.target_message_size = 10 * n
a.state.transfer_message_target_bytes = 10 * n
b.state.transfer_message_target_bytes = 10 * n
xs = [
c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30)
]
y = c.submit(lambda _: None, xs, key="y", workers=[b.address])
await y

assert 2 <= len(b.incoming_transfer_log) <= 20
assert 2 <= len(a.outgoing_transfer_log) <= 20
assert 2 <= len(b.transfer_incoming_log) <= 20
assert 2 <= len(a.transfer_outgoing_log) <= 20

assert all(msg["who"] == b.address for msg in a.outgoing_transfer_log)
assert all(msg["who"] == a.address for msg in a.incoming_transfer_log)
assert all(msg["who"] == b.address for msg in a.transfer_outgoing_log)
assert all(msg["who"] == a.address for msg in a.transfer_incoming_log)


@gen_cluster(client=True)
Expand Down Expand Up @@ -764,7 +764,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
concurrent outgoing connections. If multiple small fetches from the same worker are
scheduled all at once, they will result in a single call to gather_dep.
"""
a.state.total_out_connections = 2
a.state.transfer_incoming_count_limit = 2
futures = await c.scatter(
{f"x{i}": i for i in range(100)},
workers=[w.address for w in snd_workers],
Expand All @@ -779,7 +779,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
while len(a.data) < 100:
await asyncio.sleep(0.01)

assert a.state.comm_nbytes == 0
assert a.state.transfer_incoming_bytes == 0

story = a.state.story("request-dep", "receive-dep")
assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker
Expand Down Expand Up @@ -808,27 +808,31 @@ async def test_multiple_transfers(c, s, w1, w2, w3):
@pytest.mark.xfail(reason="very high flakiness")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_share_communication(c, s, w1, w2, w3):
x = c.submit(mul, b"1", int(w3.target_message_size + 1), workers=w1.address)
y = c.submit(mul, b"2", int(w3.target_message_size + 1), workers=w2.address)
x = c.submit(
mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address
)
y = c.submit(
mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address
)
await wait([x, y])
await c._replicate([x, y], workers=[w1.address, w2.address])
z = c.submit(add, x, y, workers=w3.address)
await wait(z)
assert len(w3.incoming_transfer_log) == 2
assert w1.outgoing_transfer_log
assert w2.outgoing_transfer_log
assert len(w3.transfer_incoming_log) == 2
assert w1.transfer_outgoing_log
assert w2.transfer_outgoing_log


@pytest.mark.xfail(reason="very high flakiness")
@gen_cluster(client=True)
async def test_dont_overlap_communications_to_same_worker(c, s, a, b):
x = c.submit(mul, b"1", int(b.target_message_size + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.target_message_size + 1), workers=a.address)
x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address)
await wait([x, y])
z = c.submit(add, x, y, workers=b.address)
await wait(z)
assert len(b.incoming_transfer_log) == 2
l1, l2 = b.incoming_transfer_log
assert len(b.transfer_incoming_log) == 2
l1, l2 = b.transfer_incoming_log

assert l1["stop"] < l2["start"]

Expand Down Expand Up @@ -1245,8 +1249,8 @@ async def test_wait_for_outgoing(c, s, a, b):
y = c.submit(inc, future, workers=b.address)
await wait(y)

assert len(b.incoming_transfer_log) == len(a.outgoing_transfer_log) == 1
bb = b.incoming_transfer_log[0]["duration"]
assert len(b.transfer_incoming_log) == len(a.transfer_outgoing_log) == 1
bb = b.transfer_incoming_log[0]["duration"]
aa = a.outgoing_transfer_log[0]["duration"]
ratio = aa / bb

Expand All @@ -1263,8 +1267,8 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3):
y = c.submit(inc, x, workers=[w2.address])
await wait(y)

assert any(d["who"] == w2.address for d in w1.outgoing_transfer_log)
assert not any(d["who"] == w2.address for d in w3.outgoing_transfer_log)
assert any(d["who"] == w2.address for d in w1.transfer_outgoing_log)
assert not any(d["who"] == w2.address for d in w3.transfer_outgoing_log)


@gen_cluster(
Expand All @@ -1282,10 +1286,10 @@ async def test_avoid_oversubscription(c, s, *workers):
await wait(futures)

# Original worker not responsible for all transfers
assert len(workers[0].outgoing_transfer_log) < len(workers) - 2
assert len(workers[0].transfer_outgoing_log) < len(workers) - 2

# Some other workers did some work
assert len([w for w in workers if len(w.outgoing_transfer_log) > 0]) >= 3
assert len([w for w in workers if len(w.transfer_outgoing_log) > 0]) >= 3


@gen_cluster(client=True, worker_kwargs={"metrics": {"my_port": lambda w: w.port}})
Expand Down Expand Up @@ -1967,7 +1971,7 @@ async def test_gather_dep_one_worker_always_busy(c, s, a, b):
# We will block A for any outgoing communication. This simulates an
# overloaded worker which will always return "busy" for get_data requests,
# effectively blocking H indefinitely
a.outgoing_current_count = 10000000
a.transfer_outgoing_count = 10000000

h = c.submit(add, f, g, key="h", workers=[b.address])

Expand Down Expand Up @@ -2029,7 +2033,7 @@ async def test_gather_dep_from_remote_workers_if_all_local_workers_are_busy(
)
)["f"]
for w in lws:
w.outgoing_current_count = 10000000
w.transfer_outgoing_count = 10000000

g = c.submit(inc, f, key="g", workers=[a.address])
assert await g == 2
Expand Down Expand Up @@ -2726,7 +2730,7 @@ async def test_acquire_replicas_same_channel(c, s, a, b):
("request-dep", a.address, {fut.key}),
],
)
assert any(fut.key in msg["keys"] for msg in b.incoming_transfer_log)
assert any(fut.key in msg["keys"] for msg in b.transfer_incoming_log)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
Expand Down Expand Up @@ -3005,9 +3009,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_acquire_replicas_large_data(c, s, a):
"""When acquire-replicas is used to acquire multiple sizeable tasks, it respects
target_message_size and acquires them over multiple iterations.
transfer_message_target_bytes and acquires them over multiple iterations.
"""
size = a.state.target_message_size // 5 - 10_000
size = a.state.transfer_message_target_bytes // 5 - 10_000

class C:
def __sizeof__(self):
Expand All @@ -3034,7 +3038,7 @@ async def test_missing_released_zombie_tasks(c, s, a, b):
Ensure that no fetch/flight tasks are left in the task dict of a
worker after everything was released
"""
a.total_in_connections = 0
a.transfer_outgoing_count_limit = 0
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
key = f1.key
Expand Down Expand Up @@ -3318,8 +3322,8 @@ async def test_Worker__to_dict(c, s, a):
"thread_id",
"logs",
"config",
"incoming_transfer_log",
"outgoing_transfer_log",
"transfer_incoming_log",
"transfer_outgoing_log",
# Attributes of WorkerMemoryManager
"data",
"max_spill",
Expand Down Expand Up @@ -3559,3 +3563,20 @@ async def test_execute_preamble_abort_retirement(c, s):

# Test that y does not get stuck.
assert await y == 2


@gen_cluster()
async def test_deprecation_of_renamed_worker_attributes(s, a, b):
msg = (
"The `Worker.outgoing_count` attribute has been renamed to "
"`Worker.transfer_outgoing_count_total`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_count == a.transfer_outgoing_count_total

msg = (
"The `Worker.outgoing_current_count` attribute has been renamed to "
"`Worker.transfer_outgoing_count`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_current_count == a.transfer_outgoing_count
Loading