Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Categorify with multi-column group fails when NA values are present #1605

Closed
radekosmulski opened this issue Jul 7, 2022 · 3 comments · Fixed by #1652
Closed

[BUG] Categorify with multi-column group fails when NA values are present #1605

radekosmulski opened this issue Jul 7, 2022 · 3 comments · Fixed by #1652
Assignees
Labels
bug Something isn't working P0
Milestone

Comments

@radekosmulski
Copy link
Contributor

Describe the bug

Steps/Code to reproduce bug
The following code

import cudf
import nvtabular as nvt
import numpy as np

gdf = cudf.DataFrame(data=[['apple', np.nan], ['apple', 'red'], ['apple', 'green'], ['orange', 'red']], columns=['item', 'color'])
gdf

output = [['item', 'color']] >> nvt.ops.Categorify(encode_type='combo')

workflow = nvt.Workflow(output)
dataset = nvt.Dataset(gdf)

workflow.fit_transform(dataset).to_ddf().compute()

produces this error message

/opt/conda/lib/python3.8/site-packages/cudf/core/dataframe.py:1292: UserWarning: The deep parameter is ignored and is only included for pandas compatibility.
  warnings.warn(
Failed to transform operator <nvtabular.ops.categorify.Categorify object at 0x7f1619751eb0>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/nvtabular/ops/categorify.py", line 482, in transform
    new_df[name] = encoded
  File "/opt/conda/lib/python3.8/site-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/cudf/core/dataframe.py", line 1123, in __setitem__
    self.insert(len(self._data), arg, value)
  File "/opt/conda/lib/python3.8/site-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/cudf/core/dataframe.py", line 2603, in insert
    self._data.insert(name, value, loc=loc)
  File "/opt/conda/lib/python3.8/site-packages/cudf/core/column_accessor.py", line 297, in insert
    raise ValueError("All columns must be of equal length")
ValueError: All columns must be of equal length

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/nvtabular/workflow/workflow.py", line 519, in _transform_partition
    output_df = node.op.transform(selection, input_df)
  File "/opt/conda/lib/python3.8/site-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/nvtabular/ops/categorify.py", line 484, in transform
    raise RuntimeError(f"Failed to categorical encode column {name}") from e
RuntimeError: Failed to categorical encode column item_color
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File /opt/conda/lib/python3.8/site-packages/nvtabular/ops/categorify.py:482, in Categorify.transform(self, col_selector, df)
    464     encoded = _encode(
    465         use_name,
    466         storage_name,
   (...)
    480         start_index=self.start_index,
    481     )
--> 482     new_df[name] = encoded
    483 except Exception as e:

File /opt/conda/lib/python3.8/site-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
    100 libnvtx_push_range(self.attributes, self.domain.handle)
--> 101 result = func(*args, **kwargs)
    102 libnvtx_pop_range(self.domain.handle)

File /opt/conda/lib/python3.8/site-packages/cudf/core/dataframe.py:1123, in DataFrame.__setitem__(self, arg, value)
   1120         else:
   1121             # disc. with pandas here
   1122             # pandas raises key error here
-> 1123             self.insert(len(self._data), arg, value)
   1125 elif can_convert_to_column(arg):

File /opt/conda/lib/python3.8/site-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
    100 libnvtx_push_range(self.attributes, self.domain.handle)
--> 101 result = func(*args, **kwargs)
    102 libnvtx_pop_range(self.domain.handle)

File /opt/conda/lib/python3.8/site-packages/cudf/core/dataframe.py:2603, in DataFrame.insert(self, loc, name, value, nan_as_null)
   2601 value = column.as_column(value, nan_as_null=nan_as_null)
-> 2603 self._data.insert(name, value, loc=loc)

File /opt/conda/lib/python3.8/site-packages/cudf/core/column_accessor.py:297, in ColumnAccessor.insert(self, name, value, loc, validate)
    296     if len(value) != self._column_length:
--> 297         raise ValueError("All columns must be of equal length")
    298 else:

ValueError: All columns must be of equal length

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
Input In [1], in <cell line: 13>()
     10 workflow = nvt.Workflow(output)
     11 dataset = nvt.Dataset(gdf)
---> 13 workflow.fit_transform(dataset).to_ddf().compute()

File /opt/conda/lib/python3.8/site-packages/nvtabular/workflow/workflow.py:286, in Workflow.fit_transform(self, dataset)
    266 def fit_transform(self, dataset: Dataset) -> Dataset:
    267     """Convenience method to both fit the workflow and transform the dataset in a single
    268     call. Equivalent to calling ``workflow.fit(dataset)`` followed by
    269     ``workflow.transform(dataset)``
   (...)
    284     transform
    285     """
--> 286     self.fit(dataset)
    287     return self.transform(dataset)

File /opt/conda/lib/python3.8/site-packages/nvtabular/workflow/workflow.py:261, in Workflow.fit(self, dataset)
    257         dependencies.difference_update(current_phase)
    259 # This captures the output dtypes of operators like LambdaOp where
    260 # the dtype can't be determined without running the transform
--> 261 self._transform_impl(dataset, capture_dtypes=True).sample_dtypes()
    262 self.graph.construct_schema(dataset.schema, preserve_dtypes=True)
    264 return self

File /opt/conda/lib/python3.8/site-packages/merlin/io/dataset.py:1147, in Dataset.sample_dtypes(self, n, annotate_lists)
   1140 """Return the real dtypes of the Dataset
   1141 
   1142 Use cached metadata if this operation was
   1143 already performed. Otherwise, call down to the
   1144 underlying engine for sampling logic.
   1145 """
   1146 if self._real_meta.get(n, None) is None:
-> 1147     _real_meta = self.engine.sample_data(n=n)
   1148     if self.dtypes:
   1149         _real_meta = _set_dtypes(_real_meta, self.dtypes)

File /opt/conda/lib/python3.8/site-packages/merlin/io/dataset_engine.py:71, in DatasetEngine.sample_data(self, n)
     69 _ddf = self.to_ddf()
     70 for partition_index in range(_ddf.npartitions):
---> 71     _head = _ddf.partitions[partition_index].head(n)
     72     if len(_head):
     73         return _head

File /opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py:1098, in _Frame.head(self, n, npartitions, compute)
   1096 # No need to warn if we're already looking at all partitions
   1097 safe = npartitions != self.npartitions
-> 1098 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py:1132, in _Frame._head(self, n, npartitions, compute, safe)
   1127 result = new_dd_object(
   1128     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1129 )
   1131 if compute:
-> 1132     result = result.compute()
   1133 return result

File /opt/conda/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286     dask.base.compute
    287     """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File /opt/conda/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /opt/conda/lib/python3.8/site-packages/dask/local.py:553, in get_sync(dsk, keys, **kwargs)
    548 """A naive synchronous version of get_async
    549 
    550 Can be useful for debugging.
    551 """
    552 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553 return get_async(
    554     synchronous_executor.submit,
    555     synchronous_executor._max_workers,
    556     dsk,
    557     keys,
    558     **kwargs,
    559 )

File /opt/conda/lib/python3.8/site-packages/dask/local.py:496, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494 while state["waiting"] or state["ready"] or state["running"]:
    495     fire_tasks(chunksize)
--> 496     for key, res_info, failed in queue_get(queue).result():
    497         if failed:
    498             exc, tb = loads(res_info)

File /opt/conda/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /opt/conda/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File /opt/conda/lib/python3.8/site-packages/dask/local.py:538, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    536 fut = Future()
    537 try:
--> 538     fut.set_result(fn(*args, **kwargs))
    539 except BaseException as e:
    540     fut.set_exception(e)

File /opt/conda/lib/python3.8/site-packages/dask/local.py:234, in batch_execute_tasks(it)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File /opt/conda/lib/python3.8/site-packages/dask/local.py:234, in <listcomp>(.0)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File /opt/conda/lib/python3.8/site-packages/dask/local.py:225, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223     failed = False
    224 except BaseException as e:
--> 225     result = pack_exception(e, dumps)
    226     failed = True
    227 return key, result, failed

File /opt/conda/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File /opt/conda/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /opt/conda/lib/python3.8/site-packages/dask/optimization.py:969, in SubgraphCallable.__call__(self, *args)
    967 if not len(args) == len(self.inkeys):
    968     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /opt/conda/lib/python3.8/site-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /opt/conda/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /opt/conda/lib/python3.8/site-packages/dask/utils.py:37, in apply(func, args, kwargs)
     35 def apply(func, args, kwargs=None):
     36     if kwargs:
---> 37         return func(*args, **kwargs)
     38     else:
     39         return func(*args)

File /opt/conda/lib/python3.8/site-packages/nvtabular/workflow/workflow.py:519, in _transform_partition(root_df, workflow_nodes, additional_columns, capture_dtypes)
    516 try:
    517     # use input_columns to ensure correct grouping (subgroups)
    518     selection = node.input_columns.resolve(node.input_schema)
--> 519     output_df = node.op.transform(selection, input_df)
    521     # Update or validate output_df dtypes
    522     for col_name, output_col_schema in node.output_schema.column_schemas.items():

File /opt/conda/lib/python3.8/site-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
     98 @wraps(func)
     99 def inner(*args, **kwargs):
    100     libnvtx_push_range(self.attributes, self.domain.handle)
--> 101     result = func(*args, **kwargs)
    102     libnvtx_pop_range(self.domain.handle)
    103     return result

File /opt/conda/lib/python3.8/site-packages/nvtabular/ops/categorify.py:484, in Categorify.transform(self, col_selector, df)
    482         new_df[name] = encoded
    483     except Exception as e:
--> 484         raise RuntimeError(f"Failed to categorical encode column {name}") from e
    486 return new_df

RuntimeError: Failed to categorical encode column item_color

Expected behavior
The op doesn't fail when NA values are present (similarly to Categorify not failing in such a scenario without multi-column group)

Environment details (please complete the following information):
nvcr.io/nvidia/merlin/merlin-hugectr:22.06

@radekosmulski radekosmulski added bug Something isn't working P1 labels Jul 7, 2022
@jershi425
Copy link

Hi @radekosmulski, is this issue still on track?

@radekosmulski radekosmulski added P0 and removed P1 labels Aug 1, 2022
@radekosmulski radekosmulski added this to the Merlin 22.08 milestone Aug 1, 2022
@radekosmulski
Copy link
Contributor Author

Hi @viswa-nvidia -- @jershi425 reports that this is impacting a customer engagement, a tutorial for HugeCTR that the customer is expecting. Based on the feedback I upped the priority and added it to the nearest milestone, but not sure if that would be the proper way to handle this, in line with our bug fixing process. Just wanted to raise this for your visibility. Thank you very much for all your help!

@rjzamora
Copy link
Collaborator

Thanks for the simple reproducer @radekosmulski ! There is definitely a bug in _write_uniques related to null-value handling of multi-column categories. I'll try to figure out how to fix this in the next few days (if not today).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants