Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added num_proc parameter for parallel data loading #974

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion prepare/cards/wikitq.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from unitxt.test_utils.card import test_card

card = TaskCard(
loader=LoadHF(path="wikitablequestions", data_classification_policy=["public"]),
# Adjust the num_proc value according to the number of CPU cores available for faster loading
loader=LoadHF(
path="wikitablequestions", data_classification_policy=["public"], num_proc=10
),
preprocess_steps=[
Set({"context_type": "table"}),
## truncate only if needed as it can impact evaluation results.
Expand Down
3 changes: 2 additions & 1 deletion src/unitxt/catalog/cards/wikitq.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"path": "wikitablequestions",
"data_classification_policy": [
"public"
]
],
"num_proc": 10
},
"preprocess_steps": [
{
Expand Down
6 changes: 6 additions & 0 deletions src/unitxt/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ class Loader(SourceOperator):
Args:
loader_limit: Optional integer to specify a limit on the number of records to load.
streaming: Bool indicating if streaming should be used.
num_proc: Optional integer to specify the number of processes to use for parallel dataset loading. Adjust the value according to the number of CPU cores available and the specific needs of your processing task.
"""

loader_limit: int = None
streaming: bool = False
num_proc: int = None

def get_limit(self):
if settings.global_loader_limit is not None and self.loader_limit is not None:
Expand Down Expand Up @@ -151,6 +153,7 @@ class LoadHF(Loader):
data_files: Optional specification of particular data files to load.
streaming: Bool indicating if streaming should be used.
filtering_lambda: A lambda function for filtering the data after loading.
num_proc: Optional integer to specify the number of processes to use for parallel dataset loading.

Example:
Loading glue's mrpc dataset
Expand All @@ -169,6 +172,7 @@ class LoadHF(Loader):
] = None
streaming: bool = True
filtering_lambda: Optional[str] = None
num_proc: Optional[int] = None
_cache: dict = InternalField(default=None)
requirements_list: List[str] = OptionalField(default_factory=list)

Expand Down Expand Up @@ -199,6 +203,7 @@ def stream_dataset(self):
cache_dir=None if self.streaming else dir_to_be_deleted,
split=self.split,
trust_remote_code=settings.allow_unverified_code,
num_proc=self.num_proc,
)
except ValueError as e:
if "trust_remote_code" in str(e):
Expand Down Expand Up @@ -234,6 +239,7 @@ def load_dataset(self):
cache_dir=dir_to_be_deleted,
split=self.split,
trust_remote_code=settings.allow_unverified_code,
num_proc=self.num_proc,
)
except ValueError as e:
if "trust_remote_code" in str(e):
Expand Down
Loading