-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve differentiation between incoming/outgoing connections and transfers #6933
Changes from 1 commit
bbe7e18
485e72f
cc83764
f70cceb
074707d
2bfdc2e
aea0210
0742765
4b073aa
87599ed
3afc878
4fb8b7b
1b2034d
4218493
07c96db
ce1d051
abdc243
d28ff03
5d64743
a0faa5f
bdfc87c
1f3dc4f
b10985a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -385,9 +385,9 @@ class Worker(BaseWorker, ServerNode): | |
profile_history: deque[tuple[float, dict[str, Any]]] | ||
incoming_transfer_log: deque[dict[str, Any]] | ||
outgoing_transfer_log: deque[dict[str, Any]] | ||
incoming_count: int | ||
outgoing_count: int | ||
outgoing_current_count: int | ||
incoming_transfer_count: int | ||
outgoing_transfer_count: int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While we're at it, would There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about
Pros:
not a strong opinion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. discussion on #6936 (comment) |
||
current_outgoing_tranfer_count: int | ||
bandwidth: float | ||
latency: float | ||
profile_cycle_interval: float | ||
|
@@ -540,10 +540,10 @@ def __init__( | |
validate = dask.config.get("distributed.scheduler.validate") | ||
|
||
self.incoming_transfer_log = deque(maxlen=100000) | ||
self.incoming_count = 0 | ||
self.incoming_transfer_count = 0 | ||
self.outgoing_transfer_log = deque(maxlen=100000) | ||
self.outgoing_count = 0 | ||
self.outgoing_current_count = 0 | ||
self.outgoing_transfer_count = 0 | ||
self.current_outgoing_tranfer_count = 0 | ||
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth")) | ||
self.bandwidth_workers = defaultdict( | ||
lambda: (0, 0) | ||
|
@@ -1653,26 +1653,26 @@ async def get_data( | |
|
||
if self.status == Status.paused: | ||
max_connections = 1 | ||
throttle_msg = " Throttling outgoing connections because worker is paused." | ||
throttle_msg = " Throttling incoming connections because worker is paused." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are inbound connections for outbound data. Maybe change it to "Throttling outgoing data transfers"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
else: | ||
throttle_msg = "" | ||
|
||
if ( | ||
max_connections is not False | ||
and self.outgoing_current_count >= max_connections | ||
and self.current_outgoing_tranfer_count >= max_connections | ||
): | ||
logger.debug( | ||
"Worker %s has too many open connections to respond to data request " | ||
"from %s (%d/%d).%s", | ||
self.address, | ||
who, | ||
self.outgoing_current_count, | ||
self.current_outgoing_tranfer_count, | ||
max_connections, | ||
throttle_msg, | ||
) | ||
return {"status": "busy"} | ||
|
||
self.outgoing_current_count += 1 | ||
self.current_outgoing_tranfer_count += 1 | ||
data = {k: self.data[k] for k in keys if k in self.data} | ||
|
||
if len(data) < len(keys): | ||
|
@@ -1704,14 +1704,14 @@ async def get_data( | |
comm.abort() | ||
raise | ||
finally: | ||
self.outgoing_current_count -= 1 | ||
self.current_outgoing_tranfer_count -= 1 | ||
stop = time() | ||
if self.digests is not None: | ||
self.digests["get-data-send-duration"].add(stop - start) | ||
|
||
total_bytes = sum(filter(None, nbytes.values())) | ||
|
||
self.outgoing_count += 1 | ||
self.outgoing_transfer_count += 1 | ||
duration = (stop - start) or 0.5 # windows | ||
self.outgoing_transfer_log.append( | ||
{ | ||
|
@@ -1955,7 +1955,7 @@ def _update_metrics_received_data( | |
self.digests["transfer-bandwidth"].add(total_bytes / duration) | ||
self.digests["transfer-duration"].add(duration) | ||
self.counters["transfer-count"].add(len(data)) | ||
self.incoming_count += 1 | ||
self.incoming_transfer_count += 1 | ||
|
||
@fail_hard | ||
async def gather_dep( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1143,7 +1143,7 @@ class WorkerState: | |
#: The total number of bytes in flight | ||
comm_nbytes: int | ||
|
||
#: The maximum number of concurrent incoming requests for data. | ||
#: The maximum number of concurrent outgoing requests for data. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixing docstring. |
||
#: See also :attr:`distributed.worker.Worker.total_in_connections`. | ||
total_out_connections: int | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of talking about
incoming_transfer
/outgoing_transfer
, which we already had through the{incoming|outgoing}_transfer_log
, we could also talk aboutsend
/recv
to further avoid confusion.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[EDIT] let's keep them but rename with a deprecation. Discussion on #6936 (comment)