Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor HTTP tests to be less flaky and more robust #7143

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

275 changes: 88 additions & 187 deletions crates/test-programs/src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use anyhow::Context;
use anyhow::{Context, Result};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{body::Bytes, service::service_fn, Request, Response};
use std::{
future::Future,
net::{SocketAddr, TcpListener},
sync::{mpsc, OnceLock},
time::Duration,
net::{SocketAddr, TcpStream},
thread::JoinHandle,
};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(50);
use tokio::net::TcpListener;

async fn test(
mut req: Request<hyper::body::Incoming>,
Expand All @@ -26,66 +24,105 @@ async fn test(
.body(Full::<Bytes>::from(buf).boxed())
}

struct ServerHttp1 {
receiver: mpsc::Receiver<anyhow::Result<()>>,
pub struct Server {
addr: SocketAddr,
worker: Option<JoinHandle<Result<()>>>,
}

impl ServerHttp1 {
fn new() -> Self {
tracing::debug!("initializing http1 server");
static CELL_HTTP1: OnceLock<TcpListener> = OnceLock::new();
let listener = CELL_HTTP1.get_or_init(|| {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("preparing tcp listener at localhost:3000");
TcpListener::bind(addr).unwrap()
});
let (sender, receiver) = mpsc::channel::<anyhow::Result<()>>();
std::thread::spawn(move || {
tracing::debug!("dedicated thread to start listening");
match tokio::runtime::Builder::new_current_thread()
impl Server {
fn new<F>(run: impl FnOnce(tokio::net::TcpStream) -> F + Send + Sync + 'static) -> Result<Self>
where
F: Future<Output = Result<()>>,
{
let thread = std::thread::spawn(|| -> Result<_> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => {
tracing::debug!("using tokio runtime");
sender
.send(rt.block_on(async move {
tracing::debug!("preparing to accept connection");
let (stream, _) = listener.accept().map_err(anyhow::Error::from)?;
tracing::trace!("tcp stream {:?}", stream);
.context("failed to start tokio runtime")?;
let listener = rt.block_on(async move {
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
TcpListener::bind(addr).await.context("failed to bind")
})?;
Ok((rt, listener))
});
let (rt, listener) = thread.join().unwrap()?;
let addr = listener.local_addr().context("failed to get local addr")?;
let worker = std::thread::spawn(move || {
tracing::debug!("dedicated thread to start listening");
rt.block_on(async move {
tracing::debug!("preparing to accept connection");
let (stream, _) = listener.accept().await.map_err(anyhow::Error::from)?;
run(stream).await
})
});
Ok(Self {
worker: Some(worker),
addr,
})
}

let mut builder = hyper::server::conn::http1::Builder::new();
let http = builder.keep_alive(false).pipeline_flush(true);
let io = tokio::net::TcpStream::from_std(stream)
.map_err(anyhow::Error::from)?;
pub fn http1() -> Result<Self> {
tracing::debug!("initializing http1 server");
Self::new(|io| async move {
let mut builder = hyper::server::conn::http1::Builder::new();
let http = builder.keep_alive(false).pipeline_flush(true);

tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
conn?;
Ok(())
})
}

tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
conn.map_err(anyhow::Error::from)
}))
.expect("value sent from http1 server dedicated thread");
}
Err(e) => {
tracing::debug!("unable to start tokio runtime");
sender.send(Err(anyhow::Error::from(e))).unwrap()
pub fn http2() -> Result<Self> {
tracing::debug!("initializing http2 server");
Self::new(|io| async move {
let mut builder = hyper::server::conn::http2::Builder::new(TokioExecutor);
let http = builder.max_concurrent_streams(20);

tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
if let Err(e) = &conn {
let message = e.to_string();
if message.contains("connection closed before reading preface")
|| message.contains("unspecific protocol error detected")
{
return Ok(());
}
};
});
Self { receiver }
}
conn?;
Ok(())
})
}

fn shutdown(self) -> anyhow::Result<()> {
pub fn addr(&self) -> String {
format!("localhost:{}", self.addr.port())
}
}

impl Drop for Server {
fn drop(&mut self) {
tracing::debug!("shutting down http1 server");
self.receiver
.recv_timeout(DEFAULT_TIMEOUT)
.context("value received from http1 server dedicated thread")?
// Force a connection to happen in case one hasn't happened already.
let _ = TcpStream::connect(&self.addr);

// If the worker fails with an error, report it here but don't panic.
// Some tests don't make a connection so the error will be that the tcp
// stream created above is closed immediately afterwards. Let the test
// independently decide if it failed or not, and this should be in the
// logs to assist with debugging if necessary.
let worker = self.worker.take().unwrap();
if let Err(e) = worker.join().unwrap() {
eprintln!("worker failed with error {e:?}");
}
}
}

#[derive(Clone)]
/// An Executor that uses the tokio runtime.
pub struct TokioExecutor;
struct TokioExecutor;

impl<F> hyper::rt::Executor<F> for TokioExecutor
where
Expand All @@ -96,139 +133,3 @@ where
tokio::task::spawn(fut);
}
}

struct ServerHttp2 {
receiver: mpsc::Receiver<anyhow::Result<()>>,
}

impl ServerHttp2 {
fn new() -> Self {
tracing::debug!("initializing http2 server");
static CELL_HTTP2: OnceLock<TcpListener> = OnceLock::new();
let listener = CELL_HTTP2.get_or_init(|| {
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
tracing::debug!("preparing tcp listener at localhost:3001");
TcpListener::bind(addr).unwrap()
});
let (sender, receiver) = mpsc::channel::<anyhow::Result<()>>();
std::thread::spawn(move || {
tracing::debug!("dedicated thread to start listening");
match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => {
tracing::debug!("using tokio runtime");
sender
.send(rt.block_on(async move {
tracing::debug!("preparing to accept incoming connection");
let (stream, _) = listener.accept().map_err(anyhow::Error::from)?;
tracing::trace!("tcp stream {:?}", stream);

let mut builder =
hyper::server::conn::http2::Builder::new(TokioExecutor);
let http = builder.max_concurrent_streams(20);
let io = tokio::net::TcpStream::from_std(stream)
.map_err(anyhow::Error::from)?;

tracing::debug!("preparing to bind connection to service");
let conn = http.serve_connection(io, service_fn(test)).await;
tracing::trace!("connection result {:?}", conn);
if let Err(e) = &conn {
let message = e.to_string();
if message.contains("connection closed before reading preface")
|| message.contains("unspecific protocol error detected")
{
return Ok(());
}
}
conn.map_err(anyhow::Error::from)
}))
.expect("value sent from http2 server dedicated thread");
}
Err(e) => {
tracing::debug!("unable to start tokio runtime");
sender.send(Err(anyhow::Error::from(e))).unwrap()
}
};
});
Self { receiver }
}

fn shutdown(self) -> anyhow::Result<()> {
tracing::debug!("shutting down http2 server");
self.receiver
.recv_timeout(DEFAULT_TIMEOUT)
.context("value received from http2 server dedicated thread")?
}
}

pub async fn setup_http1(f: impl Future<Output = anyhow::Result<()>>) -> anyhow::Result<()> {
tracing::debug!("preparing http1 server asynchronously");
let server = ServerHttp1::new();

tracing::debug!("running inner function (future)");
let result = f.await;

if let Err(err) = server.shutdown() {
tracing::error!("[host/server] failure {:?}", err);
}
result
}

pub fn setup_http1_sync<F>(f: F) -> anyhow::Result<()>
where
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
{
tracing::debug!("preparing http1 server synchronously");
let server = ServerHttp1::new();

let (tx, rx) = mpsc::channel::<anyhow::Result<()>>();
tracing::debug!("running inner function in a dedicated thread");
std::thread::spawn(move || {
let _ = tx.send(f());
});
let result = rx
.recv_timeout(DEFAULT_TIMEOUT)
.context("value received from request dedicated thread");

if let Err(err) = server.shutdown() {
tracing::error!("[host/server] failure {:?}", err);
}
result?
}

pub async fn setup_http2(f: impl Future<Output = anyhow::Result<()>>) -> anyhow::Result<()> {
tracing::debug!("preparing http2 server asynchronously");
let server = ServerHttp2::new();

tracing::debug!("running inner function (future)");
let result = f.await;

if let Err(err) = server.shutdown() {
tracing::error!("[host/server] Failure: {:?}", err);
}
result
}

pub fn setup_http2_sync<F>(f: F) -> anyhow::Result<()>
where
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
{
tracing::debug!("preparing http2 server synchronously");
let server = ServerHttp2::new();

let (tx, rx) = mpsc::channel::<anyhow::Result<()>>();
tracing::debug!("running inner function in a dedicated thread");
std::thread::spawn(move || {
let _ = tx.send(f());
});
let result = rx
.recv_timeout(DEFAULT_TIMEOUT)
.context("value received from request dedicated thread");

if let Err(err) = server.shutdown() {
tracing::error!("[host/server] failure {:?}", err);
}
result?
}
Loading