-
Notifications
You must be signed in to change notification settings - Fork 651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FIX-#4597: Refactor Partition handling of func, args, kwargs #4715
Conversation
Codecov Report
@@ Coverage Diff @@
## master #4715 +/- ##
==========================================
+ Coverage 84.91% 89.64% +4.73%
==========================================
Files 267 268 +1
Lines 19740 20027 +287
==========================================
+ Hits 16762 17953 +1191
+ Misses 2978 2074 -904
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
662ca49
to
4985e00
Compare
This pull request introduces 1 alert when merging 4985e007afe6997b5d88b0fa6276f3a3e056d116 into cfafbb2 - view on LGTM.com new alerts:
|
be8c58a
to
9e321ad
Compare
num_returns : int, default: 1 | ||
The number of returned objects. | ||
pure : bool, optional | ||
pure : bool, default: True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pure=True
is the default behavior of Dask (see here); making this the case in our codebase as well may help avoid potential confusion in the future.
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks nice, left a bunch of small suggestions
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
a13425d
to
943e978
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we should merge the changes until we check performance. As I see there might be performance penalties both for Ray and Dask engines.
Also, I was wondering if we could do preprocessing (ray.put
. client.scatter
) for the arguments to be passed in a ray/dask remote function? That may drastically increase performance.
From what I ran into while testing, there are some points in the codebase where the arguments are already Ray object IDs/Dask futures when they're passed into a function. I can try investigating further to see if this can be made uniform, perhaps such that |
As to the performance penalties I said earlier, you can see that in the following simple example. if __name__ == "__main__":
import modin.pandas as pd
import numpy as np
from timeit import default_timer as dt
data = np.random.randint(0, 100, size=(2**16, 2**12))
df = pd.DataFrame(data)
start = dt()
result = df.abs()
end = dt()
print("time =", end - start) Ray, master Ray, PR4715 Dask, master Dask, PR4715 I ran it in my laptop with 8 cores available. |
Please look at my comment first here. Yes, some of the arguments can already be futures. We should probably go through |
Thanks for the pointers. I've removed the Ray, master: 0.009s I'll look into how easy it is to call |
943e978
to
8302bb4
Compare
This pull request introduces 1 alert when merging 8302bb44f37b04be20300d08ebb39c0d3f73c60c into 3f985ed - view on LGTM.com new alerts:
|
It seems that CI is still failing because when Dask encounters a nested tuple, where functions are embedded deeper inside the tuple, it seems to try to evaluate the functions in a depth-first fashion. Here's a simple example of what I mean: from distributed import Client
c = Client()
def f1(f, arg):
print("in f1")
return f(arg)
def f2(arg):
print("in f2")
return arg + 1
future = c.submit(f1, f2, 100) # passes
print(future)
print(c.gather(future))
print("passed")
future = c.submit(f1, (f2, 100)) # crashes
print(future)
print(c.gather(future)) In the first configuration, where stdout/stderr for evidence:in f1 in f2 101 passed in f2 2022-08-10 18:44:09,542 - distributed.worker - WARNING - Compute Failed Key: f1-db5fb5becdfd2e9b2edf6ef903a81100 Function: execute_task args: ((, (, 100))) kwargs: {} Exception: 'TypeError("f1() missing 1 required positional argument: \'arg\'")' I'm not familiar enough with Dask to say whether or not this is a bug on Dask's end, since I think it's reasonable for the scheduler to try to evaluate depth-first, although this behavior is somewhat counterintuitive. It looks like we may have to go back to @jeffreykennethli 's original suggestion of just leaving the Dask call queue of flattened tuples.
|
@noloerino, please note that you are getting the error because f1 didn't get enough arguments. The following message says about that - |
Also, note that Dask may not work with tuples passed in to a remote function. That's why we have used a list for the call queue so far. |
2ec21d6
to
d55daad
Compare
Now the perf is on par with master on Ray engine, but there is a little slowdown on Dask engine. |
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Outdated
Show resolved
Hide resolved
@@ -308,7 +300,7 @@ def ip(self): | |||
return self._ip_cache | |||
|
|||
|
|||
def apply_func(partition, func, *args, **kwargs): | |||
def apply_func(partition, func, f_args, f_kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def apply_func(partition, func, f_args, f_kwargs): | |
def apply_func(partition, func, *args, **kwargs): |
Why doesn't this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this made it easier to pass arguments to DaskWrapper.deploy
, as here, and this change fixed the performance difference between this PR and master on my micro-benchmark (calling df.describe()
on a 2^16 x 2^12 dataframe). I'll refactor it to your version for consistency with Ray.
b0618f7
to
2bdf0f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@noloerino can you rebase this on master so that you can redo the OmniSci runs?
I've already rebased on latest master; Mahesh initiated a rerun of CI for me and it looks like Omnisci tests are passing. |
I think Mahesh re-ran some other stuff. This seems to be failing: https://github.com/modin-project/modin/actions/runs/3048755188/jobs/4914151049 |
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py
Show resolved
Hide resolved
…s, kwargs Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
2bdf0f7
to
ad8e3dd
Compare
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a minor comment, other than that, LGTM!
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
…tioning/virtual_partition.py Co-authored-by: Iaroslav Igoshev <Poolliver868@mail.ru>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Thanks, I've gone through and cleaned up the docstrings. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, assuming tests pass. Thanks.
What do these changes do?
(picking up from #4600)
This introduces the
Invocable
object, which wraps tuples offunc, *args, **kwargs
that were previously being passed around the codebase, and led to possible mixing of modin-internal arguments with API-level ones.flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date