Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

an option to download only added files for a given dataset version #1100

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 98 additions & 10 deletions clearml/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,8 +909,9 @@ def is_final(self):
return self._task.get_status() not in (
Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed)

def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True, max_workers=None):
# type: (bool, Optional[int], Optional[int], bool, Optional[int]) -> str
def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True, max_workers=None,
ignore_parent_datasets=False):
# type: (bool, Optional[int], Optional[int], bool, Optional[int], bool) -> str
"""
Return a base folder with a read-only (immutable) local copy of the entire dataset
download and copy / soft-link, files from all the parent dataset versions. The dataset needs to be finalized
Expand All @@ -930,6 +931,7 @@ def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_o
:param raise_on_error: If True, raise exception if dataset merging failed on any file
:param max_workers: Number of threads to be spawned when getting the dataset copy. Defaults
to the number of logical cores.
:param ignore_parent_datasets: If True, ignore all the parent datasets and download only files added to the latest version

:return: A base folder for the entire dataset
"""
Expand All @@ -942,14 +944,22 @@ def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_o
raise ValueError("Cannot get a local copy of a dataset that was not finalized/closed")
max_workers = max_workers or psutil.cpu_count()

# now let's merge the parents
target_folder = self._merge_datasets(
use_soft_links=use_soft_links,
raise_on_error=raise_on_error,
part=part,
num_parts=num_parts,
max_workers=max_workers,
)
if ignore_parent_datasets:
# merge only added files, ignoring the parents
if part is not None or num_parts is not None:
LoggerRoot.get_base_logger().info("Getting only added files, ignoring parents")
target_folder = self._merge_diff(
max_workers=max_workers,
)
else:
# now let's merge the parents
target_folder = self._merge_datasets(
use_soft_links=use_soft_links,
raise_on_error=raise_on_error,
part=part,
num_parts=num_parts,
max_workers=max_workers,
)
return target_folder

def get_mutable_local_copy(
Expand Down Expand Up @@ -2320,6 +2330,7 @@ def _download_link(link, target_path):
LoggerRoot.get_base_logger().info(log_string)
else:
link.size = Path(target_path).stat().st_size

if not max_workers:
for relative_path, link in links.items():
target_path = os.path.join(target_folder, relative_path)
Expand Down Expand Up @@ -2454,6 +2465,51 @@ def _get_next_data_artifact_name(self, last_artifact_name=None):
numbers = sorted([int(a[prefix_len:]) for a in data_artifact_entries if a.startswith(prefix)])
return '{}{:03d}'.format(prefix, numbers[-1]+1 if numbers else 1)

def _merge_diff(self, max_workers=None):
# type: (Optional[int]) -> str
"""
Download only the added files of the dataset, ignoring all the data from parents

:param max_workers: Number of threads to be spawned when merging datasets. Defaults to the number
of logical cores.

:return: the target folder
"""

max_workers = max_workers or psutil.cpu_count()

# just create the dataset target folder
target_base_folder, _ = self._create_ds_target_folder(
part=None, num_parts=None, lock_target_folder=True)

# check if target folder is not empty, see if it contains everything we need
if target_base_folder and next(target_base_folder.iterdir(), None):
if self._verify_diff_folder(target_base_folder):
target_base_folder.touch()
self._release_lock_ds_target_folder(target_base_folder)
return target_base_folder.as_posix()
else:
LoggerRoot.get_base_logger().info('Dataset diff needs refreshing, downloading added files')
# we should delete the entire cache folder
shutil.rmtree(target_base_folder.as_posix())
# make sure we recreate the dataset target folder
target_base_folder.mkdir(parents=True, exist_ok=True)

self._get_dataset_files(
force=True,
selected_chunks=None,
cleanup_target_folder=True,
target_folder=target_base_folder,
max_workers=max_workers
)

# update target folder timestamp
target_base_folder.touch()

# if we have no dependencies, we can just return now
self._release_lock_ds_target_folder(target_base_folder)
return target_base_folder.absolute().as_posix()

def _merge_datasets(self, use_soft_links=None, raise_on_error=True, part=None, num_parts=None, max_workers=None):
# type: (bool, bool, Optional[int], Optional[int], Optional[int]) -> str
"""
Expand Down Expand Up @@ -3210,6 +3266,38 @@ def copy_file(file_entry):
raise ValueError("Dataset merging failed: {}".format([e for e in errors if e is not None]))
pool.close()


def _verify_diff_folder(self, target_base_folder):
# type: (Path) -> bool
target_base_folder = Path(target_base_folder)
# check the file size for the added portion of the dataset, regardless of parents
verified = True

datasets = self._dependency_graph[self._id]
unified_list = set()
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
unified_list |= set(dataset._dataset_file_entries.values())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirillfish This line seems to break:

    unified_list |= set(dataset._dataset_file_entries.values())
TypeError: unhashable type: 'FileEntry'

Did you mean to construct the set based on the ids? Like:

unified_list |= {entry.id for entry in dataset._dataset_file_entries.values()}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirillfish btw, here's a simple test snippet (you'll need to adapt it a little) to check that it works properly.

from pathlib import Path
import os

from clearml import Dataset, StorageManager



def main():
    manager = StorageManager()

    print("STEP1 : Downloading mnist dataset")
    mnist_dataset = Path(manager.get_local_copy(
        remote_url="https://allegro-datasets.s3.amazonaws.com/datasets/MNIST.zip", name="MNIST"))
    mnist_dataset_train = mnist_dataset / "TRAIN"
    mnist_dataset_test = mnist_dataset / "TEST"

    print("STEP2 : Creating the training dataset")
    mnist_dataset = Dataset.create(
        dataset_project="dataset_examples", dataset_name="MNIST Training Dataset")
    mnist_dataset.add_files(path=mnist_dataset_train, dataset_path="TRAIN")
    mnist_dataset.upload()
    mnist_dataset.finalize()

    print("STEP3 : Create a child dataset of mnist dataset using TEST Dataset")
    child_dataset = Dataset.create(
        dataset_project="dataset_examples", dataset_name="MNIST Complete Dataset", parent_datasets=[mnist_dataset.id])
    child_dataset.add_files(path=mnist_dataset_test, dataset_path="TEST")
    child_dataset.upload()
    child_dataset.finalize()

    print("We are done, have a great day :)")
    
main()
dataset_path = Dataset.get("<replace with child dataset id, i.e. MNIST Complete Dataset ID>").get_local_copy()
print(Path(dataset_path).glob("*"))

os.rmtree(dataset_path)

dataset_path = Dataset.get("<replace with child dataset id, i.e. MNIST Complete Dataset ID>").get_local_copy(ignore_parent_datasets=True)
print(Path(dataset_path).glob("*"))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirillfish any news on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kirillfish , any update?

unified_list |= set(dataset._dataset_link_entries.values())

added_list = [
f
for f in list(self._dataset_file_entries.values()) + list(self._dataset_link_entries.values())
if f not in unified_list
]
# noinspection PyBroadException
try:
for f in set(added_list):

# check if the local size and the stored size match (faster than comparing hash)
if (target_base_folder / f.relative_path).stat().st_size != f.size:
verified = False
break
except Exception:
verified = False

return verified

def _verify_dataset_folder(self, target_base_folder, part, chunk_selection):
# type: (Path, Optional[int], Optional[dict]) -> bool
target_base_folder = Path(target_base_folder)
Expand Down