Skip to content

Commit

Permalink
Resolve #178
Browse files Browse the repository at this point in the history
  • Loading branch information
nutjob4life committed Sep 9, 2024
1 parent 10b6a99 commit fc4ceaf
Showing 1 changed file with 55 additions and 23 deletions.
78 changes: 55 additions & 23 deletions src/pds2/aipgen/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import dataclasses
import hashlib
import logging
import os.path
import sys
from datetime import datetime
from http import HTTPStatus
from typing import Any
from typing import Iterator
from typing import Union
from urllib.parse import urlparse

import requests

Expand Down Expand Up @@ -69,7 +71,7 @@
# --------------

_apiquerylimit = 50 # Pagination in the PDS API
_defaultserver = "https://pds.nasa.gov/api/search/1.0/" # Where to find the PDS API
_defaultserver = "https://pds.nasa.gov/api/search/1/" # Where to find the PDS API
_searchkey = "ops:Harvest_Info.ops:harvest_date_time" # How to sort products


Expand Down Expand Up @@ -168,7 +170,9 @@ def _getproducts(server_url: str, lidvid: str, allcollections=True) -> Iterator[
If ``allcollections`` is True, then return all collections for LID-only references; otherwise
return just the latest collection for LID-only references (has no effect on full LIDVID-references).
"""
url = f"{server_url}/products/{lidvid}/members/{'all' if allcollections else 'latest'}"
# Commenting out `all` vs. `latest` functionality for now since the API does not support it at this time
# url = f"{server_url}/products/{lidvid}/members/{'all' if allcollections else 'latest'}"
url = f"{server_url}/products/{lidvid}/members"
params = {"sort": _searchkey, "limit": _apiquerylimit}
while True:
_logger.debug('Making request to %s with params %r', url, params)
Expand Down Expand Up @@ -196,15 +200,14 @@ def _addfiles(product: dict, bac: dict):
bac[lidvid] = files # Stash for future use


def _comprehendregistry(url: str, bundlelidvid: str, allcollections=True) -> tuple[int, dict, str]:
def _comprehendregistry(url: str, bundlelidvid: str, allcollections=True) -> tuple[dict, str]:
"""Fathom the registry.
Query the PDS API at ``url`` for all information about the PDS ``bundlelidvid`` and return a
comprehension of it. If ``allcollections`` is True, we include every reference from a collection
that's LID-only; if it's False, then we only include the latest reference form a LID-only reference.
A "comprehension of it" means a triple of the common prefix length of all PDS paths referenced
within it, the "B.A.C." (a dict mapping PDS lidvids to sets of ``_File``s), and the title of
the PDS bundle.
A "comprehension of it" means a double of the "B.A.C." (a dict mapping PDS lidvids to sets of ``_File``s),
and the title of the PDS bundle.
If ``allcollections`` is True, we include all collections, meaning that if a bundle references
a collection with LID only (no VID), we include all version IDs of that collection. When this
Expand All @@ -224,8 +227,6 @@ def _comprehendregistry(url: str, bundlelidvid: str, allcollections=True) -> tup
raise ValueError(f"🤷‍♀️ The bundle {bundlelidvid} cannot be found in the registry at {url}")
title = bundle.get("title", "«unknown»")
_addfiles(bundle, bac)
bundleurl = bundle["metadata"]["label_url"]
prefixlen = bundleurl.rfind("/") + 1

# It turns out the PDS registry makes this *trivial* compared to the PDS filesystem version;
# Just understanding it all was there was the hard part! 😊 THANK YOU! 🙏
Expand All @@ -235,21 +236,26 @@ def _comprehendregistry(url: str, bundlelidvid: str, allcollections=True) -> tup
_addfiles(product, bac)

# C'est tout 🌊
return prefixlen, bac, title
return bac, title


def _writechecksummanifest(fn: str, prefixlen: int, bac: dict) -> tuple[str, int, int]:
def _urltocommonpath(url: str, pathprefix: str) -> str:
"""Given a ``url`` and a ``pathprefix`` return the relative path of the path-only part of ``url``."""
return os.path.relpath(urlparse(url).path, pathprefix)


def _writechecksummanifest(fn: str, pathprefix: str, bac: dict) -> tuple[str, int, int]:
"""Write an AIP "checksum manifest".
This writes an AIP "checksum manifest" to the given ``fn`` PDS filename, stripping ``prefixlen``
characters off paths, and using information from the ``bac``. Return a triple of the MD5
This writes an AIP "checksum manifest" to the given ``fn`` PDS filename, stripping ``pathprefix``
off paths, and using information from the ``bac``. Return a triple of the MD5
of the manifest, its size in bytes, and a count of the number of entries in it.
"""
hashish, size, count = hashlib.new("md5"), 0, 0
with open(fn, "wb") as o:
for files in bac.values():
for f in files:
entry = f"{f.md5}\t{f.url[prefixlen:]}\r\n".encode("utf-8")
entry = f"{f.md5}\t{_urltocommonpath(f.url, pathprefix)}\r\n".encode("utf-8")
o.write(entry)
hashish.update(entry)
size += len(entry)
Expand All @@ -260,19 +266,20 @@ def _writechecksummanifest(fn: str, prefixlen: int, bac: dict) -> tuple[str, int
return hashish.hexdigest(), size, count


def _writetransfermanifest(fn: str, prefixlen: int, bac: dict) -> tuple[str, int, int]:
def _writetransfermanifest(fn: str, pathprefix: str, bac: dict) -> tuple[str, int, int]:
"""Write an AIP "transfer manifest".
This writes an AIP "transfer manifest" to the named ``fn`` PDS file, stripping ``prefixlen``
characters off the beginnings of PDS paths, and using info in the ``bac``. Return a triple of
This writes an AIP "transfer manifest" to the named ``fn`` PDS file, stripping ``pathprefix``
off the beginnings of PDS paths, and using info in the ``bac``. Return a triple of
the MD5 of the created manifest, its size in bytes, and a count of its entries.
"""
_logger.debug("⚙️ Writing AIP transfer manifest to %s", fn)
hashish, size, count = hashlib.new("md5"), 0, 0
with open(fn, "wb") as o:
for lidvid, files in bac.items():
for f in files:
entry = f"{lidvid:255}/{f.url[prefixlen:]:254}\r\n".encode("utf-8") # 254 because we hard-code the /
# We use [:254] because we hard code the ``/``:
entry = f"{lidvid:255}/{_urltocommonpath(f.url, pathprefix)[:254]}\r\n".encode("utf-8")
o.write(entry)
hashish.update(entry)
size += len(entry)
Expand All @@ -283,19 +290,19 @@ def _writetransfermanifest(fn: str, prefixlen: int, bac: dict) -> tuple[str, int
return hashish.hexdigest(), size, count


def _writeaip(bundlelidvid: str, prefixlen: int, bac: dict, ts: datetime) -> str:
def _writeaip(bundlelidvid: str, pathprefix: str, bac: dict, ts: datetime) -> str:
"""Create the PDS Archive Information Package.
This creates the PDS Archive Information Package for the given ``bundlelidvid``, stripping
``prefixlen`` characters off file paths and using information in the ``bac``. The ``ts``
``pathprefix`` off file paths and using information in the ``bac``. The ``ts``
timestamp tells what metadata to put in the PDS label and the date for generated PDS
filenames. Return a stringified version of the MD5 hash of the *checksum manifest* of the AIP.
"""
_logger.debug("⚙️ Creating AIP for %s", bundlelidvid)
cmfn = _makefilename(bundlelidvid, ts, "checksum_manifest", PDS_TABLE_FILENAME_EXTENSION)
tmfn = _makefilename(bundlelidvid, ts, "transfer_manifest", PDS_TABLE_FILENAME_EXTENSION)
cmmd5, cmsize, cmnum = _writechecksummanifest(cmfn, prefixlen, bac)
tmmd5, tmsize, tmnum = _writetransfermanifest(tmfn, prefixlen, bac)
cmmd5, cmsize, cmnum = _writechecksummanifest(cmfn, pathprefix, bac)
tmmd5, tmsize, tmnum = _writetransfermanifest(tmfn, pathprefix, bac)
labelfn = _makefilename(bundlelidvid, ts, "aip", PDS_LABEL_FILENAME_EXTENSION)
lid, vid = _splitlidvid(bundlelidvid)
writeaiplabel(labelfn, f"{lid}_v{vid}", lid, vid, cmfn, cmmd5, cmsize, cmnum, tmfn, tmmd5, tmsize, tmnum, ts)
Expand Down Expand Up @@ -333,6 +340,30 @@ def _writesip(bundlelidvid: str, bac: dict, title: str, site: str, ts: datetime,
writesiplabel(lid, vid, title, hashish.hexdigest(), size, count, "MD5", sipfn, site, o, cmmd5, ts)


def _findcommonpathprefix(bac: dict) -> str:
"""Given a ``bac``, find the smallest common path prefix of the path parts of all file URLs in it.
For example, if the files have URLs like
• https://atmos.nmsu.edu/pds4/data/airhead/bundle.xml
• https://atmos.nmsu.edu/pds4/data/airhead/collection/collection.csv
the path prefix would be ``/pds4/data/airhead``.
"""
def urls(bac):
"""Generate just the URLs of files in the ``bac``."""
for files in bac.values():
for file in files:
yield file.url

def paths(urls):
"""Generate the paths only in the ``urls``."""
for url in urls:
yield urlparse(url).path

return os.path.commonpath(paths(urls(bac)))


def generatedeeparchive(url: str, bundlelidvid: str, site: str, allcollections=True):
"""Make a PDS "deep archive" 🧘 in the current directory.
Expand All @@ -347,10 +378,11 @@ def generatedeeparchive(url: str, bundlelidvid: str, site: str, allcollections=T
ts = datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second, microsecond=0, tzinfo=None)

# Figure out what we're dealing with
prefixlen, bac, title = _comprehendregistry(url, bundlelidvid, allcollections)
bac, title = _comprehendregistry(url, bundlelidvid, allcollections)
pathprefix = _findcommonpathprefix(bac)

# Make it rain ☔️
cmmd5 = _writeaip(bundlelidvid, prefixlen, bac, ts)
cmmd5 = _writeaip(bundlelidvid, pathprefix, bac, ts)
_writesip(bundlelidvid, bac, title, site, ts, cmmd5)


Expand Down

0 comments on commit fc4ceaf

Please sign in to comment.