Skip to content

Commit

Permalink
Refactor pipeline call impl (#219)
Browse files Browse the repository at this point in the history
* Refactor pipeline call impl
  • Loading branch information
fafhrd91 committed Aug 10, 2023
1 parent 2e66b4b commit 594bf0a
Show file tree
Hide file tree
Showing 25 changed files with 217 additions and 110 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
fail-fast: false
matrix:
version:
- 1.66.0 # MSRV
- 1.67.0 # MSRV
- stable
- nightly

Expand Down
4 changes: 2 additions & 2 deletions ntex-async-std/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl Future for WriteTask {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
let this = self.as_mut().get_mut();

match this.st {
IoWriteState::Processing(ref mut delay) => {
Expand Down Expand Up @@ -432,7 +432,7 @@ mod unixstream {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
let this = self.as_mut().get_mut();

match this.st {
IoWriteState::Processing(ref mut delay) => {
Expand Down
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.2] - 2023-08-10

* Replace `PipelineCall` with `ServiceCall<'static, S, R>`

## [0.3.1] - 2023-06-23

* `PipelineCall` is static
Expand Down
4 changes: 2 additions & 2 deletions ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.3.1"
version = "0.3.2"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand All @@ -19,7 +19,7 @@ path = "src/lib.rs"
ntex-codec = "0.6.2"
ntex-bytes = "0.1.19"
ntex-util = "0.3.0"
ntex-service = "1.2.1"
ntex-service = "1.2.3"

bitflags = "1.3"
log = "0.4"
Expand Down
6 changes: 2 additions & 4 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,8 @@ where
// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item);
spawn(async move {
let result = fut.await;
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
}
Expand All @@ -276,9 +275,8 @@ where
// call service
let shared = slf.shared.clone();
shared.inflight.set(shared.inflight.get() + 1);
let fut = shared.service.call(item);
spawn(async move {
let result = fut.await;
let result = shared.service.call(item).await;
shared.handle_result(result, &shared.io);
});
}
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl Future for WriteTask {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().get_mut();
let this = self.as_mut().get_mut();

match this.st {
IoWriteState::Processing(ref mut delay) => {
Expand Down
4 changes: 4 additions & 0 deletions ntex-service/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [1.2.3] - 2023-08-10

* Check readiness for pipeline calls

## [1.2.2] - 2023-06-24

* Added `ServiceCall::advance_to_call`
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "1.2.2"
version = "1.2.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]
Expand Down
3 changes: 1 addition & 2 deletions ntex-service/src/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<S> ApplyService<S> {
where
S: Service<R>,
{
self.service.service_call(req)
self.service.call(req)
}
}

Expand Down Expand Up @@ -85,7 +85,6 @@ where
type Error = Err;
type Future<'f> = R where Self: 'f, In: 'f, R: 'f;

crate::forward_poll_ready!(service);
crate::forward_poll_shutdown!(service);

#[inline]
Expand Down
104 changes: 65 additions & 39 deletions ntex-service/src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{cell::UnsafeCell, future::Future, marker, pin::Pin, rc::Rc, task};

use crate::Service;
use crate::{Pipeline, Service};

pub struct ServiceCtx<'a, S: ?Sized> {
idx: usize,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl Drop for Waiters {
}
}

impl<'a, S: ?Sized> ServiceCtx<'a, S> {
impl<'a, S> ServiceCtx<'a, S> {
pub(crate) fn new(waiters: &'a Waiters) -> Self {
Self {
idx: waiters.index,
Expand All @@ -107,7 +107,7 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> {
/// Wait for service readiness and then call service
pub fn call<T, R>(&self, svc: &'a T, req: R) -> ServiceCall<'a, T, R>
where
T: Service<R> + ?Sized,
T: Service<R>,
R: 'a,
{
ServiceCall {
Expand All @@ -125,7 +125,7 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> {
/// Call service, do not check service readiness
pub fn call_nowait<T, R>(&self, svc: &'a T, req: R) -> T::Future<'a>
where
T: Service<R> + ?Sized,
T: Service<R>,
R: 'a,
{
svc.call(
Expand All @@ -139,9 +139,9 @@ impl<'a, S: ?Sized> ServiceCtx<'a, S> {
}
}

impl<'a, S: ?Sized> Copy for ServiceCtx<'a, S> {}
impl<'a, S> Copy for ServiceCtx<'a, S> {}

impl<'a, S: ?Sized> Clone for ServiceCtx<'a, S> {
impl<'a, S> Clone for ServiceCtx<'a, S> {
#[inline]
fn clone(&self) -> Self {
Self {
Expand All @@ -157,57 +157,65 @@ pin_project_lite::pin_project! {
pub struct ServiceCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
#[pin]
state: ServiceCallState<'a, S, Req>,
}
}

impl<'a, S, Req> ServiceCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> {
match self.state {
ServiceCallState::Ready { .. } => {}
ServiceCallState::Call { .. } | ServiceCallState::Empty => {
panic!(
"`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`"
)
}
}
ServiceCallToCall { state: self.state }
}
}

pin_project_lite::pin_project! {
#[project = ServiceCallStateProject]
enum ServiceCallState<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
Ready { req: Option<Req>,
svc: &'a S,
idx: usize,
waiters: &'a WaitersRef,
},
ReadyPl { req: Option<Req>,
svc: &'a Pipeline<S>,
pl: Pipeline<S>,
},
Call { #[pin] fut: S::Future<'a> },
Empty,
}
}

impl<'a, S, Req> ServiceCall<'a, S, Req>
where
S: Service<Req>,
Req: 'a,
{
pub(crate) fn call_pipeline(req: Req, svc: &'a Pipeline<S>) -> Self {
ServiceCall {
state: ServiceCallState::ReadyPl {
req: Some(req),
pl: svc.clone(),
svc,
},
}
}

pub fn advance_to_call(self) -> ServiceCallToCall<'a, S, Req> {
match self.state {
ServiceCallState::Ready { .. } | ServiceCallState::ReadyPl { .. } => {}
ServiceCallState::Call { .. } | ServiceCallState::Empty => {
panic!(
"`ServiceCall::advance_to_call` must be called before `ServiceCall::poll`"
)
}
}
ServiceCallToCall { state: self.state }
}
}

impl<'a, S, Req> Future for ServiceCall<'a, S, Req>
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Output = Result<S::Response, S::Error>;

Expand Down Expand Up @@ -243,7 +251,21 @@ where
task::Poll::Pending
}
},
ServiceCallStateProject::Call { fut } => fut.poll(cx).map(|r| {
ServiceCallStateProject::ReadyPl { req, svc, pl } => {
task::ready!(pl.poll_ready(cx))?;

let ctx = ServiceCtx::new(&svc.waiters);
let svc_call = svc.get_ref().call(req.take().unwrap(), ctx);

// SAFETY: `svc_call` has same lifetime same as lifetime of `pl.svc`
// Pipeline::svc is heap allocated(Rc<S>), we keep it alive until
// `svc_call` get resolved to result
let fut = unsafe { std::mem::transmute(svc_call) };

this.state.set(ServiceCallState::Call { fut });
self.poll(cx)
}
ServiceCallStateProject::Call { fut, .. } => fut.poll(cx).map(|r| {
this.state.set(ServiceCallState::Empty);
r
}),
Expand All @@ -259,8 +281,6 @@ pin_project_lite::pin_project! {
pub struct ServiceCallToCall<'a, S, Req>
where
S: Service<Req>,
S: 'a,
S: ?Sized,
Req: 'a,
{
#[pin]
Expand All @@ -270,7 +290,7 @@ pin_project_lite::pin_project! {

impl<'a, S, Req> Future for ServiceCallToCall<'a, S, Req>
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Output = Result<S::Future<'a>, S::Error>;

Expand Down Expand Up @@ -306,6 +326,12 @@ where
task::Poll::Pending
}
},
ServiceCallStateProject::ReadyPl { req, svc, pl } => {
task::ready!(pl.poll_ready(cx))?;

let ctx = ServiceCtx::new(&svc.waiters);
task::Poll::Ready(Ok(svc.get_ref().call(req.take().unwrap(), ctx)))
}
ServiceCallStateProject::Call { .. } => {
unreachable!("`ServiceCallToCall` can only be constructed in `Ready` state")
}
Expand Down Expand Up @@ -387,13 +413,13 @@ mod tests {
let data1 = data.clone();
ntex::rt::spawn(async move {
let _ = poll_fn(|cx| srv1.poll_ready(cx)).await;
let i = srv1.call("srv1").await.unwrap();
let i = srv1.call_nowait("srv1").await.unwrap();
data1.borrow_mut().push(i);
});

let data2 = data.clone();
ntex::rt::spawn(async move {
let i = srv2.service_call("srv2").await.unwrap();
let i = srv2.call_static("srv2").await.unwrap();
data2.borrow_mut().push(i);
});
time::sleep(time::Millis(50)).await;
Expand All @@ -417,7 +443,7 @@ mod tests {
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));

let mut fut = srv.service_call("test").advance_to_call();
let mut fut = srv.call("test").advance_to_call();
let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
con.notify();

Expand All @@ -432,7 +458,7 @@ mod tests {
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));

let mut fut = srv.service_call("test");
let mut fut = srv.call("test");
let _ = lazy(|cx| Pin::new(&mut fut).poll(cx)).await;
con.notify();

Expand Down
2 changes: 1 addition & 1 deletion ntex-service/src/fn_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ mod tests {

let pipe = Pipeline::new(chain(srv).and_then(on_shutdown).clone());

let res = pipe.service_call(()).await;
let res = pipe.call(()).await;
assert_eq!(lazy(|cx| pipe.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert!(res.is_ok());
assert_eq!(res.unwrap(), "pipe");
Expand Down
4 changes: 2 additions & 2 deletions ntex-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub trait ServiceFactory<Req, Cfg = ()> {

impl<'a, S, Req> Service<Req> for &'a S
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
Expand All @@ -285,7 +285,7 @@ where

impl<S, Req> Service<Req> for Box<S>
where
S: Service<Req> + ?Sized,
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
Expand Down
Loading

0 comments on commit 594bf0a

Please sign in to comment.