Ordered send_recv
pattern for RPCs
#7480
Labels
asyncio
core
enhancement
Improve existing functionality or make things work better
feature
Something is missing
We are currently supporting two ways to trigger RPC calls.
send_recv
patterns. Every request is using a dedicated connection that is pooled and potentially reused after succesful completion of the RPC. [1]A notable difference between the two methods is that 2.) guarantees message ordering while 1.) does not.
For many applications ordering is essential, e.g. to implement consistent distributed transactions.
It is very easy for developers to fall back to the pooled RPC approach that does not provide ordering whenever a response is useful. If both a response and ordering is required, there is currently no proper way to implement this.
This issue intends to discuss the possibiltiy of introducing standard functionality to support this use case, i.e. provide an ordered
send_recv
functionality[1] Using a dedicated connection is primarily useful to submit large amounts of payload data s.t. the stream is not blocked for other smaller messages while the large payload is being submitted.
Naive approach w/out additional infrastructure
It is possible to write a
send_recv
-like pattern using the batched stream by breaking a coroutine into two, e.g.This approach has a couple of downsides
Suggested approach
Ideally, the batched, ordered communication allows a similar access pattern as the PooledRPCCalls and constructs the message behind the scenes. Further, any returned results would be sent back to the caller.
Effectively, stream handlers and comm handlers would be identical and there would no longer be the need to distinguish the two.
To enable such an API we would need to change both sender and receiver of the stream to deal with responses.
Sender
On sender side we are stricing to provide a similar API to what
PooledRPCCall
is currently offering.A notable difference between the
PooledRPCCall
and this suggested approach is that the batched comm we are using here is already used/read inServer.handle_stream
, i.e. we cannot wait for the response ourselves. Instead we can introduce a new handler that is listening to all responses (or special case/inline this into handle_stream)The sender generates a unique ID for every request (could be simply a monotonic counter) and attaches this to every request. It also creates and an
asyncio.Future
(or some other synchronization primitive) that is mapped to that request ID and can be awaited to receive the result once available. We then use a dedicated stream handlersend_recv_batched
that is accepting the response and is setting the result to the future. The receiver side subsequently submits the response to this dedicated response handler with the originalrequest_id
.Receiver
Caveats
The above pseudo code is obviously not complete. I believe the most critical missing component is us dealing with dead remotes. The current
send_recv
will naturally fail since the used comm is being closed once the remote is dead. Since we are not awaiting thecomm.read
we will need to handle this case explicitly in thehandle_stream
try/except/finally.cc @hendrikmakait @graingert
The text was updated successfully, but these errors were encountered: