Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve differentiation between incoming/outgoing connections and transfers #6933

Merged
merged 23 commits into from
Aug 30, 2022

Conversation

hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Aug 23, 2022

The unclear differentiation between incoming/outgoing connections and transfers has made it difficult for me to understand what we are doing. In addition, there were two mistakes in docstrings/messages that we fix.

Following #6936 (comment) while assuming some small liberties, this PR now implements the following changes:

main new notes
WorkerState.comm_nbytes WorkerState.transfer_incoming_bytes  
(new property) WorkerState.transfer_incoming_count len(self.in_flight_workers)
WorkerState.total_out_connections WorkerState.transfer_incoming_count_limit  
WorkerState.comm_threshold_bytes WorkerState.transfer_incoming_bytes_throttle_threshold  
WorkerState.target_message_size WorkerState.transfer_message_target_bytes
Worker.outgoing_current_count Worker.transfer_outgoing_count  
(new attribute) Worker.transfer_outgoing_bytes  
Worker.incoming_count WorkerState.transfer_incoming_count_total Moved to WorkerState
Worker.outgoing_count Worker.transfer_outgoing_count_total  
Worker.total_in_connections Worker.transfer_outgoing_count_limit
Worker.incoming_transfer_log Worker.transfer_incoming_log
Worker.outgoing_transfer_log Worker.transfer_outgoing_log

Notes:

  • Skip current suffix
  • Use total suffix to highlight cumulative sums.
    • shorter than cumulative
    • if it's not total, it's current
  • The abundance transfer_-prefixed variables might indicate a missing abstraction
  • Tests added / passed
  • Passes pre-commit run --all-files

incoming_count: int
outgoing_count: int
outgoing_current_count: int
incoming_transfer_count: int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of talking about incoming_transfer/outgoing_transfer, which we already had through the {incoming|outgoing}_transfer_log, we could also talk about send/recv to further avoid confusion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes

Copy link
Collaborator

@crusaderky crusaderky Aug 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[EDIT] let's keep them but rename with a deprecation. Discussion on #6936 (comment)

@@ -1143,7 +1143,7 @@ class WorkerState:
#: The total number of bytes in flight
comm_nbytes: int

#: The maximum number of concurrent incoming requests for data.
#: The maximum number of concurrent outgoing requests for data.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing docstring.

