Skip to content

Commit

Permalink
Add DataLoader.epochs() method and Dataset.to_iter(epochs=) argument (#…
Browse files Browse the repository at this point in the history
…1147)

* add epoochs arg to dataloaders

* fix test

* add dedicated  method that returns a multi-epoch dataloader

* remove stale epochs kwargs

* improve test
  • Loading branch information
rjzamora committed Oct 20, 2021
1 parent b94f6b4 commit a8f15a2
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 17 deletions.
30 changes: 19 additions & 11 deletions nvtabular/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,9 @@ def merge(cls, left, right, **kwargs):
)
)

def to_iter(self, columns=None, indices=None, shuffle=False, seed=None, use_file_metadata=None):
def to_iter(
self, columns=None, indices=None, shuffle=False, seed=None, use_file_metadata=None, epochs=1
):
"""Convert `Dataset` object to a `cudf.DataFrame` iterator.
Note that this method will use `to_ddf` to produce a
Expand Down Expand Up @@ -619,6 +621,9 @@ def to_iter(self, columns=None, indices=None, shuffle=False, seed=None, use_file
optimization will only be used if the current Dataset is
backed by a file-based engine. Otherwise, it is possible
that an intermediate transform has modified the row-count.
epochs : int
Number of dataset passes to include within a single iterator.
This option is used for multi-epoch data-loading. Default is 1.
"""
if isinstance(columns, str):
columns = [columns]
Expand All @@ -645,6 +650,7 @@ def to_iter(self, columns=None, indices=None, shuffle=False, seed=None, use_file
self.to_ddf(columns=columns, shuffle=shuffle, seed=seed),
indices=indices,
partition_lens=partition_lens_meta,
epochs=epochs,
)

def to_parquet(
Expand Down Expand Up @@ -1174,28 +1180,30 @@ def _set_dtypes(chunk, dtypes):


class DataFrameIter:
def __init__(self, ddf, columns=None, indices=None, partition_lens=None):
def __init__(self, ddf, columns=None, indices=None, partition_lens=None, epochs=1):
self.indices = indices if isinstance(indices, list) else range(ddf.npartitions)
self._ddf = ddf
self.columns = columns
self.partition_lens = partition_lens
self.epochs = epochs

def __len__(self):
if self.partition_lens:
# Use metadata-based partition-size information
# if/when it is available. Note that this metadata
# will not be correct if rows where added or dropped
# after IO (within Ops).
return sum(self.partition_lens[i] for i in self.indices)
return sum(self.partition_lens[i] for i in self.indices) * self.epochs
if len(self.indices) < self._ddf.npartitions:
return len(self._ddf.partitions[self.indices])
return len(self._ddf)
return len(self._ddf.partitions[self.indices]) * self.epochs
return len(self._ddf) * self.epochs

def __iter__(self):
for i in self.indices:
part = self._ddf.get_partition(i)
if self.columns:
yield part[self.columns].compute(scheduler="synchronous")
else:
yield part.compute(scheduler="synchronous")
for epoch in range(self.epochs):
for i in self.indices:
part = self._ddf.get_partition(i)
if self.columns:
yield part[self.columns].compute(scheduler="synchronous")
else:
yield part.compute(scheduler="synchronous")
part = None
43 changes: 37 additions & 6 deletions nvtabular/loader/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import math
import queue
import threading
Expand Down Expand Up @@ -50,14 +51,14 @@ class ChunkQueue:
before checking for errors and trying again
"""

def __init__(self, dataloader, qsize, num_parts=1, shuffle=False, put_wait=1e-6):
def __init__(self, dataloader, qsize, num_parts=1, shuffle=False, put_wait=1e-6, epochs=1):
self.num_parts = num_parts
self.shuffle = shuffle
self.put_wait = put_wait
self.q_out = queue.Queue(qsize)
self._stop_event = threading.Event()
indices = dataloader._gather_indices_for_dev(0)
self.itr = dataloader.data.to_iter(indices=indices)
self.itr = dataloader.data.to_iter(indices=indices, epochs=epochs)
self.dataloader = dataloader

def __len__(self):
Expand Down Expand Up @@ -198,6 +199,7 @@ def __init__(
self.sparse_as_dense = sparse_as_dense
self.global_size = global_size or 1
self.global_rank = global_rank or 0
self._epochs = 1

self.cat_names = cat_names or dataset.schema.select_by_tag(Tags.CATEGORICAL).column_names
self.cont_names = cont_names or dataset.schema.select_by_tag(Tags.CONTINUOUS).column_names
Expand All @@ -216,13 +218,42 @@ def __init__(

self.num_rows_processed = 0

# we set size of chunk queue to 1 we only want one chunk in queue at a time.
self._buff = ChunkQueue(self, 1, num_parts=parts_per_chunk, shuffle=shuffle)
# run once instead of every time len called
self._buff_len = len(self._buff)
self.parts_per_chunk = parts_per_chunk
self.shuffle = shuffle
self.__buff = None
self.__buff_len = None
self._batch_itr = None
self._workers = None

@property
def _buff(self):
if self.__buff is None:
# we set size of chunk queue to 1 we only want one chunk in queue at a time.
self.__buff = ChunkQueue(
self, 1, num_parts=self.parts_per_chunk, shuffle=self.shuffle, epochs=self._epochs
)
return self.__buff

@property
def _buff_len(self):
if self.__buff_len is None:
# run once instead of everytime len called
self.__buff_len = len(self._buff)
return self.__buff_len

def epochs(self, epochs=1):
if epochs == self._epochs:
return self
new_dataloader = copy.copy(self)
new_dataloader._set_epochs(epochs)
return new_dataloader

def _set_epochs(self, epochs):
self.stop()
self.__buff = None
self.__buff_len = None
self._epochs = epochs

def __len__(self):
batches = _num_steps(self._buff_len, self.batch_size)
if self.drop_last and self._buff_len % self.batch_size > 0:
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/loader/test_dataloader_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import cupy
import pytest

from nvtabular.dispatch import _concat
from nvtabular.io.dataset import Dataset
from nvtabular.loader.backend import DataLoader
from tests.conftest import assert_eq


@pytest.mark.parametrize("engine", ["parquet"])
Expand Down Expand Up @@ -120,3 +122,33 @@ def test_dataloader_empty_error(datasets, engine, batch_size):
assert "Neither Categorical or Continuous columns were found by the dataloader. " in str(
exc_info.value
)


@pytest.mark.parametrize("engine", ["parquet"])
@pytest.mark.parametrize("batch_size", [128])
@pytest.mark.parametrize("epochs", [1, 5])
def test_dataloader_epochs(datasets, engine, batch_size, epochs):
dataset = Dataset(str(datasets["parquet"]), engine=engine)
cont_names = ["x", "y", "id"]
cat_names = ["name-string", "name-cat"]
label_name = ["label"]

data_loader = DataLoader(
dataset,
cat_names=cat_names,
cont_names=cont_names,
batch_size=batch_size,
label_names=label_name,
shuffle=False,
)

# Convert to iterators and then to DataFrames
df1 = _concat(list(data_loader._buff.itr))
df2 = _concat(list(data_loader.epochs(epochs)._buff.itr))

# Check that the DataFrame sizes and rows make sense
assert len(df2) == epochs * len(df1)
assert_eq(
_concat([df1 for i in range(epochs)]).reset_index(drop=True),
df2.reset_index(drop=True),
)

0 comments on commit a8f15a2

Please sign in to comment.