Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slow performance when storing datasets in gcsfs-backed zarr stores #1770

Closed
rabernat opened this issue Dec 8, 2017 · 11 comments
Closed

slow performance when storing datasets in gcsfs-backed zarr stores #1770

rabernat opened this issue Dec 8, 2017 · 11 comments
Labels
topic-zarr Related to zarr storage library

Comments

@rabernat
Copy link
Contributor

rabernat commented Dec 8, 2017

We are working on integrating zarr with xarray. In the process, we have encountered a performance issue that I am documenting here. At this point, it is not clear if the core issue is in zarr, gcsfs, dask, or xarray. I originally started posting this in zarr, but in the process, I became more convinced the issue was with xarray.

Dask Only

Here is an example using only dask and zarr.

# connect to a local dask scheduler
from dask.distributed import Client
client = Client('tcp://129.236.20.45:8786')

# create a big dask array
import dask.array as dsa
shape = (30, 50, 1080, 2160)
chunkshape = (1, 1, 1080, 2160)
ar = dsa.random.random(shape, chunks=chunkshape)

# connect to gcs and create MutableMapping
import gcsfs
fs = gcsfs.GCSFileSystem(project='pangeo-181919')
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/test999', gcs=fs, check=True,
                              create=True)

# create a zarr array to store into
import zarr
za = zarr.create(ar.shape, chunks=chunkshape, dtype=ar.dtype, store=gcsmap)

# write it
ar.store(za, lock=False)

When you do this, it spends a long time serializing stuff before the computation starts.

For a more fine-grained look at the process, one can instead do

delayed_obj = a.store(za, compute=False, lock=False)
%prun future = client.compute(dobj)

This reveals that the pre-compute step takes about 10s. Monitoring the distributed scheduler, I can see that, once the computation starts, it takes about 1:30 to store the array (27 GB). (This is actually not bad!)

Some debugging by @mrocklin revealed the following step is quite slow

import cloudpickle
%time len(cloudpickle.dumps(za))

On my system, this was taking close to 1s. On contrast, when the store passed to gcsmap is not a GCSMap but instead a path, it is in the microsecond territory. So pickling GCSMap objects is relatively slow. I'm not sure whether this pickling happens when we call client.compute or during the task execution.

There is room for improvement here, but overall, zarr + gcsfs + dask seem to integrate well and give decent performance.

Xarray

This get much worse once xarray enters the picture. (Note that this example requires the xarray PR #1528, which has not been merged yet.)

# wrap the dask array in an xarray
import xarray as xr
import numpy as np
ds = xr.DataArray(ar, dims=['time', 'depth', 'lat', 'lon'],
                  coords={'lat': np.linspace(-90, 90, Ny),
                          'lon': np.linspace(0, 360, Nx)}).to_dataset(name='temperature')
# store to a different bucket
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/test1', gcs=fs, check=True, create=True)
ds.to_zarr(store=gcsmap, mode='w')

Now the store step takes 18 minutes. Most of this time, is upfront, during which there is little CPU activity and no network activity. After about 15 minutes or so, it finally starts computing, at which point the writes to gcs proceed more-or-less at the same rate as with the dask-only example.

Profiling the to_zarr with snakeviz reveals that it is spending most of its time waiting for thread locks.

image

I don't understand this, since I specifically eliminated locks when storing the zarr arrays.

@rabernat
Copy link
Contributor Author

rabernat commented Dec 8, 2017

does dask.array.store(..., lock=None) do the same thing as dask.array.store(..., lock=False)?

@mrocklin
Copy link
Contributor

mrocklin commented Dec 8, 2017 via email

@mrocklin
Copy link
Contributor

When pickling the GCS mapping it looks like we're actually pulling down all of the data within it (Zarr has already placed some metadata) instead of serializing the connection information.

@martindurant what information should we safely be passing around when serializing? These tasks would need to remain valid for longer than the standard hour-long short-lived-token.

@martindurant
Copy link
Contributor

I am puzzled that serializing the mapping is pulling the data. GCSMap does not have get/set_state, but the only attributes are the GCSFileSystem and path. Perhaps the __getitem__ gets called? As for the GCSFileSystem, it stores the token with a renewable token, which lives indefinitely, and the refresh API is called upon deserialization. There should probably be a check in _call to ensure that the token hasn't expired.

@mrocklin
Copy link
Contributor

Ah, we can just serialize the .gcs object and leave the rest to the GCSFileSystem.

Perhaps the MutableMapping collections class defines get/setstate differently. I'll play aorund.

@mrocklin
Copy link
Contributor

It looks like serializing GCSFileSystem.dirs can be quite expensive. I think that this is just here for caching and efficiency, is this correct?

@mrocklin
Copy link
Contributor

t

@martindurant
Copy link
Contributor

Yes, dirs exists to prevent the need to query the server for file-lists multiple times. There is an outstanding issue to move to prefix/delimited listing as with s3fs, rather than fetch the complete listing for a bucket. If all the paths are known beforehand, as might be the case for zarr, then it may be of no use at all - but actually I'm not sure then why it would have been populated.

@mrocklin
Copy link
Contributor

fsspec/gcsfs#49

import gcsfs
fs = gcsfs.GCSFileSystem(project='pangeo-181919')
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/test997', gcs=fs, check=True,
                              create=True)

import dask.array as dsa
shape = (30, 50, 1080, 2160)
chunkshape = (1, 1, 1080, 2160)
ar = dsa.random.random(shape, chunks=chunkshape)

import zarr
za = zarr.create(ar.shape, chunks=chunkshape, dtype=ar.dtype, store=gcsmap)

In [2]: import cloudpickle
In [3]: %time len(cloudpickle.dumps(gcsmap))
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 560 µs
Out[3]: 213

@jakirkham
Copy link

Is this still an issue?

@dcherian dcherian added the topic-zarr Related to zarr storage library label Jan 13, 2019
@jhamman
Copy link
Member

jhamman commented Jan 13, 2019

Closing. I think our fixes in xarray and zarr last winter addressed most of the problems here. If others feel differently, please reopen.

@jhamman jhamman closed this as completed Jan 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-zarr Related to zarr storage library
Projects
None yet
Development

No branches or pull requests

6 participants