Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Dask interface #1674

Merged
merged 15 commits into from
Nov 7, 2017
Merged

Support Dask interface #1674

merged 15 commits into from
Nov 7, 2017

Conversation

mrocklin
Copy link
Contributor

@mrocklin mrocklin commented Oct 31, 2017

This integrates the new dask interface methods into XArray. This will place XArray as a first-class dask collection and help in particular with newer dask.distributed features.

Builds on work from @jcrist here: dask/dask#2748
Depends on dask/dask#2847

@mrocklin
Copy link
Contributor Author

I've only done Variable so far. Hopefully what's here seems straightforward. I'll do DataArray and DataSet next and then look at what legacy code I can clean up within XArray.

I'll be working on this while on a long flight and so may not respond quickly.


def visualize(self, **kwargs):
import dask
return dask.visualize(self, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My inclination would be to leave this out and require using dask.visualize(). My concern is that it could be easily confused with .plot().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@mrocklin
Copy link
Contributor Author

For the distributed work this now also uses dask/distributed#1513

@mrocklin
Copy link
Contributor Author

I've updated this to dataarray and dataset as well

@jhamman
Copy link
Member

jhamman commented Nov 1, 2017

@mrocklin - thanks for getting this started. Curious, does the test suite pass when you combine this with dask/dask#2847 ?

@mrocklin
Copy link
Contributor Author

mrocklin commented Nov 1, 2017

Generally yes, things work. There are a few xfailed failures in tests that used mock on functions that are no longer being used. Also xarray/tests/test_dask.py::test_dataarray_pickle is failing. Otherwise everything works well.

@mrocklin
Copy link
Contributor Author

mrocklin commented Nov 1, 2017

OK, this is now backwards compatible. I'll need to appropriately skip the tests if a new version of dask/dask and dask/distributed aren't around, but this change should be innocuous otherwise.

Review from @jhamman or @shoyer would be welcome.

@mrocklin
Copy link
Contributor Author

mrocklin commented Nov 1, 2017

OK, this is now backwards compatible. Tests should pass.

@mrocklin mrocklin changed the title WIP - Support Dask interface Support Dask interface Nov 1, 2017
Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally this looks great, thanks for putting this together!

if dask.is_dask_collection(v) else
(False, k, v) for k, v in self._variables.items()]
return self._dask_postcompute, (info, self._coord_names, self._dims,
self._attrs, self._file_obj, self._encoding)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please indent to match the opening ( on the previous line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Is it possible to add this style concern to the flake8 tests?

return None

def __dask_keys__(self):
return self._data.__dask_keys__()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is OK if these methods error (with AttributeError) when self._data is not a dask array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we always check if the object is a dask collection first by calling __dask_graph__

@@ -576,6 +576,33 @@ def reset_coords(self, names=None, drop=False, inplace=False):
dataset[self.name] = self.variable
return dataset

def __dask_graph__(self):
return self._variable.__dask_graph__()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually possible to have multiple dask arrays in an xarray.DataArray, if there are dask arrays in the coordinates. So it would be better to handle DataArray by converting to a Dataset than to a Variable. We use the _to_temp_dataset/_from_temp_dataset as a shortcut for these types of cases, e.g., see the current implementation of DataArray.persist().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that will be a bit tricky. My guess is that it might be simpler to just account for all of the possible dask things explicitly, as we do in dataset. Otherwise we're converting to and from datasets in each of the __dask_foo__ methods, and I would not be surprised to run into oddness there. I'm not sure though.

Can you recommend a test case that includes dask arrays in the coordinates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason this fails when I use dask.arrays for coordinates

    coord = da.arange(8, chunks=(4,))
    data = da.random.random((8, 8), chunks=(4, 4)) + 1
    array = DataArray(data,
                      coords={'x': coord, 'y': coord},
                      dims=['x', 'y'])

Replacing coords with a numpy array works fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrocklin - what is the failure you are referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In [1]: import xarray 

In [2]: import dask.array as da

In [3]:     coord = da.arange(8, chunks=(4,))
   ...:     data = da.random.random((8, 8), chunks=(4, 4)) + 1
   ...:     array = xarray.DataArray(data,
   ...:                       coords={'x': coord, 'y': coord},
   ...:                       dims=['x', 'y'])
   ...: 
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-3-b90a33ebf436> in <module>()
      3 array = xarray.DataArray(data,
      4                   coords={'x': coord, 'y': coord},
----> 5                   dims=['x', 'y'])

/home/mrocklin/workspace/xarray/xarray/core/dataarray.py in __init__(self, data, coords, dims, name, attrs, encoding, fastpath)
    227 
    228             data = as_compatible_data(data)
--> 229             coords, dims = _infer_coords_and_dims(data.shape, coords, dims)
    230             variable = Variable(dims, data, attrs, encoding, fastpath=True)
    231 

/home/mrocklin/workspace/xarray/xarray/core/dataarray.py in _infer_coords_and_dims(shape, coords, dims)
     68     if utils.is_dict_like(coords):
     69         for k, v in coords.items():
---> 70             new_coords[k] = as_variable(v, name=k)
     71     elif coords is not None:
     72         for dim, coord in zip(dims, coords):

/home/mrocklin/workspace/xarray/xarray/core/variable.py in as_variable(obj, name)
     94                             '{}'.format(obj))
     95     elif utils.is_scalar(obj):
