Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve differentiation between incoming/outgoing connections and transfers #6933

Merged
merged 23 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions distributed/dashboard/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def __init__(self, worker, height=300, **kwargs):
"total",
]

self.incoming = ColumnDataSource({name: [] for name in names})
self.outgoing = ColumnDataSource({name: [] for name in names})
self.incoming_transfers = ColumnDataSource({name: [] for name in names})
self.outgoing_transfers = ColumnDataSource({name: [] for name in names})

x_range = DataRange1d(range_padding=0)
y_range = DataRange1d(range_padding=0)
Expand All @@ -131,7 +131,7 @@ def __init__(self, worker, height=300, **kwargs):
)

fig.rect(
source=self.incoming,
source=self.incoming_transfers,
x="middle",
y="y",
width="duration",
Expand All @@ -140,7 +140,7 @@ def __init__(self, worker, height=300, **kwargs):
alpha="alpha",
)
fig.rect(
source=self.outgoing,
source=self.outgoing_transfers,
x="middle",
y="y",
width="duration",
Expand All @@ -159,26 +159,30 @@ def __init__(self, worker, height=300, **kwargs):

self.root = fig

self.last_incoming = 0
self.last_outgoing = 0
self.last_incoming_transfer_count = 0
self.last_outgoing_transfer_count = 0
self.who = dict()

@without_property_validation
@log_errors
def update(self):
outgoing = self.worker.outgoing_transfer_log
n = self.worker.outgoing_count - self.last_outgoing
outgoing = [outgoing[-i].copy() for i in range(1, n + 1)]
self.last_outgoing = self.worker.outgoing_count
outgoing_transfer_log = self.worker.outgoing_transfer_log
n = self.worker.outgoing_transfer_count - self.last_outgoing_transfer_count
outgoing_transfer_log = [
outgoing_transfer_log[-i].copy() for i in range(1, n + 1)
]
self.last_outgoing_transfer_count = self.worker.outgoing_transfer_count

incoming = self.worker.incoming_transfer_log
n = self.worker.incoming_count - self.last_incoming
incoming = [incoming[-i].copy() for i in range(1, n + 1)]
self.last_incoming = self.worker.incoming_count
incoming_transfer_log = self.worker.incoming_transfer_log
n = self.worker.incoming_transfer_count - self.last_incoming_transfer_count
incoming_transfer_log = [
incoming_transfer_log[-i].copy() for i in range(1, n + 1)
]
self.last_incoming_transfer_count = self.worker.incoming_transfer_count

for [msgs, source] in [
[incoming, self.incoming],
[outgoing, self.outgoing],
[incoming_transfer_log, self.incoming_transfers],
[outgoing_transfer_log, self.outgoing_transfers],
]:

for msg in msgs:
Expand Down
8 changes: 4 additions & 4 deletions distributed/dashboard/tests/test_worker_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ async def test_CommunicatingStream(c, s, a, b):
aa.update()
bb.update()

assert len(first(aa.outgoing.data.values())) and len(
first(bb.outgoing.data.values())
assert len(first(aa.outgoing_transfers.data.values())) and len(
first(bb.outgoing_transfers.data.values())
)
assert len(first(aa.incoming.data.values())) and len(
first(bb.incoming.data.values())
assert len(first(aa.incoming_transfers.data.values())) and len(
first(bb.incoming_transfers.data.values())
)


Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async def test_nanny_timeout(c, s, a):
clean_kwargs={"threads": False},
config={"distributed.worker.memory.pause": False},
)
async def test_throttle_outgoing_connections(c, s, a, *other_workers):
async def test_throttle_incoming_connections(c, s, a, *other_workers):
# Put a bunch of small data on worker a
logging.getLogger("distributed.worker").setLevel(logging.DEBUG)
remote_data = c.map(
Expand All @@ -241,7 +241,7 @@ async def test_throttle_outgoing_connections(c, s, a, *other_workers):
await wait(remote_data)

a.status = Status.paused
a.outgoing_current_count = 2
a.current_outgoing_transfer_count = 2

requests = [
await a.get_data(await w.rpc.connect(w.address), keys=[f.key], who=w.address)
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1967,7 +1967,7 @@ async def test_gather_dep_one_worker_always_busy(c, s, a, b):
# We will block A for any outgoing communication. This simulates an
# overloaded worker which will always return "busy" for get_data requests,
# effectively blocking H indefinitely
a.outgoing_current_count = 10000000
a.current_outgoing_transfer_count = 10000000

h = c.submit(add, f, g, key="h", workers=[b.address])

Expand Down Expand Up @@ -2029,7 +2029,7 @@ async def test_gather_dep_from_remote_workers_if_all_local_workers_are_busy(
)
)["f"]
for w in lws:
w.outgoing_current_count = 10000000
w.current_outgoing_transfer_count = 10000000

g = c.submit(inc, f, key="g", workers=[a.address])
assert await g == 2
Expand Down
26 changes: 13 additions & 13 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ class Worker(BaseWorker, ServerNode):
profile_history: deque[tuple[float, dict[str, Any]]]
incoming_transfer_log: deque[dict[str, Any]]
outgoing_transfer_log: deque[dict[str, Any]]
incoming_count: int
outgoing_count: int
outgoing_current_count: int
incoming_transfer_count: int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of talking about incoming_transfer/outgoing_transfer, which we already had through the {incoming|outgoing}_transfer_log, we could also talk about send/recv to further avoid confusion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes

Copy link
Collaborator

@crusaderky crusaderky Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[EDIT] let's keep them but rename with a deprecation. Discussion on #6936 (comment)

outgoing_transfer_count: int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're at it, would total_outgoing_transfer_count be clearer for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

  • outgoing_transfer_count_current
  • outgoing_transfer_count_total

Pros:

  • differentiation is very clear
  • can be sorted alphabetically and related stuff is grouped
  • code autocompletion is straight forward and a developer can search for outgoing_stuff_*

not a strong opinion

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussion on #6936 (comment)

current_outgoing_tranfer_count: int
bandwidth: float
latency: float
profile_cycle_interval: float
Expand Down Expand Up @@ -540,10 +540,10 @@ def __init__(
validate = dask.config.get("distributed.scheduler.validate")

self.incoming_transfer_log = deque(maxlen=100000)
self.incoming_count = 0
self.incoming_transfer_count = 0
self.outgoing_transfer_log = deque(maxlen=100000)
self.outgoing_count = 0
self.outgoing_current_count = 0
self.outgoing_transfer_count = 0
self.current_outgoing_tranfer_count = 0
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
self.bandwidth_workers = defaultdict(
lambda: (0, 0)
Expand Down Expand Up @@ -1653,26 +1653,26 @@ async def get_data(

if self.status == Status.paused:
max_connections = 1
throttle_msg = " Throttling outgoing connections because worker is paused."
throttle_msg = " Throttling incoming connections because worker is paused."
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing throttle_msg.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are inbound connections for outbound data. Maybe change it to "Throttling outgoing data transfers"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

else:
throttle_msg = ""

if (
max_connections is not False
and self.outgoing_current_count >= max_connections
and self.current_outgoing_tranfer_count >= max_connections
):
logger.debug(
"Worker %s has too many open connections to respond to data request "
"from %s (%d/%d).%s",
self.address,
who,
self.outgoing_current_count,
self.current_outgoing_tranfer_count,
max_connections,
throttle_msg,
)
return {"status": "busy"}

self.outgoing_current_count += 1
self.current_outgoing_tranfer_count += 1
data = {k: self.data[k] for k in keys if k in self.data}

if len(data) < len(keys):
Expand Down Expand Up @@ -1704,14 +1704,14 @@ async def get_data(
comm.abort()
raise
finally:
self.outgoing_current_count -= 1
self.current_outgoing_tranfer_count -= 1
stop = time()
if self.digests is not None:
self.digests["get-data-send-duration"].add(stop - start)

total_bytes = sum(filter(None, nbytes.values()))

self.outgoing_count += 1
self.outgoing_transfer_count += 1
duration = (stop - start) or 0.5 # windows
self.outgoing_transfer_log.append(
{
Expand Down Expand Up @@ -1955,7 +1955,7 @@ def _update_metrics_received_data(
self.digests["transfer-bandwidth"].add(total_bytes / duration)
self.digests["transfer-duration"].add(duration)
self.counters["transfer-count"].add(len(data))
self.incoming_count += 1
self.incoming_transfer_count += 1

@fail_hard
async def gather_dep(
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ class WorkerState:
#: The total number of bytes in flight
comm_nbytes: int

#: The maximum number of concurrent incoming requests for data.
#: The maximum number of concurrent outgoing requests for data.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing docstring.

#: See also :attr:`distributed.worker.Worker.total_in_connections`.
total_out_connections: int

Expand Down