Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4597: Refactor Partition handling of func, args, kwargs #4715

Merged
merged 12 commits into from
Sep 21, 2022

Conversation

noloerino
Copy link
Collaborator

@noloerino noloerino commented Jul 25, 2022

What do these changes do?

(picking up from #4600)

This introduces the Invocable object, which wraps tuples of func, *args, **kwargs that were previously being passed around the codebase, and led to possible mixing of modin-internal arguments with API-level ones.

  • commit message follows format outlined here
  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Refactor Partition handling of func, args, kwargs #4597
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date
  • added (Issue Number: PR title (PR Number)) and github username to release notes for next major release

@noloerino noloerino requested a review from a team as a code owner July 25, 2022 20:57
@noloerino noloerino marked this pull request as draft July 25, 2022 20:57
@codecov
Copy link

codecov bot commented Jul 25, 2022

Codecov Report

Merging #4715 (8808834) into master (d071b43) will increase coverage by 4.73%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master    #4715      +/-   ##
==========================================
+ Coverage   84.91%   89.64%   +4.73%     
==========================================
  Files         267      268       +1     
  Lines       19740    20027     +287     
==========================================
+ Hits        16762    17953    +1191     
+ Misses       2978     2074     -904     
Impacted Files Coverage Δ
...core/dataframe/base/partitioning/axis_partition.py 100.00% <ø> (ø)
...ns/pandas_on_ray/partitioning/virtual_partition.py 91.17% <ø> (-0.26%) ⬇️
...n/core/io/column_stores/column_store_dispatcher.py 96.00% <ø> (ø)
modin/core/io/column_stores/parquet_dispatcher.py 96.25% <ø> (+2.08%) ⬆️
modin/core/io/pickle/pickle_dispatcher.py 92.30% <ø> (ø)
modin/core/io/sql/sql_dispatcher.py 100.00% <ø> (ø)
modin/core/io/text/csv_glob_dispatcher.py 80.99% <ø> (ø)
modin/core/io/text/excel_dispatcher.py 94.01% <ø> (+0.85%) ⬆️
modin/core/io/text/json_dispatcher.py 97.67% <ø> (ø)
modin/core/io/text/text_file_dispatcher.py 97.42% <ø> (ø)
... and 56 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@lgtm-com
Copy link

lgtm-com bot commented Jul 27, 2022

This pull request introduces 1 alert when merging 4985e007afe6997b5d88b0fa6276f3a3e056d116 into cfafbb2 - view on LGTM.com

new alerts:

  • 1 for Unused import

@noloerino noloerino force-pushed the partition-args branch 3 times, most recently from be8c58a to 9e321ad Compare August 3, 2022 21:18
num_returns : int, default: 1
The number of returned objects.
pure : bool, optional
pure : bool, default: True
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pure=True is the default behavior of Dask (see here); making this the case in our codebase as well may help avoid potential confusion in the future.

@noloerino noloerino marked this pull request as ready for review August 3, 2022 22:04
Copy link
Collaborator

@vnlitvinov vnlitvinov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks nice, left a bunch of small suggestions

@noloerino noloerino force-pushed the partition-args branch 2 times, most recently from a13425d to 943e978 Compare August 9, 2022 18:18
Copy link
Collaborator

@YarShev YarShev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we should merge the changes until we check performance. As I see there might be performance penalties both for Ray and Dask engines.

Also, I was wondering if we could do preprocessing (ray.put. client.scatter) for the arguments to be passed in a ray/dask remote function? That may drastically increase performance.

@noloerino
Copy link
Collaborator Author

Also, I was wondering if we could do preprocessing (ray.put. client.scatter) for the arguments to be passed in a ray/dask remote function? That may drastically increase performance.

From what I ran into while testing, there are some points in the codebase where the arguments are already Ray object IDs/Dask futures when they're passed into a function. I can try investigating further to see if this can be made uniform, perhaps such that Invocable or some other wrapper function is always called with concrete Python objects as arguments, and then calls ray.put/client.scatter within its constructor.

@YarShev
Copy link
Collaborator

YarShev commented Aug 10, 2022

As to the performance penalties I said earlier, you can see that in the following simple example.

if __name__ == "__main__":
    import modin.pandas as pd
    import numpy as np
    from timeit import default_timer as dt

    data = np.random.randint(0, 100, size=(2**16, 2**12))
    df = pd.DataFrame(data)
    start = dt()
    result = df.abs()
    end = dt()
    print("time =", end - start)

Ray, master
time = 0.024973500025225803

Ray, PR4715
time = 0.047648999985540286

Dask, master
time = 0.4006148000189569

Dask, PR4715
time = 1.6876778000150807

I ran it in my laptop with 8 cores available.

@YarShev
Copy link
Collaborator

YarShev commented Aug 10, 2022

Also, I was wondering if we could do preprocessing (ray.put. client.scatter) for the arguments to be passed in a ray/dask remote function? That may drastically increase performance.

From what I ran into while testing, there are some points in the codebase where the arguments are already Ray object IDs/Dask futures when they're passed into a function. I can try investigating further to see if this can be made uniform, perhaps such that Invocable or some other wrapper function is always called with concrete Python objects as arguments, and then calls ray.put/client.scatter within its constructor.

Please look at my comment first here.

Yes, some of the arguments can already be futures. We should probably go through *args and **kwargs, find non-future objects and make those futures.

@noloerino
Copy link
Collaborator Author

Thanks for the pointers. I've removed the Invocable class and instead replaced it with a type alias for a 3-tuple of (func, args, kwargs). Here's the speeds:

Ray, master: 0.009s
Dask, master: 0.129s
Ray, old PR version: 0.012s
Dask, old PR version: 0.537s
Ray, latest PR version: 0.011s
Dask, latest PR version: 0.132s

I'll look into how easy it is to call ray.put/client.scatter over args and kwargs, and see if it's simple enough to include in this PR.

@lgtm-com
Copy link

lgtm-com bot commented Aug 10, 2022

This pull request introduces 1 alert when merging 8302bb44f37b04be20300d08ebb39c0d3f73c60c into 3f985ed - view on LGTM.com

new alerts:

  • 1 for Unused import

@noloerino
Copy link
Collaborator Author

It seems that CI is still failing because when Dask encounters a nested tuple, where functions are embedded deeper inside the tuple, it seems to try to evaluate the functions in a depth-first fashion. Here's a simple example of what I mean:

from distributed import Client

c = Client()

def f1(f, arg):
    print("in f1")
    return f(arg)

def f2(arg):
    print("in f2")
    return arg + 1

future = c.submit(f1, f2, 100) # passes
print(future)
print(c.gather(future))
print("passed")

future = c.submit(f1, (f2, 100)) # crashes
print(future)

print(c.gather(future))

In the first configuration, where f1, f2, 100 are the arguments passed to c.submit, everything passes fine; f1 is properly called with f2, 100 as its arguments. In the second, where the arguments are instead f1, (f2, 100), Dask attempts to evaluate f2() before f1.

stdout/stderr for evidence:
in f1
in f2
101
passed

in f2
2022-08-10 18:44:09,542 - distributed.worker - WARNING - Compute Failed
Key:       f1-db5fb5becdfd2e9b2edf6ef903a81100
Function:  execute_task
args:      ((, (, 100)))
kwargs:    {}
Exception: 'TypeError("f1() missing 1 required positional argument: \'arg\'")'

Traceback (most recent call last):
File "", line 21, in
File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/client.py", line 2210, in gather
return self.sync(
File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/utils.py", line 338, in sync
return sync(
File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync
raise exc.with_traceback(tb)
File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/utils.py", line 378, in f
result = yield future
File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/jhshi/opt/miniconda3/envs/modin-dev/lib/python3.10/site-packages/distributed/client.py", line 2073, in _gather
raise exception.with_traceback(traceback)
TypeError: f1() missing 1 required positional argument: 'arg'

I'm not familiar enough with Dask to say whether or not this is a bug on Dask's end, since I think it's reasonable for the scheduler to try to evaluate depth-first, although this behavior is somewhat counterintuitive. It looks like we may have to go back to @jeffreykennethli 's original suggestion of just leaving the Dask call queue of flattened tuples.

Another really messy (and possibly confusing) way is to not compose call_queue with Invocables, and instead with the original (func, args, kwargs).
That means we go from (func, args, kwargs) in the API layer, convert to Invocable when passing them around the partition layer, and unpack them when dealing with the execution layer, which seems bad. Again, all of this is to make the partition layer cleaner, at the cost of... making the partition layer complex again.

@YarShev
Copy link
Collaborator

YarShev commented Aug 11, 2022

@noloerino, please note that you are getting the error because f1 didn't get enough arguments. The following message says about that - Exception: 'TypeError("f1() missing 1 required positional argument: \'arg\'")'. You should unwrap the tuple with *(f2, 100) to get this worked.

@YarShev
Copy link
Collaborator

YarShev commented Aug 11, 2022

Also, note that Dask may not work with tuples passed in to a remote function. That's why we have used a list for the call queue so far.

@YarShev
Copy link
Collaborator

YarShev commented Sep 8, 2022

Now the perf is on par with master on Ray engine, but there is a little slowdown on Dask engine.

@@ -308,7 +300,7 @@ def ip(self):
return self._ip_cache


def apply_func(partition, func, *args, **kwargs):
def apply_func(partition, func, f_args, f_kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def apply_func(partition, func, f_args, f_kwargs):
def apply_func(partition, func, *args, **kwargs):

Why doesn't this work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this made it easier to pass arguments to DaskWrapper.deploy, as here, and this change fixed the performance difference between this PR and master on my micro-benchmark (calling df.describe() on a 2^16 x 2^12 dataframe). I'll refactor it to your version for consistency with Ray.

Copy link
Collaborator

@pyrito pyrito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noloerino can you rebase this on master so that you can redo the OmniSci runs?

@noloerino
Copy link
Collaborator Author

I've already rebased on latest master; Mahesh initiated a rerun of CI for me and it looks like Omnisci tests are passing.

@pyrito
Copy link
Collaborator

pyrito commented Sep 14, 2022

I think Mahesh re-ran some other stuff. This seems to be failing: https://github.com/modin-project/modin/actions/runs/3048755188/jobs/4914151049

…s, kwargs

Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
YarShev
YarShev previously approved these changes Sep 20, 2022
Copy link
Collaborator

@YarShev YarShev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a minor comment, other than that, LGTM!

…tioning/virtual_partition.py

Co-authored-by: Iaroslav Igoshev <Poolliver868@mail.ru>
Signed-off-by: Jonathan Shi <jhshi@ponder.io>
@noloerino
Copy link
Collaborator Author

Thanks, I've gone through and cleaned up the docstrings.

Copy link
Collaborator

@YarShev YarShev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, assuming tests pass. Thanks.

@YarShev YarShev merged commit d6d503a into modin-project:master Sep 21, 2022
@noloerino noloerino deleted the partition-args branch September 21, 2022 17:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor Partition handling of func, args, kwargs
5 participants