Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: New DataStore / Encoder / Decoder API for review #1087

Closed
wants to merge 1 commit into from

Conversation

shoyer
Copy link
Member

@shoyer shoyer commented Nov 7, 2016

The goal here is to make something extensible that we can live with for quite
some time, and to clean up the internals of xarray's backend interface.

Most of these are analogues of existing xarray classes with a cleaned up
interface. I have not yet worried about backwards compatibility or tests -- I
would appreciate feedback on the approach here.

Several parts of the logic exist for the sake of dask. I've included the word
"dask" in comments to facilitate inspection by mrocklin.

CC @rabernat, @pwolfram, @jhamman, @mrocklin -- for review

CC @mcgibbon, @JoyMonteiro -- this is relevant to our discussion today about
adding support for appending to netCDF files. Don't let this stop you from
getting started on that with the existing interface, though.

The goal here is to make something extensible that we can live with for quite
some time, and to clean up the internals of xarray's backend interface.

Most of these are analogues of existing xarray classes with a cleaned up
interface. I have not yet worried about backwards compatibility or tests -- I
would appreciate feedback on the approach here.

Several parts of the logic exist for the sake of dask. I've included the word
"dask" in comments to facilitate inspection by mrocklin.

CC rabernat, jhamman, mrocklin -- for review

CC mcgibbon, JoyMonteiro -- this is relevant to our discussion today about
adding support for appending to netCDF files. Don't let this stop you from
getting started on that with the existing interface, though.

def get_token(self):
"""Return a token identifier suitable for use by dask."""
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could default to str(uuid.uuid4())

return (self.filename, os.path.getmtime(self.filename))

def get_name(self):
return self.filename
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that this would just be 'read-from-disk'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe that's better than using the full filename.

# Note: this mostly exists for the benefit of future support for partial
# reads -- we don't actually make use of this in the current version of
# xarray.
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably the thing returned by this method is never the result of a task and so doesn't need to be serialized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be passed as a target into da.store, so I think it does need to be serializable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zarr could definitely use this

"""
# Again, we actually have a use for the region argument? Could be useful
# to ensure writes to zarr are safe.
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What behavior does HDF5 allow here? Can we write from multiple threads to non-overlapping blocks of the on-disk array? Is the library safe enough to allow this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write from multiple threads to non-overlapping blocks of the on-disk array?

I wish! Unfortunately, my understanding is that this is not the case. HDF5 isn't at all threadsafe -- not even for reading entirely different files at the same time. In the best case scenario, you have compiled HDF5 in "threadsafe" mode which just means they add their own global lock around every API call. So we will need to use some sort of global lock for all HDF5 files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to like zarr over HDF. ;)

self._attributes[name] = copy.deepcopy(value)

def get_write_lock(self, name, region=Ellipsis):
return self._write_locks[name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, presumably, is what @alimanfoo would implement if we wanted to support climate data on Zarr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. The main complexity would be mapping region (in array coordinates) to the set of overlapping blocks (each of which probably needs it's own lock), but he probably already has such a system.

import dask.array as da
# TODO: dask.array.store needs to be able to accept a list of Lock
# objects.
da.store(self.sources, self.targets, lock=self.lock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems doable to me

def write_datastore(dataset, store, encode=None, encoding=None,
close_on_error=False):
# TODO: add compute keyword argument to allow for returning futures, like
# dask.array.store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that you don't want to deal with futures directly. Instead you want to expose a dask.graph that the distributed client can collect and replace with futures on its own.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of simply returning the dask.delayed object returned by da.store (that's what I meant by "futrue"). Unless you think this function should be returning a dask graph directly?

Copy link
Member Author

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to also add in a prototype of consolidated file handling (with an LRU cache and pickle-ability) that DataStores can plug in to. That will be a cleaner solution for dask.distributed.

return (self.filename, os.path.getmtime(self.filename))

def get_name(self):
return self.filename
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe that's better than using the full filename.

# Note: this mostly exists for the benefit of future support for partial
# reads -- we don't actually make use of this in the current version of
# xarray.
raise NotImplementedError
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be passed as a target into da.store, so I think it does need to be serializable.

"""
# Again, we actually have a use for the region argument? Could be useful
# to ensure writes to zarr are safe.
return None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write from multiple threads to non-overlapping blocks of the on-disk array?

I wish! Unfortunately, my understanding is that this is not the case. HDF5 isn't at all threadsafe -- not even for reading entirely different files at the same time. In the best case scenario, you have compiled HDF5 in "threadsafe" mode which just means they add their own global lock around every API call. So we will need to use some sort of global lock for all HDF5 files.

