Skip to content

Commit

Permalink
Merge branch 'main' into workerstate_doc
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Aug 30, 2022
2 parents 67c3ad0 + 34b18a9 commit e40337d
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 137 deletions.
52 changes: 33 additions & 19 deletions distributed/dashboard/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def update(self):
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
"Ready": [len(w.state.ready)],
"Waiting": [w.state.waiting_for_data_count],
"Connections": [len(w.state.in_flight_workers)],
"Connections": [w.state.transfer_incoming_count],
"Serving": [len(w._comms)],
}
update(self.source, d)
Expand All @@ -114,8 +114,8 @@ def __init__(self, worker, height=300, **kwargs):
"total",
]

self.incoming = ColumnDataSource({name: [] for name in names})
self.outgoing = ColumnDataSource({name: [] for name in names})
self.transfer_incoming = ColumnDataSource({name: [] for name in names})
self.transfer_outgoing = ColumnDataSource({name: [] for name in names})

x_range = DataRange1d(range_padding=0)
y_range = DataRange1d(range_padding=0)
Expand All @@ -131,7 +131,7 @@ def __init__(self, worker, height=300, **kwargs):
)

fig.rect(
source=self.incoming,
source=self.transfer_incoming,
x="middle",
y="y",
width="duration",
Expand All @@ -140,7 +140,7 @@ def __init__(self, worker, height=300, **kwargs):
alpha="alpha",
)
fig.rect(
source=self.outgoing,
source=self.transfer_outgoing,
x="middle",
y="y",
width="duration",
Expand All @@ -159,26 +159,40 @@ def __init__(self, worker, height=300, **kwargs):

self.root = fig

self.last_incoming = 0
self.last_outgoing = 0
self.last_transfer_incoming_count_total = 0
self.last_transfer_outgoing_count_total = 0
self.who = dict()

@without_property_validation
@log_errors
def update(self):
outgoing = self.worker.outgoing_transfer_log
n = self.worker.outgoing_count - self.last_outgoing
outgoing = [outgoing[-i].copy() for i in range(1, n + 1)]
self.last_outgoing = self.worker.outgoing_count
transfer_outgoing_log = self.worker.transfer_outgoing_log
n = (
self.worker.transfer_outgoing_count_total
- self.last_transfer_outgoing_count_total
)
transfer_outgoing_log = [
transfer_outgoing_log[-i].copy() for i in range(1, n + 1)
]
self.last_transfer_outgoing_count_total = (
self.worker.transfer_outgoing_count_total
)

incoming = self.worker.incoming_transfer_log
n = self.worker.incoming_count - self.last_incoming
incoming = [incoming[-i].copy() for i in range(1, n + 1)]
self.last_incoming = self.worker.incoming_count
transfer_incoming_log = self.worker.transfer_incoming_log
n = (
self.worker.state.transfer_incoming_count_total
- self.last_transfer_incoming_count_total
)
transfer_incoming_log = [
transfer_incoming_log[-i].copy() for i in range(1, n + 1)
]
self.last_transfer_incoming_count_total = (
self.worker.state.transfer_incoming_count_total
)

for [msgs, source] in [
[incoming, self.incoming],
[outgoing, self.outgoing],
[transfer_incoming_log, self.transfer_incoming],
[transfer_outgoing_log, self.transfer_outgoing],
]:

for msg in msgs:
Expand Down Expand Up @@ -225,7 +239,7 @@ def __init__(self, worker, **kwargs):
fig = figure(
title="Communication History",
x_axis_type="datetime",
y_range=[-0.1, worker.state.total_out_connections + 0.5],
y_range=[-0.1, worker.state.transfer_incoming_count_limit + 0.5],
height=150,
tools="",
x_range=x_range,
Expand All @@ -247,7 +261,7 @@ def update(self):
{
"x": [time() * 1000],
"out": [len(self.worker._comms)],
"in": [len(self.worker.state.in_flight_workers)],
"in": [self.worker.state.transfer_incoming_count],
},
10000,
)
Expand Down
8 changes: 4 additions & 4 deletions distributed/dashboard/tests/test_worker_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ async def test_CommunicatingStream(c, s, a, b):
aa.update()
bb.update()

assert len(first(aa.outgoing.data.values())) and len(
first(bb.outgoing.data.values())
assert len(first(aa.transfer_outgoing.data.values())) and len(
first(bb.transfer_outgoing.data.values())
)
assert len(first(aa.incoming.data.values())) and len(
first(bb.incoming.data.values())
assert len(first(aa.transfer_incoming.data.values())) and len(
first(bb.transfer_incoming.data.values())
)


Expand Down
2 changes: 1 addition & 1 deletion distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def collect(self):
yield GaugeMetricFamily(
self.build_name("concurrent_fetch_requests"),
"Number of open fetch requests to other workers.",
value=len(self.server.state.in_flight_workers),
value=self.server.state.transfer_incoming_count,
)

yield GaugeMetricFamily(
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ async def test_gather_skip(c, s, a):
async def test_limit_concurrent_gathering(c, s, a, b):
futures = c.map(inc, range(100))
await c.gather(futures)
assert len(a.outgoing_transfer_log) + len(b.outgoing_transfer_log) < 100
assert len(a.transfer_outgoing_log) + len(b.transfer_outgoing_log) < 100


@gen_cluster(client=True)
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async def test_nanny_timeout(c, s, a):
clean_kwargs={"threads": False},
config={"distributed.worker.memory.pause": False},
)
async def test_throttle_outgoing_connections(c, s, a, *other_workers):
async def test_throttle_outgoing_transfers(c, s, a, *other_workers):
# Put a bunch of small data on worker a
logging.getLogger("distributed.worker").setLevel(logging.DEBUG)
remote_data = c.map(
Expand All @@ -241,7 +241,7 @@ async def test_throttle_outgoing_connections(c, s, a, *other_workers):
await wait(remote_data)

a.status = Status.paused
a.outgoing_current_count = 2
a.transfer_outgoing_count = 2

requests = [
await a.get_data(await w.rpc.connect(w.address), keys=[f.key], who=w.address)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def random(**kwargs):
# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.incoming_transfer_log:
for log in worker.transfer_incoming_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not any(k.startswith("random") for k in keys), keys
Expand Down
83 changes: 52 additions & 31 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,19 +673,19 @@ async def test_clean(c, s, a, b):
@gen_cluster(client=True)
async def test_message_breakup(c, s, a, b):
n = 100_000
a.state.target_message_size = 10 * n
b.state.target_message_size = 10 * n
a.state.transfer_message_target_bytes = 10 * n
b.state.transfer_message_target_bytes = 10 * n
xs = [
c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30)
]
y = c.submit(lambda _: None, xs, key="y", workers=[b.address])
await y

assert 2 <= len(b.incoming_transfer_log) <= 20
assert 2 <= len(a.outgoing_transfer_log) <= 20
assert 2 <= len(b.transfer_incoming_log) <= 20
assert 2 <= len(a.transfer_outgoing_log) <= 20

assert all(msg["who"] == b.address for msg in a.outgoing_transfer_log)
assert all(msg["who"] == a.address for msg in a.incoming_transfer_log)
assert all(msg["who"] == b.address for msg in a.transfer_outgoing_log)
assert all(msg["who"] == a.address for msg in a.transfer_incoming_log)


@gen_cluster(client=True)
Expand Down Expand Up @@ -764,7 +764,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
concurrent outgoing connections. If multiple small fetches from the same worker are
scheduled all at once, they will result in a single call to gather_dep.
"""
a.state.total_out_connections = 2
a.state.transfer_incoming_count_limit = 2
futures = await c.scatter(
{f"x{i}": i for i in range(100)},
workers=[w.address for w in snd_workers],
Expand All @@ -779,7 +779,7 @@ async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
while len(a.data) < 100:
await asyncio.sleep(0.01)

assert a.state.comm_nbytes == 0
assert a.state.transfer_incoming_bytes == 0

story = a.state.story("request-dep", "receive-dep")
assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker
Expand Down Expand Up @@ -808,27 +808,31 @@ async def test_multiple_transfers(c, s, w1, w2, w3):
@pytest.mark.xfail(reason="very high flakiness")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_share_communication(c, s, w1, w2, w3):
x = c.submit(mul, b"1", int(w3.target_message_size + 1), workers=w1.address)
y = c.submit(mul, b"2", int(w3.target_message_size + 1), workers=w2.address)
x = c.submit(
mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address
)
y = c.submit(
mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address
)
await wait([x, y])
await c._replicate([x, y], workers=[w1.address, w2.address])
z = c.submit(add, x, y, workers=w3.address)
await wait(z)
assert len(w3.incoming_transfer_log) == 2
assert w1.outgoing_transfer_log
assert w2.outgoing_transfer_log
assert len(w3.transfer_incoming_log) == 2
assert w1.transfer_outgoing_log
assert w2.transfer_outgoing_log


@pytest.mark.xfail(reason="very high flakiness")
@gen_cluster(client=True)
async def test_dont_overlap_communications_to_same_worker(c, s, a, b):
x = c.submit(mul, b"1", int(b.target_message_size + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.target_message_size + 1), workers=a.address)
x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address)
await wait([x, y])
z = c.submit(add, x, y, workers=b.address)
await wait(z)
assert len(b.incoming_transfer_log) == 2
l1, l2 = b.incoming_transfer_log
assert len(b.transfer_incoming_log) == 2
l1, l2 = b.transfer_incoming_log

assert l1["stop"] < l2["start"]

Expand Down Expand Up @@ -1244,8 +1248,8 @@ async def test_wait_for_outgoing(c, s, a, b):
y = c.submit(inc, future, workers=b.address)
await wait(y)

assert len(b.incoming_transfer_log) == len(a.outgoing_transfer_log) == 1
bb = b.incoming_transfer_log[0]["duration"]
assert len(b.transfer_incoming_log) == len(a.transfer_outgoing_log) == 1
bb = b.transfer_incoming_log[0]["duration"]
aa = a.outgoing_transfer_log[0]["duration"]
ratio = aa / bb

Expand All @@ -1262,8 +1266,8 @@ async def test_prefer_gather_from_local_address(c, s, w1, w2, w3):
y = c.submit(inc, x, workers=[w2.address])
await wait(y)

assert any(d["who"] == w2.address for d in w1.outgoing_transfer_log)
assert not any(d["who"] == w2.address for d in w3.outgoing_transfer_log)
assert any(d["who"] == w2.address for d in w1.transfer_outgoing_log)
assert not any(d["who"] == w2.address for d in w3.transfer_outgoing_log)


@gen_cluster(
Expand All @@ -1281,10 +1285,10 @@ async def test_avoid_oversubscription(c, s, *workers):
await wait(futures)

# Original worker not responsible for all transfers
assert len(workers[0].outgoing_transfer_log) < len(workers) - 2
assert len(workers[0].transfer_outgoing_log) < len(workers) - 2

# Some other workers did some work
assert len([w for w in workers if len(w.outgoing_transfer_log) > 0]) >= 3
assert len([w for w in workers if len(w.transfer_outgoing_log) > 0]) >= 3


@gen_cluster(client=True, worker_kwargs={"metrics": {"my_port": lambda w: w.port}})
Expand Down Expand Up @@ -1959,7 +1963,7 @@ async def test_gather_dep_one_worker_always_busy(c, s, a, b):
# We will block A for any outgoing communication. This simulates an
# overloaded worker which will always return "busy" for get_data requests,
# effectively blocking H indefinitely
a.outgoing_current_count = 10000000
a.transfer_outgoing_count = 10000000

h = c.submit(add, f, g, key="h", workers=[b.address])

Expand Down Expand Up @@ -2021,7 +2025,7 @@ async def test_gather_dep_from_remote_workers_if_all_local_workers_are_busy(
)
)["f"]
for w in lws:
w.outgoing_current_count = 10000000
w.transfer_outgoing_count = 10000000

g = c.submit(inc, f, key="g", workers=[a.address])
assert await g == 2
Expand Down Expand Up @@ -2718,7 +2722,7 @@ async def test_acquire_replicas_same_channel(c, s, a, b):
("request-dep", a.address, {fut.key}),
],
)
assert any(fut.key in msg["keys"] for msg in b.incoming_transfer_log)
assert any(fut.key in msg["keys"] for msg in b.transfer_incoming_log)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
Expand Down Expand Up @@ -2997,9 +3001,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_acquire_replicas_large_data(c, s, a):
"""When acquire-replicas is used to acquire multiple sizeable tasks, it respects
target_message_size and acquires them over multiple iterations.
transfer_message_target_bytes and acquires them over multiple iterations.
"""
size = a.state.target_message_size // 5 - 10_000
size = a.state.transfer_message_target_bytes // 5 - 10_000

class C:
def __sizeof__(self):
Expand All @@ -3026,7 +3030,7 @@ async def test_missing_released_zombie_tasks(c, s, a, b):
Ensure that no fetch/flight tasks are left in the task dict of a
worker after everything was released
"""
a.total_in_connections = 0
a.transfer_outgoing_count_limit = 0
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[b.address])
key = f1.key
Expand Down Expand Up @@ -3310,8 +3314,8 @@ async def test_Worker__to_dict(c, s, a):
"thread_id",
"logs",
"config",
"incoming_transfer_log",
"outgoing_transfer_log",
"transfer_incoming_log",
"transfer_outgoing_log",
# Attributes of WorkerMemoryManager
"data",
"max_spill",
Expand Down Expand Up @@ -3551,3 +3555,20 @@ async def test_execute_preamble_abort_retirement(c, s):

# Test that y does not get stuck.
assert await y == 2


@gen_cluster()
async def test_deprecation_of_renamed_worker_attributes(s, a, b):
msg = (
"The `Worker.outgoing_count` attribute has been renamed to "
"`Worker.transfer_outgoing_count_total`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_count == a.transfer_outgoing_count_total

msg = (
"The `Worker.outgoing_current_count` attribute has been renamed to "
"`Worker.transfer_outgoing_count`"
)
with pytest.warns(DeprecationWarning, match=msg):
assert a.outgoing_current_count == a.transfer_outgoing_count
Loading

0 comments on commit e40337d

Please sign in to comment.