-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve differentiation between incoming/outgoing connections and transfers #6933
Improve differentiation between incoming/outgoing connections and transfers #6933
Conversation
distributed/worker.py
Outdated
incoming_count: int | ||
outgoing_count: int | ||
outgoing_current_count: int | ||
incoming_transfer_count: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of talking about incoming_transfer
/outgoing_transfer
, which we already had through the {incoming|outgoing}_transfer_log
, we could also talk about send
/recv
to further avoid confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[EDIT] let's keep them but rename with a deprecation. Discussion on #6936 (comment)
distributed/worker_state_machine.py
Outdated
@@ -1143,7 +1143,7 @@ class WorkerState: | |||
#: The total number of bytes in flight | |||
comm_nbytes: int | |||
|
|||
#: The maximum number of concurrent incoming requests for data. | |||
#: The maximum number of concurrent outgoing requests for data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing docstring.
distributed/worker.py
Outdated
@@ -1653,26 +1653,26 @@ async def get_data( | |||
|
|||
if self.status == Status.paused: | |||
max_connections = 1 | |||
throttle_msg = " Throttling outgoing connections because worker is paused." | |||
throttle_msg = " Throttling incoming connections because worker is paused." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing throttle_msg
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are inbound connections for outbound data. Maybe change it to "Throttling outgoing data transfers"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 48m 11s ⏱️ + 4m 58s Results for commit a0faa5f. ± Comparison against base commit c083790. ♻️ This comment has been updated with latest results. |
distributed/worker.py
Outdated
incoming_count: int | ||
outgoing_count: int | ||
outgoing_current_count: int | ||
incoming_transfer_count: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes
distributed/worker.py
Outdated
@property | ||
def incoming_count(self): | ||
warnings.warn( | ||
"The `Worker.incoming_count` attribute has been renamed to " | ||
"`Worker.incoming_transfer_count`", | ||
DeprecationWarning, | ||
stacklevel=2, | ||
) | ||
return self.incoming_transfer_count | ||
|
||
@property | ||
def outgoing_count(self): | ||
warnings.warn( | ||
"The `Worker.outgoing_count` attribute has been renamed to " | ||
"`Worker.outgoing_transfer_count`", | ||
DeprecationWarning, | ||
stacklevel=2, | ||
) | ||
return self.outgoing_transfer_count | ||
|
||
@property | ||
def outgoing_current_count(self): | ||
warnings.warn( | ||
"The `Worker.outgoing_current_count` attribute has been renamed to " | ||
"`Worker.current_outgoing_transfer_count`", | ||
DeprecationWarning, | ||
stacklevel=2, | ||
) | ||
return self.current_outgoing_transfer_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added DeprecationWarning
s for the renamed attributes via properties. Setters have been skipped. IIUC, these attributes should not be modified by others. cc @fjetter
distributed/worker.py
Outdated
outgoing_count: int | ||
outgoing_current_count: int | ||
incoming_transfer_count: int | ||
outgoing_transfer_count: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we're at it, would total_outgoing_transfer_count
be clearer for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
outgoing_transfer_count_current
outgoing_transfer_count_total
Pros:
- differentiation is very clear
- can be sorted alphabetically and related stuff is grouped
- code autocompletion is straight forward and a developer can search for
outgoing_stuff_*
not a strong opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussion on #6936 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying @crusaderky's comment for visibility:
To recap our meeting:
- rename
WorkerState.comm_nbytes
tocomm_incoming_bytes
.
No deprecation in WorkerState needed; just change the already existingDeprecatedWorkerStateAttribute
in Worker- new property
WorkerState.comm_incoming_count: return len(self.in_flight_workers)
- rename
Worker.outgoing_current_count
tocomm_outgoing_count
. Add a deprecated property (read-only).- new attribute
Worker.comm_outgoing_bytes
- rename
Worker.incoming_count
tocomm_incoming_cumulative_count
with a deprecated accessor- rename
Worker.outgoing_count
tocomm_outgoing_cumulative_count
with a deprecated accessor- heartbeat will now return
comm: incoming_bytes: # incoming_count: # incoming_cumulative_count: # outgoing_bytes: # outgoing_count: # outgoing_cumulative_count: #
update #6933 to match. Prefer "comm" prefix to "transfers" suffix everywhere.
In addition to the suggested renamings, I have renamed Worker.{incoming|outgoing}_transfer_log
to Worker.comm_{incoming|outgoing}_log
for consistency.
I'm wondering if instead of stripping the word transfer
out of names, we should explicitly in for anything that's a data transfer, i.e., a connection but going the other way.
distributed/worker_state_machine.py
Outdated
if ( | ||
len(self.in_flight_workers) >= self.total_out_connections | ||
and self.comm_nbytes >= self.comm_threshold_bytes | ||
self.comm_incoming_count >= self.total_out_connections |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After implementing those changes, I am not a huge fan of comm_incoming_count
. This line illustrates why:
Why do we compare comm_incoming_count
to total_out_connections
? How are they related?
comm_incoming_count
still feels vaguely defined to me. This might be because comm_
feels like a namespacing prefix for everything communications-related, so what are we counting exactly in that namespace that's incoming? I might just need to get more used to the fact that when we're talking about comm_
, we tend to mean Comm
object. Then again, that Comm
object feels equivalent to a connection, so why is the comm incoming while the connection is outgoing? Should we get transfer
back into the naming here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to rename comm
to transfer
everywhere.
comm_incoming_count
-> transfer_incoming_count_current
total_out_connections
-> transfer_incoming_count_max
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Updated naming on #6936. Please let's keep all naming discussion there from now on. |
8bdf7a0
to
4b073aa
Compare
@@ -91,7 +91,7 @@ def update(self): | |||
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)], | |||
"Ready": [len(w.state.ready)], | |||
"Waiting": [w.state.waiting_for_data_count], | |||
"Connections": [len(w.state.in_flight_workers)], | |||
"Connections": [w.state.transfer_incoming_count], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, I don't want to touch "Connections"
.
@@ -985,17 +985,17 @@ async def get_data(self, comm, *args, **kwargs): | |||
|
|||
@gen_cluster() | |||
async def test_deprecated_worker_attributes(s, a, b): | |||
n = a.state.comm_threshold_bytes | |||
n = a.state.target_message_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests the use of a deprecated attribute with the same name on the WorkerState
. Since we renamed comm_threshold_bytes
, I had to pick another variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abundance transfer_-prefixed variables might indicate a missing abstraction
This is just about instrumentation. We could introduce a dataclass holding these things but I think that'd be overkill.
I don't want to do this now, it might be worth taking a closer look at this once/if we are diving into things like dynamically limiting comms. Currently, the comms-related logic is spread around |
No, it should not rely on dynamic memory measurements. We're not actually measuring any memory here. We're running off of data provided by the scheduler |
Agreed, as I said, this note was merely meant to be kept in mind for future work such as the dynamic limiting ideas mentioned in #6212 or #6208 (comment). |
Thank you! |
The unclear differentiation between incoming/outgoing connections and transfers has made it difficult for me to understand what we are doing. In addition, there were two mistakes in docstrings/messages that we fix.
Following #6936 (comment) while assuming some small liberties, this PR now implements the following changes:
Notes:
current
suffixtotal
suffix to highlight cumulative sums.cumulative
total
, it's currenttransfer_
-prefixed variables might indicate a missing abstractionpre-commit run --all-files