self._attributes[name] = copy.deepcopy(value)

def get_write_lock(self, name, region=Ellipsis):
return self._write_locks[name]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. The main complexity would be mapping region (in array coordinates) to the set of overlapping blocks (each of which probably needs it's own lock), but he probably already has such a system.

def write_datastore(dataset, store, encode=None, encoding=None,
close_on_error=False):
# TODO: add compute keyword argument to allow for returning futures, like
# dask.array.store.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of simply returning the dask.delayed object returned by da.store (that's what I meant by "futrue"). Unless you think this function should be returning a dask graph directly?

@shoyer
Copy link
Member Author

shoyer commented Nov 29, 2017

CC @alexamici for interest in the backends refactor


def __call__(self, variables, attrs):
return conventions.decode_cf_variables(
variables, attrs, **self._kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. It would definitely solve some of the encoding challenges with zarr.

@rabernat
Copy link
Contributor

Stephan this looks awesome! Should simplify the backends a lot!

I do worry that it will be painful to refactor the existing backends. But I guess that is the cost of progress.

@rabernat rabernat mentioned this pull request Nov 29, 2017
4 tasks
@shoyer
Copy link
Member Author

shoyer commented Nov 30, 2017

OK, I'm going to try to reboot this and finish it up in the form of an API that we'll be happy with going forward. I just discovered two more xarray backends over the past two days (in Unidata's Siphon and something @alexamici and colleagues are writing to reading GRIB files), so clearly the demand is here.

One additional change I'd like to make is try to rewrite the encoding/decoding functions for variables into a series of invertible coding filters that can potentially be chained together in a flexible way (this is somewhat inspired by zarr). This will allow different backends to mix/match filters as necessary, depending on their particular needs. I'll start on that in another PR.

@alimanfoo
Copy link
Contributor

alimanfoo commented Nov 30, 2017 via email

@shoyer
Copy link
Member Author

shoyer commented Dec 1, 2017

See #1752 for getting started on filters. I had a productive plane ride!

@alimanfoo thanks for the pointer to numcodecs. I'm sure that will come in handy eventually. Most of the "filters" xarray uses here are at a slightly higher level decoding/encoding metadata. The actual filters themselves are generally quite simple, e.g., just coercing a dtype, but there is lots of metadata to keep track of to know when they are appropriate to use.

Copy link
Member

@jhamman jhamman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really cool. Will make adding/maintaining backends much easier.

# type: (Hashable, Union[Ellipsis, Tuple[slice, ...]]) -> object
"""Return a lock for writing a given variable.

This method may be useful for DataStores that from which data is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...DataSores for which data...

class InMemoryDataStore(AbstractWritableDataStore):
"""Stores variables and attributes directly in OrderedDicts.

This store exists internal testing purposes, e.g., for integration tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...exists for internal...

def __init__(self):
self._variables = OrderedDict()
self._attributes = OrderedDict()
# do we need locks? are writes to NumPy arrays thread-safe?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer is yes, we need locks (no writes are not thread safe), to this question but I imagine @mrocklin can give the final word.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never do overlapping writes though, right? I've found that locks are not necessary as long as the underlying data store's chunking doesn't overlap poorly with how we're writing chunks. Given that NumPy arrays are entirely fine-grained this doesn't seem like it would be an issue.

from xarray.core.pycompat import OrderedDict, dask_array_type

from xarray import Variable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you need to import xarray, and conventions. I'm guessing you are also expecting to put all the Coders in a module coders?

self.targets.append(target)
self.locks.append(lock)
else:
target[...] = source
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Nice to get this in a cleaner form.

@alexamici
Copy link
Collaborator

@shoyer regarding the xarray-grib-driver (not public yet, sorry) we have been working on the GRIB side lately and I didn't review this branch until today. Now we are coming back to the xarray side and I welcome the new "pluggability" of the encoding / decoding engine. Anyway since a lot of the coding work is already done by the ecCodes library my hope is that most of the complexity will stay outside of xarray anyway.

@jhamman
Copy link
Member

jhamman commented Jul 29, 2019

@shoyer - since I've started work on a related subject in #3166, I'm wondering if you think this PR is capable of being revived? I think we'll have some time soon to pick this up in one form or another.

@shoyer
Copy link
Member Author

shoyer commented Jul 29, 2019

This is gotten a little stale at this point. Coders did make it in, though we haven't moved over everything from conventions.py yet. This interface should also probably be reconciled with the use of CachingFileManager.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants