-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimise scheduler.get_comm_cost set difference #6931
Conversation
This is one simple go at addressing #6899. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 6h 44m 23s ⏱️ - 1m 14s For more details on these failures, see this check. Results for commit ce87313. ± Comparison against base commit 2a2c3bb. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a simple and sensible improvement!
Just the highly-flaky #6896, so I think we could merge this.
if 10 * len(ts.dependencies) < len(ws.has_what): | ||
# In the common case where the number of dependencies is | ||
# much less than the number of tasks that we have, | ||
# construct the set of deps that require communication in | ||
# O(len(dependencies)) rather than O(len(has_what)) time. | ||
# Factor of 10 is a guess at the overhead of explicit | ||
# iteration as opposed to just calling set.difference | ||
deps = {dep for dep in ts.dependencies if dep not in ws.has_what} | ||
else: | ||
deps = ts.dependencies.difference(ws.has_what) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Micro benchmarking this, I get a factor of ~2 rather than 10
for dict_size in [100, 1_000, 10_000, 100_000, 1_000_000]:
a_large_dict = {
f"{ix}-{uuid.uuid4()}": "foo"
for ix in range(dict_size)
}
def timing(func):
start = time.time_ns()
iterations = 10
for iteration in range(iterations):
func()
end = time.time_ns()
return (end-start)/iterations
for factor in [0.1, 0.4, 0.45, 0.5]:
small_set = set(sample(a_large_dict.keys(), int(factor * dict_size)))
intersect = timing(lambda: small_set.intersection(a_large_dict))
iterate = timing(lambda: {k for k in small_set if k in a_large_dict})
if iterate < intersect:
print(f"Iterating faster for {dict_size=} and {factor=}")
Iterating faster for dict_size=100 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.4
Iterating faster for dict_size=1000 and factor=0.5
Iterating faster for dict_size=10000 and factor=0.1
Iterating faster for dict_size=10000 and factor=0.4
Iterating faster for dict_size=10000 and factor=0.45
Iterating faster for dict_size=100000 and factor=0.1
Iterating faster for dict_size=100000 and factor=0.4
Iterating faster for dict_size=100000 and factor=0.45
Iterating faster for dict_size=100000 and factor=0.5
Iterating faster for dict_size=1000000 and factor=0.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conversely, on my (admittedly slightly antediluvian) Broadwell box, Python 3.9.13
Iterating faster for dict_size=100 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.1
Iterating faster for dict_size=1000 and factor=0.4
Iterating faster for dict_size=10000 and factor=0.1
Iterating faster for dict_size=100000 and factor=0.1
Iterating faster for dict_size=1000000 and factor=0.1
Computing set(A).difference(B) is O(max(len(A), len(B))). When estimating the communication cost of a task's dependencies it is usual that the number of dependencies (A) will be small but the number of tasks the worker has (B) is large. In this case it is better to manually construct the set difference by iterating over A and checking if each element is in B. Performing a left.merge(right, on="key", how="inner) of a distributed dataframe with eight workers with chunks_per_worker * rows_per_chunk held constant, I observe the following timings using the tcp communication protocol: | chunks_per_worker | rows_per_chunk | before | after | |-------------------|----------------|--------|-------| | 100 | 50000 | 75s | 48s | | 10 | 500000 | ~9s | ~9s | | 1 | 5000000 | ~8s | ~8s |
01d4e05
to
ce87313
Compare
Updated commit message/summary for timings with tcp rather than UCX comms protocol (otherwise no change in the force push). I can adapt the heuristic for when to select between the two options but as above, the threshold varies depending on hardware. |
In my benchmarking of the workflow, a factor of 10 or 2 didn't really make a difference, I guess because the |
Thanks for checking about the factor again. I guess you are right and that's good enough |
Computing set(A).difference(B) is O(max(len(A), len(B))). When
estimating the communication cost of a task's dependencies it is usual
that the number of dependencies (A) will be small but the number of
tasks the worker has (B) is large. In this case it is better to
manually construct the set difference by iterating over A and checking
if each element is in B.
Performing a
left.merge(right, on="key", how="inner)
of a distributeddataframe with eight workers with chunks_per_worker * rows_per_chunk
held constant, I observe the following timings using the tcp
communication protocol:
pre-commit run --all-files