-
Notifications
You must be signed in to change notification settings - Fork 651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#5394: Reduce amount of remote calls for Map operator #7136
Conversation
Is this PR ready for review? |
3c2f61e
to
8917466
Compare
Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
8917466
to
5aa374c
Compare
7874189
to
a6dd5e7
Compare
@Retribution98 I see your graphs above, but I don’t really understand what the axes mean. Please label them. |
@anmyachev Thanks, updated it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
nrows = MinPartitionSize.get() * CpuCount.get() * 2 | ||
data = {f"col{i}": np.ones(nrows) for i in range(ncols)} | ||
df = pd.DataFrame(data) | ||
partitions = df._query_compiler._modin_frame._partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of partition_manager_class
?
partitions = df._query_compiler._modin_frame._partitions | |
partitions = df._query_compiler._modin_frame._partitions | |
partition_mgr_cls = df._query_compiler._modin_frame._partition_mgr_cls |
|
||
|
||
def test_map_partitions_joined_by_column(): | ||
# Set the config to 'True' inside of the context-manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean?
kw = { | ||
"num_splits": step, | ||
} | ||
result = np.empty(partitions.shape, dtype=cls._partition_class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are equivalent actions, but let's make it more explicit.
result = np.empty(partitions.shape, dtype=cls._partition_class) | |
result = np.empty(partitions.shape, dtype=object) |
@YarShev any more comments? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Retribution98, LGTM, thanks!
What do these changes do?
This PR includes an implementation of the simple method proposed in the task:
Check for partitioning before every Map call and if there're too many partitions then call the function across row/column axis so the number of remote calls would equal to the number of row/column partitions (fewer than the total amount of partitions).
But this way got slow when the Dataframe has few columnar partitions and many row partitions (much more than the Cpu count), otherwise modin would only use one remote task.
To solve this problem, another strategy was implemented. If we use columnar partitions to reduce the number of remote tasks, we can try to split them to fill all processors. If possible, we use a new implementation, otherwise the simple method.
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date