-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Performance improvements for zarr backend #1800
Changes from 16 commits
a0bea98
afdb254
cc02150
86240cd
9c89ef2
2568d21
d459c66
c59ca57
2dd186a
8f71b31
07b9c21
67fcd92
b38e1a6
47ba8b6
9152b12
26b6bcb
b7681ae
e084e9e
a6aeb36
264b13f
9c03bfc
c92020a
18434f9
9f89c7c
3590d28
69cacee
8d744e0
a8dabdf
7858db7
48bf7ef
53260c9
7ed6bf8
3872da2
e6b7068
c31decf
189d262
07b92e2
96996ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,10 +6,10 @@ | |
import time | ||
import traceback | ||
import contextlib | ||
from collections import Mapping | ||
from collections import Mapping, OrderedDict | ||
import warnings | ||
|
||
from ..conventions import cf_encoder | ||
from ..conventions import cf_encoder, maybe_encode_as_char_array | ||
from ..core import indexing | ||
from ..core.utils import FrozenOrderedDict, NdimSizeLenMixin | ||
from ..core.pycompat import iteritems, dask_array_type | ||
|
@@ -216,7 +216,11 @@ def store_dataset(self, dataset): | |
|
||
def store(self, variables, attributes, check_encoding_set=frozenset(), | ||
unlimited_dims=None): | ||
# This seems out of place | ||
variables = OrderedDict([(k, maybe_encode_as_char_array(v)) | ||
for k, v in variables.items()]) | ||
self.set_attributes(attributes) | ||
self.set_dimensions(variables, unlimited_dims=unlimited_dims) | ||
self.set_variables(variables, check_encoding_set, | ||
unlimited_dims=unlimited_dims) | ||
|
||
|
@@ -234,12 +238,22 @@ def set_variables(self, variables, check_encoding_set, | |
|
||
self.writer.add(source, target) | ||
|
||
def set_necessary_dimensions(self, variable, unlimited_dims=None): | ||
def set_dimensions(self, variables, unlimited_dims=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a docstring or comment explaining what this method does? This would help new developers (including myself) come onboard with backend development. |
||
if unlimited_dims is None: | ||
unlimited_dims = set() | ||
dims = self.get_dimensions() | ||
for d, l in zip(variable.dims, variable.shape): | ||
if d not in dims: | ||
|
||
existing_dims = self.get_dimensions() | ||
|
||
dims = {} | ||
for v in variables.values(): | ||
dims.update(dict(zip(v.dims, v.shape))) | ||
|
||
for d, l in dims.items(): | ||
|
||
if d in existing_dims and l != existing_dims[d]: | ||
raise ValueError("Unable to update size for existing dimension" | ||
"%r (%d != %d)" % (d, l, existing_dims[d])) | ||
elif d not in existing_dims: | ||
is_unlimited = d in unlimited_dims | ||
self.set_dimension(d, l, is_unlimited) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,13 +43,8 @@ def _ensure_valid_fill_value(value, dtype): | |
return _encode_zarr_attr_value(valid) | ||
|
||
|
||
def _decode_zarr_attr_value(value): | ||
return value | ||
|
||
|
||
def _decode_zarr_attrs(attrs): | ||
return OrderedDict([(k, _decode_zarr_attr_value(v)) | ||
for k, v in attrs.items()]) | ||
return OrderedDict(attrs.asdict()) | ||
|
||
|
||
def _replace_slices_with_arrays(key, shape): | ||
|
@@ -297,9 +292,6 @@ def __init__(self, zarr_group, writer=None): | |
raise KeyError("Zarr group can't be read by xarray because " | ||
"it is missing the `%s` attribute." % | ||
_DIMENSION_KEY) | ||
else: | ||
# initialize hidden dimension attribute | ||
self.ds.attrs[_DIMENSION_KEY] = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the removal of these lines is relevant for the dimension key test error. |
||
|
||
if writer is None: | ||
# by default, we should not need a lock for writing zarr because | ||
|
@@ -315,7 +307,7 @@ def open_store_variable(self, name, zarr_array): | |
data = indexing.LazilyIndexedArray(ZarrArrayWrapper(name, self)) | ||
dimensions, attributes = _get_zarr_dims_and_attrs(zarr_array, | ||
_DIMENSION_KEY) | ||
attributes = _decode_zarr_attrs(attributes) | ||
attributes = _decode_zarr_attrs(attributes.asdict()) | ||
encoding = {'chunks': zarr_array.chunks, | ||
'compressor': zarr_array.compressor, | ||
'filters': zarr_array.filters} | ||
|
@@ -331,29 +323,33 @@ def get_variables(self): | |
for k, v in self.ds.arrays()) | ||
|
||
def get_attrs(self): | ||
_, attributes = _get_zarr_dims_and_attrs(self.ds, _DIMENSION_KEY) | ||
attributes = HiddenKeyDict(self.ds.attrs.asdict(), [_DIMENSION_KEY]) | ||
return _decode_zarr_attrs(attributes) | ||
|
||
def get_dimensions(self): | ||
dimensions, _ = _get_zarr_dims_and_attrs(self.ds, _DIMENSION_KEY) | ||
try: | ||
dimensions = self.ds.attrs[_DIMENSION_KEY].asdict() | ||
except KeyError: | ||
raise KeyError("Zarr object is missing the attribute `%s`, which " | ||
"is required for xarray to determine variable " | ||
"dimensions." % (_DIMENSION_KEY)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be raising when you delete the dimension key. |
||
return dimensions | ||
|
||
def set_dimension(self, name, length, is_unlimited=False): | ||
if is_unlimited: | ||
def set_dimensions(self, variables, unlimited_dims=None): | ||
if unlimited_dims is not None: | ||
raise NotImplementedError( | ||
"Zarr backend doesn't know how to handle unlimited dimensions") | ||
# consistency check | ||
if name in self.ds.attrs[_DIMENSION_KEY]: | ||
if self.ds.attrs[_DIMENSION_KEY][name] != length: | ||
raise ValueError("Pre-existing array dimensions %r " | ||
"encoded in Zarr attributes are incompatible " | ||
"with newly specified dimension `%s`: %g" % | ||
(self.ds.attrs[_DIMENSION_KEY], name, length)) | ||
self.ds.attrs[_DIMENSION_KEY][name] = length | ||
|
||
def set_attribute(self, key, value): | ||
_, attributes = _get_zarr_dims_and_attrs(self.ds, _DIMENSION_KEY) | ||
attributes[key] = _encode_zarr_attr_value(value) | ||
|
||
dims = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make this an |
||
for v in variables.values(): | ||
dims.update(dict(zip(v.dims, v.shape))) | ||
|
||
self.ds.attrs.update({_DIMENSION_KEY: dims}) | ||
|
||
def set_attributes(self, attributes): | ||
encoded_attrs = OrderedDict((k, _encode_zarr_attr_value(v)) | ||
for k, v in iteritems(attributes)) | ||
self.ds.attrs.put(encoded_attrs) | ||
|
||
def prepare_variable(self, name, variable, check_encoding=False, | ||
unlimited_dims=None): | ||
|
@@ -363,41 +359,21 @@ def prepare_variable(self, name, variable, check_encoding=False, | |
dtype = variable.dtype | ||
shape = variable.shape | ||
|
||
# TODO: figure out how zarr should deal with unlimited dimensions | ||
self.set_necessary_dimensions(variable, unlimited_dims=unlimited_dims) | ||
|
||
fill_value = _ensure_valid_fill_value(attrs.pop('_FillValue', None), | ||
dtype) | ||
|
||
# TODO: figure out what encoding is needed for zarr | ||
encoding = _extract_zarr_variable_encoding( | ||
variable, raise_on_invalid=check_encoding) | ||
|
||
# arguments for zarr.create: | ||
# zarr.creation.create(shape, chunks=None, dtype=None, | ||
# compressor='default', fill_value=0, order='C', store=None, | ||
# synchronizer=None, overwrite=False, path=None, chunk_store=None, | ||
# filters=None, cache_metadata=True, **kwargs) | ||
if name in self.ds: | ||
zarr_array = self.ds[name] | ||
else: | ||
zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
fill_value=fill_value, **encoding) | ||
# decided not to explicity enumerate encoding options because we | ||
# risk overriding zarr's defaults (e.g. if we specificy | ||
# cache_metadata=None instead of True). Alternative is to have lots of | ||
# logic in _extract_zarr_variable encoding to duplicate zarr defaults. | ||
# chunks=encoding.get('chunks'), | ||
# compressor=encoding.get('compressor'), | ||
# filters=encodings.get('filters'), | ||
# cache_metadata=encoding.get('cache_metadata')) | ||
|
||
encoded_attrs = OrderedDict() | ||
# the magic for storing the hidden dimension data | ||
zarr_array.attrs[_DIMENSION_KEY] = dims | ||
_, attributes = _get_zarr_dims_and_attrs(zarr_array, _DIMENSION_KEY) | ||
|
||
encoded_attrs[_DIMENSION_KEY] = dims | ||
for k, v in iteritems(attrs): | ||
attributes[k] = _encode_zarr_attr_value(v) | ||
encoded_attrs[k] = _encode_zarr_attr_value(v) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe leave this as a #TODO? |
||
|
||
zarr_array = self.ds.create(name, shape=shape, dtype=dtype, | ||
fill_value=fill_value, **encoding) | ||
zarr_array.attrs.put(encoded_attrs) | ||
|
||
return zarr_array, variable.data | ||
|
||
|
@@ -406,29 +382,6 @@ def store(self, variables, attributes, *args, **kwargs): | |
for k, v in iteritems(variables)) | ||
AbstractWritableDataStore.store(self, new_vars, attributes, | ||
*args, **kwargs) | ||
# sync() and close() methods should not be needed with zarr | ||
|
||
|
||
# from zarr docs | ||
|
||
# Zarr arrays can be used as either the source or sink for data in parallel | ||
# computations. Both multi-threaded and multi-process parallelism are | ||
# supported. The Python global interpreter lock (GIL) is released for both | ||
# compression and decompression operations, so Zarr will not block other Python | ||
# threads from running. | ||
# | ||
# A Zarr array can be read concurrently by multiple threads or processes. No | ||
# synchronization (i.e., locking) is required for concurrent reads. | ||
# | ||
# A Zarr array can also be written to concurrently by multiple threads or | ||
# processes. Some synchronization may be required, depending on the way the | ||
# data is being written. | ||
|
||
# If each worker in a parallel computation is writing to a separate region of | ||
# the array, and if region boundaries are perfectly aligned with chunk | ||
# boundaries, then no synchronization is required. However, if region and chunk | ||
# boundaries are not perfectly aligned, then synchronization is required to | ||
# avoid two workers attempting to modify the same chunk at the same time. | ||
|
||
|
||
def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should each backend define encode/decode methods? For string variables, we need to encode before setting the dimensions.