---> 96         obj = Variable([], obj)
     97     elif getattr(obj, 'name', None) is not None:
     98         obj = Variable(obj.name, obj)

/home/mrocklin/workspace/xarray/xarray/core/variable.py in __init__(self, dims, data, attrs, encoding, fastpath)
    275         """
    276         self._data = as_compatible_data(data, fastpath=fastpath)
--> 277         self._dims = self._parse_dimensions(dims)
    278         self._attrs = None
    279         self._encoding = None

/home/mrocklin/workspace/xarray/xarray/core/variable.py in _parse_dimensions(self, dims)
    439             raise ValueError('dimensions %s must have the same length as the '
    440                              'number of data dimensions, ndim=%s'
--> 441                              % (dims, self.ndim))
    442         return dims
    443 

ValueError: dimensions () must have the same length as the number of data dimensions, ndim=1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My objective here is to produce a case where a data array has dask arrays in its coordinates so that I can write code to handle such cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrocklin - something funny is going on here. I'm going to open a separate issue.

Copy link
Member

@jhamman jhamman Nov 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the short term, this may help you move forward:

In [21]: x = xr.Variable('x', da.arange(8, chunks=(4,)))
    ...: y = xr.Variable('y', da.arange(8, chunks=(4,)) * 2)
    ...: data = da.random.random((8, 8), chunks=(4, 4)) + 1
    ...: array = xr.DataArray(data,
    ...:                      coords={'xx': x, 'yy': y},
    ...:                      dims=['x', 'y'])
    ...:

In [22]: array
Out[22]:
<xarray.DataArray 'add-a034ba104341d3cca6b28ad7bf059b14' (x: 8, y: 8)>
dask.array<shape=(8, 8), dtype=float64, chunksize=(4, 4)>
Coordinates:
    xx       (x) int64 dask.array<shape=(8,), chunksize=(4,)>
    yy       (y) int64 dask.array<shape=(8,), chunksize=(4,)>
Dimensions without coordinates: x, y

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #1684 for more information and #1685 for the fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jhamman , using your suggested code and @shoyer 's suggestion to depend on the DataSet implementation I think that this is now resolved.

if dask.is_dask_collection(v) else
(False, k, v) for k, v in self._variables.items()]
return self._dask_postpersist, (info, self._coord_names, self._dims,
self._attrs, self._file_obj, self._encoding)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please indent

lambda x: x.persist(),
pytest.mark.skipif(LooseVersion(dask.__version__) < '0.16',
lambda x: dask.persist(x)[0],
reason='Need Dask 0.16+')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty confusing at first glance, unless you already deeply understand how pytest marks work.

I don't really have a suggestion for how to make this better, but maybe a comment is in order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment on the first such use of this parametrization

assert dask.is_dask_collection(y)
assert dask.is_dask_collection(y.var1)
assert dask.is_dask_collection(y.var2)
# assert not dask.is_dask_collection(y.var3) # TODO: avoid chunking unnecessarily in dataset.py::maybe_chunk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably argue about whether .chunk() should chunk variables that don't use the supplied dimension. But the default behavior is chunk everything when given an empty argument, so I think it's actually correct to do it this way (it certainly makes the return value easier to understand).

Probably a better way to do this would be to construct the dataset by hand from dask arrays, e.g.,

ds = Dataset({'foo': ('x', da.arange(3, chunks=(3,)), 'bar': ('x', np.arange(3))})
assert dask.is_dask_collection(ds)
assert dask.is_dask_collection(ds.foo) 
assert not dask.is_dask_collection(ds.bar) 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a distributed context there is more cost to this behavior than with the threaded scheduler because we communicate the array around the network, rather than do things immediately/locally.

Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, though it still needs the note in "What's new". This is pretty slow-risk in its current state (just adding new methods), so I would be OK including it in v0.10.

assert dask.is_dask_collection(z)
assert dask.is_dask_collection(z.var1)
assert dask.is_dask_collection(z.var2)
# assert not dask.is_dask_collection(z.var3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@mrocklin
Copy link
Contributor Author

mrocklin commented Nov 4, 2017

Looks good to me, though it still needs the note in "What's new". This is pretty slow-risk in its current state (just adding new methods), so I would be OK including it in v0.10.

So, we're currently labeling the Dask collection interface (what we've implemented here) as experimental and subject to change without a deprecation cycle. I don't foresee much changing, but it might be wise to let people experiment with this in master for a while without putting it in the release. I would not be surprised if we find issues with it after moderate use.

@mrocklin
Copy link
Contributor Author

mrocklin commented Nov 6, 2017

I just tried things and persisting datasets seems to work well for me in practice.

@mrocklin
Copy link
Contributor Author

mrocklin commented Nov 7, 2017

Thank you for stepping in @shoyer .

From my perspective this is good to go. However I'm also not in any rush.

@shoyer shoyer merged commit 10495be into pydata:master Nov 7, 2017
@shoyer
Copy link
Member

shoyer commented Nov 7, 2017

Thanks Matt. I was just waiting for CI to pass.

I've indicated that this is experimental in the release notes, so as long as we keep the messaging consistent on the Dask side I think we have room to change this up if needed.

@mrocklin
Copy link
Contributor Author

mrocklin commented Nov 7, 2017

Great. I'm glad to see this in. Thanks for the help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants