-
Notifications
You must be signed in to change notification settings - Fork 651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Poor Performance on TPC-H Queries #6948
Comments
Hi @hesamshahrokhi! Thanks for the question! The method you use to initialize dask is not intended for Modin:
Let's see what's wrong. Could you please provide more information? |
Hi @anmyachev I did the explicit initialization of Dask and Ray because of a runtime warning that asked me to do so. Now, based on your suggestion, I entirely removed the explicit initializations for Ray and Dask. But, after re-running the benchmark, I still see poor performance from Modin (on both backends). Here are the updated benchmark results and the mentioned warnings for Ray and Dask. Regarding the correctness issue in Q1, that still exists, I would refer you to the correct answer by Pandas and the incorrect answers by Modin on Ray and Dask. In the Modin outputs, the titles for these columns |
This was not the most obvious warning, which in fact simply showed that Modin initializes the engine itself. We have already removed it in Modin 0.27.0. What version of Modin are you using? Can you use version 0.27.0? Perhaps the bug has already been fixed and the performance is better.
We'll look at this problem. |
I'm able to reproduce the problem with the following reproducer (for Modin 0.27.0): import numpy as np
import modin.pandas as pd
from modin.pandas.test.utils import df_equals
np.random.seed(42)
columns = ['l_returnflag', 'l_linestatus', 'l_discount', 'l_extendedprice', 'l_quantity', 'b', 'a'] # , 'l_extendedprice'] first fail
columns = ['l_returnflag', 'l_linestatus', 'l_discount', 'l_extendedprice', 'l_quantity']
df = pd.DataFrame(np.random.randint(0, 100, size=(1000, len(columns))), columns=columns)
df["a"] = ((df.l_extendedprice) * (1 - (df.l_discount)))
df['b'] = ((df.l_extendedprice) * (1 - (df.l_discount))) * 1.94
agg_df = df \
.groupby(['l_returnflag', 'l_linestatus']) \
.agg(
sum_qty=("l_quantity", "sum"),
sum_base_price=("l_extendedprice", "sum"),
sum_disc_price=("a", "sum"),
sum_charge=("b", "sum"),
avg_qty=("l_quantity", "mean"),
avg_price=("l_extendedprice", "mean"),
avg_disc=("l_discount", "mean"),
count_order=("l_returnflag", "count"),
).reset_index()
agg_df_pd = df._to_pandas() \
.groupby(['l_returnflag', 'l_linestatus']) \
.agg(
sum_qty=("l_quantity", "sum"),
sum_base_price=("l_extendedprice", "sum"),
sum_disc_price=("a", "sum"),
sum_charge=("b", "sum"),
avg_qty=("l_quantity", "mean"),
avg_price=("l_extendedprice", "mean"),
avg_disc=("l_discount", "mean"),
count_order=("l_returnflag", "count"),
).reset_index()
df_equals(agg_df, agg_df_pd) |
I changed my modin version to 0.27.0. The warnings are gone but the performance and correctness issues still exist.
Thanks :) |
…column partitions Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
It seems that you have fixed the correctness issue and made a pull request. This is great! Do you have any idea how can I resolve the performance issue as well? The current performance for both workloads is much lower than Pandas. Thanks. |
Not yet completely, I fixed it only for your case, but this approach does not always work.
Could you tell me how many cores there are on your system? Judging by the time spent on the operations that you have in the file, the data is not very large. It might make sense to reduce the number of cores used via I would also like to draw your attention to the fact that during the first operation the engine initialization time is also included. If you are interested in the operation time without overhead, then you can try to perform some operation before (kind of warming up). For example like: import modin.config as cfg
# init the workers
pd.DataFrame([cfg.MinPartitionSize.get() * cfg.NPartitions.get()]).to_numpy() |
Here is some contextual information:
By default, I let Modin use the 4 available cores. However, after your suggestion, I set MODIN_CPUS to 2 which made the results worse.
I used your suggested code for a warm-up phase (repeating it 100 times). Then, I executed each query 2 times after the warmup phase. Unfortunately, I still see bad performance and it seems that it is the real operation time. In summary, can I conclude that Modin is not a good match for my workload? Based on your experience, for what kind of workloads do you suggest using Modin? Thank you so much for your follow-ups. |
Thanks for the trying!
4 cores is the minimum, 2 almost always works worse.
Could you also try to use
I don’t see any fundamental reasons why your workload is not suitable for Modin; rather, it’s insufficient optimization on our part. @YarShev @dchigarev thoughts? |
I tried to use it:
I see. |
Responses to these questions may shed some light on the performance slowdown. Eventually, we would need the data you are using to reproduce the performance issue. |
@YarShev Dataset: TPC-H SF-1 (~1GB)
I think it's quite easy to generate (with https://github.com/electrum/tpch-dbgen). |
After doing some measurements, I can exclude the problem of 'not having enough data to efficiently parallelize' as I tried running the queries with different data scale factors (up to 10x) and Modin performs poorly with all the data sizes. I also can exclude the problem of @hesamshahrokhi having not suitable hardware, as I did the measurements on a 16-cores machine that has more than enough memory. I also verified that the partitioning is perfect, and the dataset doesn't have tricky dtypes. I also did the measurements of individual methods in the queries and found out that the problem seems to be in the rows filtering, as it's the slowest part: It's unclear to me why the rows filtering and columns insertions work that slow, need more time to profile |
…ions (#6951) Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
…column partitions (modin-project#6951) Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
…column partitions (modin-project#6951) Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
I took a deeper look and found several problems related to our lazy execution mechanism: 0. Should trigger execution after reading for fair measurements 1. We trigger the materialization of When there is some queue of operations hanging on a dataframe, the metadata of partitions (length/width cache) is represented via unmaterialized meta list. Accessing partition's metadata results into materialization of this meta list. Then when computing row lengths for the whole dataframe, we request partition's cache in a sequential for-loop, meaning that we trigger meta-list computation for each partition sequentially, rather than in parallel (despite we're passing As a proof that this is indeed a bottleneck, I've prepared this patch, where instead of materializing meta-list on every git patchdiff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py
index 5198d835..6d4a2766 100644
--- a/modin/core/execution/ray/common/deferred_execution.py
+++ b/modin/core/execution/ray/common/deferred_execution.py
@@ -465,6 +465,9 @@ class DeferredExecution:
"""Not serializable."""
raise NotImplementedError("DeferredExecution is not serializable!")
+@ray.remote
+def get_index(obj, index):
+ return obj[index]
class MetaList:
"""
@@ -478,6 +481,12 @@ class MetaList:
def __init__(self, obj: Union[ray.ObjectID, ClientObjectRef, List]):
self._obj = obj
+ def lazy_get(self, index):
+ if isinstance(self._obj, list):
+ return self._obj[index]
+
+ return get_index.remote(self._obj, index)
+
def __getitem__(self, index):
"""
Get item at the specified index.
diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
index 2e0ded45..12bd57d2 100644
--- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
+++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py
@@ -338,7 +338,8 @@ class PandasOnRayDataframePartition(PandasDataframePartition):
@property
def _length_cache(self): # noqa: GL08
- return self._meta[self._meta_offset]
+ # return self._meta[self._meta_offset]
+ return self._meta.lazy_get(self._meta_offset)
@_length_cache.setter
def _length_cache(self, value): # noqa: GL08
@@ -346,7 +347,8 @@ class PandasOnRayDataframePartition(PandasDataframePartition):
@property
def _width_cache(self): # noqa: GL08
- return self._meta[self._meta_offset + 1]
+ # return self._meta[self._meta_offset + 1]
+ return self._meta.lazy_get(self._meta_offset + 1)
@_width_cache.setter
def _width_cache(self, value): # noqa: GL08
@@ -354,7 +356,8 @@ class PandasOnRayDataframePartition(PandasDataframePartition):
@property
def _ip_cache(self): # noqa: GL08
- return self._meta[-1]
+ # return self._meta[-1]
+ return self._meta.lazy_get(-1)
@_ip_cache.setter
def _ip_cache(self, value): # noqa: GL08 After applying this patch, execution time for the following reproducer decreased from 4.23s to 0.47s. reproducer, problem 1import modin.pandas as pd
import modin.config as cfg
from modin.utils import execute
cfg.CpuCount.put(16)
from timeit import default_timer as timer
l_columnnames = [
'l_orderkey', 'l_partkey', 'l_suppkey', 'l_linenumber', 'l_quantity', 'l_extendedprice',
'l_discount', 'l_tax', 'l_returnflag', 'l_linestatus', 'l_shipdate', 'l_commitdate',
'l_receiptdate', 'l_shipinstruct', 'l_shipmode', 'l_comment'
]
l_parse_dates = ['l_shipdate', 'l_commitdate', 'l_receiptdate']
# see instructions on how to generate data at the bottom of the message
path = "../tpch-dbgen/lineitem_3.tbl"
df = pd.read_table(path, sep="|", names=l_columnnames, parse_dates=l_parse_dates, index_col=False)
print("data read")
def problem1(df):
res = df["l_shipdate"] <= "1998-09-02"
execute(res)
t1 = timer()
problem1(df)
print(timer() - t1) I'm not sure though whether this is the optimal way to fix this, would like to hear @AndreyPavlenko opinion on that. 2. Unnecessary This block with binary operations unnecessary triggers lazy executions because of df['a'] = (df.l_extendedprice) * (1 - (df.l_discount))
df['b'] = (((df.l_extendedprice) * (1 - (df.l_discount))) * (1 + (df.l_tax))) The current implementation of 3. Unnecessary The dataframe in this workflow often have
This ._filter_empties() call was introduced ~5 years ago by this PR (#721) and isn't really required for the indices' propagation to work. I suggest we replace this call with ._filter_empties(compute_metadata=False) to not trigger computations here.
TL;DRThere are several problems with current lazy execution in modin that should be fixed soon. After applying all the fixes from the list, I've been able to drop the execution time of the first query from 5.0s to 3.7s and the whole query from the beginning until groupby call runs completely lazy now. P.S.Disabling lazy execution completely helps a bit, but not much: execution time of q1
|
I'll address this issue - #7001 |
So we've merged all performance fixes for the problem described in this comment and released 0.28.0 version containing all of them. Here are the performance measurements I got: In the case of 16 cores being provided for Modin, there's a noticeable performance gain (~40%) for query 1. In the case of 4 cores, Modin doesn't perform that well (we generally don't recommend using modin with such a small amount of available cores, 8cores+ is the recommended minimum). There's no performance gain for query 6, as the main bottleneck there is that there are columns containing long strings ( how to rewrite the 6th querydef q6(df):
+ df = df[['l_shipdate', 'l_discount', 'l_quantity', 'l_extendedprice']]
df = df[
(df.l_shipdate>='1994-01-01') &
(df.l_shipdate<'1995-01-01') &
(df.l_discount>=0.050) &
(df.l_discount<=0.070) &
(df.l_quantity<24)
]
- df = df[['l_shipdate', 'l_discount', 'l_quantity', 'l_extendedprice']]
df['l_extendedpricel_discount'] = ((df.l_extendedprice) * (df.l_discount))
res = (df.l_extendedpricel_discount).sum()
return res It still performs slower than Pandas though, an explanation could be that Modin doesn't work well yet on tasks that perform in a time less than 5 seconds in Pandas, the overheads of parallelization simply don't justify the gain. |
feel free to reopen if you have any following questions |
In a benchmark project (link), I tried to compare the performance of Pandas and Modin (on Ray and Dask) on simple TPC-H queries (Q1 and Q6) written in Pandas. The results (link) show that the
read_table
API is 2x faster than its Pandas equivalent on Ray. However, the same API on Dask, and also the individual query run times on both backends are much slower than Pandas. Moreover, the results of Q1 are not correct.Could you please help me to understand why this is the case? I tried to use the suggested configurations and imported the data using
read_table
which is officially supported by Modin.The text was updated successfully, but these errors were encountered: