Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Abstract PyTorch Data loader to provide Sparse Tensors for list columns #500

Closed
gabrielspmoreira opened this issue Dec 16, 2020 · 0 comments · Fixed by #793
Closed

Comments

@gabrielspmoreira
Copy link
Member

gabrielspmoreira commented Dec 16, 2020

Is your feature request related to a problem? Please describe.

When working with list columns in parquet files, the PyTorch Data loader returns them in a specific tuple representation, with length 2, described as follows:

  1. The contenated values of lists from all samples
  2. The offsets, that allows to know when to break the values into the lists corresponding to each sample

That representation seems useful if the feature is categorical, and if the values will be fed directly to an torch.nn.EmbeddingBag() to lookup for the corresponding embeddings, as show here

self.embedding_names = [i for i in embedding_table_shapes.keys()]
        self.embedding_layers = torch.nn.ModuleList(
            [
                torch.nn.EmbeddingBag(*embedding_table_shapes[key], mode=mode)
                for key in self.embedding_names
            ]
        )

But if the list columns are not categoricals (e.g. floats), or if you don't want to use EmbeddingBag for your categoricals, then you need to reconstruct manually the list columns into a PyTorch sparse tensor.

Describe the solution you'd like
It would be better if the NVTabular data loader provides both the offset representation (for EmbeddingBag) and a also sparse tensor representation for general usage.

I have implemented a Data loader extension that converts the current offset representation of list columns to a sparse or dense tensor, so that it is easier to use in PyTorch pipelines.

from nvtabular.loader.torch import TorchAsyncItr as NVTDataLoader
from nvtabular import Dataset as NVTDataset


class NVTDataLoaderWrapper(NVTDataLoader):
    def __init__(self, *args, **kwargs):
        self.default_seq_features_len = None
        if 'default_seq_features_len' in kwargs:
            self.default_seq_features_len = kwargs.pop('default_seq_features_len')
        else:
            raise ValueError('NVTabular data loader requires the "default_seq_features_len" argument "'+\
                                'to create the sparse tensors for list columns')
        
        super(NVTDataLoaderWrapper, self).__init__(*args, **kwargs)

    def __enter__(self):
        return None

    def __exit__(self, type, value, traceback):
        return None

    def __next__(self):
        cat_features, cont_features, label_features = super(NVTDataLoaderWrapper, self).__next__()
        
        cat_sequence_features_transf = {}
        if cat_features is not None:
            cat_single_features, cat_sequence_features = cat_features
            cat_sequence_features_transf = {fname: self.get_sparse_tensor_list_column(cat_sequence_features[fname], 
                                                                                    'categorical') \
                                        for fname in cat_sequence_features}

        cont_sequence_features_transf = {}
        if cont_features is not None:
            cont_single_features, cont_sequence_features = cont_features

            cont_sequence_features_transf = {fname: self.get_sparse_tensor_list_column(cont_sequence_features[fname], 
                                                                                    'continuous') \
                                            for fname in cont_sequence_features}

        inputs = {**cat_sequence_features_transf, **cont_sequence_features_transf}
        return inputs

    def get_sparse_tensor_list_column(self, values_offset, feature_group):
        values = values_offset[0].flatten()
        offsets = values_offset[1].flatten()
        num_rows = len(offsets)

        #Appending the values length to the end of the offset vector, to be able to compute diff of the last sequence
        offsets = torch.cat([offsets, torch.LongTensor([len(values)]).to(offsets.device)])
        #Computing the difference between consecutive offsets, to get the sequence lengths
        diff_offsets = offsets[1:] - offsets[:-1]
        #Infering the number of cols based on the maximum sequence length
        max_seq_len = int(diff_offsets.max())
        default_seq_features_len = self.default_seq_features_len
        if max_seq_len > default_seq_features_len:
            raise ValueError('The default sequence length has been configured to {}, but the '+\
                                'largest sequence in this batch have {} length'.format(self.default_seq_features_len,
                                                                                    max_seq_len))

        #Building the indices to reconstruct the sparse tensors
        row_ids = torch.arange(len(offsets)-1).to(offsets.device)
        row_ids_repeated = torch.repeat_interleave(row_ids, diff_offsets)
        row_offset_repeated = torch.repeat_interleave(offsets[:-1], diff_offsets)
        col_ids = torch.arange(len(row_offset_repeated)).to(offsets.device) - row_offset_repeated.to(offsets.device)
        indices = torch.cat([row_ids_repeated.unsqueeze(-1), col_ids.unsqueeze(-1)], axis=1)

        if feature_group == 'categorical':
            sparse_tensor_class = torch.sparse.LongTensor
        elif feature_group == 'continuous':
            sparse_tensor_class = torch.sparse.FloatTensor
        else:
            raise NotImplementedError('Invalid feature group from NVTabular: {}'.format(feature_group))

        sparse_tensor = sparse_tensor_class(indices.T, values, torch.Size([num_rows, default_seq_features_len]))
        return sparse_tensor

An example using this extended Data Loader

from nvtabular.loader.torch import TorchAsyncItr as NVTDataLoader
from nvtabular import Dataset as NVTDataset

data_loader_config = {
        "cats": categ_features,
        "conts": continuous_features,
        "labels": [],
        "devices": [0],
    }

FEATURES_SEQ_LEN = 20
train_set = NVTDataset(train_data_path, engine="parquet", part_mem_fraction=0.1)
train_loader = NVTDataLoaderWrapper(train_set, 
                                    default_seq_features_len=FEATURES_SEQ_LEN, 
                                    batch_size=training_args.per_device_train_batch_size, 
                                    shuffle=False, **data_loader_config)

The data loader will return a dict whose keys are feature names and values are dense tensors (with lists padded with 0 up to the maximum defined length). It could also return the intermediate sparse tensor representation, for pipelines that can use it.

P.s. This class currently does not return "simple" (not list) columns, because there is no way currently to know the column names of the "simple" features (see #499 ). As soon as that is fixed, this class could also include in the return dict the "simple" columns and corresponding tensors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants