Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7047: Add range-partitioning implementation for '.pivot_table()' #7048

Merged
merged 7 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions modin/core/storage_formats/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,256 @@ def mean_reduce(dfgb, **kwargs):
"skew": GroupbyReduceImpl._build_skew_impl(),
"sum": ("sum", "sum", lambda grp, *args, **kwargs: grp.sum(*args, **kwargs)),
}


class PivotTableImpl:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.pivot_table() is literally a groupby + fancy post-processing, so decided to put it into groupby.py

"""Provide MapReduce, Range-Partitioning and Full-Column implementations for 'pivot_table()'."""

@classmethod
def map_reduce_impl(
cls, qc, unique_keys, drop_column_level, pivot_kwargs
): # noqa: PR01
"""Compute 'pivot_table()' using MapReduce implementation."""
if pivot_kwargs["margins"]:
raise NotImplementedError(
"MapReduce 'pivot_table' implementation doesn't support 'margins=True' parameter"
)

index, columns, values = (
pivot_kwargs["index"],
pivot_kwargs["columns"],
pivot_kwargs["values"],
)
aggfunc = pivot_kwargs["aggfunc"]

if not GroupbyReduceImpl.has_impl_for(aggfunc):
raise NotImplementedError(
"MapReduce 'pivot_table' implementation only supports 'aggfuncs' that are implemented in 'GroupbyReduceImpl'"
)

if len(set(index).intersection(columns)) > 0:
raise NotImplementedError(
"MapReduce 'pivot_table' implementation doesn't support intersections of 'index' and 'columns'"
)

to_group, keys_columns = cls._separate_data_from_grouper(
qc, values, unique_keys
)
to_unstack = columns if index else None

result = GroupbyReduceImpl.build_qc_method(
aggfunc,
finalizer_fn=lambda df: cls._pivot_table_from_groupby(
df,
pivot_kwargs["dropna"],
drop_column_level,
to_unstack,
pivot_kwargs["fill_value"],
),
)(
to_group,
by=keys_columns,
axis=0,
groupby_kwargs={
"observed": pivot_kwargs["observed"],
"sort": pivot_kwargs["sort"],
},
agg_args=(),
agg_kwargs={},
drop=True,
)

if to_unstack is None:
result = result.transpose()
return result

@classmethod
def full_axis_impl(
cls, qc, unique_keys, drop_column_level, pivot_kwargs
): # noqa: PR01
"""Compute 'pivot_table()' using full-column-axis implementation."""
index, columns, values = (
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic was copied from qc.pivot_table()

pivot_kwargs["index"],
pivot_kwargs["columns"],
pivot_kwargs["values"],
)

to_group, keys_columns = cls._separate_data_from_grouper(
qc, values, unique_keys
)

def applyier(df, other): # pragma: no cover
"""
Build pivot table for a single partition.

Parameters
----------
df : pandas.DataFrame
Partition of the self frame.
other : pandas.DataFrame
Broadcasted partition that contains `value` columns
of the self frame.

Returns
-------
pandas.DataFrame
Pivot table for this particular partition.
"""
concated = pandas.concat([df, other], axis=1, copy=False)
result = pandas.pivot_table(
concated,
**pivot_kwargs,
)

# if only one value is specified, removing level that maps
# columns from `values` to the actual values
if drop_column_level is not None:
result = result.droplevel(drop_column_level, axis=1)

# in that case Pandas transposes the result of `pivot_table`,
# transposing it back to be consistent with column axis values along
# different partitions
if len(index) == 0 and len(columns) > 0:
result = result.transpose()

return result

result = qc.__constructor__(
to_group._modin_frame.broadcast_apply_full_axis(
axis=0, func=applyier, other=keys_columns._modin_frame
)
)

# transposing the result again, to be consistent with Pandas result
if len(index) == 0 and len(columns) > 0:
result = result.transpose()
dchigarev marked this conversation as resolved.
Show resolved Hide resolved

return result

@classmethod
def range_partition_impl(
cls, qc, unique_keys, drop_column_level, pivot_kwargs
): # noqa: PR01
"""Compute 'pivot_table()' using Range-Partitioning implementation."""
if pivot_kwargs["margins"]:
raise NotImplementedError(
"Range-partitioning 'pivot_table' implementation doesn't support 'margins=True' parameter"
)

index, columns, values = (
pivot_kwargs["index"],
pivot_kwargs["columns"],
pivot_kwargs["values"],
)

if len(set(index).intersection(columns)) > 0:
raise NotImplementedError(
"Range-partitioning 'pivot_table' implementation doesn't support intersections of 'index' and 'columns'"
)

if values is not None:
to_take = list(np.unique(list(index) + list(columns) + list(values)))
qc = qc.getitem_column_array(to_take, ignore_order=True)

to_unstack = columns if index else None

groupby_result = qc._groupby_shuffle(
by=list(unique_keys),
agg_func=pivot_kwargs["aggfunc"],
axis=0,
groupby_kwargs={
"observed": pivot_kwargs["observed"],
"sort": pivot_kwargs["sort"],
},
agg_args=(),
agg_kwargs={},
drop=True,
)

# the length of 'groupby_result' is typically really small here,
# so it's okay to call full-column function
result = groupby_result._modin_frame.apply_full_axis(
axis=0,
func=lambda df: cls._pivot_table_from_groupby(
df,
pivot_kwargs["dropna"],
drop_column_level,
to_unstack,
pivot_kwargs["fill_value"],
# FIXME: Range-partitioning impl has a problem with the resulting order in case of multiple grouping keys,
# so passing 'sort=True' explicitly in this case
# https://github.com/modin-project/modin/issues/6875
sort=pivot_kwargs["sort"] if len(unique_keys) > 1 else False,
),
)

if to_unstack is None:
result = result.transpose()

return qc.__constructor__(result)

@staticmethod
def _pivot_table_from_groupby(
df, dropna, drop_column_level, to_unstack, fill_value, sort=False
):
"""
Convert group by aggregation result to a pivot table.

Parameters
----------
df : pandas.DataFrame
Group by aggregation result.
dropna : bool
Whether to drop NaN columns.
drop_column_level : int or None
An extra columns level to drop.
to_unstack : list of labels or None
Group by keys to pass to ``.unstack()``. Reperent `columns` parameter
for ``.pivot_table()``.
fill_value : bool
Fill value for NaN values.
sort : bool, default: False
Whether to sort the result along index.

Returns
-------
pandas.DataFrame
"""
if df.index.nlevels > 1 and to_unstack is not None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df = df.unstack(level=to_unstack)
if drop_column_level is not None:
df = df.droplevel(drop_column_level, axis=1)
if dropna:
df = df.dropna(axis=1, how="all")
if fill_value is not None:
df = df.fillna(fill_value, downcast="infer")
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
if sort:
df = df.sort_index(axis=0)
return df

@staticmethod
def _separate_data_from_grouper(qc, values, unique_keys):
"""
Split `qc` for key columns to group by and values to aggregate.

Parameters
----------
qc : PandasQueryCompiler
values : list of labels or None
List of columns to aggregate. ``None`` means all columns except 'unique_keys'.
unique_keys : list of labels
List of key columns to group by.

Returns
-------
to_aggregate : PandasQueryCompiler
keys_to_group : PandasQueryCompiler
"""
if values is None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic was copied from PandasQueryCompiler.pivot_table

to_aggregate = qc.drop(columns=unique_keys)
else:
to_aggregate = qc.getitem_column_array(np.unique(values), ignore_order=True)

keys_to_group = qc.getitem_column_array(unique_keys, ignore_order=True)

return to_aggregate, keys_to_group
Loading
Loading