Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading data from KNMI Data Platform #166

Merged
merged 34 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
bcddf30
Add first methods to get data from knmi data platform
rubencalje Apr 14, 2023
48fe72f
Improve knmi_data_platform
rubencalje Apr 14, 2023
a1d65a2
from this day onward, I pledge to use type hints when making new func…
rubencalje Apr 14, 2023
41015d8
minor change to make os.path.isdir work
rubencalje Apr 14, 2023
46bd163
Read multiple files at once and add tests
rubencalje Apr 14, 2023
4ec8bc7
create method to get api_key
martinvonk Apr 15, 2023
4dd96e0
Update knmi_data_platform.py
martinvonk Apr 15, 2023
d135281
add h5netcdf as dependency
martinvonk Apr 16, 2023
5c804d2
add functions to read hdf5 files
martinvonk Apr 16, 2023
49de03a
update read_h5 and read_nc functions
martinvonk Apr 16, 2023
ddcf8e2
add read_grib_knmi function
martinvonk Apr 16, 2023
7ae146f
Update knmi_data_platform.py
martinvonk Apr 17, 2023
dbc642e
update reading h5 file
martinvonk Apr 17, 2023
6e05e44
isort
martinvonk Apr 17, 2023
93f8872
Update knmi_data_platform.py
martinvonk Apr 17, 2023
8d007be
don't provide engine when reading netcdf
martinvonk Apr 17, 2023
bbdb42d
Update knmi_data_platform.py
martinvonk Apr 17, 2023
ef634fe
Update knmi_data_platform.py
martinvonk Apr 17, 2023
7645657
Delete HA40_N55_202304130000_00600_GB
martinvonk Apr 17, 2023
7285001
Update knmi_data_platform.py
martinvonk Apr 17, 2023
aa6f8c7
Update test_018_knmi_data_platform.py
martinvonk Apr 17, 2023
b7501d2
Create KNMI_Data_Platform_GRIB.tar
martinvonk Apr 17, 2023
84f552e
Create KNMI_Data_Platform_H5.zip
martinvonk Apr 17, 2023
b74d51d
move hour check to read_dataset
martinvonk Apr 17, 2023
dfd8e8d
Update knmi_data_platform.py
martinvonk Apr 17, 2023
82791b6
Merge branch 'dev' into knmi_data_platform
martinvonk May 2, 2023
d884737
full install in ci
martinvonk May 25, 2023
ebecab1
resolve merge conflicts setup.py?
martinvonk May 25, 2023
ca163e3
Merge branch 'dev' into knmi_data_platform
martinvonk May 25, 2023
f62db61
add dependencies for knmi_data_platform
martinvonk May 25, 2023
62adbf2
seperate download and file reading
martinvonk May 25, 2023
5d9c7e1
upload correct dataset
martinvonk May 25, 2023
caaa084
fix codacy stuff and add simple docstring
martinvonk May 25, 2023
1588b0c
fix meta update bug
martinvonk May 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,40 @@ jobs:

steps:
- uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[ci]

- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings.
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=80 --statistics

- name: Download executables needed for tests
shell: bash -l {0}
run: |
python -c "import nlmod; nlmod.util.download_mfbinaries()"

- name: Run notebooks
if: ${{ github.event_name == 'push' }}
run: |
py.test ./tests -m "not notebooks"

- name: Run tests only
if: ${{ github.event_name == 'pull_request' }}
run: |
py.test ./tests -m "not notebooks"


- name: Run codacy-coverage-reporter
uses: codacy/codacy-coverage-reporter-action@master
with:
Expand Down
1 change: 1 addition & 0 deletions nlmod/read/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
geotop,
jarkus,
knmi,
knmi_data_platform,
meteobase,
regis,
rws,
Expand Down
305 changes: 305 additions & 0 deletions nlmod/read/knmi_data_platform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
import logging
import os
import re
import tarfile
from io import FileIO
from tempfile import TemporaryDirectory
from typing import Any, Dict, List, Optional, Tuple, Union
from zipfile import ZipFile

import requests
import xarray as xr
from h5py import Dataset as h5Dataset
from h5py import File as h5File
from numpy import arange, array, ndarray
from pandas import Timedelta, Timestamp, read_html
from tqdm import tqdm

logger = logging.getLogger(__name__)

# base_url = "https://api.dataplatform.knmi.nl/dataset-content/v1/datasets"
base_url = "https://api.dataplatform.knmi.nl/open-data"


def get_anonymous_api_key() -> Union[str, None]:
try:
url = "https://developer.dataplatform.knmi.nl/get-started"
tables = read_html(url) # get all tables from url
for table in tables:
for coln in table.columns:
if "KEY" in coln.upper(): # look for columns with key
api_key_str = table.iloc[0].loc[
coln
] # get entry with key (first row)
api_key = max(
api_key_str.split(), key=len
) # get key base on str length
logger.info(f"Retrieved anonymous API Key from {url}")
return api_key
except Exception as exc:
if Timestamp.today() < Timestamp("2024-07-01"):
logger.info("Retrieved anonymous API Key from memory")
api_key = (
"eyJvcmciOiI1ZTU1NGUxOTI3NGE5NjAwMDEyYTNlYjEiLCJpZCI6ImE1OGI5"
"NGZmMDY5NDRhZDNhZjFkMDBmNDBmNTQyNjBkIiwiaCI6Im11cm11cjEyOCJ9"
)
return api_key
else:
logger.error(
f"Could not retrieve anonymous API Key from {url}, please"
" create your own at https://developer.dataplatform.knmi.nl/"
)
raise exc