@@ -1653,26 +1653,26 @@ async def get_data(

if self.status == Status.paused:
max_connections = 1
throttle_msg = " Throttling outgoing connections because worker is paused."
throttle_msg = " Throttling incoming connections because worker is paused."
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing throttle_msg.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are inbound connections for outbound data. Maybe change it to "Throttling outgoing data transfers"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@github-actions
Copy link
Contributor

github-actions bot commented Aug 23, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   6h 48m 11s ⏱️ + 4m 58s
  3 053 tests +  2    2 969 ✔️ +  1    84 💤 +1  0 ±0 
22 585 runs  +16  21 603 ✔️ +10  982 💤 +6  0 ±0 

Results for commit a0faa5f. ± Comparison against base commit c083790.

♻️ This comment has been updated with latest results.

@hendrikmakait
Copy link
Member Author

CI flakes: #6896, #6934, #6935

@hendrikmakait hendrikmakait marked this pull request as ready for review August 23, 2022 12:37
@hendrikmakait hendrikmakait self-assigned this Aug 23, 2022
incoming_count: int
outgoing_count: int
outgoing_current_count: int
incoming_transfer_count: int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes

Comment on lines 2536 to 2564
@property
def incoming_count(self):
warnings.warn(
"The `Worker.incoming_count` attribute has been renamed to "
"`Worker.incoming_transfer_count`",
DeprecationWarning,
stacklevel=2,
)
return self.incoming_transfer_count

@property
def outgoing_count(self):
warnings.warn(
"The `Worker.outgoing_count` attribute has been renamed to "
"`Worker.outgoing_transfer_count`",
DeprecationWarning,
stacklevel=2,
)
return self.outgoing_transfer_count

@property
def outgoing_current_count(self):
warnings.warn(
"The `Worker.outgoing_current_count` attribute has been renamed to "
"`Worker.current_outgoing_transfer_count`",
DeprecationWarning,
stacklevel=2,
)
return self.current_outgoing_transfer_count
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added DeprecationWarnings for the renamed attributes via properties. Setters have been skipped. IIUC, these attributes should not be modified by others. cc @fjetter

outgoing_count: int
outgoing_current_count: int
incoming_transfer_count: int
outgoing_transfer_count: int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're at it, would total_outgoing_transfer_count be clearer for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

  • outgoing_transfer_count_current
  • outgoing_transfer_count_total

Pros:

  • differentiation is very clear
  • can be sorted alphabetically and related stuff is grouped
  • code autocompletion is straight forward and a developer can search for outgoing_stuff_*

not a strong opinion

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussion on #6936 (comment)

Copy link
Member Author

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying @crusaderky's comment for visibility:

@hendrikmakait

To recap our meeting:

  • rename WorkerState.comm_nbytes to comm_incoming_bytes.
    No deprecation in WorkerState needed; just change the already existing DeprecatedWorkerStateAttribute in Worker
  • new property WorkerState.comm_incoming_count: return len(self.in_flight_workers)
  • rename Worker.outgoing_current_count to comm_outgoing_count. Add a deprecated property (read-only).
  • new attribute Worker.comm_outgoing_bytes
  • rename Worker.incoming_count to comm_incoming_cumulative_count with a deprecated accessor
  • rename Worker.outgoing_count to comm_outgoing_cumulative_count with a deprecated accessor
  • heartbeat will now return
comm:
      incoming_bytes: #
      incoming_count: #
      incoming_cumulative_count: #
      outgoing_bytes: #
      outgoing_count: #
      outgoing_cumulative_count: #

update #6933 to match. Prefer "comm" prefix to "transfers" suffix everywhere.

#6936 (comment)

In addition to the suggested renamings, I have renamed Worker.{incoming|outgoing}_transfer_log to Worker.comm_{incoming|outgoing}_log for consistency.

I'm wondering if instead of stripping the word transfer out of names, we should explicitly in for anything that's a data transfer, i.e., a connection but going the other way.

if (
len(self.in_flight_workers) >= self.total_out_connections
and self.comm_nbytes >= self.comm_threshold_bytes
self.comm_incoming_count >= self.total_out_connections
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After implementing those changes, I am not a huge fan of comm_incoming_count. This line illustrates why:
Why do we compare comm_incoming_count to total_out_connections? How are they related?

comm_incoming_count still feels vaguely defined to me. This might be because comm_ feels like a namespacing prefix for everything communications-related, so what are we counting exactly in that namespace that's incoming? I might just need to get more used to the fact that when we're talking about comm_, we tend to mean Comm object. Then again, that Comm object feels equivalent to a connection, so why is the comm incoming while the connection is outgoing? Should we get transfer back into the naming here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to rename comm to transfer everywhere.
comm_incoming_count -> transfer_incoming_count_current
total_out_connections -> transfer_incoming_count_max

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@crusaderky
Copy link
Collaborator

Updated naming on #6936. Please let's keep all naming discussion there from now on.

@@ -91,7 +91,7 @@ def update(self):
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
"Ready": [len(w.state.ready)],
"Waiting": [w.state.waiting_for_data_count],
"Connections": [len(w.state.in_flight_workers)],
"Connections": [w.state.transfer_incoming_count],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I don't want to touch "Connections".

@@ -985,17 +985,17 @@ async def get_data(self, comm, *args, **kwargs):

@gen_cluster()
async def test_deprecated_worker_attributes(s, a, b):
n = a.state.comm_threshold_bytes
n = a.state.target_message_size
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests the use of a deprecated attribute with the same name on the WorkerState. Since we renamed comm_threshold_bytes, I had to pick another variable.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The abundance transfer_-prefixed variables might indicate a missing abstraction

This is just about instrumentation. We could introduce a dataclass holding these things but I think that'd be overkill.

@hendrikmakait
Copy link
Member Author

hendrikmakait commented Aug 30, 2022

This is just about instrumentation. We could introduce a dataclass holding these things but I think that'd be overkill.

I don't want to do this now, it might be worth taking a closer look at this once/if we are diving into things like dynamically limiting comms. Currently, the comms-related logic is spread around Worker and WorkerState and I'm not sure the WorkerState should contain any logic relating to memory-aware throttling.

@fjetter
Copy link
Member

fjetter commented Aug 30, 2022

I'm not sure the WorkerState should contain any logic relating to memory-aware throttling.

No, it should not rely on dynamic memory measurements. We're not actually measuring any memory here. We're running off of data provided by the scheduler TaskState.nbytes so this is a static attribute which makes it suitable for the worker state.

@hendrikmakait
Copy link
Member Author

No, it should not rely on dynamic memory measurements. We're not actually measuring any memory here. We're running off of data provided by the scheduler TaskState.nbytes so this is a static attribute which makes it suitable for the worker state.

Agreed, as I said, this note was merely meant to be kept in mind for future work such as the dynamic limiting ideas mentioned in #6212 or #6208 (comment).

distributed/utils_test.py Outdated Show resolved Hide resolved
distributed/utils_test.py Outdated Show resolved Hide resolved
distributed/worker_state_machine.py Outdated Show resolved Hide resolved
@crusaderky crusaderky merged commit 34b18a9 into dask:main Aug 30, 2022
@crusaderky
Copy link
Collaborator

Thank you!

crusaderky added a commit to crusaderky/distributed that referenced this pull request Aug 30, 2022
gjoseph92 pushed a commit to gjoseph92/distributed that referenced this pull request Oct 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants