Skip to content

Commit

Permalink
Upgrade gRPC example deps (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcches authored Oct 4, 2024
1 parent 3a38ee6 commit cfc6316
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 72 deletions.
18 changes: 8 additions & 10 deletions examples/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,19 @@ version = "0.1.0"
edition = "2021"
publish = false


[dependencies]
tonic = "0.12"
prost = "0.13"
hyper = "1.4"
async-stream = "0.3"
turmoil = { path = "../.." }
hyper = "1"
hyper-util = { version = "0.1", features = ["tokio"] }
prost = "0.13"
tokio = "1"
tonic = "0.12"
tower = "0.4"
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1"
tower = "0.5"
hyper-util = "0.1"
turmoil = { path = "../.." }

[build-dependencies]
tonic-build = "0.12"
protox = "0.7"
prost = "0.13"
prost-build = "0.13"
protox = "0.7.0"
13 changes: 6 additions & 7 deletions examples/grpc/build.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use prost::Message;
use std::path::PathBuf;

fn main() {
fn main() -> std::io::Result<()> {
let file_descriptors = protox::compile(["helloworld.proto"], ["."]).unwrap();
let file_descriptor_path = PathBuf::from(std::env::var_os("OUT_DIR").expect("OUT_DIR not set"))
.join("file_descriptor_set.bin");
std::fs::write(&file_descriptor_path, file_descriptors.encode_to_vec()).unwrap();

let mut config = prost_build::Config::new();
config
.file_descriptor_set_path(&file_descriptor_path)
.skip_protoc_run();

tonic_build::configure()
.compile_with_config(config, &["helloworld.proto"], &["."])
.file_descriptor_set_path(&file_descriptor_path)
.skip_protoc_run()
.compile_protos(&["helloworld.proto"], &["."])
.unwrap();

Ok(())
}
97 changes: 44 additions & 53 deletions examples/grpc/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
use connector::connector;
use proto::greeter_client::GreeterClient;
use proto::greeter_server::{Greeter, GreeterServer};
use proto::{HelloReply, HelloRequest};
use std::net::{IpAddr, Ipv4Addr};
use tonic::transport::{Endpoint, Server};
use tonic::Status;
use tonic::{Request, Response};
use tracing::{info_span, Instrument};
use turmoil::net::TcpListener;
use turmoil::Builder;

#[allow(non_snake_case)]
mod proto {
tonic::include_proto!("helloworld");
}

use crate::connector::{TurmoilTcpConnector, TurmoilTcpStream};
use crate::proto::greeter_client::GreeterClient;
use proto::greeter_server::{Greeter, GreeterServer};
use proto::{HelloReply, HelloRequest};
use turmoil::net::TcpListener;

fn main() {
configure_tracing();

Expand All @@ -33,11 +32,10 @@ fn main() {
.serve_with_incoming(async_stream::stream! {
let listener = TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| TurmoilTcpStream(s));
yield listener.accept().await.map(|(s, _)| incoming::Accepted(s));
}
})
.await
.unwrap();
.await?;

Ok(())
}
Expand All @@ -48,7 +46,7 @@ fn main() {
"client",
async move {
let ch = Endpoint::new("http://server:9999")?
.connect_with_connector(TurmoilTcpConnector)
.connect_with_connector(connector())
.await?;
let mut greeter_client = GreeterClient::new(ch);

Expand Down Expand Up @@ -109,88 +107,81 @@ impl Greeter for MyGreeter {
}
}

mod connector {
use hyper::Uri;
use hyper_util::client::legacy::connect::Connected;
use hyper_util::rt::TokioIo;
mod incoming {
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};
use tokio::io;

use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::server::TcpConnectInfo;
use tower::Service;
use tonic::transport::server::{Connected, TcpConnectInfo};
use turmoil::net::TcpStream;

#[derive(Clone)]
pub struct TurmoilTcpConnector;

impl Service<Uri> for TurmoilTcpConnector {
type Response = TokioIo<TurmoilTcpStream>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
pub struct Accepted(pub TcpStream);

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, uri: Uri) -> Self::Future {
Box::pin(async move {
let stream = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok(TokioIo::new(TurmoilTcpStream(stream)))
})
}
}

pub struct TurmoilTcpStream(pub TcpStream);

impl hyper_util::client::legacy::connect::Connection for TurmoilTcpStream {
fn connected(&self) -> Connected {
Connected::new()
}
}

impl tonic::transport::server::Connected for TurmoilTcpStream {
impl Connected for Accepted {
type ConnectInfo = TcpConnectInfo;

fn connect_info(&self) -> Self::ConnectInfo {
TcpConnectInfo {
Self::ConnectInfo {
local_addr: self.0.local_addr().ok(),
remote_addr: self.0.peer_addr().ok(),
}
}
}

impl AsyncRead for TurmoilTcpStream {
impl AsyncRead for Accepted {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), io::Error>> {
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}

impl AsyncWrite for TurmoilTcpStream {
impl AsyncWrite for Accepted {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}
}

mod connector {
use std::{future::Future, pin::Pin};

use hyper::Uri;
use hyper_util::rt::TokioIo;

use tower::Service;
use turmoil::net::TcpStream;

type Fut = Pin<Box<dyn Future<Output = Result<TokioIo<TcpStream>, std::io::Error>> + Send>>;

pub fn connector(
) -> impl Service<Uri, Response = TokioIo<TcpStream>, Error = std::io::Error, Future = Fut> + Clone
{
tower::service_fn(|uri: Uri| {
Box::pin(async move {
let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok::<_, std::io::Error>(TokioIo::new(conn))
}) as Fut
})
}
}
4 changes: 2 additions & 2 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl TcpStream {
)
}

/// Has no effect in turmoil. API parity with
/// Has no effect in turmoil. API parity with
/// https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_nodelay
pub fn set_nodelay(&self, _nodelay: bool) -> Result<()> {
Ok(())
Expand Down Expand Up @@ -430,4 +430,4 @@ impl Drop for WriteHalf {
world.current_host_mut().tcp.close_stream_half(*self.pair);
})
}
}
}

0 comments on commit cfc6316

Please sign in to comment.