diff --git a/dandi/cli/cmd_download.py b/dandi/cli/cmd_download.py index f9e92718f..373461542 100644 --- a/dandi/cli/cmd_download.py +++ b/dandi/cli/cmd_download.py @@ -25,6 +25,13 @@ default="refresh", show_default=True, ) +@click.option( + "-f", + "--format", + help="Choose the format/frontend for output. TODO: support all of the ls", + type=click.Choice(["pyout", "debug"]), + default="pyout", +) @click.option( "-J", "--jobs", @@ -48,12 +55,16 @@ ) @click.argument("url", nargs=-1) @map_to_click_exceptions -def download(url, output_dir, existing, jobs=6, develop_debug=False): +def download(url, output_dir, existing, jobs, format): """Download a file or entire folder from DANDI""" # First boring attempt at click commands being merely an interface to # Python function from ..download import download return download( - url, output_dir, existing=existing, jobs=jobs, develop_debug=develop_debug + url, + output_dir, + existing=existing, + format=format, + jobs=jobs, # develop_debug=develop_debug ) diff --git a/dandi/cli/cmd_ls.py b/dandi/cli/cmd_ls.py index 2737053a7..7ed7e3f46 100644 --- a/dandi/cli/cmd_ls.py +++ b/dandi/cli/cmd_ls.py @@ -82,7 +82,7 @@ def ls(paths, fields=None, format="auto", recursive=False): if fields and fields[0] != "path": # we must always have path - our "id" fields = ["path"] + fields - out = PYOUTFormatter(files=files, fields=fields) + out = PYOUTFormatter(fields=fields) elif format == "json": out = JSONFormatter() elif format == "json_pp": diff --git a/dandi/cli/formatter.py b/dandi/cli/formatter.py index 4ce134b8f..dd851531b 100644 --- a/dandi/cli/formatter.py +++ b/dandi/cli/formatter.py @@ -55,8 +55,7 @@ def __call__(self, rec): class PYOUTFormatter(pyout.Tabular): - def __init__(self, files, fields): - # max_filename_len = max(map(lambda x: len(op.basename(x)), files)) + def __init__(self, fields): PYOUT_STYLE = pyouts.get_style(hide_if_missing=not fields) kw = dict(style=PYOUT_STYLE) diff --git a/dandi/consts.py b/dandi/consts.py index 2f9d45567..eab3d3fbe 100644 --- a/dandi/consts.py +++ b/dandi/consts.py @@ -4,6 +4,8 @@ # A list of metadata fields which dandi extracts from .nwb files. # Additional fields (such as `number_of_*`) might be added by the # get_metadata` +import os + metadata_nwb_file_fields = ( "experiment_description", "experimenter", @@ -73,23 +75,31 @@ dandiset_metadata_file = "dandiset.yaml" dandiset_identifier_regex = "^[0-9]{6}$" -dandi_instance = namedtuple("dandi_instance", ("girder", "gui", "redirector")) +dandi_instance = namedtuple("dandi_instance", ("girder", "gui", "redirector", "api")) known_instances = { "local-girder-only": dandi_instance( - "http://localhost:8080", None, None + "http://localhost:8080", None, None, None ), # just pure girder # Redirector: TODO https://github.com/dandi/dandiarchive/issues/139 "local-docker": dandi_instance( - "http://localhost:8080", "http://localhost:8085", None + "http://localhost:8080", + "http://localhost:8085", + None, + "http://localhost:9000", # ATM it is minio, not sure where /api etc + # may be https://github.com/dandi/dandi-publish/pull/71 would help ), "local-docker-tests": dandi_instance( - "http://localhost:8081", "http://localhost:8086", "http://localhost:8079" + "http://localhost:8081", + "http://localhost:8086", + "http://localhost:8079", + None, # TODO: https://github.com/dandi/dandi-cli/issues/164 ), "dandi": dandi_instance( "https://girder.dandiarchive.org", "https://gui.dandiarchive.org", "https://dandiarchive.org", + "https://publish.dandiarchive.org/api", # ? might become api. ), } # to map back url: name @@ -100,6 +110,14 @@ file_operation_modes = ["dry", "simulate", "copy", "move", "hardlink", "symlink"] +# +# Download (upload?) specific constants +# +# Chunk size when iterating a download (and upload) body. Taken from girder-cli +# TODO: should we make them smaller for download than for upload? +# ATM used only in download +MAX_CHUNK_SIZE = int(os.environ.get("DANDI_MAX_CHUNK_SIZE", 1024 * 1024 * 8)) # 64 + # # Some routes # TODO: possibly centralize in dandi-common from our redirection service diff --git a/dandi/dandiapi.py b/dandi/dandiapi.py new file mode 100644 index 000000000..930c56794 --- /dev/null +++ b/dandi/dandiapi.py @@ -0,0 +1,335 @@ +from contextlib import contextmanager + +import requests + +from . import get_logger +from .utils import ensure_datetime +from .consts import MAX_CHUNK_SIZE + +lgr = get_logger() + + +# Following class is loosely based on GirderClient, with authentication etc +# being stripped. +# TODO: add copyright/license info +class RESTFullAPIClient(object): + """A base class for REST clients""" + + def __init__(self, api_url): + self.api_url = api_url + self._session = None + + @contextmanager + def session(self, session=None): + """ + Use a :class:`requests.Session` object for all outgoing requests. + If `session` isn't passed into the context manager + then one will be created and yielded. Session objects are useful for enabling + persistent HTTP connections as well as partially applying arguments to many + requests, such as headers. + + Note: `session` is closed when the context manager exits, regardless of who + created it. + + .. code-block:: python + + with client.session() as session: + session.headers.update({'User-Agent': 'myapp 1.0'}) + + for item in items: + client.downloadItem(item, fh) + + In the above example, each request will be executed with the User-Agent header + while reusing the same TCP connection. + + :param session: An existing :class:`requests.Session` object, or None. + """ + self._session = session if session else requests.Session() + + try: + yield self._session + finally: + # close only if we started a new one + if not session: + self._session.close() + self._session = None + + def _request_func(self, method): + if self._session is not None: + return getattr(self._session, method.lower()) + else: + return getattr(requests, method.lower()) + + def send_request( + self, + method, + path, + parameters=None, + data=None, + files=None, + json=None, + headers=None, + json_resp=True, + **kwargs, + ): + """ + This method looks up the appropriate method, constructs a request URL + from the base URL, path, and parameters, and then sends the request. If + the method is unknown or if the path is not found, an exception is + raised, otherwise a JSON object is returned with the response. + + This is a convenience method to use when making basic requests that do + not involve multipart file data that might need to be specially encoded + or handled differently. + + :param method: The HTTP method to use in the request (GET, POST, etc.) + :type method: str + :param path: A string containing the path elements for this request. + Note that the path string should not begin or end with the path separator, '/'. + :type path: str + :param parameters: A dictionary mapping strings to strings, to be used + as the key/value pairs in the request parameters. + :type parameters: dict + :param data: A dictionary, bytes or file-like object to send in the body. + :param files: A dictionary of 'name' => file-like-objects for multipart encoding upload. + :type files: dict + :param json: A JSON object to send in the request body. + :type json: dict + :param headers: If present, a dictionary of headers to encode in the request. + :type headers: dict + :param json_resp: Whether the response should be parsed as JSON. If False, the raw + response object is returned. To get the raw binary content of the response, + use the ``content`` attribute of the return value, e.g. + + .. code-block:: python + + resp = client.get('my/endpoint', json_resp=False) + print(resp.content) # Raw binary content + print(resp.headers) # Dict of headers + + :type json_resp: bool + """ + if not parameters: + parameters = {} + + # Look up the HTTP method we need + f = self._request_func(method) + + url = self.get_url(path) + + # Make the request, passing parameters and authentication info + _headers = headers or {} + + if json_resp and "accept" not in _headers: + _headers["accept"] = "application/json" + + result = f( + url, + params=parameters, + data=data, + files=files, + json=json, + headers=_headers, + **kwargs, + ) + + # If success, return the json object. Otherwise throw an exception. + if not result.ok: + raise requests.HTTPError( + f"Error {result.status_code} while sending {method} request to {url}", + response=result, + ) + + if json_resp: + return result.json() + else: + return result + + def get_url(self, path): + # Construct the url + if self.api_url.endswith("/") and path.startswith("/"): + path = path[1:] + url = self.api_url + path + return url + + def get(self, path, parameters=None, json_resp=True): + """ + Convenience method to call :py:func:`send_request` with the 'GET' HTTP method. + """ + return self.send_request("GET", path, parameters, json_resp=json_resp) + + def post( + self, + path, + parameters=None, + files=None, + data=None, + json=None, + headers=None, + json_resp=True, + ): + """ + Convenience method to call :py:func:`send_request` with the 'POST' HTTP method. + """ + return self.send_request( + "POST", + path, + parameters, + files=files, + data=data, + json=json, + headers=headers, + json_resp=json_resp, + ) + + def put(self, path, parameters=None, data=None, json=None, json_resp=True): + """ + Convenience method to call :py:func:`send_request` with the 'PUT' + HTTP method. + """ + return self.send_request( + "PUT", path, parameters, data=data, json=json, json_resp=json_resp + ) + + def delete(self, path, parameters=None, json_resp=True): + """ + Convenience method to call :py:func:`send_request` with the 'DELETE' HTTP method. + """ + return self.send_request("DELETE", path, parameters, json_resp=json_resp) + + def patch(self, path, parameters=None, data=None, json=None, json_resp=True): + """ + Convenience method to call :py:func:`send_request` with the 'PATCH' HTTP method. + """ + return self.send_request( + "PATCH", path, parameters, data=data, json=json, json_resp=json_resp + ) + + +class DandiAPIClient(RESTFullAPIClient): + def get_asset(self, dandiset_id, version, uuid): + """ + + /dandisets/{version__dandiset__pk}/versions/{version__version}/assets/{uuid}/ + + Parameters + ---------- + dandiset_id + version + uuid + + Returns + ------- + + """ + return self.get(f"/dandisets/{dandiset_id}/versions/{version}/assets/{uuid}/") + + def get_dandiset(self, dandiset_id, version): + return self.get(f"/dandisets/{dandiset_id}/versions/{version}/") + + def get_dandiset_assets(self, dandiset_id, version, location=None, page_size=None): + """A generator to provide asset records + """ + if location is not None: + raise NotImplementedError( + "location specific query. See https://github.com/dandi/dandi-publish/issues/77" + ) + # although we could just provide ad-hoc implementation here for now. TODO + if page_size is not None: + raise NotImplementedError("paginated query is not supported yet") + page_size = 1000000 + resp = self.get( + f"/dandisets/{dandiset_id}/versions/{version}/assets/", + parameters={"page_size": page_size}, + ) + try: + assert not resp.get( + "next" + ), "ATM we do not support pagination and result should have not been paginated" + assert not resp.get("prev") + results = resp.get("results", []) + assert len(results) == resp.get("count") + # Just some sanity checks for now, but might change, see + # https://github.com/dandi/dandi-publish/issues/79 + assert all( + r.get("version", {}).get("dandiset", {}).get("identifier") + == dandiset_id + for r in results + ) + assert all(r.get("version", {}).get("version") == version for r in results) + except AssertionError: + lgr.error( + f"Some expectations on returned /assets/ for {dandiset_id}@{version} are violated" + ) + raise + # Things might change, so let's just return only "relevant" ATM information + # under assumption that assets belong to the current version of the dataset requested + # results_ = [ + # {k: r[k] for k in ("path", "uuid", "size", "sha256", "metadata") if k in r} + # for r in results + # ] + for r in results: + # check for paranoid Yarik with current multitude of checksums + # r['sha256'] is what "dandi-publish" computed, but then + # metadata could contain multiple digests computed upon upload + metadata = r.get("metadata") + if ( + "sha256" in r + and "sha256" in metadata + and metadata["sha256"] != r["sha256"] + ): + lgr.warning("sha256 mismatch for %s" % str(r)) + # There is no "modified" time stamp and "updated" also shows something + # completely different, so if "modified" is not there -- we will try to + # get it from metadata + if "modified" not in r and metadata: + uploaded_mtime = metadata.get("uploaded_mtime") + if uploaded_mtime: + r["modified"] = ensure_datetime(uploaded_mtime) + yield r + + def get_dandiset_and_assets(self, dandiset_id, version, location=None): + """This is pretty much an adapter to provide "harmonized" output in both + girder and DANDI api clients. + + Harmonization should happen toward DADNDI API BUT AFAIK it is still influx + """ + # Fun begins! + location_ = "/" + location if location else "" + lgr.info(f"Traversing {dandiset_id}{location_} (version: {version})") + + # TODO: get all assets + # 1. includes sha256, created, updated but those are of "girder" level + # so lack "uploaded_mtime" and uploaded_nwb_object_id forbidding logic for + # deducing necessity to update/move. But we still might want to rely on its + # sha256 instead of metadata since older uploads would not have that metadata + # in them + # 2. there is no API to list assets given a location + # + # Get dandiset information + dandiset = self.get_dandiset(dandiset_id, version) + # TODO: location + assets = self.get_dandiset_assets(dandiset_id, version, location=location) + return dandiset, assets + + def get_download_file_iter( + self, dandiset_id, version, uuid, chunk_size=MAX_CHUNK_SIZE + ): + url = self.get_url( + f"/dandisets/{dandiset_id}/versions/{version}/assets/{uuid}/download/" + ) + + def downloader(): + lgr.debug("Starting download from %s", url) + result = (self._session if self._session else requests).get( + url, stream=True + ) + # TODO: apparently we might need retries here as well etc + # if result.status_code not in (200, 201): + result.raise_for_status() + + for chunk in result.raw.stream(chunk_size, decode_content=False): + if chunk: # could be some "keep alive"? + yield chunk + + return downloader diff --git a/dandi/download.py b/dandi/download.py index 40b7ce1f5..6c39d7b2a 100644 --- a/dandi/download.py +++ b/dandi/download.py @@ -1,12 +1,29 @@ +import hashlib + +import os import os.path as op +import random import re import requests +import sys +import time + +from urllib.parse import unquote as urlunquote +from .dandiapi import DandiAPIClient from . import girder, get_logger -from .consts import dandiset_metadata_file, known_instances, metadata_digests +from .consts import ( + dandiset_metadata_file, + known_instances, + known_instances_rev, + metadata_digests, +) from .dandiset import Dandiset from .exceptions import FailedToConnectError, NotFoundError, UnknownURLError -from .utils import flatten, flattened, Parallel, delayed, get_instance +from .utils import flattened, is_same_time, get_instance + +import humanize +from .support.pyout import naturalsize lgr = get_logger() @@ -16,6 +33,7 @@ class _dandi_url_parser: # into module space, and later we might end up with classes for those anyways id_regex = "[a-f0-9]{24}" id_grp = f"(?P{id_regex})" + dandiset_id_grp = "(?P[0-9]{6})" server_grp = "(?P(?Phttps?)://(?P[^/]+)/)" known_urls = { # Those we first redirect and then handle the redirected URL @@ -24,6 +42,12 @@ class _dandi_url_parser: # handle_redirect: # - 'pass' - would continue with original url if no redirect happen # - 'only' - would interrupt if no redirection happens + # server_type: + # - 'girder' - the default/old + # - 'api' - the "new" (as of 20200715 state of various PRs) + # rewrite: + # - callable -- which would rewrite that "URI" + "DANDI:": {"rewrite": lambda x: "https://identifiers.org/" + x}, "https?://dandiarchive.org/.*": {"handle_redirect": "pass"}, "https?://identifiers.org/DANDI:.*": {"handle_redirect": "pass"}, "https?://[^/]*dandiarchive-org.netlify.app/.*": {"map_instance": "dandi"}, @@ -35,16 +59,30 @@ class _dandi_url_parser: f"{server_grp}#/folder/{id_regex}/selected(?P(/item\\+{id_grp})+)$": {}, # Direct girder urls to items f"{server_grp}api/v1/(?Pitem)/{id_grp}/download$": {}, + # New DANDI API + # https://deploy-preview-341--gui-dandiarchive-org.netlify.app/#/dandiset/000006/0.200714.1807 + # https://deploy-preview-341--gui-dandiarchive-org.netlify.app/#/dandiset/000006/0.200714.1807/files + # https://deploy-preview-341--gui-dandiarchive-org.netlify.app/#/dandiset/000006/0.200714.1807/files?location=%2Fsub-anm369962%2F + # But for drafts files navigator it is a different beast: + # https://deploy-preview-341--gui-dandiarchive-org.netlify.app/#/dandiset/000027/draft/files?_id=5f176583f63d62e1dbd06943&_modelType=folder + f"{server_grp}#.*/(?Pdandiset)/{dandiset_id_grp}" + "/(?P([.0-9]{5,}|draft))" + "(/files(\\?location=(?P.*)?)?)?" + f"(/files(\\?_id={id_grp}(&_modelType=folder)?)?)?" + "$": {"server_type": "api"}, + # https://deploy-preview-341--gui-dandiarchive-org.netlify.app/#/dandiset/000006/draft + # (no API yet) "https?://.*": {"handle_redirect": "only"}, } # We might need to remap some assert_types map_asset_types = {"dandiset": "folder"} # And lets create our mapping into girder instances from known_instances: - map_to_girder = {} - for girder, *_ in known_instances.values(): # noqa: F402 - for h in _: + map_to = {"girder": {}, "api": {}} + for girder, gui, redirector, api in known_instances.values(): # noqa: F402 + for h in (gui, redirector): if h: - map_to_girder[h] = girder + map_to["girder"][h] = girder + map_to["api"][h] = api @classmethod def parse(cls, url, *, map_instance=True): @@ -83,7 +121,7 @@ def parse(cls, url, *, map_instance=True): Returns ------- - server, asset_type, asset_id + server_type, server, asset_type, asset_id asset_type is either asset_id or folder ATM. asset_id might be a list in case of multiple files @@ -96,8 +134,14 @@ def parse(cls, url, *, map_instance=True): match = re.match(regex, url) if not match: continue + rewrite = settings.get("rewrite", False) handle_redirect = settings.get("handle_redirect", False) - if handle_redirect: + if rewrite: + assert not handle_redirect + assert not settings.get("map_instance") + new_url = rewrite(url) + return cls.parse(new_url) + elif handle_redirect: assert handle_redirect in ("pass", "only") new_url = cls.follow_redirect(url) if new_url != url: @@ -114,16 +158,22 @@ def parse(cls, url, *, map_instance=True): ) elif settings.get("map_instance"): if map_instance: - server, *_ = cls.parse(url, map_instance=False) + server_type, server, *_ = cls.parse(url, map_instance=False) if settings["map_instance"] not in known_instances: raise ValueError( "Unknown instance {}. Known are: {}".format( settings["map_instance"], ", ".join(known_instances) ) ) - return (get_instance(settings["map_instance"]).girder,) + tuple(_) + known_instance = get_instance(settings["map_instance"]) + # for consistency, add + server = getattr(known_instance, server_type) + if not server.endswith("/"): + server += "/" + return (server_type, server) + tuple(_) continue # in this run we ignore an match further else: + server_type = settings.get("server_type", "girder") break if not match: @@ -137,22 +187,58 @@ def parse(cls, url, *, map_instance=True): ) groups = match.groupdict() - girder_server = cls.map_to_girder.get( - groups["server"].rstrip("/"), groups["server"] - ) - if not girder_server.endswith("/"): - girder_server += "/" # we expected '/' to be there so let it be - - if "multiitem" not in groups: - # we must be all set - asset_ids = [groups["id"]] - asset_type = groups["asset_type"] - asset_type = cls.map_asset_types.get(asset_type, asset_type) + url_server = groups["server"] + server = cls.map_to[server_type].get(url_server.rstrip("/"), url_server) + + if not server.endswith("/"): + server += "/" # we expected '/' to be there so let it be + + if server_type == "girder": + if "multiitem" not in groups: + # we must be all set + asset_ids = [groups["id"]] + asset_type = groups["asset_type"] + asset_type = cls.map_asset_types.get(asset_type, asset_type) + else: + # we need to split/parse them and return a list + asset_ids = [ + i.split("+")[1] for i in groups["multiitem"].split("/") if i + ] + asset_type = "item" + elif server_type == "api": + asset_type = groups.get("asset_type") + dandiset_id = groups.get("dandiset_id") + version = groups.get("version") + location = groups.get("location") + if location: + location = urlunquote(location) + # ATM carries leading '/' which IMHO is not needed/misguiding somewhat, so + # I will just strip it + location = location.lstrip("/") + if not (asset_type == "dandiset" and dandiset_id): + raise ValueError(f"{url} does not point to a dandiset") + if not version: + raise NotImplementedError( + f"{url} does not point to a specific version (or draft). DANDI ppl should " + f"decide what should be a behavior in such cases" + ) + # Let's just return a structured record for the requested asset + asset_ids = {"dandiset_id": dandiset_id, "version": version} + # if location is not degenerate -- it would be a folder or a file + if location: + if location.endswith("/"): + asset_type = "folder" + else: + asset_type = "item" + asset_ids["location"] = location + # TODO: remove whenever API supports "draft" and this type of url + if groups.get("id"): + assert version == "draft" + asset_ids["folder_id"] = groups["id"] + asset_type = "folder" else: - # we need to split/parse them and return a list - asset_ids = [i.split("+")[1] for i in groups["multiitem"].split("/") if i] - asset_type = "item" - ret = girder_server, asset_type, asset_ids + raise RuntimeError(f"must not happen. We got {server_type}") + ret = server_type, server, asset_type, asset_ids lgr.debug("Parsed into %s", ret) return ret @@ -177,17 +263,75 @@ def follow_redirect(url): follow_redirect = _dandi_url_parser.follow_redirect -def download( +def download(urls, output_dir, *, format="pyout", existing="error", jobs=1): + # TODO: unduplicate with upload. For now stole from that one + # We will again use pyout to provide a neat table summarizing our progress + # with upload etc + import pyout + from .support import pyout as pyouts + + # dandi.cli.formatters are used in cmd_ls to provide switchable + pyout_style = pyouts.get_style(hide_if_missing=False) + + rec_fields = ("path", "size", "done", "done%", "checksum", "status", "message") + out = pyout.Tabular(style=pyout_style, columns=rec_fields, max_workers=jobs) + + out_helper = PYOUTHelper() + pyout_style["done"] = pyout_style["size"].copy() + pyout_style["size"]["aggregate"] = out_helper.agg_size + pyout_style["done"]["aggregate"] = out_helper.agg_done + + # I thought I was making a beautiful flower but ended up with cacti + # which never blooms... All because assets are looped through inside download_generator + # TODO: redo + kw = dict(assets_it=out_helper.it) + if jobs > 1 and format == "pyout": + # It could handle delegated to generator downloads + kw["yield_generator_for_fields"] = rec_fields[1:] # all but path + + gen_ = download_generator(urls, output_dir, existing=existing, **kw) + + # TODOs: + # - redo frontends similarly to how command_ls did it + # - have a single loop with analysis of `rec` to either any file + # has failed to download. If any was: exception should probably be + # raised. API discussion for Python side of API: + # + if format == "debug": + for rec in gen_: + print(rec) + sys.stdout.flush() + elif format == "pyout": + with out: + for rec in gen_: + out(rec) + else: + raise ValueError(format) + + +def download_generator( urls, output_dir, *, + assets_it=None, + yield_generator_for_fields=None, existing="error", - jobs=6, - develop_debug=False, - authenticate=False, # Seems to work just fine for public stuff - recursive=True, ): - """Download a file or entire folder from DANDI""" + """A generator for downloads of files, folders, or entire dandiset from DANDI + (as identified by URL) + + This function is a generator which would yield records on ongoing activities. + Activites include traversal of the remote resource (DANDI archive), download of + individual assets while yielding records (TODO: schema) while validating their + checksums "on the fly", etc. + + Parameters + ---------- + assets_it: IteratorWithAggregation + which will be set .gen to assets. Purpose is to make it possible to get + summary statistics while already downloading. TODO: reimplement properly! + + """ urls = flattened([urls]) if len(urls) > 1: raise NotImplementedError("multiple URLs not supported") @@ -197,112 +341,442 @@ def download( # on which instance it exists! Thus ATM we would do nothing but crash raise NotImplementedError("No URLs were provided. Cannot download anything") url = urls[0] - girder_server_url, asset_type, asset_id = parse_dandi_url(url) + server_type, server_url, asset_type, asset_id = parse_dandi_url(url) # We could later try to "dandi_authenticate" if run into permission issues. # May be it could be not just boolean but the "id" to be used? - client = girder.get_client( - girder_server_url, - authenticate=authenticate, - progressbars=True, # TODO: redo all this - ) - - lgr.info(f"Downloading {asset_type} with id {asset_id} from {girder_server_url}") - - # there might be multiple asset_ids, e.g. if multiple files were selected etc, - # so we will traverse all of them - files = flatten( - _get_asset_files( - asset_id_, asset_type, output_dir, client, authenticate, existing, recursive + # TODO: remove whenever API starts to support drafts in an unknown version + if server_type == "api" and asset_id.get("version") == "draft": + asset_id, asset_type, client, server_type = _map_to_girder(url) + args = asset_id, asset_type + elif server_type == "girder": + client = girder.get_client( + server_url, authenticate=False, progressbars=True # TODO: redo all this ) - for asset_id_ in set(flattened([asset_id])) - ) - - Parallel(n_jobs=jobs, backend="threading")( - delayed(client.download_file)( - file["id"], - op.join(output_dir, file["path"]), - existing=existing, - attrs=file["attrs"], - # TODO: make it less "fluid" to not breed a bug where we stop verifying - # for e.g. digests move - digests={ - d: file.get("metadata")[d] - for d in metadata_digests - if d in file.get("metadata", {}) - }, + args = asset_id, asset_type + elif server_type == "api": + client = DandiAPIClient(server_url) + args = (asset_id["dandiset_id"], asset_id["version"], asset_id.get("location")) + else: + raise NotImplementedError( + f"Download from server of type {server_type} is not yet implemented" ) - for file in files - ) + with client.session(): + dandiset, assets = client.get_dandiset_and_assets( + *args + ) # , recursive=recursive) + if assets_it: + assets_it.gen = assets + assets = assets_it + dandiset_path = ( + op.join(output_dir, dandiset["dandiset"]["identifier"]) + if dandiset + else None + ) + # TODO: if we are ALREADY in a dandiset - we can validate that it is the + # same dandiset and use that dandiset path as the one to download under -def _get_asset_files( - asset_id, asset_type, output_dir, client, authenticate, existing, recursive -): - # asset_rec = client.getResource(asset_type, asset_id) - # lgr.info("Working with asset %s", str(asset_rec)) - # In principle Girder's client already has ability to download any - # resource (collection/folder/item/file). But it seems that "mending" it - # with custom handling (e.g. later adding filtering to skip some files, - # or add our own behavior on what to do when files exist locally, etc) would - # not be easy. So we will reimplement as a two step (kinda) procedure. - # Return a generator which would be traversing girder and yield records - # of encountered resources. - # TODO later: may be look into making it async - # First we access top level records just to sense what we are working with - top_entities = None - while True: - try: - # this one should enhance them with "fullpath" - top_entities = list( - client.traverse_asset(asset_id, asset_type, recursive=False) + # TODO: do analysis of assets for early detection of needed renames etc + # to avoid any need for late treatment of existing and also for + # more efficient download if files are just renamed etc + + # Handle our so special dandiset.yaml + if dandiset: + for resp in _populate_dandiset_yaml( + dandiset_path, + dandiset.get("metadata", {}).get("dandiset", {}), + existing == "overwrite", + ): + yield dict(path=dandiset_metadata_file, **resp) + + for asset in assets: + # unavoidable ugliness since girder and API have different "scopes" for + # identifying an asset + digests_from_metadata = { + d: asset.get("metadata")[d] + for d in metadata_digests + if d in asset.get("metadata", {}) + } + if server_type == "girder": + down_args = (asset["id"],) + digests = digests_from_metadata + elif server_type == "api": + # Even worse to get them from the asset record which also might have its return + # record still changed, https://github.com/dandi/dandi-publish/issues/79 + down_args = args[:2] + (asset["uuid"],) + if "sha256" not in asset: + lgr.warning("For some reason - there no sha256 in %s", str(asset)) + digests = digests_from_metadata + else: + digests = {"sha256": asset["sha256"]} + if ( + "sha256" in digests_from_metadata + and asset["sha256"] != digests_from_metadata["sha256"] + ): + lgr.warning( + "Metadata seems to be outdated since API returned different " + "sha256 for %(path)s", + asset, + ) + + path = asset["path"].lstrip("/") # make into relative path + path = download_path = op.normpath(path) + if dandiset_path: # place under dandiset directory + download_path = op.join(dandiset_path, path) + else: + download_path = op.join(output_dir, path) + + downloader = client.get_download_file_iter(*down_args) + + # Get size from the metadata, although I guess it could be returned directly + # by server while establishing downloader... but it seems that girder itself + # does get it from the "file" resource, not really from direct URL. So I guess + # we will just follow. For now we must find it in "attrs" + _download_generator = _download_file( + downloader, + download_path, + # size and modified generally should be there but better to redownload + # than to crash + size=asset.get("size"), + mtime=asset.get("modified"), + existing=existing, + digests=digests, ) - break - except girder.gcl.HttpError as exc: - if not authenticate and girder.is_access_denied(exc): - lgr.warning("unauthenticated access denied, let's authenticate") - client.dandi_authenticate() - continue - raise - entity_type = list(set(e["type"] for e in top_entities)) - if len(entity_type) > 1: - raise ValueError( - f"Please point to a single type of entity - either dandiset(s)," - f" folder(s) or file(s). Got: {entity_type}" + + if yield_generator_for_fields: + yield {"path": path, yield_generator_for_fields: _download_generator} + else: + for resp in _download_generator: + yield dict(resp, path=path) + + +class ItemsSummary: + """A helper "structure" to accumulate information about assets to be downloaded + + To be used as a callback to IteratorWithAggregation + """ + + def __init__(self): + self.files = 0 + # TODO: get rid of needing it + self.t0 = None # when first record is seen + self.size = 0 + self.has_unknown_sizes = False + + def as_dict(self): + return {a: getattr(self, a) for a in ("files", "size", "has_unknown_sizes")} + + def __call__(self, rec, prior=None): + assert prior in (None, self) + if not self.files: + self.t0 = time.time() + self.files += 1 + size = rec.get("size") + if size is not None: + self.size += size + elif rec.get("path", "") == "dandiset.yaml": + # again -- so special. TODO: make it a proper file + pass + else: + self.has_unknown_sizes = True + return self + + +class PYOUTHelper: + """Helper for PYOUT styling + + Provides aggregation callbacks for PyOUT and also an iterator to be wrapped around + iterating over assets, so it would get "totals" as soon as they are available. + """ + + def __init__(self): + # Establish "fancy" download while still possibly traversing the dandiset + # functionality. + from .support.iterators import IteratorWithAggregation + + self.items_summary = ItemsSummary() + self.it = IteratorWithAggregation( + # unfortunately Yarik missed the point that we need to wrap + # "assets" generator within downloader_generator + # so we do not have assets here! Ad-hoc solution for now is to + # pass this beast so it could get .gen set within downloader_generator + None, # download_generator(urls, output_dir, existing=existing), + self.items_summary, ) - entity_type = entity_type[0] - if entity_type in ("dandiset", "folder"): - # redo recursively - lgr.info( - "Traversing remote %ss (%s) recursively and downloading them " "locally", - entity_type, - ", ".join(e["name"] for e in top_entities), + + def agg_files(self, *ignored): + ret = str(self.items_summary.files) + if not self.it.finished: + ret += "+" + return ret + + def agg_size(self, sizes): + """Formatter for "size" column where it would show + + how much is "active" (or done) + +how much yet to be "shown". + """ + active = sum(sizes) + if (active, self.items_summary.size) == (0, 0): + return "" + v = [naturalsize(active)] + if not self.it.finished or ( + active != self.items_summary.size or self.items_summary.has_unknown_sizes + ): + extra = self.items_summary.size - active + if extra < 0: + lgr.debug("Extra size %d < 0 -- must not happen", extra) + else: + extra_str = "+%s" % naturalsize(extra) + if not self.it.finished: + extra_str = ">" + extra_str + if self.items_summary.has_unknown_sizes: + extra_str += "+?" + v.append(extra_str) + return v + + def agg_done(self, done_sizes): + """Formatter for "DONE" column + """ + done = sum(done_sizes) + if self.it.finished and done == 0 and self.items_summary.size == 0: + # even with 0s everywhere consider it 100% + r = 1.0 + elif self.items_summary.size: + r = done / self.items_summary.size + else: + r = 0 + pref = "" + if not self.it.finished: + pref += "<" + if self.items_summary.has_unknown_sizes: + pref += "?" + v = [naturalsize(done), "%s%.2f%%" % (pref, 100 * r)] + if ( + done + and self.items_summary.t0 is not None + and r + and self.items_summary.size != 0 + ): + dt = time.time() - self.items_summary.t0 + more_time = dt / r if r != 1 else 0 + more_time_str = humanize.naturaldelta(more_time) + if not self.it.finished: + more_time_str += "<" + if self.items_summary.has_unknown_sizes: + more_time_str += "+?" + if more_time: + v.append("ETA: %s" % more_time_str) + return v + + +def _map_to_girder(url): + """ + "draft" datasets are not yet supported through our DANDI API. So we need to + perform special handling for now: discover girder_id for it and then proceed + with girder + """ + # This is a duplicate call from above but it is cheap, so decided to just redo + # it here instead of passing all the variables + url + server_type, server_url, asset_type, asset_id = parse_dandi_url(url) + server_url = known_instances[known_instances_rev[server_url.rstrip("/")]].girder + server_type = "girder" + client = girder.get_client(server_url, authenticate=False, progressbars=True) + # TODO: RF if https://github.com/dandi/dandiarchive/issues/316 gets addressed + # A hybrid UI case not yet adjusted for drafts API. + # TODO: remove whenever it is gone in an unknown version + if asset_id.get("folder_id"): + asset_type = "folder" + asset_id = [asset_id.get("folder_id")] + else: + girder_path = op.join("drafts", asset_id["dandiset_id"]) + asset_type = "folder" + if asset_id.get("location"): + # Not implemented by UI ATM but might come + girder_path = op.join(girder_path, asset_id["location"]) + try: + girder_rec = girder.lookup(client, girder_path) + except BaseException: + lgr.warning(f"Failed to lookup girder information for {girder_path}") + girder_rec = None + if not girder_rec: + raise RuntimeError(f"Cannot download from {url}") + asset_id = girder_rec.get("_id") + return asset_id, asset_type, client, server_type + + +def _skip_file(msg): + return {"status": "skipped", "message": str(msg)} + + +def _populate_dandiset_yaml(dandiset_path, metadata, overwrite): + if not metadata: + lgr.warning( + "Got completely empty metadata record for dandiset, not producing dandiset.yaml" ) - entities = client.traverse_asset(asset_id, asset_type, recursive=recursive) - # TODO: special handling for a dandiset -- we might need to - # generate dandiset.yaml out of the metadata record - # we care only about files ATM - files = (e for e in entities if e["type"] == "file") - elif entity_type == "file": - files = top_entities + return + dandiset_yaml = op.join(dandiset_path, dandiset_metadata_file) + yield {"message": "updating"} + lgr.debug(f"Updating {dandiset_metadata_file} from obtained dandiset " f"metadata") + if op.lexists(dandiset_yaml) and not overwrite: + yield _skip_file("already exists") + return else: - raise ValueError(f"Unexpected entity type {entity_type}") - if entity_type == "dandiset": - for e in top_entities: - dandiset_path = op.join(output_dir, e["path"]) - dandiset_yaml = op.join(dandiset_path, dandiset_metadata_file) - lgr.info( - f"Updating {dandiset_metadata_file} from obtained dandiset " f"metadata" + dandiset = Dandiset(dandiset_path, allow_empty=True) + dandiset.path_obj.mkdir(exist_ok=True) # exist_ok in case of parallel race + old_metadata = dandiset.metadata + dandiset.update_metadata(metadata) + yield { + "status": "done", + "message": "updated" if metadata != old_metadata else "same", + } + + +def _download_file( + downloader, path, size=None, mtime=None, existing="error", digests=None +): + """Common logic for downloading a single file + + Generator downloader: + + TODO: describe expected records it should yield + - progress + - error + - completion + + Parameters + ---------- + downloader: callable returning a generator + A backend (girder or api) specific fixture for downloading some file into + path. It should be a generator yielding downloaded blocks. + size: int, optional + Target size if known + digests: dict, optional + possible checksums or other digests provided for the file. Only one + will be used to verify download + """ + if op.lexists(path): + block = f"File {path!r} already exists" + if existing == "error": + raise FileExistsError(block) + elif existing == "skip": + yield _skip_file("already exists") + return + elif existing == "overwrite": + pass + elif existing == "refresh": + if mtime is None: + lgr.warning( + f"{path!r} - no mtime or ctime in the record, redownloading" + ) + else: + stat = os.stat(op.realpath(path)) + same = [] + if is_same_time(stat.st_mtime, mtime): + same.append("mtime") + if size is not None and stat.st_size == size: + same.append("size") + # TODO: use digests if available? or if e.g. size is identical + # but mtime is different + if same == ["mtime", "size"]: + # TODO: add recording and handling of .nwb object_id + yield _skip_file("same time and size") + return + lgr.debug(f"{path!r} - same attributes: {same}. Redownloading") + + if size is not None: + yield {"size": size} + + destdir = op.dirname(path) + os.makedirs(destdir, exist_ok=True) + + yield {"status": "downloading"} + + algo, digester, digest, downloaded_digest = None, None, None, None + if digests: + # choose first available for now. + # TODO: reuse that sorting based on speed + for algo, digest in digests.items(): + digester = getattr(hashlib, algo, None) + if digester: + break + if not digester: + lgr.warning("Found no digests in hashlib for any of %s", str(digests)) + + # TODO: how do we discover the total size???? + # TODO: do not do it in-place, but rather into some "hidden" file + for attempt in range(3): + try: + downloaded = 0 + if digester: + downloaded_digest = digester() # start empty + warned = False + # I wonder if we could make writing async with downloader + with open(path, "wb") as writer: + for block in downloader(): + if digester: + downloaded_digest.update(block) + downloaded += len(block) + # TODO: yield progress etc + msg = {"done": downloaded} + if size: + if downloaded > size and not warned: + warned = True + # Yield ERROR? + lgr.warning( + "Downloaded %d bytes although size was told to be just %d", + downloaded, + size, + ) + msg["done%"] = 100 * downloaded / size if size else "100" + # TODO: ETA etc + yield msg + writer.write(block) + break + # both girder and we use HttpError + except requests.exceptions.HTTPError as exc: + # TODO: actually we should probably retry only on selected codes, and also + # respect Retry-After + if attempt >= 2 or exc.response.status_code not in ( + 400, # Bad Request, but happened with gider: + # https://github.com/dandi/dandi-cli/issues/87 + 503, # Service Unavailable + ): + lgr.debug("Download failed: %s", exc) + yield {"status": "error", "message": str(exc)} + return + # if is_access_denied(exc) or attempt >= 2: + # raise + # sleep a little and retry + lgr.debug( + "Failed to download on attempt#%d: %s, will sleep a bit and retry", + attempt, + exc, ) - if op.lexists(dandiset_yaml): - if existing != "overwrite": - lgr.info( - f"{dandiset_yaml} already exists. Set 'existing' " - f"to overwrite if you want it to be redownloaded. " - f"Skipping" - ) - continue - dandiset = Dandiset(dandiset_path, allow_empty=True) - dandiset.path_obj.mkdir(exist_ok=True) # exist_ok in case of parallel race - dandiset.update_metadata(e.get("metadata", {}).get("dandiset", {})) - return files + time.sleep(random.random() * 5) + + if downloaded_digest: + downloaded_digest = downloaded_digest.hexdigest() # we care only about hex + if digest != downloaded_digest: + msg = f"{algo}: downloaded {downloaded_digest} != {digest}" + yield {"checksum": "differs", "status": "error", "message": msg} + lgr.debug("%s is different: %s.", path, msg) + return + else: + yield {"checksum": "ok"} + lgr.debug("Verified that %s has correct %s %s", path, algo, digest) + else: + # shouldn't happen with more recent metadata etc + yield { + "checksum": "-", + # "message": "no digests were provided" + } + + # It seems that girder might not care about setting either mtime, so we will do if we know + # TODO: dissolve attrs and pass specific mtime? + if mtime: + yield {"status": "setting mtime"} + os.utime(path, (time.time(), mtime.timestamp())) + + yield {"status": "done"} diff --git a/dandi/girder.py b/dandi/girder.py index 069a05cd7..2aefdf66b 100644 --- a/dandi/girder.py +++ b/dandi/girder.py @@ -14,9 +14,8 @@ from . import get_logger from .exceptions import LockingError -from .utils import ensure_datetime, is_same_time -from .consts import known_instances_rev, metadata_digests -from .support.digests import Digester +from .utils import ensure_datetime, flattened, flatten, remap_dict +from .consts import known_instances_rev, MAX_CHUNK_SIZE lgr = get_logger() @@ -354,103 +353,144 @@ def _list_folder(self, folder_id, folder_type="folder", types=None): if len(children) < gcl.DEFAULT_PAGE_LIMIT: break - def download_file(self, file_id, path, existing="error", attrs=None, digests=None): + def get_download_file_iter(self, file_id, chunk_size=MAX_CHUNK_SIZE): """ + """ + + def downloader(): + # TODO: make it a common decorator here? + # Will do 3 attempts to avoid some problems due to flaky/overloaded + # connections, see https://github.com/dandi/dandi-cli/issues/87 + for attempt in range(3): + try: + return self.downloadFileAsIterator(file_id, chunkSize=chunk_size) + break + except gcl.HttpError as exc: + if is_access_denied(exc) or attempt >= 2: + raise + # sleep a little and retry + lgr.debug( + "Failed to download on attempt#%d, will sleep a bit and retry", + attempt, + ) + time.sleep((1 + random.random()) * 5) + + return downloader + + def _get_asset_recs(self, asset_id, asset_type, authenticate=False, recursive=True): + """ + Parameters ---------- - digests: dict, optional - possible checksums or other digests provided for the file. Only one - will be used to verify download + asset_id + asset_type + authenticate + recursive + + Returns + ------- + dandiset_rec, files_rec + dandiset_rec will be None if asset_id is not pointing to a dandiset """ - if op.lexists(path): - msg = f"File {path!r} already exists" - if existing == "error": - raise FileExistsError(msg) - elif existing == "skip": - lgr.info(msg + " skipping") - return - elif existing == "overwrite": - pass - elif existing == "refresh": - remote_file_mtime = self._get_file_mtime(attrs) - if remote_file_mtime is None: - lgr.warning( - f"{path!r} - no mtime or ctime in the record, redownloading" - ) - else: - stat = os.stat(op.realpath(path)) - same = [] - if is_same_time(stat.st_mtime, remote_file_mtime): - same.append("mtime") - if "size" in attrs and stat.st_size == attrs["size"]: - same.append("size") - if same == ["mtime", "size"]: - # TODO: add recording and handling of .nwb object_id - lgr.info(f"{path!r} - same time and size, skipping") - return - lgr.debug(f"{path!r} - same attributes: {same}. Redownloading") - - destdir = op.dirname(path) - os.makedirs(destdir, exist_ok=True) - # suboptimal since - # 1. downloads into TMPDIR which might lack space etc. If anything, we - # might tune up setting/TMPDIR at the - # level of download so it goes alongside with the target path - # (e.g. under .FILENAME.dandi-download). That would speed things up - # when finalizing the download, possibly avoiding `mv` across partitions - # 2. unlike upload it doesn't use a callback but relies on a context - # manager to be called with an .update. also it uses only filename - # in the progressbar label - # For starters we would do this implementation but later RF - # when RF - do not forget to remove progressReporterCls in __init__ - - # Will do 3 attempts to avoid some problems due to flaky/overloaded - # connections, see https://github.com/dandi/dandi-cli/issues/87 - for attempt in range(3): + # asset_rec = client.getResource(asset_type, asset_id) + # lgr.info("Working with asset %s", str(asset_rec)) + # In principle Girder's client already has ability to download any + # resource (collection/folder/item/file). But it seems that "mending" it + # with custom handling (e.g. later adding filtering to skip some files, + # or add our own behavior on what to do when files exist locally, etc) would + # not be easy. So we will reimplement as a two step (kinda) procedure. + # Return a generator which would be traversing girder and yield records + # of encountered resources. + # TODO later: may be look into making it async + # First we access top level records just to sense what we are working with + top_entities = None + files = None + dandiset = None + + while True: try: - self.downloadFile(file_id, path) + # this one should enhance them with "fullpath" + top_entities = list( + self.traverse_asset(asset_id, asset_type, recursive=False) + ) break except gcl.HttpError as exc: - if is_access_denied(exc) or attempt >= 2: - raise - # sleep a little and retry - lgr.debug( - "Failed to download on attempt#%d, will sleep a bit and retry", - attempt, - ) - time.sleep((1 + random.random()) * 5) - # It seems that above call does not care about setting either mtime - if attrs: - mtime = self._get_file_mtime(attrs) - if mtime: - os.utime(path, (time.time(), mtime.timestamp())) - if digests: - # Pick the first one (ordered according to speed of computation) - for algo in metadata_digests: - if algo in digests: - break - else: - algo = list(digests)[:1] # first available - digest = Digester([algo])(path)[algo] - if digests[algo] != digest: - lgr.warning( - "%s %s is different: downloaded %s, should have been %s.", - path, - algo, - digest, - digests[algo], + if not authenticate and is_access_denied(exc): + lgr.warning("unauthenticated access denied, let's authenticate") + self.dandi_authenticate() + continue + raise + entity_type = list(set(e["type"] for e in top_entities)) + if len(entity_type) > 1: + raise ValueError( + f"Please point to a single type of entity - either dandiset(s)," + f" folder(s) or file(s). Got: {entity_type}" + ) + entity_type = entity_type[0] + if entity_type in ("dandiset", "folder"): + # redo recursively + lgr.info( + "Traversing remote %ss (%s) recursively", + entity_type, + ", ".join(e["name"] for e in top_entities), + ) + entities = self.traverse_asset(asset_id, asset_type, recursive=recursive) + # TODO: special handling for a dandiset -- we might need to + # generate dandiset.yaml out of the metadata record + # we care only about files ATM + files = (e for e in entities if e["type"] == "file") + elif entity_type == "file": + files = top_entities + else: + raise ValueError(f"Unexpected entity type {entity_type}") + # TODO: move -- common and has nothing to do with getting a list of assets + if entity_type == "dandiset": + if len(top_entities) > 1: + raise NotImplementedError( + "A single dandiset at a time only supported ATM, got %d: %s" + % (len(top_entities), top_entities) ) - else: - lgr.debug("Verified that %s has correct %s %s", path, algo, digest) + dandiset = top_entities[0] + + return dandiset, files + + def get_dandiset_and_assets( + self, asset_id, asset_type, recursive=True, authenticate=False + ): + lgr.debug(f"Traversing {asset_type} with id {asset_id}") + # there might be multiple asset_ids, e.g. if multiple files were selected etc, + # so we will traverse all of them + dandiset_asset_recs = [ + self._get_asset_recs( + asset_id_, asset_type, authenticate=authenticate, recursive=recursive + ) + for asset_id_ in set(flattened([asset_id])) + ] + + dandiset = None - @staticmethod - def _get_file_mtime(attrs): - if not attrs: - return None - # We would rely on uploaded_mtime from metadata being stored as mtime. - # If that one was not provided, the best we know is the "ctime" - # for the file, use that one - return ensure_datetime(attrs.get("mtime", attrs.get("ctime", None))) + if not dandiset_asset_recs: + lgr.warning("Got empty listing for %s %s", asset_type, asset_id) + return + elif ( + len(dandiset_asset_recs) > 1 + ): # had multiple asset_ids, should not be dandisets + if any(r[0] for r in dandiset_asset_recs): + raise NotImplementedError("Got multiple ids for dandisets") + else: + dandiset = dandiset_asset_recs[0][0] + + # Return while harmonizing + if dandiset: + dandiset = _harmonize_girder_dandiset_to_dandi_api(dandiset) + + return ( + dandiset, + ( + _harmonize_girder_asset_to_dandi_api(a) + for a in flatten(r[1] for r in dandiset_asset_recs) + ), + ) @contextlib.contextmanager def lock_dandiset(self, dandiset_identifier: str): @@ -476,6 +516,191 @@ def lock_dandiset(self, dandiset_identifier: str): ) +def _harmonize_girder_dandiset_to_dandi_api(rec): + """ + Compare API (on a released version): + +{'count': 1, + 'created': '2020-07-21T22:22:15.396171Z', + 'dandiset': {'created': '2020-07-21T22:22:14.732729Z', + 'identifier': '000027', + 'updated': '2020-07-21T22:22:14.732762Z'}, + 'metadata': {'dandiset': {...}}, + 'updated': '2020-07-21T22:22:15.396295Z', + 'version': '0.200721.2222'} + + to Girder (on drafts): + +{'attrs': {'ctime': '2020-07-08T21:54:42.543000+00:00', + 'mtime': '2020-07-21T22:02:34.918000+00:00', + 'size': 0}, + 'id': '5f0640a2ab90ac46c4561e4f', + 'metadata': {'dandiset': {...}}, + 'name': '000027', + 'path': '000027', + 'type': 'dandiset'} + +So we will place some girder specific ones under 'girder' and populate 'dandiset', e.g. +(there is absent clarify of what date times API returns: +https://github.com/dandi/dandi-publish/issues/107 +so we will assume that my take was more or less correct and then we would have them +correspond in case of a draft, as it is served by girder ATM: + +{# 'count': 1, # no count + 'created': '2020-07-21T22:22:15.396171Z', # attrs.ctime + 'dandiset': {'created': '2020-07-08T21:54:42.543000+00:00', # attrs.ctime + 'identifier': '000027', # name + 'updated': '2020-07-21T22:02:34.918000+00:00' }, # attrs.mtime + 'metadata': {'dandiset': {...}}, + 'updated': '2020-07-21T22:02:34.918000+00:00'} # attrs.mtime + + + Parameters + ---------- + rec + + Returns + ------- + dict + """ + # ATM it is just a simple remapping but might become more sophisticated later on + return remap_dict( + rec, + { + "metadata": "metadata", # 1-to-1 for now + "dandiset.created": "attrs.ctime", + "created": "attrs.ctime", + "dandiset.uptimed": "attrs.mtime", + "updated": "attrs.mtime", + "dandiset.identifier": "name", + }, + ) + + +def _get_file_mtime(attrs): + if not attrs: + return None + # We would rely on uploaded_mtime from metadata being stored as mtime. + # If that one was not provided, the best we know is the "ctime" + # for the file, use that one + return ensure_datetime(attrs.get("mtime", attrs.get("ctime", None))) + + +def _harmonize_girder_asset_to_dandi_api(rec): + """ + + girder rec: + +*(Pdb) pprint(_a[0]) +{'attrs': {'ctime': '2020-07-21T22:00:36.362000+00:00', + 'mtime': '2020-07-21T17:31:55.283394-04:00', + 'size': 18792}, + 'id': '5f176584f63d62e1dbd06946', + 'metadata': {... identical at this level + 'uploaded_by': 'dandi 0.5.0+12.gd4ef762.dirty', + 'uploaded_datetime': '2020-07-21T18:00:36.703727-04:00', + 'uploaded_mtime': '2020-07-21T17:31:55.283394-04:00', + 'uploaded_size': 18792}, + 'name': 'sub-RAT123.nwb', + 'path': '000027/sub-RAT123/sub-RAT123.nwb', + 'type': 'file'} + + and API (lacking clear "modified" so needs tuning too): + + { + "version": { + "dandiset": { + "identifier": "000027", + "created": "2020-07-21T22:22:14.732729Z", + "updated": "2020-07-21T22:22:14.732762Z" + }, + "version": "0.200721.2222", + "created": "2020-07-21T22:22:15.396171Z", + "updated": "2020-07-21T22:22:15.396295Z", + "count": 1 + }, + "uuid": "bca53c42-7fc2-41b6-b836-5ed102ba8447", + "path": "/sub-RAT123/sub-RAT123.nwb", + "size": 18792, + "sha256": "1a765509384ea96b7b12136353d9c5b94f23d764ad0431e049197f7875eb352c", + "created": "2020-07-21T22:22:16.882594Z", + "updated": "2020-07-21T22:22:16.882641Z", + "metadata": { +... + "sha256": "1a765509384ea96b7b12136353d9c5b94f23d764ad0431e049197f7875eb352c", +... + "uploaded_size": 18792, + "uploaded_mtime": "2020-07-21T17:31:55.283394-04:00", + "uploaded_datetime": "2020-07-21T18:00:36.703727-04:00", +... + } + } + + + Parameters + ---------- + rec + + Returns + ------- + + """ + rec = rec.copy() # we will modify in place + + metadata = rec.get("metadata", {}) + size = rec["size"] = rec.get("attrs", {}).get("size") + # we will add messages leading to decision that metadata is outdated and thus should not be used + metadata_outdated = [] + uploaded_size = metadata.get("uploaded_size") + if size is None: + lgr.debug("Found no size in attrs from girder!") + if uploaded_size is not None: + lgr.debug("Taking 'uploaded_size' of %d", uploaded_size) + rec["size"] = uploaded_size + else: + if uploaded_size is not None and size != uploaded_size: + metadata_outdated.append( + f"uploaded_size of {uploaded_size} != size of {size}" + ) + + # duplication but ok for now + modified = rec["modified"] = _get_file_mtime(rec.get("attrs")) + uploaded_mtime = metadata.get("uploaded_mtime") + if uploaded_mtime: + uploaded_mtime = ensure_datetime(uploaded_mtime) + if modified is None: + lgr.debug("Found no mtime (modified) among girder attrs") + if uploaded_mtime is not None: + rec["modified"] = uploaded_mtime + else: + if uploaded_mtime is not None and modified != uploaded_mtime: + metadata_outdated.append( + f"uploaded_mtime of {uploaded_mtime} != mtime of {modified}" + ) + + if metadata_outdated: + lgr.warning( + "Found discrepnancies in girder record and metadata: %s", + ", ".join(metadata_outdated), + ) + + # we need to strip off the leading dandiset identifier from the path + path = rec["path"] + if path.startswith("00"): + # leading / is for consistency with API although yoh dislikes it + # https://github.com/dandi/dandi-publish/issues/109 + # Girder client returned paths are OS specific. + path = "/" + path.split(op.sep, 1)[1] + else: + lgr.debug( + "Unexpected: an asset path did not have leading dandiset identifier: %s", + path, + ) + rec["path"] = path + + return rec + + # TODO: our adapter on top of the Girder's client to simplify further def get_client(server_url, authenticate=True, progressbars=False): """Simple authenticator which would store credential (api key) via keyring diff --git a/dandi/support/iterators.py b/dandi/support/iterators.py new file mode 100644 index 000000000..f08c756b2 --- /dev/null +++ b/dandi/support/iterators.py @@ -0,0 +1,117 @@ +"""Various helpful iterators""" + +from queue import Queue, Empty +from threading import Thread + + +class IteratorWithAggregation: + """ + An iterable over an iterable which also makes an aggregate of the values available asap + + It iterates over the iterable in a separate thread. + + A use case is a generator which collects information about resources, + which might be relatively fast but still take time. While we are iterating over it, + we could perform other operations on yielded records, but we would also like to have access to + the "summary" object as soon as that iterator completes but while we might still be + iterating over items in the outside loop. + + Use case: iterate over remote resource for downloads, and get "Total" size/number as + soon as it becomes known inside the underlying iterator. + + TODO: probably could be more elegant etc if implemented via async/coroutines. + + Attributes + ---------- + .total: + Aggregated value as known to the moment. None if nothing was aggregated. + It is a final value if `finished` is True. + .finished: bool + Set to True upon completion of iteration + .exc: BaseException or None + If not None -- the exception which was raised + + Example + ------- + + Very simplistic example, since typically (not range) it would be taking some time to + iterate for the nested iteration:: + + it = IteratorWithAggregation(range(3), lambda v, t=0: v+t) + for v in it: + print(it.total, it.finished, v) + sleep(0.02) # doing smth heavy, but we would know .total as soon as it is known + + would produce (so 3 is known right away, again since it is just range) + + 3 True 0 + 3 True 1 + 3 True 2 + + """ + + def __init__(self, gen, agg, reraise_immediately=False): + """ + + Parameters + ---------- + gen: iterable + Generator (but could be any iterable, but it would not make much sense) + to yield from + agg: callable + A callable with two args: new_value[, total=None] which should return adjusted + total. Upon first iteration, no prior `total` is provided + reraise_immediately: bool, optional + If True, it would stop yielding values as soon as it detects that some + exception has occurred (although there might still be values in the queue to be yielded + which were collected before the exception was raised) + """ + self.gen = gen + self.agg = agg + self.reraise_immediately = reraise_immediately + + self.total = None + self.finished = None + self._exc = None + + def __iter__(self): + self.finished = False + self._exc = None + + queue = Queue() + + def worker(): + """That is the one which interrogates gen and places total + into queue_total upon completion""" + total = None + try: + for value in self.gen: + queue.put(value) + self.total = total = ( + self.agg(value, total) if total is not None else self.agg(value) + ) + except BaseException as e: + self._exc = e + finally: + self.finished = True + + t = Thread(target=worker) + t.start() + + # yield from the queue (.total and .finished could be accessed meanwhile) + while True: + if self.reraise_immediately and self._exc is not None: + break + + # race condition HERE between checking for self.finished and + if self.finished and queue.empty(): + break + # in general queue should not be empty, but if it is, e.g. due to race + # condition with above check + try: + yield queue.get(timeout=0.001) + except Empty: + continue + t.join() + if self._exc is not None: + raise self._exc diff --git a/dandi/support/pyout.py b/dandi/support/pyout.py index dc385f8c1..d14833ef2 100644 --- a/dandi/support/pyout.py +++ b/dandi/support/pyout.py @@ -106,6 +106,13 @@ def __call__(self, values): def get_style(hide_if_missing=True): + progress_style = dict( # % done + transform=lambda f: "%d%%" % f, + align="right", + color=dict( + interval=[[0, 10, "red"], [10, 100, "yellow"], [100, None, "green"]] + ), + ) STYLE = { "summary_": {"bold": True}, "header_": dict( @@ -167,10 +174,16 @@ def get_style(hide_if_missing=True): ), aggregate=counts, ), - "upload": dict( # % done - transform=lambda f: "%d%%" % f, + "upload": progress_style, + "done%": progress_style, + "checksum": dict( + align="center", color=dict( - interval=[[0, 10, "red"], [10, 100, "yellow"], [100, None, "green"]] + re_lookup=[ + ["ok", "green"], + ["^(-|NA|N/A)", "yellow"], + ["^(differ|failed|error|ERROR)", "red"], + ] ), ), } diff --git a/dandi/support/tests/test_iterators.py b/dandi/support/tests/test_iterators.py new file mode 100644 index 000000000..15549b0e4 --- /dev/null +++ b/dandi/support/tests/test_iterators.py @@ -0,0 +1,51 @@ +from time import sleep + +from ..iterators import IteratorWithAggregation + +import pytest + + +def sleeping_range(n, secs=0.01, thr=None): + """Fast generator""" + for i in range(n): + yield i + sleep(secs) + if thr and i >= thr: + raise ValueError(i) + + +def test_IteratorWithAggregation(): + def sumup(v, t=0): + return v + t + + it = IteratorWithAggregation(sleeping_range(3, 0.0001), agg=sumup) + # we should get our summary available after 2nd iteration and before it finishes + for t, i in enumerate(it): + sleep(0.01) # 0.0003 should be sufficient but to deal with Windows failures, + # making it longer + assert t == i # it is just a range after all + if i: + assert it.finished + + # If there is an exception thrown, it would be raised only by the end + it = IteratorWithAggregation(sleeping_range(5, 0.0001, thr=2), agg=sumup) + got = [] + with pytest.raises(ValueError): + for i in it: + got.append(i) + sleep(0.001) + assert got == [0, 1, 2] + assert it.finished + + # If there is an exception thrown, it would be raised only by the end + it = IteratorWithAggregation( + sleeping_range(5, 0.0001, thr=2), agg=sumup, reraise_immediately=True + ) + got = [] + with pytest.raises(ValueError): + for i in it: + got.append(i) + # 0.005 should be more than enough, but Windows is still lagging + sleep(0.02) + assert got == [0] + assert it.finished diff --git a/dandi/tests/test_download.py b/dandi/tests/test_download.py index 46868af0a..5ae5732d5 100644 --- a/dandi/tests/test_download.py +++ b/dandi/tests/test_download.py @@ -1,31 +1,51 @@ +import os.path as op + import time import tqdm from ..download import download, follow_redirect, parse_dandi_url from ..exceptions import NotFoundError from ..tests.skip import mark +from ..consts import known_instances from ..girder import GirderCli, gcl, TQDMProgressReporter import pytest +def _assert_parse_girder_url(url): + st, s, a, aid = parse_dandi_url(url) + assert st == "girder" + assert s == known_instances["dandi"].girder + "/" + return a, aid + + +def _assert_parse_api_url(url): + st, s, a, aid = parse_dandi_url(url) + assert st == "api" + assert s == known_instances["dandi"].api + "/" + return a, aid + + def test_parse_dandi_url(): + # ATM we point to drafts, so girder + assert _assert_parse_girder_url("DANDI:000027") == ( + "folder", + ["5f0640a2ab90ac46c4561e4f"], + ) + # user - s, a, aid = parse_dandi_url( + # we do not care about user -- we care about folder id + assert _assert_parse_girder_url( "https://girder.dandiarchive.org/" "#user/5da4b8fe51c340795cb18fd0/folder/5e5593cc1a343161ff7c5a92" - ) - # we do not care about user -- we care about folder id - assert (a, aid) == ("folder", ["5e5593cc1a343161ff7c5a92"]) + ) == ("folder", ["5e5593cc1a343161ff7c5a92"]) # dandiset # folder - s, a, aid = parse_dandi_url( + assert _assert_parse_girder_url( "https://gui.dandiarchive.org/#/folder/5e5593cc1a343161ff7c5a92" - ) - assert s == "https://girder.dandiarchive.org/" - assert (a, aid) == ("folder", ["5e5593cc1a343161ff7c5a92"]) + ) == ("folder", ["5e5593cc1a343161ff7c5a92"]) # selected folder(s) # "https://gui.dandiarchive.org/#/folder/5d978d9ecc10d1bc31040bca/selected/folder+5e8672e06cce296e2e817318" @@ -34,24 +54,65 @@ def test_parse_dandi_url(): # "https://gui.dandiarchive.org/#/folder/5d978d9ecc10d1bc31040bca/selected/item+5e8674dc6cce296e2e8173c5/folder+5e8672e06cce296e2e817318" # Selected multiple items - s, a, aid = parse_dandi_url( + assert _assert_parse_girder_url( "https://gui.dandiarchive.org/#/folder/5e7b9e43529c28f35128c745/selected/" "item+5e7b9e44529c28f35128c747/item+5e7b9e43529c28f35128c746" - ) - assert s == "https://girder.dandiarchive.org/" - assert (a, aid) == ( - "item", - ["5e7b9e44529c28f35128c747", "5e7b9e43529c28f35128c746"], - ) + ) == ("item", ["5e7b9e44529c28f35128c747", "5e7b9e43529c28f35128c746"]) # new (v1? not yet tagged) web UI, and as it comes from a PR, # so we need to provide yet another mapping to stock girder - s, a, aid = parse_dandi_url( + assert _assert_parse_girder_url( "https://refactor--gui-dandiarchive-org.netlify.app/#/file-browser" "/folder/5e9f9588b5c9745bad9f58fe" + ) == ("folder", ["5e9f9588b5c9745bad9f58fe"]) + + # New DANDI web UI driven by DANDI API. Again no version assigned/planned! + # see https://github.com/dandi/dandiarchive/pull/341 + url1 = ( + "https://deploy-preview-341--gui-dandiarchive-org.netlify.app/" + "#/dandiset/000006/0.200714.1807" + ) + assert _assert_parse_api_url(url1) == ( + "dandiset", + {"dandiset_id": "000006", "version": "0.200714.1807"}, + ) + assert _assert_parse_api_url(url1 + "/files") == ( + "dandiset", + {"dandiset_id": "000006", "version": "0.200714.1807"}, + ) + assert _assert_parse_api_url(url1 + "/files?location=%2F") == ( + "dandiset", + {"dandiset_id": "000006", "version": "0.200714.1807"}, + ) + assert _assert_parse_api_url(url1 + "/files?location=%2Fsub-anm369962%2F") == ( + "folder", + { + "dandiset_id": "000006", + "version": "0.200714.1807", + "location": "sub-anm369962/", + }, + ) + # no trailing / - Yarik considers it to be an item (file) + assert _assert_parse_api_url(url1 + "/files?location=%2Fsub-anm369962") == ( + "item", + { + "dandiset_id": "000006", + "version": "0.200714.1807", + "location": "sub-anm369962", + }, + ) + # And the hybrid for "drafts" where it still goes by girder ID + assert _assert_parse_api_url( + "https://deploy-preview-341--gui-dandiarchive-org.netlify.app/#/dandiset/000027" + "/draft/files?_id=5f176583f63d62e1dbd06943&_modelType=folder" + ) == ( + "folder", + { + "dandiset_id": "000027", + "version": "draft", + "folder_id": "5f176583f63d62e1dbd06943", + }, ) - assert s == "https://girder.dandiarchive.org" - assert (a, aid) == ("folder", ["5e9f9588b5c9745bad9f58fe"]) @mark.skipif_no_network @@ -60,9 +121,10 @@ def test_parse_dandi_url_redirect(): with pytest.raises(NotFoundError): parse_dandi_url("https://dandiarchive.org/dandiset/999999") # Is there ATM - s, a, aid = parse_dandi_url("https://dandiarchive.org/dandiset/000003") - assert s == "https://girder.dandiarchive.org/" - assert a, aid == ("dandiset-meta", "5e6eb2b776569eb93f451f8d") + assert _assert_parse_girder_url("https://dandiarchive.org/dandiset/000003") == ( + "folder", + ["5e6eb2b776569eb93f451f8d"], + ) # And this one would point to a folder assert ( follow_redirect("https://bit.ly/dandi12") @@ -77,30 +139,34 @@ def test_download_multiple_files(monkeypatch, tmpdir): "item+5e70d3173da50caa9adaf335/item+5e70d3183da50caa9adaf336" ) + # In 0.6 RF of download we stopped using girder's downloadFile. + # But we still do up to 3 tries also while getting the downloadFileAsIterator, + # to this test will test those retries. # While at it we will also test girder downloadFile to retry at least 3 times # in case of some errors, and that it sleeps between retries - - orig_downloadFile = GirderCli.downloadFile + orig_downloadFileAsIterator = GirderCli.downloadFileAsIterator class Mocks: ntries = 0 sleeps = 0 @staticmethod - def downloadFile(self, *args, **kwargs): + def downloadFileAsIterator(self, *args, **kwargs): Mocks.ntries += 1 if Mocks.ntries < 3: raise gcl.HttpError( text="Failing to download", url=url, method="GET", status=500 ) - return orig_downloadFile(self, *args, **kwargs) + return orig_downloadFileAsIterator(self, *args, **kwargs) @staticmethod def sleep(duration): Mocks.sleeps += duration # no actual sleeping - monkeypatch.setattr(GirderCli, "downloadFile", Mocks.downloadFile) + monkeypatch.setattr( + GirderCli, "downloadFileAsIterator", Mocks.downloadFileAsIterator + ) monkeypatch.setattr(time, "sleep", Mocks.sleep) # to not sleep in the test ret = download(url, tmpdir) @@ -117,6 +183,44 @@ def sleep(duration): assert all(x.lstat().size > 1e5 for x in tmpdir.listdir()) # all bigish files +# both urls point to 000027 (lean test dataset), and both draft and "released" +# version have only a single file ATM +@pytest.mark.parametrize( + "url", + [ # Should go through API + "https://deploy-preview-341--gui-dandiarchive-org.netlify.app/" + "#/dandiset/000027/0.200721.2222", + # Good old girder (draft) + "https://gui.dandiarchive.org/#/dandiset/5f0640a2ab90ac46c4561e4f", + ], +) +def test_download_000027(url, tmpdir): + ret = download(url, tmpdir) + assert not ret # we return nothing ATM, might want to "generate" + dsdir = tmpdir / "000027" + downloads = (x.relto(dsdir) for x in dsdir.visit()) + assert sorted(downloads) == [ + "dandiset.yaml", + "sub-RAT123", + op.join("sub-RAT123", "sub-RAT123.nwb"), + ] + # and checksum should be correct as well + from ..support.digests import Digester + + assert ( + Digester(["md5"])(dsdir / "sub-RAT123" / "sub-RAT123.nwb")["md5"] + == "33318fd510094e4304868b4a481d4a5a" + ) + # redownload - since already exist there should be an exception + with pytest.raises(FileExistsError): + download(url, tmpdir) + + # TODO: somehow get that status report about what was downloaded and what not + download(url, tmpdir, existing="skip") # TODO: check that skipped + download(url, tmpdir, existing="overwrite") # TODO: check that redownloaded + download(url, tmpdir, existing="refresh") # TODO: check that skipped (the same) + + def test_girder_tqdm(monkeypatch): # smoke test to ensure we do not blow up def raise_assertion_error(*args, **kwargs): diff --git a/dandi/tests/test_utils.py b/dandi/tests/test_utils.py index 0048c9a89..db4d6270d 100644 --- a/dandi/tests/test_utils.py +++ b/dandi/tests/test_utils.py @@ -16,6 +16,7 @@ get_utcnow_datetime, is_same_time, on_windows, + remap_dict, ) @@ -128,6 +129,27 @@ def test_flatten(): ] +@pytest.mark.parametrize( + "from_,revmapping,to", + [ + ({"1": 2}, {"1": "1"}, {"1": 2}), + ({1: 2}, {(1,): [1]}, {1: 2}), # if path must not be string, use list or tuple + ( + {1: 2}, + {"sub.key": (1,)}, + {"sub": {"key": 2}}, + ), # if path must not be string, use list or tuple + ( + {1: 2, "a": {"b": [1]}}, + {"sub.key": (1,), "sub.key2.blah": "a.b"}, + {"sub": {"key": 2, "key2": {"blah": [1]}}}, + ), + ], +) +def test_remap_dict(from_, revmapping, to): + assert remap_dict(from_, revmapping) == to + + @responses.activate def test_get_instance_dandi(): responses.add( @@ -149,6 +171,7 @@ def test_get_instance_dandi(): girder="https://girder.dandi", gui="https://gui.dandi", redirector="https://dandiarchive.org", + api="https://publish.dandi/api", ) @@ -173,6 +196,7 @@ def test_get_instance_url(): girder="https://girder.dandi", gui="https://gui.dandi", redirector="https://example.dandi/", + api="https://publish.dandi/api", ) diff --git a/dandi/utils.py b/dandi/utils.py index b6b26e55a..d0e0e3170 100644 --- a/dandi/utils.py +++ b/dandi/utils.py @@ -211,6 +211,39 @@ def updated(d, update): return d +def remap_dict(rec, revmapping): + """Remap nested dicts according to mapping + + Parameters + ---------- + revmapping: dict + (to, from) + + TODO: document and test more + """ + out = {} + + def split(path): + # map path from key.subkey if given in a string (not tuple) form + return path.split(".") if isinstance(path, str) else path + + for to, from_ in revmapping.items(): + in_v = rec + for p in split(from_): + if p not in in_v: + continue # it is not there -- cannot map + in_v = in_v[p] + + # and now set + out_v = out + t_split = split(to) + for p in t_split[:-1]: + out_v[p] = out_v.get(p, {}) # container for the next field + out_v = out_v[p] + out_v[t_split[-1]] = in_v # and the last one gets the in_v + return out + + # # Paths and files # @@ -556,4 +589,5 @@ def get_instance(dandi_instance_id): girder=server_info["services"]["girder"]["url"], gui=server_info["services"]["webui"]["url"], redirector=redirector_url, + api=server_info["services"]["api"]["url"], ) diff --git a/setup.cfg b/setup.cfg index 76cd5f92a..797e4e7db 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ install_requires = click-didyoumean etelemetry >= 0.2.0 joblib - pyout + pyout != 0.6.0 humanize requests ~= 2.20 ruamel.yaml >=0.15, <1