Skip to content

Commit

Permalink
Merge pull request #247 from dandi/gh-198
Browse files Browse the repository at this point in the history
Download files to temporary directory containing metadata
  • Loading branch information
yarikoptic committed Oct 2, 2020
2 parents 05dff2d + a9a7e87 commit 46237b0
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 21 deletions.
7 changes: 5 additions & 2 deletions dandi/dandiapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,13 @@ def get_download_file_iter(
f"/dandisets/{dandiset_id}/versions/{version}/assets/{uuid}/download/"
)

def downloader():
def downloader(start_at=0):
lgr.debug("Starting download from %s", url)
headers = None
if start_at > 0:
headers = {"Range": f"bytes={start_at}-"}
result = (self._session if self._session else requests).get(
url, stream=True
url, stream=True, headers=headers
)
# TODO: apparently we might need retries here as well etc
# if result.status_code not in (200, 201):
Expand Down
96 changes: 91 additions & 5 deletions dandi/download.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import hashlib

import json
import os
import os.path as op
from pathlib import Path
import random
import requests
from shutil import rmtree
import sys
import time

Expand Down Expand Up @@ -431,13 +433,19 @@ def _download_file(
# TODO: do not do it in-place, but rather into some "hidden" file
for attempt in range(3):
try:
downloaded = 0
if digester:
downloaded_digest = digester() # start empty
warned = False
# I wonder if we could make writing async with downloader
with open(path, "wb") as writer:
for block in downloader():
with DownloadDirectory(path, digests) as dldir:
downloaded = dldir.offset
if size is not None and downloaded == size:
# Exit early when downloaded == size, as making a Range
# request in such a case results in a 416 error from S3.
# Problems will result if `size` is None but we've already
# downloaded everything.
break
for block in downloader(start_at=dldir.offset):
if digester:
downloaded_digest.update(block)
downloaded += len(block)
Expand All @@ -455,7 +463,7 @@ def _download_file(
msg["done%"] = 100 * downloaded / size if size else "100"
# TODO: ETA etc
yield msg
writer.write(block)
dldir.append(block)
break
# both girder and we use HttpError
except requests.exceptions.HTTPError as exc:
Expand Down Expand Up @@ -503,3 +511,81 @@ def _download_file(
os.utime(path, (time.time(), ensure_datetime(mtime).timestamp()))

yield {"status": "done"}


class DownloadDirectory:
def __init__(self, filepath, digests):
#: The path to which to save the file after downloading
self.filepath = Path(filepath)
#: Expected hashes of the downloaded data, as a mapping from algorithm
#: names to digests
self.digests = digests
#: The working directory in which downloaded data will be temporarily
#: stored
self.dirpath = self.filepath.with_name(self.filepath.name + ".dandidownload")
#: The file in `dirpath` to which data will be written as it is
#: received
self.writefile = self.dirpath / "file"
#: A `fasteners.InterProcessLock` on `dirpath`
self.lock = None
#: An open filehandle to `writefile`
self.fp = None
#: How much of the data has been downloaded so far
self.offset = None

def __enter__(self):
from fasteners import InterProcessLock

self.dirpath.mkdir(parents=True, exist_ok=True)
self.lock = InterProcessLock(str(self.dirpath / "lock"))
if not self.lock.acquire(blocking=False):
raise RuntimeError("Could not acquire download lock for {self.filepath}")
chkpath = self.dirpath / "checksum"
try:
with chkpath.open() as fp:
digests = json.load(fp)
except (FileNotFoundError, ValueError):
digests = {}
matching_algs = self.digests.keys() & digests.keys()
if matching_algs and all(
self.digests[alg] == digests[alg] for alg in matching_algs
):
# Pick up where we left off, writing to the end of the file
lgr.debug(
"Download directory exists and has matching checksum; resuming download"
)
self.fp = self.writefile.open("ab")
else:
# Delete the file (if it even exists) and start anew
if not chkpath.exists():
lgr.debug("Starting new download in new download directory")
else:
lgr.debug(
"Download directory found, but digests do not match; starting new download"
)
try:
self.writefile.unlink()
except FileNotFoundError:
pass
self.fp = self.writefile.open("wb")
with chkpath.open("w") as fp:
json.dump(self.digests, fp)
self.offset = self.fp.tell()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.fp.close()
try:
if exc_type is None:
self.writefile.replace(self.filepath)
finally:
self.lock.release()
if exc_type is None:
rmtree(self.dirpath, ignore_errors=True)
self.lock = None
self.fp = None
self.offset = None
return False

def append(self, blob):
self.fp.write(blob)
24 changes: 21 additions & 3 deletions dandi/girder.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,32 @@ def get_download_file_iter(self, file_id, chunk_size=MAX_CHUNK_SIZE):
"""
"""

def downloader():
def downloader(start_at=0):
# TODO: make it a common decorator here?
# Will do 3 attempts to avoid some problems due to flaky/overloaded
# connections, see https://github.com/dandi/dandi-cli/issues/87
for attempt in range(3):
try:
return self.downloadFileAsIterator(file_id, chunkSize=chunk_size)
break
path = f"file/{file_id}/download"
if start_at > 0:
headers = {"Range": f"bytes={start_at}-"}
# Range requests result in a 206 response, which the
# Girder client treats as an error (at least until they
# merge girder/girder#3301). Hence, we need to make
# the request directly through `requests`.
import requests

resp = requests.get(
f"{self._server_url}/api/v1/{path}",
stream=True,
headers=headers,
)
resp.raise_for_status()
else:
resp = self.sendRestRequest(
"get", path, stream=True, jsonResp=False
)
return resp.iter_content(chunk_size=chunk_size)
except gcl.HttpError as exc:
if is_access_denied(exc) or attempt >= 2:
raise
Expand Down
57 changes: 46 additions & 11 deletions dandi/tests/test_download.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json
import os
import os.path as op

import time
Expand All @@ -23,29 +25,32 @@ def test_download_multiple_files(monkeypatch, tmpdir):
# to this test will test those retries.
# While at it we will also test girder downloadFile to retry at least 3 times
# in case of some errors, and that it sleeps between retries
orig_downloadFileAsIterator = GirderCli.downloadFileAsIterator
orig_sendRestRequest = GirderCli.sendRestRequest

class Mocks:
ntries = 0
sleeps = 0

@staticmethod
def downloadFileAsIterator(self, *args, **kwargs):
Mocks.ntries += 1
if Mocks.ntries < 3:
raise gcl.HttpError(
text="Failing to download", url=url, method="GET", status=500
)
return orig_downloadFileAsIterator(self, *args, **kwargs)
def sendRestRequest(self, *args, **kwargs):
if (
len(args) > 1
and args[1].startswith("file/")
and args[1].endswith("/download")
):
Mocks.ntries += 1
if Mocks.ntries < 3:
raise gcl.HttpError(
text="Failing to download", url=url, method="GET", status=500
)
return orig_sendRestRequest(self, *args, **kwargs)

@staticmethod
def sleep(duration):
Mocks.sleeps += duration
# no actual sleeping

monkeypatch.setattr(
GirderCli, "downloadFileAsIterator", Mocks.downloadFileAsIterator
)
monkeypatch.setattr(GirderCli, "sendRestRequest", Mocks.sendRestRequest)
monkeypatch.setattr(time, "sleep", Mocks.sleep) # to not sleep in the test

ret = download(url, tmpdir)
Expand Down Expand Up @@ -140,3 +145,33 @@ def raise_assertion_error(*args, **kwargs):

with TQDMProgressReporter() as pr:
pr.update(10)


@pytest.mark.parametrize("resizer", [lambda sz: 0, lambda sz: sz // 2, lambda sz: sz])
@pytest.mark.parametrize("version", ["0.200721.2222", "draft"])
def test_download_000027_resume(tmp_path, resizer, version):
from ..support.digests import Digester

url = f"https://dandiarchive.org/dandiset/000027/{version}"
digester = Digester()
download(url, tmp_path, get_metadata=False)
dsdir = tmp_path / "000027"
nwb = dsdir / "sub-RAT123" / "sub-RAT123.nwb"
digests = digester(str(nwb))
dldir = nwb.with_name(nwb.name + ".dandidownload")
dldir.mkdir()
dlfile = dldir / "file"
nwb.rename(dlfile)
size = dlfile.stat().st_size
os.truncate(dlfile, resizer(size))
with (dldir / "checksum").open("w") as fp:
json.dump(digests, fp)
download(url, tmp_path, get_metadata=False)
contents = [
op.relpath(op.join(dirpath, entry), dsdir)
for (dirpath, dirnames, filenames) in os.walk(dsdir)
for entry in dirnames + filenames
]
assert sorted(contents) == ["sub-RAT123", op.join("sub-RAT123", "sub-RAT123.nwb")]
assert nwb.stat().st_size == size
assert digester(str(nwb)) == digests
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ install_requires =
click-didyoumean
email-validator
etelemetry >= 0.2.0
fasteners
joblib
pydantic
pyout != 0.6.0
Expand Down

0 comments on commit 46237b0

Please sign in to comment.