From bd1d78e1fdc5c26feaab31652510a398bdcb673d Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sun, 25 Jun 2023 23:05:49 +1000 Subject: [PATCH] Add `blocking::Client::from_async_client` so that blocking client can be constructed from async client and a handle to tokio runtime. Signed-off-by: Jiahao XU --- src/blocking/client.rs | 63 +++++++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/src/blocking/client.rs b/src/blocking/client.rs index e6ec6735a..0a8a09843 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -851,6 +851,14 @@ impl Client { ClientBuilder::new() } + /// Construct a new `Client` using asynchronous [`crate::Client`] and + /// a `handle` to tokio runtime. + pub fn from_async_client(client: async_impl::Client, handle: tokio::runtime::Handle) -> Self { + Self { + inner: ClientHandle::from_async_client(client, handle), + } + } + /// Convenience method to make a `GET` request to a URL. /// /// # Errors @@ -967,17 +975,15 @@ struct InnerClientHandle { impl Drop for InnerClientHandle { fn drop(&mut self) { - let id = self - .thread - .as_ref() - .map(|h| h.thread().id()) - .expect("thread not dropped yet"); - - trace!("closing runtime thread ({:?})", id); - self.tx.take(); - trace!("signaled close for runtime thread ({:?})", id); - self.thread.take().map(|h| h.join()); - trace!("closed runtime thread ({:?})", id); + if let Some(h) = self.thread.take() { + let id = h.thread().id(); + + trace!("closing runtime thread ({:?})", id); + self.tx.take(); + trace!("signaled close for runtime thread ({:?})", id); + let _ = h.join(); + trace!("closed runtime thread ({:?})", id); + } } } @@ -1020,14 +1026,7 @@ impl ClientHandle { return; } - let mut rx = rx; - - while let Some((req, req_tx)) = rx.recv().await { - let req_fut = client.execute(req); - tokio::spawn(forward(req_fut, req_tx)); - } - - trace!("({:?}) Receiver is shutdown", thread::current().id()); + Self::request_process_loop(rx, client).await; }; trace!("({:?}) start runtime::block_on", thread::current().id()); @@ -1056,6 +1055,32 @@ impl ClientHandle { }) } + fn from_async_client(client: async_impl::Client, handle: tokio::runtime::Handle) -> Self { + let (tx, rx) = mpsc::unbounded_channel::<(async_impl::Request, OneshotResponse)>(); + + handle.spawn(Self::request_process_loop(rx, client)); + + Self { + timeout: Timeout(None), + inner: Arc::new(InnerClientHandle { + tx: Some(tx), + thread: None, + }), + } + } + + async fn request_process_loop( + mut rx: mpsc::UnboundedReceiver<(async_impl::Request, OneshotResponse)>, + client: async_impl::Client, + ) { + while let Some((req, req_tx)) = rx.recv().await { + let req_fut = client.execute(req); + tokio::spawn(forward(req_fut, req_tx)); + } + + trace!("({:?}) Receiver is shutdown", thread::current().id()); + } + fn execute_request(&self, req: Request) -> crate::Result { let (tx, rx) = oneshot::channel(); let (req, body) = req.into_async();