def get_list_of_files(
dataset_name: str,
dataset_version: str,
api_key: Optional[str] = None,
max_keys: int = 500,
start_after_filename: Optional[str] = None,
timeout: int = 120,
) -> List[str]:
"""Download list of files from KNMI data platform"""
if api_key is None:
api_key = get_anonymous_api_key()
files = []
is_trucated = True
while is_trucated:
url = f"{base_url}/datasets/{dataset_name}/versions/{dataset_version}/files"
r = requests.get(url, headers={"Authorization": api_key}, timeout=timeout)
params = {"maxKeys": f"{max_keys}"}
if start_after_filename is not None:
params["startAfterFilename"] = start_after_filename
r = requests.get(
url, params=params, headers={"Authorization": api_key}, timeout=timeout
)
rjson = r.json()
files.extend([x["filename"] for x in rjson["files"]])
is_trucated = rjson["isTruncated"]
start_after_filename = files[-1]
logger.debug(f"Listed files untill {start_after_filename}")
return files


def download_file(
dataset_name: str,
dataset_version: str,
fname: str,
dirname: str = ".",
api_key: Optional[str] = None,
timeout: int = 120,
) -> None:
"""Download file from KNMI data platform"""
if api_key is None:
api_key = get_anonymous_api_key()
url = (
f"{base_url}/datasets/{dataset_name}/versions/"
f"{dataset_version}/files/{fname}/url"
)
r = requests.get(url, headers={"Authorization": api_key}, timeout=timeout)
if not os.path.isdir(dirname):
os.makedirs(dirname)
logger.info(f"Download {fname} to {dirname}")
fname = os.path.join(dirname, fname)
data = r.json()
if "temporaryDownloadUrl" not in data:
raise FileNotFoundError(f"{fname} not found")
with requests.get(data["temporaryDownloadUrl"], stream=True, timeout=timeout) as r:
r.raise_for_status()
with open(fname, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)


def download_files(
dataset_name: str,
dataset_version: str,
fnames: List[str],
dirname: str = ".",
api_key: Optional[str] = None,
timeout: int = 120,
) -> None:
"""Download multiple files from KNMI data platform"""
for fname in tqdm(fnames):
download_file(
dataset_name=dataset_name,
dataset_version=dataset_version,
fname=fname,
dirname=dirname,
api_key=api_key,
timeout=timeout,
)


def read_nc(fo: Union[str, FileIO], **kwargs: dict) -> xr.Dataset:
"""Read netcdf (.nc) file to xarray Dataset"""
# could help to provide argument: engine="h5netcdf"
return xr.open_dataset(fo, **kwargs)


def get_timestamp_from_fname(fname: str) -> Union[Timestamp, None]:
"""Get the Timestamp from a filename (with some assumptions about the formatting)"""
datestr = re.search("(_[0-9]{12})", fname) # assumes YYYYMMDDHHMM
if datestr is not None:
match = datestr.group(0).replace("_", "")
year = int(match[0:4])
month = int(match[4:6])
day = int(match[6:8])
hour = int(match[8:10])
minute = int(match[8:10])
if hour == 24:
dtime = Timestamp(
year=year, month=month, day=day, hour=0, minute=minute
) + Timedelta(days=1)
else:
dtime = Timestamp(year=year, month=month, day=day, hour=hour, minute=minute)
return dtime
else:
raise FileNotFoundError(
"Could not find filename with timestamp formatted as YYYYMMDDHHMM"
)


def add_h5_meta(meta: Dict[str, Any], h5obj: Any, orig_ky: str = "") -> Dict[str, Any]:
"""Read metadata from hdf5 (.h5) file and add to existing metadata dictionary"""

def cleanup(val: Any) -> Any:
if isinstance(val, (ndarray, list)):
if len(val) == 1:
val = val[0]

if isinstance(val, (bytes, bytearray)):
val = str(val, encoding="utf-8")

return val

if hasattr(h5obj, "attrs"):
attrs = getattr(h5obj, "attrs")
submeta = {f"{orig_ky}/{ky}": cleanup(val) for ky, val in attrs.items()}
meta.update(submeta)

return meta


class MultipleDatasetsFound(Exception):
pass


