From 4380b3a155fc08ed7b0eaf9c688b9347e5363d1a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 23 Jun 2023 23:11:16 +0600 Subject: [PATCH] PipelineCall is static (#215) * PipelineCall is static * Fix static lifetimes req --- ntex-io/CHANGES.md | 4 ++++ ntex-io/Cargo.toml | 4 ++-- ntex-io/src/dispatcher.rs | 4 ++-- ntex-service/CHANGES.md | 4 ++++ ntex-service/Cargo.toml | 2 +- ntex-service/src/fn_shutdown.rs | 2 +- ntex-service/src/pipeline.rs | 37 +++++++-------------------------- ntex/CHANGES.md | 4 ++++ ntex/Cargo.toml | 8 +++---- ntex/src/http/client/connect.rs | 3 ++- ntex/src/http/h1/dispatcher.rs | 16 +++++++------- ntex/src/web/test.rs | 4 ++-- ntex/src/ws/client.rs | 2 +- 13 files changed, 43 insertions(+), 51 deletions(-) diff --git a/ntex-io/CHANGES.md b/ntex-io/CHANGES.md index 7d2a21379..bd468d039 100644 --- a/ntex-io/CHANGES.md +++ b/ntex-io/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.3.1] - 2023-06-23 + +* `PipelineCall` is static + ## [0.3.0] - 2023-06-22 * Release v0.3.0 diff --git a/ntex-io/Cargo.toml b/ntex-io/Cargo.toml index 559d44da2..0ea1f5b90 100644 --- a/ntex-io/Cargo.toml +++ b/ntex-io/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-io" -version = "0.3.0" +version = "0.3.1" authors = ["ntex contributors "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] @@ -19,7 +19,7 @@ path = "src/lib.rs" ntex-codec = "0.6.2" ntex-bytes = "0.1.19" ntex-util = "0.3.0" -ntex-service = "1.2.0" +ntex-service = "1.2.1" bitflags = "1.3" log = "0.4" diff --git a/ntex-io/src/dispatcher.rs b/ntex-io/src/dispatcher.rs index b4b67165f..0ef10681c 100644 --- a/ntex-io/src/dispatcher.rs +++ b/ntex-io/src/dispatcher.rs @@ -250,7 +250,7 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); - let fut = shared.service.call(item).into_static(); + let fut = shared.service.call(item); spawn(async move { let result = fut.await; shared.handle_result(result, &shared.io); @@ -276,7 +276,7 @@ where // call service let shared = slf.shared.clone(); shared.inflight.set(shared.inflight.get() + 1); - let fut = shared.service.call(item).into_static(); + let fut = shared.service.call(item); spawn(async move { let result = fut.await; shared.handle_result(result, &shared.io); diff --git a/ntex-service/CHANGES.md b/ntex-service/CHANGES.md index d4884e274..42b1b33bd 100644 --- a/ntex-service/CHANGES.md +++ b/ntex-service/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.2.1] - 2023-06-23 + +* Make `PipelineCall` static + ## [1.2.0] - 2023-06-22 * Rename Container to Pipeline diff --git a/ntex-service/Cargo.toml b/ntex-service/Cargo.toml index 9df4ea426..675504946 100644 --- a/ntex-service/Cargo.toml +++ b/ntex-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-service" -version = "1.2.0" +version = "1.2.1" authors = ["ntex contributors "] description = "ntex service" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-service/src/fn_shutdown.rs b/ntex-service/src/fn_shutdown.rs index bbf1c39f5..59cd67e97 100644 --- a/ntex-service/src/fn_shutdown.rs +++ b/ntex-service/src/fn_shutdown.rs @@ -85,7 +85,7 @@ mod tests { let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone()); - let res = pipe.call(()).await; + let res = pipe.service_call(()).await; assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(()))); assert!(res.is_ok()); assert_eq!(res.unwrap(), "pipe"); diff --git a/ntex-service/src/pipeline.rs b/ntex-service/src/pipeline.rs index 893182cc5..4ed1122cc 100644 --- a/ntex-service/src/pipeline.rs +++ b/ntex-service/src/pipeline.rs @@ -69,9 +69,10 @@ impl Pipeline { /// Call service and create future object that resolves to service result. /// /// Note, this call does not check service readiness. - pub fn call(&self, req: R) -> PipelineCall<'_, S, R> + pub fn call(&self, req: R) -> PipelineCall where - S: Service, + S: Service + 'static, + R: 'static, { let pipeline = self.clone(); let svc_call = pipeline.svc.call(req, ServiceCtx::new(&pipeline.waiters)); @@ -111,41 +112,19 @@ impl Clone for Pipeline { pin_project_lite::pin_project! { #[must_use = "futures do nothing unless polled"] - pub struct PipelineCall<'f, S, R> + pub struct PipelineCall where S: Service, - S: 'f, - R: 'f, + S: 'static, + R: 'static, { #[pin] - fut: S::Future<'f>, + fut: S::Future<'static>, pipeline: Pipeline, } } -impl<'f, S, R> PipelineCall<'f, S, R> -where - S: Service + 'f, - R: 'f, -{ - #[inline] - /// Convert future object to static version. - /// - /// Returned future is suitable for spawning into a async runtime. - /// Note, this call does not check service readiness. - pub fn into_static(self) -> PipelineCall<'static, S, R> { - let svc_call = self.fut; - let pipeline = self.pipeline; - - // SAFETY: `svc_call` has same lifetime same as lifetime of `pipeline.svc` - // Pipeline::svc is heap allocated(Rc), we keep it alive until - // `svc_call` get resolved to result - let fut = unsafe { std::mem::transmute(svc_call) }; - PipelineCall { fut, pipeline } - } -} - -impl<'f, S, R> future::Future for PipelineCall<'f, S, R> +impl future::Future for PipelineCall where S: Service, { diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index e7564c988..0b0dcae36 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.1] - 2023-06-23 + +* `PipelineCall` is static + ## [0.7.0] - 2023-06-22 * Release v0.7.0 diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 1693bf265..15aa1f639 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex" -version = "0.7.0" +version = "0.7.2" authors = ["ntex contributors "] description = "Framework for composable network services" readme = "README.md" @@ -52,13 +52,13 @@ ntex-codec = "0.6.2" ntex-connect = "0.3.0" ntex-http = "0.1.9" ntex-router = "0.5.1" -ntex-service = "1.2.0" +ntex-service = "1.2.1" ntex-macros = "0.1.3" ntex-util = "0.3.0" ntex-bytes = "0.1.19" -ntex-h2 = "0.3.0" +ntex-h2 = "0.3.2" ntex-rt = "0.4.9" -ntex-io = "0.3.0" +ntex-io = "0.3.1" ntex-tls = "0.3.0" ntex-tokio = { version = "0.3.0", optional = true } ntex-glommio = { version = "0.3.0", optional = true } diff --git a/ntex/src/http/client/connect.rs b/ntex/src/http/client/connect.rs index dde1a0a44..f310aceaa 100644 --- a/ntex/src/http/client/connect.rs +++ b/ntex/src/http/client/connect.rs @@ -30,7 +30,8 @@ where ) -> BoxFuture<'_, Result> { Box::pin(async move { // connect to the host - let fut = self.0.call(ClientConnect { + let pl = self.0.clone(); + let fut = pl.service_call(ClientConnect { uri: head.as_ref().uri.clone(), addr, }); diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index e488faf15..20ef5bd39 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -78,10 +78,10 @@ pin_project_lite::pin_project! { where S: 'static, X: 'static { None, - Service { #[pin] fut: PipelineCall<'static, S, Request> }, - ServiceUpgrade { #[pin] fut: PipelineCall<'static, S, Request> }, - Expect { #[pin] fut: PipelineCall<'static, X, Request> }, - Filter { fut: PipelineCall<'static, OnRequest, (Request, IoRef)> } + Service { #[pin] fut: PipelineCall }, + ServiceUpgrade { #[pin] fut: PipelineCall }, + Expect { #[pin] fut: PipelineCall }, + Filter { fut: PipelineCall } } } @@ -479,21 +479,21 @@ where fn service_call(&self, req: Request) -> CallState { // Handle normal requests CallState::Service { - fut: self.config.service.call(req).into_static(), + fut: self.config.service.call(req), } } fn service_filter(&self, req: Request, f: &Pipeline) -> CallState { // Handle filter fut CallState::Filter { - fut: f.call((req, self.io.get_ref())).into_static(), + fut: f.call((req, self.io.get_ref())), } } fn service_expect(&self, req: Request) -> CallState { // Handle normal requests with EXPECT: 100-Continue` header CallState::Expect { - fut: self.config.expect.call(req).into_static(), + fut: self.config.expect.call(req), } } @@ -506,7 +506,7 @@ where ))); // Handle upgrade requests CallState::ServiceUpgrade { - fut: self.config.service.call(req).into_static(), + fut: self.config.service.call(req), } } diff --git a/ntex/src/web/test.rs b/ntex/src/web/test.rs index 17582ceca..57240ef02 100644 --- a/ntex/src/web/test.rs +++ b/ntex/src/web/test.rs @@ -107,7 +107,7 @@ where S: Service, E: std::fmt::Debug, { - app.call(req).await.unwrap() + app.service_call(req).await.unwrap() } /// Helper function that returns a response body of a TestRequest @@ -140,7 +140,7 @@ where S: Service, { let mut resp = app - .call(req) + .service_call(req) .await .unwrap_or_else(|_| panic!("read_response failed at application call")); diff --git a/ntex/src/ws/client.rs b/ntex/src/ws/client.rs index e87be13a4..1dd917aa3 100644 --- a/ntex/src/ws/client.rs +++ b/ntex/src/ws/client.rs @@ -158,7 +158,7 @@ where let msg = Connect::new(head.uri.clone()).set_addr(self.addr); log::trace!("Open ws connection to {:?} addr: {:?}", head.uri, self.addr); - let io = self.connector.call(msg).await?; + let io = self.connector.clone().service_call(msg).await?; // create Framed and send request let codec = h1::ClientCodec::default();