Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7100: Add range-partitioning impl for 'nunique()' #7101

Merged
merged 3 commits into from
Mar 19, 2024

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Mar 18, 2024

What do these changes do?

Adds range-partitioning implementation for .nunique() method. Unfortunately, range-partitioning in .nunique() can be used only for 1D data (pd.Series and 1-column DataFrames). The reason is that we need each column to be the 'key' in range-partitioning, which is not possible even if we use several columns to build bins:

initial df:        building bins for range-parts           splitting in range-parts           reduction phase
   a  b       ---> bins: {                          ---->     a   b                           a 1                    
0  0  1                    0: [a<=0 and b<=10],            0  0   1    bin0.nunique()         b 2 
1  0  10                   1: [a<=0 and b>10]              1  0   10                  ---->   -----  sum(bin0, bin1)  ---->  a 2 (incorrect)
2  0  20                  }                                ----------                         a 1                            b 3
                                                              a   b    bin1.nunique()         b 1                              
                                                           2  0   20                           

This is not a problem for other methods, since there we want to extract unique rows rather than unique values in each column independently.

perf measurements

image

script to measure
import modin.pandas as pd
import numpy as np
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
import pandas

cfg.CpuCount.put(16)

def get_data(nrows, dtype):
    if dtype == int:
        return np.arange(nrows)
    elif dtype == float:
        return np.arange(nrows).astype(float)
    elif dtype == str:
        return np.array([f"value{i}" for i in range(nrows)])
    else:
        raise NotImplementedError(dtype)

pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy()

nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
dtypes = [int, str]
use_range_part = [True, False]

columns = pandas.MultiIndex.from_product([dtypes, duplicate_rate, use_range_part], names=["dtype", "duplicate rate", "use range-part"])
result = pandas.DataFrame(index=nrows, columns=columns)

i = 0
total_its = len(nrows) * len(duplicate_rate) * len(dtypes) * len(use_range_part)

for dt in dtypes:
    for nrow in nrows:
        data = get_data(nrow, dt)
        np.random.shuffle(data)
        for dpr in duplicate_rate:
            data_c = data.copy()
            dupl_val = data_c[0]

            num_duplicates = int(dpr * nrow)
            dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
            data_c[dupl_indices] = dupl_val

            for impl in use_range_part:
                print(f"{round((i / total_its) * 100, 2)}%")
                i += 1
                cfg.RangePartitioning.put(impl)

                sr = pd.Series(data_c)
                execute(sr)

                t1 = timer()
                # returns a scalar, so no need for materialization
                res = sr.nunique()
                tm = timer() - t1
                print(nrow, dpr, dt, impl, tm)
                result.loc[nrow, (dt, dpr, impl)] = tm
                result.to_excel("nunique.xlsx")
  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Add range-partitioning implementation for .nunique() #7100
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@pytest.mark.parametrize("axis", axis_values, ids=axis_keys)
@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize("dropna", [True, False])
def test_nunique(data, axis, dropna):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests for df.nunique() were missing

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/pandas/test/test_series.py -k "test_nunique"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section becomes a common place for range-partitioning testing since #7091

@dchigarev dchigarev marked this pull request as ready for review March 18, 2024 14:51
unsupported_message = ""
if axis != 0:
unsupported_message += (
"Range-partitioning 'nunique()' is only supported for 'axis=0'.\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it difficult to support axis=1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation of range-partitioning mechanism, it's impossible. Rewriting the whole mechanism just for this method doesn't seem reasonable

if len(unsupported_message) > 0:
message = (
f"Can't use range-partitioning implementation for 'nunique' because:\n{unsupported_message}"
+ "Falling back to a full-axis implementation."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ "Falling back to a full-axis implementation."
+ "Falling back to a Reduce implementation."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed 'full-axis' -> 'full-axis reduce'

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@YarShev
Copy link
Collaborator

YarShev commented Mar 19, 2024

This is not a problem for other methods, since there we want to extract unique rows rather than unique values in each column independently.

Would this be a problem if there were identical rows in different bins?

@dchigarev
Copy link
Collaborator Author

Would this be a problem if there were identical rows in different bins?

It would be a problem, but this can't happen by design. Identical rows (not identical values of a column) will always be in the same bin.

@YarShev YarShev merged commit f98f050 into modin-project:master Mar 19, 2024
36 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add range-partitioning implementation for .nunique()
2 participants