def read_h5_contents(h5fo: h5File) -> Tuple[ndarray, Dict[str, Any]]:
"""Read contents from a hdf5 (.h5) file"""
data = None
meta = {}
for ky in h5fo:
group = h5fo[ky]
meta = add_h5_meta(meta, group, f"{ky}")
for gky in group:
member = group[gky]
meta = add_h5_meta(meta, member, f"{ky}/{gky}")
if isinstance(member, h5Dataset):
if data is None:
data = member[:]
else:
raise MultipleDatasetsFound("h5 contains multiple datasets")
return data, meta


def read_h5(fo: Union[str, FileIO]) -> xr.Dataset:
"""Read hdf5 (.h5) file to xarray Dataset"""
with h5File(fo) as h5fo:
data, meta = read_h5_contents(h5fo)

cols = meta["geographic/geo_number_columns"]
dx = meta["geographic/geo_pixel_size_x"]
rows = meta["geographic/geo_number_rows"]
dy = meta["geographic/geo_pixel_size_y"]
x = arange(0 + dx / 2, cols + dx / 2, dx)
y = arange(rows + dy / 2, 0 + dy / 2, dy)
t = Timestamp(meta["overview/product_datetime_start"])

ds = xr.Dataset(
data_vars={"data": (["y", "x"], array(data, dtype=float))},
coords={"x": x, "y": y, "time": t},
attrs=meta,
)
return ds


def read_grib(
fo: Union[str, FileIO], filter_by_keys=None, **kwargs: dict
) -> xr.Dataset:
"""Read GRIB file to xarray Dataset"""
if kwargs is None:
kwargs = {}

if filter_by_keys is not None:
if "backend_kwargs" not in kwargs:
kwargs["backend_kwargs"] = {}
kwargs["backend_kwargs"]["filter_by_keys"] = filter_by_keys
if "errors" not in kwargs["backend_kwargs"]:
kwargs["backend_kwargs"]["errors"] = "ignore"

return xr.open_dataset(fo, engine="cfgrib", **kwargs)


def read_dataset_from_zip(
fname: str, hour: Optional[int] = None, **kwargs: dict
) -> xr.Dataset:
"""Read KNMI data platfrom .zip file to xarray Dataset"""
if fname.endswith(".zip"):
with ZipFile(fname) as zipfo:
fnames = sorted([x for x in zipfo.namelist() if not x.endswith("/")])
ds = read_dataset(fnames=fnames, zipfo=zipfo, **kwargs)

elif fname.endswith(".tar"):
with tarfile.open(fname) as tarfo:
tempdir = TemporaryDirectory()
logger.info(f"Created temporary dir {tempdir}")
tarfo.extractall(tempdir.name)
fnames = sorted(
[
os.path.join(tempdir.name, x)
for x in tarfo.getnames()
if not x.endswith("/")
]
)
ds = read_dataset(fnames=fnames, zipfo=tarfo, hour=hour, **kwargs)
return ds


def read_dataset(
fnames: List[str],
zipfo: Union[None, ZipFile, tarfile.TarFile] = None,
hour: Optional[int] = None,
**kwargs: dict,
) -> xr.Dataset:
"""Read xarray dataset from different file types; .nc, .h5 or grib file"""
if hour is not None:
if hour == 24:
hour = 0
fnames = [x for x in fnames if get_timestamp_from_fname(x).hour == hour]

data = []
for file in tqdm(fnames):
if zipfo is not None:
if isinstance(zipfo, ZipFile):
fo = zipfo.open(file)
else:
fo = file
if file.endswith(".nc"):
data.append(read_nc(fo, **kwargs))
elif file.endswith(".h5"):
data.append(read_h5(fo, **kwargs))
elif "_GB" in file:
if isinstance(zipfo, tarfile.TarFile):
# memb = zipfo.getmember(file)
# fo = zipfo.extractfile(memb)
# yields TypeError: 'ExFileObject' object is not subscriptable
# alternative is to unpack in termporary directory
data.append(read_grib(file, **kwargs))
elif isinstance(zipfo, ZipFile):
data.append(read_grib(fo, **kwargs))
else:
raise ValueError(f"Can't read/handle file {file}")

return xr.concat(data, dim="time")
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"matplotlib",
"dask",
"colorama",
"h5netcdf",
]
keywords = ["hydrology", "groundwater", "modeling", "Modflow 6", "flopy"]
classifiers = [
Expand All @@ -55,7 +56,7 @@ repository = "https://github.com/ArtesiaWater/nlmod"
documentation = "https://nlmod.readthedocs.io/en/latest/"

[project.optional-dependencies]
full = ["gdown", "geocube", "bottleneck", "contextily"]
full = ["gdown", "geocube", "bottleneck", "contextily", "cfgrib", "ecmwflibs"]
test = ["pytest>=7", "pytest-cov", "pytest-dependency"]
nbtest = ["nbformat", "nbconvert>6.4.5"]
lint = ["flake8", "isort", "black[jupyter]"]
Expand Down
Binary file added tests/data/KNMI_Data_Platform_GRIB.tar
Binary file not shown.
Binary file added tests/data/KNMI_Data_Platform_H5.zip
Binary file not shown.
Binary file added tests/data/KNMI_Data_Platform_NETCDF.zip
Binary file not shown.
Loading