Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v3] Sync with futures #1804

Merged
merged 12 commits into from
Apr 24, 2024
Merged

[v3] Sync with futures #1804

merged 12 commits into from
Apr 24, 2024

Conversation

d-v-b
Copy link
Contributor

@d-v-b d-v-b commented Apr 21, 2024

Adjusts the sync function. It now creates a Future from a coroutine, wait for that future to complete in another thread, and access the result value with the .result() method.

The previous implementation relied on mutating a list which was initialized to [None], which is ambiguous if the coroutine being awaited happens to return None.

In order to pass pre-commit, I also had to fix type annotations in v3/array.py.

TODO:

  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/tutorial.rst
  • Changes documented in docs/release.rst
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

@d-v-b d-v-b requested a review from jhamman April 21, 2024 16:00
@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 21, 2024

addresses #1803

if isinstance(return_result, BaseException):
raise return_result
else:
return return_result


def _get_loop():
def _get_loop() -> asyncio.AbstractEventLoop | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _get_loop() -> asyncio.AbstractEventLoop | None:
def _get_loop() -> asyncio.AbstractEventLoop:

I suspect you had this here because of some mypy thing but it should be possible to get this to always return a loop. Perhaps an assert loop[0] is not None right before the return is all that is needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might depend on whether it is called in the main thread, the IO thread of elsewhere,

Comment on lines 75 to 76
done, _ = wait([future])
return_result = list(done)[0].result()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what is happening here?

Some specific questions / ideas:

  1. wait returns done and not_done. Do we need to assert anything about not_done? I guess we're expecting expecting that not_done is always an empty set because we have set return_when=ALL_COMPLETED.
  2. Should we consider specifying the timeout parameter top wait? Would a future that hangs never return here?
  3. Can we check that len(done) is exactly 1 before indexing out the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, done, _ = wait([future]) is guaranteed to return tuple[set with one element, the empty set], because we are waiting on a list of futures with 1 element, and wait returns when the futures are all done.

Regarding a timeout, that's a broader question about what error boundaries we want to define for IO operations. At the moment, we basically assume that IO either succeeds or fails after a reasonable amount of time. If we want to include "IO takes forever" as a failure mode, then we would need to start building infrastructure around that higher up in the stack.


# From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py

