Skip to content

Commit

Permalink
Adds option to shuffle input files in readers (#128)
Browse files Browse the repository at this point in the history
* add todo

* rewrote c4 quality filter to match the official implementation

* add tokenizer_language

* added docstring

* fix actually keeping the document

* simplified a bit sent dedup

* small adhoc bugfixes

* made sentence dedup s2 parallelizable; bugfixes and other improvements

* reduce mem footprint

* add debug msg

* small perf improvs

* change folder structure

* added line_buffering option for dedup. Changed/optimized sentence dedup

* fix test

* updated example

* perf improvements for sent dedup filter

* remove print

* nit

* bugfixes and perf improvements

* added check

* use compiled struct

* added shuffle_files to readers
  • Loading branch information
guipenedo authored Mar 20, 2024
1 parent 55c6b1c commit 27e2cea
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 5 deletions.
7 changes: 7 additions & 0 deletions src/datatrove/pipeline/readers/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import random
from abc import abstractmethod
from contextlib import nullcontext
from typing import Callable
Expand Down Expand Up @@ -134,6 +135,7 @@ def __init__(
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
):
"""
Expand All @@ -147,11 +149,14 @@ def __init__(
default_metadata: a dictionary with any data that should be added to all sample's metadata
recursive: whether to search recursively for files
glob_pattern: pattern that all files must match exactly to be included (relative to data_folder)
shuffle_files: shuffle the files within the returned shard. Mostly used for data viz. purposes, do not use
with dedup blocks
"""
super().__init__(limit, progress, adapter, text_key, id_key, default_metadata)
self.data_folder = get_datafolder(data_folder)
self.recursive = recursive
self.glob_pattern = glob_pattern
self.shuffle_files = shuffle_files

def get_document_from_dict(self, data: dict, source_file: str, id_in_file: int):
document = super().get_document_from_dict(data, source_file, id_in_file)
Expand Down Expand Up @@ -219,6 +224,8 @@ def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1
raise RuntimeError(f"No files found on {self.data_folder.path}!")
# otherwise just a warning
logger.warning(f"No files found on {self.data_folder.path} for {rank=}")
if self.shuffle_files:
random.shuffle(files_shard)
for doc in self.read_files_shard(files_shard):
self.update_doc_stats(doc)
yield doc
14 changes: 13 additions & 1 deletion src/datatrove/pipeline/readers/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class CsvReader(BaseDiskReader):
default_metadata: default metadata to add to all documents
recursive: if True, will read files recursively in subfolders (default: True)
glob_pattern: a glob pattern to filter files to read (default: None)
shuffle_files: shuffle the files within the returned shard. Mostly used for data viz. purposes, do not use
with dedup blocks
"""

name = "🔢 Csv"
Expand All @@ -38,9 +40,19 @@ def __init__(
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
):
super().__init__(
data_folder, limit, progress, adapter, text_key, id_key, default_metadata, recursive, glob_pattern
data_folder,
limit,
progress,
adapter,
text_key,
id_key,
default_metadata,
recursive,
glob_pattern,
shuffle_files,
)
self.compression = compression
self.empty_warning = False
Expand Down
14 changes: 13 additions & 1 deletion src/datatrove/pipeline/readers/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class IpcReader(BaseDiskReader):
default_metadata: default metadata to add to all documents
recursive: if True, will read files recursively in subfolders (default: True)
glob_pattern: a glob pattern to filter files to read (default: None)
shuffle_files: shuffle the files within the returned shard. Mostly used for data viz. purposes, do not use
with dedup blocks
"""

name = "🪶 Ipc"
Expand All @@ -37,9 +39,19 @@ def __init__(
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
):
super().__init__(
data_folder, limit, progress, adapter, text_key, id_key, default_metadata, recursive, glob_pattern
data_folder,
limit,
progress,
adapter,
text_key,
id_key,
default_metadata,
recursive,
glob_pattern,
shuffle_files,
)
self.stream = stream
# TODO: add option to disable reading metadata (https://github.com/apache/arrow/issues/13827 needs to be addressed first)
Expand Down
14 changes: 13 additions & 1 deletion src/datatrove/pipeline/readers/jsonl.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class JsonlReader(BaseDiskReader):
default_metadata: default metadata to add to all documents
recursive: if True, will read files recursively in subfolders (default: True)
glob_pattern: a glob pattern to filter files to read (default: None)
shuffle_files: shuffle the files within the returned shard. Mostly used for data viz. purposes, do not use
with dedup blocks
"""

name = "🐿 Jsonl"
Expand All @@ -41,9 +43,19 @@ def __init__(
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
):
super().__init__(
data_folder, limit, progress, adapter, text_key, id_key, default_metadata, recursive, glob_pattern
data_folder,
limit,
progress,
adapter,
text_key,
id_key,
default_metadata,
recursive,
glob_pattern,
shuffle_files,
)
self.compression = compression

Expand Down
14 changes: 13 additions & 1 deletion src/datatrove/pipeline/readers/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class ParquetReader(BaseDiskReader):
default_metadata: default metadata to add to all documents
recursive: if True, will read files recursively in subfolders (default: True)
glob_pattern: a glob pattern to filter files to read (default: None)
shuffle_files: shuffle the files within the returned shard. Mostly used for data viz. purposes, do not use
with dedup blocks
"""

name = "📒 Parquet"
Expand All @@ -40,9 +42,19 @@ def __init__(
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
):
super().__init__(
data_folder, limit, progress, adapter, text_key, id_key, default_metadata, recursive, glob_pattern
data_folder,
limit,
progress,
adapter,
text_key,
id_key,
default_metadata,
recursive,
glob_pattern,
shuffle_files,
)
self.batch_size = batch_size
self.read_metadata = read_metadata
Expand Down
14 changes: 13 additions & 1 deletion src/datatrove/pipeline/readers/warc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class WarcReader(BaseDiskReader):
default_metadata: default metadata to add to all documents
recursive: if True, will read files recursively in subfolders (default: True)
glob_pattern: a glob pattern to filter files to read (default: None)
shuffle_files: shuffle the files within the returned shard. Mostly used for data viz. purposes, do not use
with dedup blocks
"""

name = "🕷 Warc"
Expand All @@ -42,10 +44,20 @@ def __init__(
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
):
self.compression = compression
super().__init__(
data_folder, limit, progress, adapter, text_key, id_key, default_metadata, recursive, glob_pattern
data_folder,
limit,
progress,
adapter,
text_key,
id_key,
default_metadata,
recursive,
glob_pattern,
shuffle_files,
)

def read_file(self, filepath: str):
Expand Down

0 comments on commit 27e2cea

Please sign in to comment.