Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing to pass in a vocab in Categorify #935

Merged
merged 16 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions .github/workflows/cpu-ci.yml
Original file line number Diff line number Diff line change
@@ -1,46 +1,46 @@
name: CPU CI
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: [3.8]
os: [ubuntu-latest]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install Ubuntu packages
run: |
sudo apt-get update -y
sudo apt-get install -y protobuf-compiler
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
python -m pip install -r requirements.txt pybind11
python -m pip install -r requirements-dev.txt
- name: Lint with flake8
run: |
flake8 .
- name: Lint with black
run: |
black --check .
- name: Lint with isort
run: |
isort -c .
- name: Build
run: |
python setup.py develop
- name: Run unittests
run: |
python -m pytest -svv tests/unit/test_cpu_workflow.py
name: CPU CI

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: [3.8]
os: [ubuntu-latest]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install Ubuntu packages
run: |
sudo apt-get update -y
sudo apt-get install -y protobuf-compiler
- name: Install dependencies
run: |
python -m pip install --upgrade pip setuptools wheel
python -m pip install -r requirements.txt pybind11
python -m pip install -r requirements-dev.txt
- name: Lint with flake8
run: |
flake8 .
- name: Lint with black
run: |
black --check .
- name: Lint with isort
run: |
isort -c .
- name: Build
run: |
python setup.py develop
- name: Run unittests
run: |
python -m pytest -svv tests/unit/test_cpu_workflow.py
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@ ipython_config.py
.dmypy.json
dmypy.json

# PyCharm
.idea
21 changes: 21 additions & 0 deletions nvtabular/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def _is_cpu_object(x):
return isinstance(x, (pd.DataFrame, pd.Series))


def is_series_or_dataframe_object(maybe_series_or_df):
return _is_series_object(maybe_series_or_df) or _is_dataframe_object(maybe_series_or_df)


def _hex_to_int(s, dtype=None):
def _pd_convert_hex(x):
if pd.isnull(x):
Expand Down Expand Up @@ -320,11 +324,28 @@ def _make_df(_like_df=None, device=None):
return pd.DataFrame(_like_df)
elif isinstance(_like_df, (cudf.DataFrame, cudf.Series)):
return cudf.DataFrame(_like_df)
elif isinstance(_like_df, dict) and len(_like_df) > 0:
is_pandas = all(isinstance(v, pd.Series) for v in _like_df.values())

return pd.DataFrame(_like_df) if is_pandas else cudf.DataFrame(_like_df)
if device == "cpu":
return pd.DataFrame()
return cudf.DataFrame()


def _add_to_series(series, to_add, prepend=True):
if isinstance(series, pd.Series):
series_to_add = pd.Series(to_add)
elif isinstance(series, cudf.Series):
series_to_add = cudf.Series(to_add)
else:
raise ValueError("Unrecognized series, please provide either a pandas a cudf series")

series_to_concat = [series_to_add, series] if prepend else [series, series_to_add]

return _concat(series_to_concat)


def _detect_format(data):
"""Utility to detect the format of `data`"""
from nvtabular import Dataset
Expand Down
23 changes: 2 additions & 21 deletions nvtabular/framework_utils/tensorflow/feature_column_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
# limitations under the License.
#

import os
import warnings

import cudf
import pandas as pd
import tensorflow as tf
from tensorflow.python.feature_column import feature_column_v2 as fc

Expand Down Expand Up @@ -227,7 +226,7 @@ def _get_parents(column):
features += features_replaced_buckets

if len(categorifies) > 0:
features += categorifies.keys() >> Categorify()
features += categorifies.keys() >> Categorify(vocabs=pd.DataFrame(categorifies))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line fails if features have vocabularies of varying size. In particular, the sample notebook "using-feature-columns.ipynb" fails on this line with the following error. Am I missing anything, or this is a bug here?

I used docker image "merlin-tensorflow-training" that had nvtabular 0.6.1 installed.

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/tmp/ipykernel_783/4041583971.py in <module>
----> 1 online_workflow, feature_columns = make_feature_column_workflow(feature_columns, "AdoptionSpeed")

/nvtabular/nvtabular/framework_utils/tensorflow/feature_column_utils.py in make_feature_column_workflow(feature_columns, label_name, category_dir)
    227 
    228     if len(categorifies) > 0:
--> 229         features += categorifies.keys() >> Categorify(vocabs=pd.DataFrame(categorifies))
    230 
    231     if len(hashes) > 0:

/usr/local/lib/python3.8/dist-packages/pandas/core/frame.py in __init__(self, data, index, columns, dtype, copy)
    466 
    467         elif isinstance(data, dict):
--> 468             mgr = init_dict(data, index, columns, dtype=dtype)
    469         elif isinstance(data, ma.MaskedArray):
    470             import numpy.ma.mrecords as mrecords

/usr/local/lib/python3.8/dist-packages/pandas/core/internals/construction.py in init_dict(data, index, columns, dtype)
    281             arr if not is_datetime64tz_dtype(arr) else arr.copy() for arr in arrays
    282         ]
--> 283     return arrays_to_mgr(arrays, data_names, index, columns, dtype=dtype)
    284 
    285 

/usr/local/lib/python3.8/dist-packages/pandas/core/internals/construction.py in arrays_to_mgr(arrays, arr_names, index, columns, dtype, verify_integrity)
     76         # figure out the index, if necessary
     77         if index is None:
---> 78             index = extract_index(arrays)
     79         else:
     80             index = ensure_index(index)

/usr/local/lib/python3.8/dist-packages/pandas/core/internals/construction.py in extract_index(data)
    395             lengths = list(set(raw_lengths))
    396             if len(lengths) > 1:
--> 397                 raise ValueError("arrays must all be same length")
    398 
    399             if have_dicts:

ValueError: arrays must all be same length

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on how to handle this @marcromeyn?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reporting! That looks like a bug to me - I opened #1062 to track


if len(hashes) > 0:
features += hashes.keys() >> HashBucket(hashes)
Expand Down Expand Up @@ -282,22 +281,4 @@ def _get_parents(column):

workflow = nvt.Workflow(features)

# create stats for Categorify op if we need it
if len(categorifies) > 0:
if category_dir is None:
category_dir = "/tmp/categories" # nosec
if not os.path.exists(category_dir):
os.makedirs(category_dir)

stats = {"categories": {}}
for feature_name, categories in categorifies.items():
categories.insert(0, None)
df = cudf.DataFrame({feature_name: categories})

save_path = os.path.join(category_dir, f"unique.{feature_name}.parquet")
df.to_parquet(save_path)
stats["categories"][feature_name] = save_path

workflow.stats = stats

return workflow, numeric_columns + new_feature_columns
Loading