iothread: List[Optional[threading.Thread]] = [None] # dedicated IO thread
loop: List[Optional[asyncio.AbstractEventLoop]] = [
iothread: list[threading.Thread | None] = [None] # dedicated IO thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are targeting py >=3.9?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.10+ actually!



def sync(coro: Coroutine, loop: Optional[asyncio.AbstractEventLoop] = None):
def sync(coro: Coroutine[Any, Any, T], loop: asyncio.AbstractEventLoop | None = None) -> T:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to directly re-use the same functions in fsspec, since they exist and have a lot of eyes on them? If they need improvement, that would be gladly received. Also, you should make triple sure that if this code appears in multiple places, there are not accidentally multiple loops on multiple async threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware that these functions existed in fsspec! I'm reading https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py and the relevant tests right now.

One reason I can think of to not simply import these functions from fsspec is that it ties zarr-python to fsspec in a rather unexpected way. That's a rather abstract problem, and surely something we could defer solving if necessary.

Is FSSpec the only place where this kind of functionality exists, or are there more places? Surely we aren't the only python users setting synchronous barriers for async code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had loosely copied the code from fsspec for zarrita.

event = threading.Event()
asyncio.run_coroutine_threadsafe(_runner(event, coro, result_box), loop)
while True:
# this loops allows thread to get interrupted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment here explains the way to loop was written. Does the changed code allow for an exception in the main thread (timeout, interrupt and other signals)? Does the GC run while waiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the changed code allow for an exception in the main thread (timeout, interrupt and other signals)? Does the GC run while waiting?

We don't test for these things at present, so I have no idea! Tests are needed, regardless of the efforts in this PR, and I see that fsspec basically has this covered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we open a ticket for finding a test that exercises this concern. I don't think we should hold back this PR though.

if isinstance(return_result, BaseException):
raise return_result
else:
return return_result


def _get_loop():
def _get_loop() -> asyncio.AbstractEventLoop | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might depend on whether it is called in the main thread, the IO thread of elsewhere,

# _children: List[Union[AsyncArray, AsyncGroup]] = self._sync_iter(
# self._async_group.children()
# )
# return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of these changes were to make mypy happy.

raise RuntimeError("Loop is not running")
try:
loop0 = asyncio.events.get_running_loop()
if loop0 is loop:
raise NotImplementedError("Calling sync() from within a running loop")
raise SyncError("Calling sync() from within a running loop")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this was a bug before as NotImplementedError inherits from RuntimeError so this exception never raised.

sync(foo(), loop=loop)


@pytest.mark.filterwarnings("ignore:coroutine.*was never awaited")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

despite this, we're still seeing the below warning. Why?

sys:1: RuntimeWarning: coroutine 'test_sync_raises_if_calling_sync_from_within_a_running_loop.<locals>.foo' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning comes after the test has finished, during garbage collection. Maybe doing del on the objects (and closure) would fix. Or run the coroutine after the check.

@jhamman jhamman added the V3 Affects the v3 branch label Apr 22, 2024
@jhamman jhamman added this to the 3.0.0.alpha milestone Apr 22, 2024
@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 22, 2024

thanks to @jhamman this PR also adds some tests.

I also added a timeout kwarg to sync, and made timeout an attribute of SyncConfiguration, so that the SyncMixin class can use it, and I added a test that this works as expected.

@martindurant [wrotehttps://github.com//pull/1804#discussion_r1573863272)

This comment here explains the way to loop was written. Does the changed code allow for an exception in the main thread (timeout, interrupt and other signals)? Does the GC run while waiting?

How would you propose we test for these properties? Since wait blocks the main thread, it's not clear to me what signals we should be looking for. And can you say more about the GC? How would we test this?

@pep8speaks
Copy link

pep8speaks commented Apr 22, 2024

Hello @d-v-b! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:

There are currently no PEP 8 issues detected in this Pull Request. Cheers! 🍻

Comment last updated at 2024-04-24 03:38:15 UTC

@martindurant
Copy link
Member

Meta question: is it actually possible to open multiple groups/arrays concurrently from sync code?

@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 22, 2024

Meta question: is it actually possible to open multiple groups/arrays concurrently from sync code?

There should not be a technical barrier to this, since this question translates to "is it possible to concurrently open some JSON documents and apply very lightweight parsing to them", but (to my knowledge) we don't have an API that supports this yet, emphasis on "yet". Bulk hierarchy access is a target for v3, and we are trying to create a foundation for doing this concurrently.

@martindurant
Copy link
Member

we don't have an API that supports this yet, emphasis on "yet".

I encourage early planning for this, since data IO access will predominantly come from sync code (as part of data processing). Current zarr 2 already has concurrency on data read (with fsspec) within an array, so this would be a clear benefit of the new approach. We've been talking about it for two years or more :)

@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 22, 2024

we don't have an API that supports this yet, emphasis on "yet".

I encourage early planning for this, since data IO access will predominantly come from sync code (as part of data processing). Current zarr 2 already has concurrency on data read (with fsspec) within an array, so this would be a clear benefit of the new approach. We've been talking about it for two years or more :)

IMO this API requires a data structure that represents an un-stored Zarr hierarchy, which zarr-python currently lacks.

The plan is to implement a declarative hierarchy API, which will allow something like this:

g = GroupModel(
  attributes={'foo': 10}, 
  members={
    'group': GroupModel(), 
    'array': ArrayModel(shape=(10,))
    }
  ).to_storage('foo.zarr/path')

where to_storage is a blocking call that waits for the hierarchy (2 groups and 1 array) to be created.

@martindurant
Copy link
Member

I was thinking more of data access, such as when xarray wants to eagerly load multiple smallish coordinate arrays, eagerly at dataset open time.

@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 22, 2024

I was thinking more of data access, such as when xarray wants to eagerly load multiple smallish coordinate arrays, eagerly at dataset open time.

that's a good point. we could do something like this:

nodes: tuple[Array | Group] = open_nodes(
  [
    path_to_array, 
    path_to_group, 
    path_to_array_2
  ], 
  access_options={'array': {...}, 'group': {...}}
)

We could also have open_arrays() and open_groups() that are guaranteed to return collections of arrays or groups (or error).

But i'm just thinking out loud here :)

@martindurant
Copy link
Member

Exactly, this is the kind of design I am after. Maybe it should be a separate issue/discussion.

@d-v-b
Copy link
Contributor Author

d-v-b commented Apr 22, 2024

Exactly, this is the kind of design I am after. Maybe it should be a separate issue/discussion.

I opened #1805 so we can chat about it

@d-v-b d-v-b merged commit 0f755cc into zarr-developers:v3 Apr 24, 2024
14 checks passed
@d-v-b d-v-b deleted the sync_with_futures branch April 24, 2024 11:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
V3 Affects the v3 branch
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

5 participants