Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threadlocking in DataArray calculations for zarr data depending on where it's loaded from (S3 vs local) #6033

Closed
adair-kovac opened this issue Nov 26, 2021 · 8 comments

Comments

@adair-kovac
Copy link

adair-kovac commented Nov 26, 2021

What happened:

I am consistently seeing an issue where if I download the same dataset from a particular zarr archive on S3, calculations are slowed down by 3 seconds or more compared to when the same data was loaded locally. (I make sure to load all the data into the datasets before trying the calculations.) These same calculations are subsecond on the locally-loaded data, and it's literally just the same data copied from S3 using the aws cli.

Profiling shows that this is a threadlocking issue.

I have been able to reproduce it:

  • Using DataArray.mean or min as the calculation
  • On different machines, OS and Linux, on different networks
  • In different versions of Python, xarray, dask, and zarr
  • Loading the full Dataset with coordinates or just the data variable in question (they're in different directories for this archive)
  • On different zarr arrays in the same archive

What you expected to happen:

No major threadlocking issues, also calculations on the same data should perform the same regardless of where it was loaded from.

Minimal Complete Verifiable Example:

See the attached Jupyter Notebook (thread_locking.ipynb.zip) which has the magic for timing and profiling the operations.

import s3fs 
import xarray as xr

s3 = s3fs.S3FileSystem(anon=True)
def lookup(path):
    return s3fs.S3Map(path, s3=s3)

path_forecast = "hrrrzarr/sfc/20211124/20211124_00z_fcst.zarr/surface/PRES"
ds_from_s3 = xr.open_zarr(lookup(f"{path_forecast}/surface")) 
_ = ds_from_s3.PRES.values
%%time
%%prun -l 2

_ = ds_from_s3.PRES.mean(dim="time").values

This takes over 3 seconds, most of it in {method 'acquire' of '_thread.lock' objects}.

The same mean with the data in question downloaded via aws s3 cp --recursive and then opened locally is 10x faster. The threadlock is still the main time spent, but it's much less.

Environment:

Tested with these and more:

Python: 3.10.0
xarray: 0.20.1
dask: 2021.11.2
zarr: 2.10.3
s3fs: 2021.11.0
Python: 3.9.6
xarray: 0.19.0
dask: 2021.07.1
zarr: 2.8.3
s3fs: 2021.07.0

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.10.0 | packaged by conda-forge | (default, Nov 20 2021, 02:25:38) [Clang 11.1.0 ]
python-bits: 64
OS: Darwin
OS-release: 18.7.0
machine: x86_64
processor: i386
byteorder: little
LC_ALL: None
LANG: None
LOCALE: (None, 'UTF-8')
libhdf5: None
libnetcdf: None

xarray: 0.20.1
pandas: 1.3.4
numpy: 1.21.4
scipy: None
netCDF4: None
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.10.3
cftime: None
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2021.11.2
distributed: 2021.11.2
matplotlib: None
cartopy: None
seaborn: None
numbagg: None
fsspec: 2021.11.0
cupy: None
pint: None
sparse: None
setuptools: 59.2.0
pip: 21.3.1
conda: None
pytest: None
IPython: 7.29.0
sphinx: None

@max-sixty
Copy link
Collaborator

Thanks @adair-kovac .

To what extent is this the time to download the data? How big is the dataset? What's the absolute difference for a very small dataset? Or for a large dataset including the time to download the data first?

The threading issue may be threading contention, or it could be the main thread waiting for another thread to complete the download (others will know more here).

@adair-kovac
Copy link
Author

adair-kovac commented Nov 27, 2021

@max-sixty There shouldn't be any download happening by the time I'm seeing this issue. If you check the notebook (also here if it's easier to read), I check that the data is downloaded (via looking at the dataset nbytes) before attempting the computation and verify it hasn't changed afterward. wait nevermind that doesn't actually work, I just verified that nbytes returns the same size of the data even when I've just opened the dataset. Is there a way to check what is and isn't downloaded?

But in any case, I call .values on the data beforehand and it has the same issue if I run the method a second (third, fourth, fifth) time. Unless it's repeatedly re-downloading the same data for some reason download doesn't seem to be the problem.

The dataset is about 350 MB and has 48 x 150 x 150 chunks. I haven't tried creating smaller or larger datasets and posting them to S3 to see if it happens with them, too.

@max-sixty
Copy link
Collaborator

Is there a way to check what is and isn't downloaded?

What is the time difference between the approach you've tried vs. before anything is downloaded?

@adair-kovac
Copy link
Author

@max-sixty Okay, yeah, that's the problem, it's re-downloading the data every time the values are accessed. Apparently this is the default behavior given that zarr is a chunked format.

Adding cache=True:

  • Fixes the problem in open_dataset
  • Throws an error in open_zarr
  • Doesn't have any noticeable effect in open_mfdataset

My data archive can't normally be usefully read without open_mfdataset and it's small enough to easily fit in memory so this behavior isn't ideal.

I guess I had assumed that the data would get stored on disk temporarily even if it wasn't in memory, too, so it's an unexpected limitation that the choices are to either cache it in memory or re-read from S3 every time you access the data. It also seems odd that the default caching logic just takes into account whether the data is chunked, not how big (small) it is, how slow accessing the store is, or whether the data's being repeatedly accessed.

@Illviljan
Copy link
Contributor

If you think the data would fit in memory maybe #5704 would be enough?

@jhamman
Copy link
Member

jhamman commented Jan 20, 2022

It is worth mentioning that, specifically when using Zarr with fsspec, you have multiple layers of caching available.

  1. You can ask fsspec to cache locally:
path = 's3://hrrrzarr/sfc/20211124/20211124_00z_fcst.zarr/surface/PRES'
ds = xr.open_zarr('simplecache::'+path)

(more details on configuration: https://filesystem-spec.readthedocs.io/en/latest/features.html#caching-files-locally)

  1. You can ask Zarr to cache chunks as they are read:
mapper = fsspec.get_mapper(path)
store = LRUStoreCache(mapper, max_size=1e9)
ds = xr.open_zarr(store)

(more details on configuration here: https://zarr.readthedocs.io/en/stable/api/storage.html#zarr.storage.LRUStoreCache)

  1. Configure a more complex mapper/cache using 3rd party mappers (i.e. Zict)

perhaps @martindurant has more to add here?

@martindurant
Copy link
Contributor

It would be interesting to turn on s3fs logging to see the access pattern, if you are interested.

fsspec.utils.setup_logging(logger_name="s3fs")

Particularly, I am interested in whether xarray is loading chunk-by chunk serially versus concurrently. It would be good to know your chunksize versus total array size.

The dask version is interesting:

xr.open_zarr(lookup(f"{path_forecast}/surface"), chunks={}) # uses dask

where the dask partition size will be the same as the underlying chunk size. If you find a lot of latency (small chunks), you can sometimes get an order of magnitude download performance increase by specifying the chunksize along some dimension(s) to be a multiple of the on-disk size. I wouldn't normally recommend Dask just for loading the data into memory, but feel free to experiment.

@dcherian
Copy link
Contributor

There doesn't seem to be anything actionable here. Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants