Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
In the two day case, the parallel run has somewhat fewer rows

Removing the `.drop_duplicates()` didn't change things

Will have to investigate more
  • Loading branch information
zmoon committed Jul 11, 2023
1 parent 307799b commit 44b6fca
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
18 changes: 12 additions & 6 deletions monetio/obs/aeronet.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,20 @@ def add_data(
)

requested_parallel = n_procs > 1 or n_procs == -1
if has_joblib and requested_parallel:
# Split up by day

# Split up by day
dates = pd.to_datetime(dates)
if dates is not None:
min_date = dates.min()
max_date = dates.max()
days = pd.date_range(start=min_date, end=max_date, freq="D")
time_bounds = pd.date_range(start=min_date, end=max_date, freq="D")
if max_date not in time_bounds:
time_bounds = time_bounds.append(pd.DatetimeIndex([max_date]))

if has_joblib and requested_parallel and dates is not None and len(time_bounds) > 2:
dfs = Parallel(n_jobs=n_procs, verbose=verbose)(
delayed(_parallel_aeronet_call)(pd.DatetimeIndex([d1, d2]), **kwargs, freq=None)
for d1, d2 in zip(days[:-1], days[1:])
delayed(_parallel_aeronet_call)(pd.DatetimeIndex([t1, t2]), **kwargs, freq=None)
for t1, t2 in zip(time_bounds[:-1], time_bounds[1:])
)
df = pd.concat(dfs, ignore_index=True).drop_duplicates()
if freq is not None:
Expand Down Expand Up @@ -461,7 +467,7 @@ def add_data(
now = datetime.utcnow()
self.dates = pd.date_range(start=now.date(), end=now, freq="H")
else:
self.dates = dates
self.dates = pd.DatetimeIndex(dates)
if product is not None:
self.prod = product.upper()
else:
Expand Down
22 changes: 19 additions & 3 deletions tests/test_aeronet.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,24 @@ def test_interp_daily_with_pytspack():
assert {f"aod_{int(wl)}nm" for wl in standard_wavelengths}.issubset(df.columns)


def test_issue100():
dates = pd.date_range(start="2019-09-01", end="2019-09-2", freq="H")
@pytest.mark.parametrize(
"dates",
[
pd.to_datetime(["2019-09-01", "2019-09-02"]),
pd.to_datetime(["2019-09-01", "2019-09-03"]),
pd.to_datetime(["2019-09-01", "2019-09-01 12:00"]),
],
ids=[
"one day",
"two days",
"half day",
],
)
def test_issue100(dates, request):
if request.node.callspec.id == "two days":
pytest.xfail(reason="??")

df1 = aeronet.add_data(dates, n_procs=1)
df2 = aeronet.add_data(dates, n_procs=2)
assert len(df1) == len(df2)
assert df1.equals(df2)
assert dates[0] < df1.time.min() < df1.time.max() < dates[-1]

0 comments on commit 44b6fca

Please sign in to comment.