diff --git a/pyterrier/_artifact.py b/pyterrier/_artifact.py index db8b251e..cb2c857c 100644 --- a/pyterrier/_artifact.py +++ b/pyterrier/_artifact.py @@ -147,6 +147,168 @@ def from_url(cls, url: str, *, expected_sha256: Optional[str] = None) -> 'Artifa return cls.load(path) + def _package_files(self) -> Iterator[Tuple[str, Union[str, io.BytesIO]]]: + has_pt_meta_file = False + for root, dirs, files in os.walk(self.path): + rel_root = os.path.relpath(root, start=self.path) + for file in sorted(files): + file_full_path = os.path.join(root, file) + file_rel_path = os.path.join(rel_root, file) if rel_root != '.' else file + yield file_rel_path, file_full_path + if file_rel_path == 'pt_meta.json': + has_pt_meta_file = True + if not has_pt_meta_file: + metadata = self._build_metadata() + if metadata is not None: + yield 'pt_meta.json', io.BytesIO(json.dumps(metadata).encode()) + + def _build_metadata(self) -> Optional[Dict[str, Any]]: + metadata = {} + + try: + metadata['type'], metadata['format'] = pt.inspect.artifact_type_format(self) + except TypeError: + pass # couldn't identify type and format + + if hasattr(self, 'ARTIFACT_PACKAGE_HINT'): + metadata['package_hint'] = self.ARTIFACT_PACKAGE_HINT + else: + metadata['package_hint'] = self.__class__.__module__.split('.')[0] + + return metadata + + def build_package( + self, + package_path: Optional[str] = None, + *, + max_file_size: Optional[float] = None, + metadata_out: Optional[Dict[str, Any]] = None, + verbose: bool = True, + ) -> str: + """Builds a package for this artifact. + + Packaged artifacts are useful for distributing an artifact as a single file, such as from Artifact.from_url(). + A separate metadata file is also generated, which gives information about the package's contents, including + file sizes and an expected hash for the package. + + Args: + package_path: The path of the package to create. Defaults to the artifact path with a .tar.lz4 extension. + max_file_size: the (approximate) maximum size of each file. + metadata_out: A dictionary that is updated with the metadata of the artifact (if provided). + verbose: Whether to display a progress bar when packaging. + + Returns: + The path of the package created. + """ + if package_path is None: + package_path = str(self.path) + '.tar.lz4' + + metadata = { + 'expected_sha256': None, + 'total_size': 0, + 'contents': [], + } + + chunk_num = 0 + chunk_start_offset = 0 + def manage_maxsize(_: None): + nonlocal raw_fout + nonlocal chunk_num + nonlocal chunk_start_offset + if max_file_size is not None and raw_fout.tell() >= max_file_size: + raw_fout.flush() + chunk_start_offset += raw_fout.tell() + raw_fout.close() + if chunk_num == 0: + metadata['segments'] = [] + metadata['segments'].append({'idx': chunk_num, 'offset': 0}) + chunk_num += 1 + if verbose: + print(f'starting segment {chunk_num}') + metadata['segments'].append({'idx': chunk_num, 'offset': chunk_start_offset}) + raw_fout = stack.enter_context(pt.io.finalized_open(f'{package_path}.{chunk_num}', 'b')) + sha256_fout.replace_writer(raw_fout) + + with contextlib.ExitStack() as stack: + raw_fout = stack.enter_context(pt.io.finalized_open(f'{package_path}.{chunk_num}', 'b')) + sha256_fout = stack.enter_context(pt.io.HashWriter(raw_fout)) + lz4_fout = stack.enter_context(LZ4FrameFile(sha256_fout, 'wb')) + tarout = stack.enter_context(tarfile.open(fileobj=lz4_fout, mode='w')) + + added_dirs = set() + for rel_path, file in self._package_files(): + path_dir, name = os.path.split(rel_path) + if path_dir and path_dir not in added_dirs: + tar_record = tarfile.TarInfo(path_dir) + tar_record.type = tarfile.DIRTYPE + tarout.addfile(tar_record) + added_dirs.add(path_dir) + lz4_fout.flush() # flush before each file, allowing seeking directly to this file within the archive + tar_record = tarfile.TarInfo(rel_path) + if isinstance(file, io.BytesIO): + tar_record.size = file.getbuffer().nbytes + else: + tar_record.size = os.path.getsize(file) + metadata['contents'].append({ + 'path': rel_path, + 'size': tar_record.size, + 'offset': chunk_start_offset + raw_fout.tell(), + }) + metadata['total_size'] += tar_record.size + if verbose: + print(f'adding {rel_path} [{pt.utils.byte_count_to_human_readable(tar_record.size)}]') + + if isinstance(file, io.BytesIO): + file.seek(0) + if rel_path == 'pt_meta.json' and metadata_out is not None: + metadata_out.update(json.load(file)) + file.seek(0) + with pt.io.CallbackReader(file, manage_maxsize) as fin: + tarout.addfile(tar_record, fin) + else: + with open(file, 'rb') as fin, \ + pt.io.CallbackReader(fin, manage_maxsize) as fin: + tarout.addfile(tar_record, fin) + if rel_path == 'pt_meta.json' and metadata_out is not None: + with open(file, 'rb') as fin: + metadata_out.update(json.load(fin)) + + tarout.close() + lz4_fout.close() + + metadata['expected_sha256'] = sha256_fout.hexdigest() + + metadata_outf = stack.enter_context(pt.io.finalized_open(f'{package_path}.json', 't')) + json.dump(metadata, metadata_outf) + metadata_outf.write('\n') + + if chunk_num == 0: + # no chunking was actually done, can use provided name directly + os.rename(f'{package_path}.{chunk_num}', package_path) + + return package_path + + @classmethod + def from_dataset(cls, dataset: str, variant: str, *, expected_sha256: Optional[str] = None) -> 'Artifact': + """Load an artifact from a PyTerrier dataset. + + Args: + dataset: The name of the dataset. + variant: The variant of the dataset. + expected_sha256: The expected SHA-256 hash of the artifact. If provided, the downloaded artifact will be + verified against this hash and an error will be raised if the hash does not match. + """ + return cls.from_hf( + repo='pyterrier/from-dataset', + branch=f'{dataset}.{variant}', + expected_sha256=expected_sha256) + + # ------------------------------------------------- + # HuggingFace Datasets Integration + # - from_hf() + # - to_hf() + # ------------------------------------------------- + @classmethod def from_hf(cls, repo: str, branch: str = None, *, expected_sha256: Optional[str] = None) -> 'Artifact': """Load an artifact from Hugging Face Hub. @@ -209,112 +371,6 @@ def to_hf(self, repo: str, *, branch: str = None, pretty_name: Optional[str] = N sys.stderr.write(f"\nArtifact uploaded to {path}\nConsider editing the README.md to help explain this " "artifact to others.\n") - @classmethod - def from_dataset(cls, dataset: str, variant: str, *, expected_sha256: Optional[str] = None) -> 'Artifact': - """Load an artifact from a PyTerrier dataset. - - Args: - dataset: The name of the dataset. - variant: The variant of the dataset. - expected_sha256: The expected SHA-256 hash of the artifact. If provided, the downloaded artifact will be - verified against this hash and an error will be raised if the hash does not match. - """ - return cls.from_hf( - repo='pyterrier/from-dataset', - branch=f'{dataset}.{variant}', - expected_sha256=expected_sha256) - - def to_zenodo(self, *, pretty_name: Optional[str] = None, sandbox: bool = False) -> None: - """Upload this artifact to Zenodo. - - Args: - pretty_name: The human-readable name of the artifact. - sandbox: Whether to perform a test upload to the Zenodo sandbox. - """ - if sandbox: - base_url = 'https://sandbox.zenodo.org/api' - else: - base_url = 'https://zenodo.org/api' - - access_token = os.environ.get('ZENODO_TOKEN') - params = {'access_token': access_token} - - with tempfile.TemporaryDirectory() as d: - r = requests.post(f'{base_url}/deposit/depositions', params=params, json={}) - r.raise_for_status() - deposit_data = r.json() - sys.stderr.write("Created {}\n".format(deposit_data['links']['html'])) - try: - metadata = {} - sys.stderr.write("Building package.\n") - self.build_package(os.path.join(d, 'artifact.tar.lz4'), metadata_out=metadata) - z_meta = { - 'metadata': self._zenodo_metadata( - pretty_name=pretty_name, - zenodo_id=deposit_data['id'], - metadata=metadata, - ), - } - r = requests.put(deposit_data['links']['latest_draft'], params=params, json=z_meta) - r.raise_for_status() - sys.stderr.write("Uploading...\n") - for file in sorted(os.listdir(d)): - file_path = os.path.join(d, file) - with open(file_path, 'rb') as fin, \ - pt.io.TqdmReader(fin, total=os.path.getsize(file_path), desc=file) as fin: - r = requests.put( - '{}/{}'.format(deposit_data['links']['bucket'], file), - params={'access_token': access_token}, - data=fin) - r.raise_for_status() - except: - sys.stderr.write("Discarding {}\n".format(deposit_data['links']['html'])) - requests.post(deposit_data['links']['discard'], params=params, json={}) - raise - sys.stderr.write("Upload complete. Please complete the form at {} to publish this artifact. (Note that " - "publishing to Zenodo cannot be undone.)\n".format(deposit_data['links']['html'])) - - @classmethod - def from_zenodo(cls, zenodo_id: str, *, expected_sha256: Optional[str] = None) -> 'Artifact': - """Load an artifact from Zenodo. - - Args: - zenodo_id: The Zenodo record ID of the artifact. - expected_sha256: The expected SHA-256 hash of the artifact. If provided, the downloaded artifact will be - verified against this hash and an error will be raised if the hash does not match. - """ - return cls.from_url(f'zenodo:{zenodo_id}', expected_sha256=expected_sha256) - - def _package_files(self) -> Iterator[Tuple[str, Union[str, io.BytesIO]]]: - has_pt_meta_file = False - for root, dirs, files in os.walk(self.path): - rel_root = os.path.relpath(root, start=self.path) - for file in sorted(files): - file_full_path = os.path.join(root, file) - file_rel_path = os.path.join(rel_root, file) if rel_root != '.' else file - yield file_rel_path, file_full_path - if file_rel_path == 'pt_meta.json': - has_pt_meta_file = True - if not has_pt_meta_file: - metadata = self._build_metadata() - if metadata is not None: - yield 'pt_meta.json', io.BytesIO(json.dumps(metadata).encode()) - - def _build_metadata(self) -> Optional[Dict[str, Any]]: - metadata = {} - - try: - metadata['type'], metadata['format'] = pt.inspect.artifact_type_format(self) - except TypeError: - pass # couldn't identify type and format - - if hasattr(self, 'ARTIFACT_PACKAGE_HINT'): - metadata['package_hint'] = self.ARTIFACT_PACKAGE_HINT - else: - metadata['package_hint'] = self.__class__.__module__.split('.')[0] - - return metadata - def _hf_readme(self, *, repo: str, @@ -378,6 +434,73 @@ def _hf_readme(self, ``` ''' + # ------------------------------------------------- + # Zenodo Integration + # - from_zenodo() + # - to_zenodo() + # ------------------------------------------------- + + @classmethod + def from_zenodo(cls, zenodo_id: str, *, expected_sha256: Optional[str] = None) -> 'Artifact': + """Load an artifact from Zenodo. + + Args: + zenodo_id: The Zenodo record ID of the artifact. + expected_sha256: The expected SHA-256 hash of the artifact. If provided, the downloaded artifact will be + verified against this hash and an error will be raised if the hash does not match. + """ + return cls.from_url(f'zenodo:{zenodo_id}', expected_sha256=expected_sha256) + + def to_zenodo(self, *, pretty_name: Optional[str] = None, sandbox: bool = False) -> None: + """Upload this artifact to Zenodo. + + Args: + pretty_name: The human-readable name of the artifact. + sandbox: Whether to perform a test upload to the Zenodo sandbox. + """ + if sandbox: + base_url = 'https://sandbox.zenodo.org/api' + else: + base_url = 'https://zenodo.org/api' + + access_token = os.environ.get('ZENODO_TOKEN') + params = {'access_token': access_token} + + with tempfile.TemporaryDirectory() as d: + r = requests.post(f'{base_url}/deposit/depositions', params=params, json={}) + r.raise_for_status() + deposit_data = r.json() + sys.stderr.write("Created {}\n".format(deposit_data['links']['html'])) + try: + metadata = {} + sys.stderr.write("Building package.\n") + self.build_package(os.path.join(d, 'artifact.tar.lz4'), metadata_out=metadata) + z_meta = { + 'metadata': self._zenodo_metadata( + pretty_name=pretty_name, + zenodo_id=deposit_data['id'], + metadata=metadata, + ), + } + r = requests.put(deposit_data['links']['latest_draft'], params=params, json=z_meta) + r.raise_for_status() + sys.stderr.write("Uploading...\n") + for file in sorted(os.listdir(d)): + file_path = os.path.join(d, file) + with open(file_path, 'rb') as fin, \ + pt.io.TqdmReader(fin, total=os.path.getsize(file_path), desc=file) as fin: + r = requests.put( + '{}/{}'.format(deposit_data['links']['bucket'], file), + params={'access_token': access_token}, + data=fin) + r.raise_for_status() + except: + sys.stderr.write("Discarding {}\n".format(deposit_data['links']['html'])) + requests.post(deposit_data['links']['discard'], params=params, json={}) + raise + sys.stderr.write("Upload complete. Please complete the form at {} to publish this artifact. (Note that " + "publishing to Zenodo cannot be undone.)\n".format(deposit_data['links']['html'])) + def _zenodo_metadata(self, *, zenodo_id: str, pretty_name: Optional[str] = None, metadata: Dict) -> Optional[str]: description = f'''

