diff --git a/ntex-async-std/src/io.rs b/ntex-async-std/src/io.rs index c8116c9cc..fb2a603b1 100644 --- a/ntex-async-std/src/io.rs +++ b/ntex-async-std/src/io.rs @@ -124,7 +124,7 @@ impl Future for WriteTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().get_mut(); + let this = self.as_mut().get_mut(); match this.st { IoWriteState::Processing(ref mut delay) => { @@ -432,7 +432,7 @@ mod unixstream { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().get_mut(); + let this = self.as_mut().get_mut(); match this.st { IoWriteState::Processing(ref mut delay) => { diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index bd468d039..3255e9d3f 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.2] - 2023-08-10 + +* Replace `PipelineCall` with `ServiceCall<'static, S, R>` + ## [0.3.1] - 2023-06-23 * `PipelineCall` is static diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 0ea1f5b90..7c3aa37a6 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.1" +version = "0.3.2" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -19,7 +19,7 @@ path = "src/lib.rs" ntex-codec = "0.6.2" ntex-bytes = "0.1.19" ntex-util = "0.3.0" -ntex-service = "1.2.1" +ntex-service = "1.2.3" bitflags = "1.3" log = "0.4" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index 0ef10681c..884b68db0 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -250,9 +250,8 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); - let fut = shared.service.call(item); spawn(async move { - let result = fut.await; + let result = shared.service.call(item).await; shared.handle_result(result, &shared.io); }); } @@ -276,9 +275,8 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); - let fut = shared.service.call(item); spawn(async move { - let result = fut.await; + let result = shared.service.call(item).await; shared.handle_result(result, &shared.io); }); } diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index ecbefc45a..d2885454e 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -468,7 +468,7 @@ impl Future for WriteTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().get_mut(); + let this = self.as_mut().get_mut(); match this.st { IoWriteState::Processing(ref mut delay) => { diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index a74e8b2f4..662792669 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.2.3] - 2023-08-10 + +* Check readiness for pipeline calls + ## [1.2.2] - 2023-06-24 * Added `ServiceCall::advance_to_call` diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index a0f654785..4b4a909a0 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "1.2.2" +version = "1.2.3" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/apply.rs b/ntex-service/src/apply.rs index 5a1109705..72632c9cc 100644 --- a/ntex-service/src/apply.rs +++ b/ntex-service/src/apply.rs @@ -56,7 +56,7 @@ impl ApplyService { where S: Service, { - self.service.service_call(req) + self.service.call(req) } } @@ -85,7 +85,6 @@ where type Error = Err; type Future<'f> = R where Self: 'f, In: 'f, R: 'f; - crate::forward_poll_ready!(service); crate::forward_poll_shutdown!(service); #[inline] diff --git a/ntex-service/src/ctx.rs b/ntex-service/src/ctx.rs index dc11252d4..fcc007aae 100644 --- a/ntex-service/src/ctx.rs +++ b/ntex-service/src/ctx.rs @@ -1,6 +1,6 @@ use std::{cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task}; -use crate::Service; +use crate::{Pipeline, Service}; pub struct ServiceCtx<'a, S: ?Sized> { idx: usize, @@ -82,7 +82,7 @@ impl Drop for Waiters { } } -impl<'a, S: ?Sized> ServiceCtx<'a, S> { +impl<'a, S> ServiceCtx<'a, S> { pub(crate) fn new(waiters: &'a Waiters) -> Self { Self { idx: waiters.index, @@ -107,7 +107,7 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> { /// Wait for service readiness and then call service pub fn call(&self, svc: &'a T, req: R) -> ServiceCall<'a, T, R> where - T: Service + ?Sized, + T: Service, R: 'a, { ServiceCall { @@ -125,7 +125,7 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> { /// Call service, do not check service readiness pub fn call_nowait(&self, svc: &'a T, req: R) -> T::Future<'a> where - T: Service + ?Sized, + T: Service, R: 'a, { svc.call( @@ -139,9 +139,9 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> { } } -impl<'a, S: ?Sized> Copy for ServiceCtx<'a, S> {} +impl<'a, S> Copy for ServiceCtx<'a, S> {} -impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> { +impl<'a, S> Clone for ServiceCtx<'a, S> { #[inline] fn clone(&self) -> Self { Self { @@ -157,8 +157,6 @@ pin_project_lite::pin_project! { pub struct ServiceCall<'a, S, Req> where S: Service, - S: 'a, - S: ?Sized, Req: 'a, { #[pin] @@ -166,33 +164,11 @@ pin_project_lite::pin_project! { } } -impl<'a, S, Req> ServiceCall<'a, S, Req> -where - S: Service, - S: 'a, - S: ?Sized, - Req: 'a, -{ - pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> { - match self.state { - ServiceCallState::Ready { .. } => {} - ServiceCallState::Call { .. } | ServiceCallState::Empty => { - panic!( - "`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`" - ) - } - } - ServiceCallToCall { state: self.state } - } -} - pin_project_lite::pin_project! { #[project = ServiceCallStateProject] enum ServiceCallState<'a, S, Req> where S: Service, - S: 'a, - S: ?Sized, Req: 'a, { Ready { req: Option, @@ -200,14 +176,46 @@ pin_project_lite::pin_project! { idx: usize, waiters: &'a WaitersRef, }, + ReadyPl { req: Option, + svc: &'a Pipeline, + pl: Pipeline, + }, Call { #[pin] fut: S::Future<'a> }, Empty, } } +impl<'a, S, Req> ServiceCall<'a, S, Req> +where + S: Service, + Req: 'a, +{ + pub(crate) fn call_pipeline(req: Req, svc: &'a Pipeline) -> Self { + ServiceCall { + state: ServiceCallState::ReadyPl { + req: Some(req), + pl: svc.clone(), + svc, + }, + } + } + + pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> { + match self.state { + ServiceCallState::Ready { .. } | ServiceCallState::ReadyPl { .. } => {} + ServiceCallState::Call { .. } | ServiceCallState::Empty => { + panic!( + "`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`" + ) + } + } + ServiceCallToCall { state: self.state } + } +} + impl<'a, S, Req> Future for ServiceCall<'a, S, Req> where - S: Service + ?Sized, + S: Service, { type Output = Result; @@ -243,7 +251,21 @@ where task::Poll::Pending } }, - ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| { + ServiceCallStateProject::ReadyPl { req, svc, pl } => { + task::ready!(pl.poll_ready(cx))?; + + let ctx = ServiceCtx::new(&svc.waiters); + let svc_call = svc.get_ref().call(req.take().unwrap(), ctx); + + // SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc` + // Pipeline::svc is heap allocated(Rc), we keep it alive until + // `svc_call` get resolved to result + let fut = unsafe { std::mem::transmute(svc_call) }; + + this.state.set(ServiceCallState::Call { fut }); + self.poll(cx) + } + ServiceCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| { this.state.set(ServiceCallState::Empty); r }), @@ -259,8 +281,6 @@ pin_project_lite::pin_project! { pub struct ServiceCallToCall<'a, S, Req> where S: Service, - S: 'a, - S: ?Sized, Req: 'a, { #[pin] @@ -270,7 +290,7 @@ pin_project_lite::pin_project! { impl<'a, S, Req> Future for ServiceCallToCall<'a, S, Req> where - S: Service + ?Sized, + S: Service, { type Output = Result, S::Error>; @@ -306,6 +326,12 @@ where task::Poll::Pending } }, + ServiceCallStateProject::ReadyPl { req, svc, pl } => { + task::ready!(pl.poll_ready(cx))?; + + let ctx = ServiceCtx::new(&svc.waiters); + task::Poll::Ready(Ok(svc.get_ref().call(req.take().unwrap(), ctx))) + } ServiceCallStateProject::Call { .. } => { unreachable!("`ServiceCallToCall` can only be constructed in `Ready` state") } @@ -387,13 +413,13 @@ mod tests { let data1 = data.clone(); ntex::rt::spawn(async move { let _ = poll_fn(|cx| srv1.poll_ready(cx)).await; - let i = srv1.call("srv1").await.unwrap(); + let i = srv1.call_nowait("srv1").await.unwrap(); data1.borrow_mut().push(i); }); let data2 = data.clone(); ntex::rt::spawn(async move { - let i = srv2.service_call("srv2").await.unwrap(); + let i = srv2.call_static("srv2").await.unwrap(); data2.borrow_mut().push(i); }); time::sleep(time::Millis(50)).await; @@ -417,7 +443,7 @@ mod tests { let con = condition::Condition::new(); let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); - let mut fut = srv.service_call("test").advance_to_call(); + let mut fut = srv.call("test").advance_to_call(); let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; con.notify(); @@ -432,7 +458,7 @@ mod tests { let con = condition::Condition::new(); let srv = Pipeline::from(Srv(cnt.clone(), con.wait())); - let mut fut = srv.service_call("test"); + let mut fut = srv.call("test"); let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await; con.notify(); diff --git a/ntex-service/src/fn_shutdown.rs b/ntex-service/src/fn_shutdown.rs index 59cd67e97..bbf1c39f5 100644 --- a/ntex-service/src/fn_shutdown.rs +++ b/ntex-service/src/fn_shutdown.rs @@ -85,7 +85,7 @@ mod tests { let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone()); - let res = pipe.service_call(()).await; + let res = pipe.call(()).await; assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(res.is_ok()); assert_eq!(res.unwrap(), "pipe"); diff --git a/ntex-service/src/lib.rs b/ntex-service/src/lib.rs index 887d83ddd..a1f3e19b6 100644 --- a/ntex-service/src/lib.rs +++ b/ntex-service/src/lib.rs @@ -261,7 +261,7 @@ pub trait ServiceFactory { impl<'a, S, Req> Service for &'a S where - S: Service + ?Sized, + S: Service, { type Response = S::Response; type Error = S::Error; @@ -285,7 +285,7 @@ where impl Service for Box where - S: Service + ?Sized, + S: Service, { type Response = S::Response; type Error = S::Error; diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 4ed1122cc..9921566a0 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -1,15 +1,14 @@ -use std::{cell::Cell, future, pin::Pin, rc::Rc, task::Context, task::Poll}; +use std::{cell::Cell, future, pin::Pin, rc::Rc, task, task::Context, task::Poll}; -use crate::ctx::{ServiceCall, ServiceCtx, Waiters}; -use crate::{Service, ServiceFactory}; +use crate::{ctx::ServiceCall, ctx::Waiters, Service, ServiceCtx, ServiceFactory}; /// Container for a service. /// /// Container allows to call enclosed service and adds support of shared readiness. pub struct Pipeline { svc: Rc, - waiters: Waiters, pending: Cell, + pub(crate) waiters: Waiters, } impl Pipeline { @@ -55,33 +54,53 @@ impl Pipeline { self.svc.poll_shutdown(cx) } + #[deprecated(since = "1.2.3", note = "Use Pipeline::call() instead")] + #[doc(hidden)] #[inline] /// Wait for service readiness and then create future object /// that resolves to service result. - pub fn service_call<'a, R>(&'a self, req: R) -> ServiceCall<'a, S, R> + pub fn service_call(&self, req: R) -> ServiceCall<'_, S, R> where S: Service, { - ServiceCtx::<'a, S>::new(&self.waiters).call(self.svc.as_ref(), req) + ServiceCall::call_pipeline(req, self) + } + + #[inline] + /// Wait for service readiness and then create future object + /// that resolves to service result. + pub fn call(&self, req: R) -> ServiceCall<'_, S, R> + where + S: Service, + { + ServiceCall::call_pipeline(req, self) + } + + #[inline] + /// Wait for service readiness and then create future object + /// that resolves to service result. + pub fn call_static(&self, req: R) -> PipelineCall + where + S: Service + 'static, + { + PipelineCall { + state: PipelineCallState::Ready { req: Some(req) }, + pipeline: self.clone(), + } } #[inline] /// Call service and create future object that resolves to service result. /// /// Note, this call does not check service readiness. - pub fn call(&self, req: R) -> PipelineCall + pub fn call_nowait(&self, req: R) -> PipelineCall where S: Service + 'static, - R: 'static, { - let pipeline = self.clone(); - let svc_call = pipeline.svc.call(req, ServiceCtx::new(&pipeline.waiters)); - - // SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc` - // Pipeline::svc is heap allocated(Rc), we keep it alive until - // `svc_call` get resolved to result - let fut = unsafe { std::mem::transmute(svc_call) }; - PipelineCall { fut, pipeline } + PipelineCall { + state: PipelineCallState::new_call(self, req), + pipeline: self.clone(), + } } /// Extract service if container hadnt been cloned before. @@ -119,11 +138,43 @@ pin_project_lite::pin_project! { R: 'static, { #[pin] - fut: S::Future<'static>, + state: PipelineCallState, pipeline: Pipeline, } } +pin_project_lite::pin_project! { + #[project = PipelineCallStateProject] + enum PipelineCallState + where + S: Service, + S: 'static, + Req: 'static, + { + Ready { req: Option }, + Call { #[pin] fut: S::Future<'static> }, + Empty, + } +} + +impl PipelineCallState +where + S: Service + 'static, + R: 'static, +{ + fn new_call(pl: &Pipeline, req: R) -> Self { + let ctx = ServiceCtx::new(&pl.waiters); + let svc_call = pl.get_ref().call(req, ctx); + + // SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc` + // Pipeline::svc is heap allocated(Rc), we keep it alive until + // `svc_call` get resolved to result + let fut = unsafe { std::mem::transmute(svc_call) }; + + PipelineCallState::Call { fut } + } +} + impl future::Future for PipelineCall where S: Service, @@ -131,8 +182,25 @@ where type Output = Result; #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut().project(); + + match this.state.as_mut().project() { + PipelineCallStateProject::Ready { req } => { + task::ready!(this.pipeline.poll_ready(cx))?; + + let st = PipelineCallState::new_call(this.pipeline, req.take().unwrap()); + this.state.set(st); + self.poll(cx) + } + PipelineCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| { + this.state.set(PipelineCallState::Empty); + r + }), + PipelineCallStateProject::Empty => { + panic!("future must not be polled after it returned `Poll::Ready`") + } + } } } diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index c52a3bcbe..91ef00144 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -128,7 +128,7 @@ impl Future for WriteTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().get_mut(); + let this = self.as_mut().get_mut(); match this.st { IoWriteState::Processing(ref mut delay) => { @@ -523,7 +523,7 @@ mod unixstream { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().get_mut(); + let this = self.as_mut().get_mut(); match this.st { IoWriteState::Processing(ref mut delay) => { diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index 0b0dcae36..7c7ea4164 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.3] - 2023-08-10 + +* Update ntex-service + ## [0.7.1] - 2023-06-23 * `PipelineCall` is static diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 15aa1f639..b681aead8 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.2" +version = "0.7.3" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -52,13 +52,13 @@ ntex-codec = "0.6.2" ntex-connect = "0.3.0" ntex-http = "0.1.9" ntex-router = "0.5.1" -ntex-service = "1.2.1" +ntex-service = "1.2.3" ntex-macros = "0.1.3" ntex-util = "0.3.0" ntex-bytes = "0.1.19" ntex-h2 = "0.3.2" ntex-rt = "0.4.9" -ntex-io = "0.3.1" +ntex-io = "0.3.2" ntex-tls = "0.3.0" ntex-tokio = { version = "0.3.0", optional = true } ntex-glommio = { version = "0.3.0", optional = true } diff --git a/ntex/src/http/client/connect.rs b/ntex/src/http/client/connect.rs index f310aceaa..dde1a0a44 100644 --- a/ntex/src/http/client/connect.rs +++ b/ntex/src/http/client/connect.rs @@ -30,8 +30,7 @@ where ) -> BoxFuture<'_, Result> { Box::pin(async move { // connect to the host - let pl = self.0.clone(); - let fut = pl.service_call(ClientConnect { + let fut = self.0.call(ClientConnect { uri: head.as_ref().uri.clone(), addr, }); diff --git a/ntex/src/http/client/pool.rs b/ntex/src/http/client/pool.rs index 47d597754..6fd505559 100644 --- a/ntex/src/http/client/pool.rs +++ b/ntex/src/http/client/pool.rs @@ -6,7 +6,7 @@ use ntex_h2::{self as h2}; use crate::http::uri::{Authority, Scheme, Uri}; use crate::io::{types::HttpProtocol, IoBoxed}; -use crate::service::{Pipeline, Service, ServiceCall, ServiceCtx}; +use crate::service::{Pipeline, PipelineCall, Service, ServiceCtx}; use crate::time::{now, Millis}; use crate::util::{ready, BoxFuture, ByteString, HashMap, HashSet}; use crate::{channel::pool, rt::spawn, task::LocalWaker}; @@ -150,7 +150,7 @@ where trace!("Connecting to {:?}", req.uri); let uri = req.uri.clone(); let (tx, rx) = waiters.borrow_mut().pool.channel(); - OpenConnection::spawn(key, tx, uri, inner, self.connector.clone(), req); + OpenConnection::spawn(key, tx, uri, inner, &self.connector, req); match rx.await { Err(_) => Err(ConnectError::Disconnected(None)), @@ -368,7 +368,7 @@ where tx, uri, this.inner.clone(), - this.connector.clone(), + &this.connector, connect, ); } @@ -385,12 +385,12 @@ where } pin_project_lite::pin_project! { - struct OpenConnection<'f, T: Service> - where T: 'f + struct OpenConnection> + where T: 'static { key: Key, #[pin] - fut: ServiceCall<'f, T, Connect>, + fut: PipelineCall, uri: Uri, tx: Option, guard: Option, @@ -399,7 +399,7 @@ pin_project_lite::pin_project! { } } -impl<'f, T> OpenConnection<'f, T> +impl OpenConnection where T: Service + 'static, { @@ -408,19 +408,20 @@ where tx: Waiter, uri: Uri, inner: Rc>, - pipeline: Pipeline, + pipeline: &Pipeline, msg: Connect, ) { + let fut = pipeline.call_static(msg); let disconnect_timeout = inner.borrow().disconnect_timeout; #[allow(clippy::redundant_async_block)] spawn(async move { OpenConnection:: { - fut: pipeline.service_call(msg), tx: Some(tx), key: key.clone(), inner: inner.clone(), guard: Some(OpenGuard::new(key, inner)), + fut, uri, disconnect_timeout, } @@ -429,7 +430,7 @@ where } } -impl<'f, T> Future for OpenConnection<'f, T> +impl Future for OpenConnection where T: Service, { diff --git a/ntex/src/http/client/request.rs b/ntex/src/http/client/request.rs index 4734193ba..47e0c3ad4 100644 --- a/ntex/src/http/client/request.rs +++ b/ntex/src/http/client/request.rs @@ -537,7 +537,7 @@ impl ClientRequest { if https { slf = slf.set_header_if_none(header::ACCEPT_ENCODING, HTTPS_ENCODING) } else { - #[cfg(any(feature = "compress"))] + #[cfg(feature = "compress")] { slf = slf.set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate") } diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 20ef5bd39..a3f8ea138 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -479,21 +479,21 @@ where fn service_call(&self, req: Request) -> CallState { // Handle normal requests CallState::Service { - fut: self.config.service.call(req), + fut: self.config.service.call_nowait(req), } } fn service_filter(&self, req: Request, f: &Pipeline) -> CallState { // Handle filter fut CallState::Filter { - fut: f.call((req, self.io.get_ref())), + fut: f.call_nowait((req, self.io.get_ref())), } } fn service_expect(&self, req: Request) -> CallState { // Handle normal requests with EXPECT: 100-Continue` header CallState::Expect { - fut: self.config.expect.call(req), + fut: self.config.expect.call_nowait(req), } } @@ -506,7 +506,7 @@ where ))); // Handle upgrade requests CallState::ServiceUpgrade { - fut: self.config.service.call(req), + fut: self.config.service.call_nowait(req), } } diff --git a/ntex/src/server/builder.rs b/ntex/src/server/builder.rs index d85d69bbd..bd9605c2c 100644 --- a/ntex/src/server/builder.rs +++ b/ntex/src/server/builder.rs @@ -242,7 +242,7 @@ impl ServerBuilder { Ok(self) } - #[cfg(all(unix))] + #[cfg(unix)] /// Add new unix domain service to the server. pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where @@ -266,7 +266,7 @@ impl ServerBuilder { self.listen_uds(name, lst, factory) } - #[cfg(all(unix))] + #[cfg(unix)] /// Add new unix domain service to the server. /// Useful when running as a systemd service and /// a socket FD can be acquired using the systemd crate. diff --git a/ntex/src/server/socket.rs b/ntex/src/server/socket.rs index 1fcc130d7..703b7fc5c 100644 --- a/ntex/src/server/socket.rs +++ b/ntex/src/server/socket.rs @@ -174,7 +174,7 @@ mod tests { } #[test] - #[cfg(all(unix))] + #[cfg(unix)] fn uds() { use std::os::unix::net::UnixListener; diff --git a/ntex/src/server/worker.rs b/ntex/src/server/worker.rs index c5bf440a1..babdba721 100644 --- a/ntex/src/server/worker.rs +++ b/ntex/src/server/worker.rs @@ -255,9 +255,11 @@ impl Worker { self.services.iter_mut().for_each(|srv| { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopped; - let svc = srv.service.clone(); + let fut = srv + .service + .call_static((None, ServerMessage::ForceShutdown)); spawn(async move { - let _ = svc.call((None, ServerMessage::ForceShutdown)).await; + let _ = fut.await; }); } }); @@ -267,9 +269,11 @@ impl Worker { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopping; - let svc = srv.service.clone(); + let fut = srv + .service + .call_static((None, ServerMessage::Shutdown(timeout))); spawn(async move { - let _ = svc.call((None, ServerMessage::Shutdown(timeout))).await; + let _ = fut.await; }); } }); @@ -490,11 +494,11 @@ impl Future for Worker { self.factories[srv.factory].name(msg.token) ); } - let srv = srv.service.clone(); + let fut = srv + .service + .call_static((Some(guard), ServerMessage::Connect(msg.io))); spawn(async move { - let _ = srv - .call((Some(guard), ServerMessage::Connect(msg.io))) - .await; + let _ = fut.await; }); } else { return Poll::Ready(()); diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 57240ef02..17582ceca 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -107,7 +107,7 @@ where S: Service, E: std::fmt::Debug, { - app.service_call(req).await.unwrap() + app.call(req).await.unwrap() } /// Helper function that returns a response body of a TestRequest @@ -140,7 +140,7 @@ where S: Service, { let mut resp = app - .service_call(req) + .call(req) .await .unwrap_or_else(|_| panic!("read_response failed at application call")); diff --git a/ntex/src/ws/client.rs b/ntex/src/ws/client.rs index 1dd917aa3..e87be13a4 100644 --- a/ntex/src/ws/client.rs +++ b/ntex/src/ws/client.rs @@ -158,7 +158,7 @@ where let msg = Connect::new(head.uri.clone()).set_addr(self.addr); log::trace!("Open ws connection to {:?} addr: {:?}", head.uri, self.addr); - let io = self.connector.clone().service_call(msg).await?; + let io = self.connector.call(msg).await?; // create Framed and send request let codec = h1::ClientCodec::default();