Skip to content

Commit

Permalink
[0.2.1] - 2024-06-24
Browse files Browse the repository at this point in the history
[0.2.1] - 2024-06-24
  • Loading branch information
nkarasiak committed Jun 24, 2024
2 parents 542ff32 + 1a517d9 commit b379ce8
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 47 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.2.1] - 2024-06-24

### Changed

- `search` is now parallelized using datetime.

## [0.2.0] - 2024-06-19

### Added
Expand Down
2 changes: 1 addition & 1 deletion earthdaily/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
# to hide warnings from rioxarray or nano seconds conversion
# warnings.filterwarnings("ignore")

__version__ = "0.2.0"
__version__ = "0.2.1"
12 changes: 6 additions & 6 deletions earthdaily/accessor/whittaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def whittaker(dataset, beta=10000.0, weights=None, time="time"):
"""

resampled = dataset.resample(time="1D").interpolate("linear")
weights_binary = np.in1d(resampled.time.dt.date, dataset.time.dt.date)
resampled = dataset.resample({time: "1D"}).interpolate("linear")
weights_binary = np.in1d(resampled[time].dt.date, dataset[time].dt.date)
if weights is not None:
weights_advanced = np.copy(weights_binary.astype(float))
weights_advanced[weights_advanced == 1.0] = weights
Expand All @@ -43,21 +43,21 @@ def whittaker(dataset, beta=10000.0, weights=None, time="time"):

# _core_dims = [dim for dim in dataset.dims if dim != "time"]
# _core_dims.extend([time])
m = resampled.time.size
m = resampled[time].size
ab_mat = _ab_mat(m, beta)

dataset_w = xr.apply_ufunc(
_whitw_pixel,
resampled,
input_core_dims=[["time"]],
output_core_dims=[["time"]],
input_core_dims=[[time]],
output_core_dims=[[time]],
output_dtypes=[float],
dask="parallelized",
vectorize=True,
kwargs=dict(weights=weights_advanced, ab_mat=ab_mat),
)

return dataset_w.isel(time=weights_binary)
return dataset_w.isel({time: weights_binary})


def _ab_mat(m, beta):
Expand Down
74 changes: 68 additions & 6 deletions earthdaily/earthdatastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
import os
import warnings
import time
import geopandas as gpd
import pandas as pd
import geopandas as gpd
import psutil
import requests
import xarray as xr
import numpy as np
from pystac.item_collection import ItemCollection
from pystac_client.item_search import ItemSearch
from pystac_client import Client
from itertools import chain
from odc import stac
from . import _scales_collections, cube_utils, mask
from .cube_utils import datacube, metacube, _datacubes, asset_mapper
Expand All @@ -21,6 +23,59 @@
logging.getLogger("earthdaily-earthdatastore")


class NoItems(Warning):
pass


_no_item_msg = NoItems("No item has been found for your query.")


def _datetime_split(datetime):
datetime_ = ItemSearch(url=None)._format_datetime(datetime).split("/")
datetime = [pd.Timestamp(datetime_[0]), pd.Timestamp(datetime_[1])]

s = pd.Timestamp(datetime[0])
e = pd.Timestamp(datetime[1])
days = (e - s).days
days_per_query = days // 10
if days_per_query == 0:
return None
datetimes = []
for d in range(0, days, days_per_query):
start = s + pd.Timedelta(days=d)
end = start + pd.Timedelta(days=days_per_query)
end = end if e > end else e
datetimes.append([start, end])

return datetimes


def _parallel_search(func):
def _search(*args, **kwargs):
from joblib import Parallel, delayed

datetime = kwargs.get("datetime", None)
need_parallel = False
if datetime:
datetimes = _datetime_split(datetime)
need_parallel = True if datetimes else False
if need_parallel:
kwargs.pop("datetime")
kwargs["raise_no_items"] = False
items = Parallel(n_jobs=10, backend="threading")(
delayed(func)(*args, datetime=datetime, **kwargs)
for datetime in datetimes
)
items = ItemCollection(chain(*items))
if len(items) == 0:
raise _no_item_msg
if not need_parallel:
items = func(*args, **kwargs)
return items

return _search


def post_query_items(items, query):
"""Applies query to items fetched from the STAC catalog.
Expand Down Expand Up @@ -644,7 +699,6 @@ def datacube(
if intersects is not None:
intersects = cube_utils.GeometryManager(intersects).to_geopandas()
self.intersects = intersects

if mask_with:
if mask_with not in mask._available_masks:
raise NotImplementedError(
Expand All @@ -656,14 +710,20 @@ def datacube(
search_kwargs, collections[0], key="eda:ag_cloud_mask_available"
)
mask_with = "ag_cloud_mask"
elif mask_with in ["cloud_mask", "cloudmask"]:
elif mask_with in [
"cloud_mask",
"cloudmask",
"cloud_mask_ag_version",
"cloudmask_ag_version",
]:
search_kwargs = self._update_search_kwargs_for_ag_cloud_mask(
search_kwargs,
collections[0],
key="eda:cloud_mask_available",
target_param="post_query",
)

mask_with = "cloud_mask"

else:
mask_with = mask._native_mask_def_mapping.get(collections[0], None)
sensor_mask = mask._native_mask_asset_mapping.get(collections[0], None)
Expand Down Expand Up @@ -830,6 +890,7 @@ def _update_search_for_assets(self, assets):
fields["include"].extend([f"assets.{asset}" for asset in assets])
return fields

@_parallel_search
def search(
self,
collections: str | list,
Expand All @@ -839,6 +900,7 @@ def search(
prefer_alternate=None,
add_default_scale_factor=False,
assets=None,
raise_no_items=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -975,8 +1037,8 @@ def search(
)
if post_query:
items_collection = post_query_items(items_collection, post_query)
if len(items_collection) == 0:
raise Warning("No item has been found.")
if len(items_collection) == 0 and raise_no_items:
raise _no_item_msg
return items_collection

def find_cloud_mask_items(self, items_collection, cloudmask="ag_cloud_mask"):
Expand Down
36 changes: 7 additions & 29 deletions earthdaily/earthdatastore/mask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
"ag_cloud_mask",
"cloud_mask",
"ag-cloud-mask",
"cloudmask" "scl",
"cloud_mask_ag_version",
"cloudmask_ag_version",
"cloudmask",
"scl",
]
_native_mask_def_mapping = {
"sentinel-2-l2a": "scl",
Expand All @@ -33,17 +36,6 @@
}


def _bool_or_int_to_njobs(var):
if isinstance(var, bool):
if var:
arg = 1
else:
arg = False
else:
arg = var
return arg


class Mask:
def __init__(self, dataset: xr.Dataset, intersects=None, bbox=None):
self._obj = dataset
Expand All @@ -67,7 +59,6 @@ def cloud_mask(
"ag_cloud_mask",
1,
labels_are_clouds=False,
n_jobs=_bool_or_int_to_njobs(mask_statistics),
)
return self._obj

Expand All @@ -80,11 +71,7 @@ def ag_cloud_mask(
self._obj = self._obj.where(self._obj["ag_cloud_mask"] == 1)
if mask_statistics:
self.compute_clear_coverage(
self._obj["ag_cloud_mask"],
"ag_cloud_mask",
1,
labels_are_clouds=False,
n_jobs=_bool_or_int_to_njobs(mask_statistics),
self._obj["ag_cloud_mask"], "ag_cloud_mask", 1, labels_are_clouds=False
)
return self._obj

Expand Down Expand Up @@ -113,11 +100,7 @@ def cloudmask_from_asset(
)
if mask_statistics:
self.compute_clear_coverage(
cloud_layer,
cloud_asset,
labels,
labels_are_clouds=labels_are_clouds,
n_jobs=_bool_or_int_to_njobs(mask_statistics),
cloud_layer, cloud_asset, labels, labels_are_clouds=labels_are_clouds
)
return self._obj

Expand All @@ -142,12 +125,7 @@ def venus_detailed_cloud_mask(self, mask_statistics=False):
)

def compute_clear_coverage(
self,
cloudmask_array,
cloudmask_name,
labels,
labels_are_clouds=True,
n_jobs=1,
self, cloudmask_array, cloudmask_name, labels, labels_are_clouds=True
):
if self._obj.attrs.get("usable_pixels", None) is None:
self.compute_available_pixels()
Expand Down
14 changes: 10 additions & 4 deletions examples/field_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,24 @@
pivot_cube = eds.datacube(
"sentinel-2-l2a",
intersects=pivot,
datetime=["2023-07-01","2023-07-31"],
assets=["red", "green", "blue"],
datetime=["2022-01","2022-03"],
assets=["red", "green", "blue", "nir"],
mask_with=["native"],
clear_cover=50,
)
pivot_cube.clear_percent.plot.scatter(x="time")

##############################################################################
# Add spectral indices using spyndex from earthdaily accessor
# ------------------------------------------------------------

pivot_cube = pivot_cube.ed.add_indices(['NDVI'])

##############################################################################
# Plots cube with SCL with at least 50% of clear data
# ----------------------------------------------------
pivot_cube = pivot_cube.load()
pivot_cube.ed.plot_rgb(col_wrap=3)
pivot_cube.ed.plot_rgb(col_wrap=3, vmin=0, vmax=.2)
plt.title("Pivot evolution masked with native cloudmasks")
plt.show()

Expand All @@ -55,6 +61,6 @@
zonal_stats = pivot_cube.ed.zonal_stats(pivot, ['mean','max','min'])

zonal_stats.isel(feature=0).to_array(dim="band").plot.line(
x="time", col="band", hue="stats"
x="time", col="band", hue="stats", col_wrap=3
)
plt.show()
2 changes: 1 addition & 1 deletion examples/venus_cube_mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

##############################################################################
# .. note::
# We specify prefer_http=True because we didn't set any s3 credentials.
# We specify prefer_http="download" because we didn't set any s3 credentials.


print(f"{theia_location} venus location has {len(items)} items.")
Expand Down

0 comments on commit b379ce8

Please sign in to comment.