Description

@@ -429,117 +552,6 @@ def _zenodo_metadata(self, *, zenodo_id: str, pretty_name: Optional[str] = None, metadata['title'] = pretty_name return metadata - def build_package( - self, - package_path: Optional[str] = None, - *, - max_file_size: Optional[float] = None, - metadata_out: Optional[Dict[str, Any]] = None, - verbose: bool = True, - ) -> str: - """Builds a package for this artifact. - - Packaged artifacts are useful for distributing an artifact as a single file, such as from Artifact.from_url(). - A separate metadata file is also generated, which gives information about the package's contents, including - file sizes and an expected hash for the package. - - Args: - package_path: The path of the package to create. Defaults to the artifact path with a .tar.lz4 extension. - max_file_size: the (approximate) maximum size of each file. - metadata_out: A dictionary that is updated with the metadata of the artifact (if provided). - verbose: Whether to display a progress bar when packaging. - - Returns: - The path of the package created. - """ - if package_path is None: - package_path = str(self.path) + '.tar.lz4' - - metadata = { - 'expected_sha256': None, - 'total_size': 0, - 'contents': [], - } - - chunk_num = 0 - chunk_start_offset = 0 - def manage_maxsize(_: None): - nonlocal raw_fout - nonlocal chunk_num - nonlocal chunk_start_offset - if max_file_size is not None and raw_fout.tell() >= max_file_size: - raw_fout.flush() - chunk_start_offset += raw_fout.tell() - raw_fout.close() - if chunk_num == 0: - metadata['segments'] = [] - metadata['segments'].append({'idx': chunk_num, 'offset': 0}) - chunk_num += 1 - if verbose: - print(f'starting segment {chunk_num}') - metadata['segments'].append({'idx': chunk_num, 'offset': chunk_start_offset}) - raw_fout = stack.enter_context(pt.io.finalized_open(f'{package_path}.{chunk_num}', 'b')) - sha256_fout.replace_writer(raw_fout) - - with contextlib.ExitStack() as stack: - raw_fout = stack.enter_context(pt.io.finalized_open(f'{package_path}.{chunk_num}', 'b')) - sha256_fout = stack.enter_context(pt.io.HashWriter(raw_fout)) - lz4_fout = stack.enter_context(LZ4FrameFile(sha256_fout, 'wb')) - tarout = stack.enter_context(tarfile.open(fileobj=lz4_fout, mode='w')) - - added_dirs = set() - for rel_path, file in self._package_files(): - path_dir, name = os.path.split(rel_path) - if path_dir and path_dir not in added_dirs: - tar_record = tarfile.TarInfo(path_dir) - tar_record.type = tarfile.DIRTYPE - tarout.addfile(tar_record) - added_dirs.add(path_dir) - lz4_fout.flush() # flush before each file, allowing seeking directly to this file within the archive - tar_record = tarfile.TarInfo(rel_path) - if isinstance(file, io.BytesIO): - tar_record.size = file.getbuffer().nbytes - else: - tar_record.size = os.path.getsize(file) - metadata['contents'].append({ - 'path': rel_path, - 'size': tar_record.size, - 'offset': chunk_start_offset + raw_fout.tell(), - }) - metadata['total_size'] += tar_record.size - if verbose: - print(f'adding {rel_path} [{pt.utils.byte_count_to_human_readable(tar_record.size)}]') - - if isinstance(file, io.BytesIO): - file.seek(0) - if rel_path == 'pt_meta.json' and metadata_out is not None: - metadata_out.update(json.load(file)) - file.seek(0) - with pt.io.CallbackReader(file, manage_maxsize) as fin: - tarout.addfile(tar_record, fin) - else: - with open(file, 'rb') as fin, \ - pt.io.CallbackReader(fin, manage_maxsize) as fin: - tarout.addfile(tar_record, fin) - if rel_path == 'pt_meta.json' and metadata_out is not None: - with open(file, 'rb') as fin: - metadata_out.update(json.load(fin)) - - tarout.close() - lz4_fout.close() - - metadata['expected_sha256'] = sha256_fout.hexdigest() - - metadata_outf = stack.enter_context(pt.io.finalized_open(f'{package_path}.json', 't')) - json.dump(metadata, metadata_outf) - metadata_outf.write('\n') - - if chunk_num == 0: - # no chunking was actually done, can use provided name directly - os.rename(f'{package_path}.{chunk_num}', package_path) - - return package_path - def _load_metadata(path: str) -> Dict: """Load the metadata file for the artifact at the specified path.