Skip to content

Commit

Permalink
🎨 Add examples and time/length stats
Browse files Browse the repository at this point in the history
  • Loading branch information
alexchapeaux committed Jul 24, 2023
1 parent 0d91843 commit 4a93ffa
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 15 deletions.
79 changes: 79 additions & 0 deletions examples/exact_substrings_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os

from datatrove.executor.base import PipelineExecutor
from datatrove.executor.local import LocalPipelineExecutor
from datatrove.io import LocalInputDataFolder, LocalOutputDataFolder
from datatrove.pipeline.dedup import DatasetToSequence, DedupReader, MergeSequences
from datatrove.pipeline.extractors import Trafilatura
from datatrove.pipeline.filters import GopherQualityFilter, LanguageFilter
from datatrove.pipeline.readers import WarcReader
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.typeshelper import Languages


"""
example on how to run exact-substring deduplication. It also requires using
https://github.com/google-research/deduplicate-text-datasets after stage 1, 2
1) DatasetToSequence maps 1 file into a sequence S. With unique separators at the beginning of each document. It also
saves the bytes offset of where each individual document begins.
2) MergeSequences merges all sequences into a big single sequence. It also saves the bytes offset per file.
---
after stage two you should use deduplicate-text-datasets scripts to create the suffix array and find all the
duplicates. The final output of these scripts should be a .bytearange file with the ranges in bytes wrt the big
sequence
---
3) DedupReader reads from DocumentsPipeline and duplicates ranges at the same time removing the duplicates ranges.
to run stage 1,2 call run_stage_1_2, after you have followed deduplicate-text-datasets instructions in the README you
can call stage 3 with run_stage_3.
"""


def run_stage_1_2():
pipeline_1 = [
WarcReader(data_folder=LocalInputDataFolder(path=f"{os.getcwd()}/warc/"), limit=1000),
Trafilatura(),
GopherQualityFilter(min_stop_words=0),
LanguageFilter(language_threshold=0.5, languages=(Languages.english,)),
JsonlWriter(LocalOutputDataFolder(path=f"{os.getcwd()}/intermediate/")),
DatasetToSequence(output_folder=LocalOutputDataFolder(path=f"{os.getcwd()}/es/")),
]

pipeline_2 = [
MergeSequences(
input_folder=LocalInputDataFolder(path=f"{os.getcwd()}/es"),
output_folder=LocalOutputDataFolder(path=f"{os.getcwd()}/es/"),
tasks_stage_1=4,
)
]

executor_1: PipelineExecutor = LocalPipelineExecutor(
pipeline=pipeline_1, workers=4, max_concurrent_uploads=1, tasks=4
)

executor_2: PipelineExecutor = LocalPipelineExecutor(
pipeline=pipeline_2, workers=1, max_concurrent_uploads=1, tasks=1
)

print(executor_1.run())
print(executor_2.run())


def run_stage_3():
pipeline_3 = [
DedupReader(
LocalInputDataFolder(path=f"{os.getcwd()}/intermediate/"),
sequence_folder=LocalInputDataFolder(path=f"{os.getcwd()}/es/"),
test=False,
)
]

executor_3: PipelineExecutor = LocalPipelineExecutor(
pipeline=pipeline_3, workers=4, max_concurrent_uploads=1, tasks=4
)

print(executor_3.run())
42 changes: 27 additions & 15 deletions src/datatrove/pipeline/dedup/exact_substrings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
... call deduplicate-text-datasets scripts ...
3) DedupReader
3) DedupReader reads docs and ranges at the same time and remove duplicates.
---
Expand All @@ -22,6 +22,7 @@
import numpy as np
import tokenizers
from loguru import logger
from nltk.tokenize import word_tokenize

from datatrove.io import BaseInputDataFolder, BaseOutputDataFolder, InputDataFile
from datatrove.pipeline.base import DocumentsPipeline, PipelineStep
Expand All @@ -44,11 +45,12 @@ class DatasetToSequence(PipelineStep):
type = "🫂 - DEDUP"
name = "🪞 - exact-substrings stage 1"

doc_lens = []

def __init__(self, output_folder=BaseOutputDataFolder, tokenizer_name: str = "gpt2", **kwargs):
super().__init__(**kwargs)
self.output_folder = output_folder
self.tokenizer = tokenizers.Tokenizer.from_pretrained(tokenizer_name)
self.doc_lens = []

def set_up_dl_locks(self, dl_lock, up_lock):
self.output_folder.set_lock(up_lock)
Expand Down Expand Up @@ -134,23 +136,26 @@ class DedupReader(JsonlReader):
type = "🫂 - DEDUP"
name = "🪞 - exact-substrings stage 3"

bytes_offset = None
bytes_ranges = None
rank = None
exhausted_ranges = False
bytes_counter = 0
idx = 0

def __init__(
self,
data_folder: BaseInputDataFolder,
sequence_folder: BaseInputDataFolder,
gzip: bool = True,
tokenizer_name: str = "gpt2",
min_doc_words: int = 50,
**kwargs,
):
super().__init__(data_folder=data_folder, gzip=gzip, **kwargs)
self.sequence_folder = sequence_folder
self.tokenizer = tokenizers.Tokenizer.from_pretrained(tokenizer_name)
self.bytes_offset = None
self.bytes_ranges = None
self.rank = None
self.exhausted_ranges = False
self.bytes_counter = 0
self.idx = 0
self.min_doc_words = min_doc_words

def read_bytes_offset(self):
offset_array_file: InputDataFile = self.sequence_folder.list_files(extension=EH.stage_2_bytes_offset)[0]
Expand Down Expand Up @@ -259,7 +264,11 @@ def remove_duplicate(self, doc, bytes_content):
doc.content = text

self.bytes_counter += len(bytes_content)
return doc

if len(word_tokenize(doc.content)) < self.min_doc_words:
return False

return True

def __call__(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
self.rank = rank
Expand All @@ -269,12 +278,15 @@ def __call__(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1)
data = self.read_files_shard(self.data_folder.get_files_shard(rank, world_size))

for doc, doc_content in zip(data, sequence_reader(sequence_file, size_file)):
# we check that the two generators are synced.
assert doc.content == self.tokenizer.decode(
read_bytes(doc_content)
), f"{doc.content}\n\n{self.tokenizer.decode(read_bytes(doc_content))}"

yield self.remove_duplicate(doc, doc_content)
with self.stats.time_manager:
# we check that the two generators are synced.
assert doc.content == self.tokenizer.decode(
read_bytes(doc_content)
), f"{doc.content}\n\n{self.tokenizer.decode(read_bytes(doc_content))}"
to_yield = self.remove_duplicate(doc, doc_content)
if to_yield:
self.stats.doc_len.update(len(doc.content))
yield doc

# we check bytes counter matches with the offset of the following rank
assert (
Expand Down

0 comments on commit 4a93ffa

Please sign in to comment.