Skip to content

Commit

Permalink
feat: add flamegraphs to benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlcibiades committed Sep 11, 2024
1 parent 091f0d8 commit 66c09f2
Show file tree
Hide file tree
Showing 5 changed files with 587 additions and 26 deletions.
13 changes: 10 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ readme = "README.md"
repository = "https://github.com/valorem-labs-inc/hyper-server"
version = "0.7.0"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.6", optional = true }

[dependencies]
async-stream = "0.3.5"
bytes = "1.7.1"
Expand All @@ -31,18 +34,22 @@ tokio-stream = { version = "0.1.16", features = ["net"] }
tokio-util = "0.7.12"
tower = { version = "0.5.1", features = ["util"] }
tracing = "0.1.40"
jemallocator = { version = "0.5", optional = true }
signature = "2.3.0-pre.4"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
hyper = { version = "1.4.1", features = ["client"] }
tokio = { version = "1.40", features = ["rt", "net", "test-util"] }
tokio-util = { version = "0.7", features = ["compat"] }
tokio-util = { version = "0.7.12", features = ["compat"] }
tracing-subscriber = "0.3.18"
num_cpus = "1.16.0"
pprof = { version = "0.13.0", features = ["flamegraph"] }
ring = "0.17.8"
rcgen = "0.13.1"

[[bench]]
name = "hello_world_tower_hyper_tls_tcp"
harness = false

[features]
jemalloc = ["jemallocator"]
jemalloc = ["tikv-jemallocator"]
103 changes: 82 additions & 21 deletions benches/hello_world_tower_hyper_tls_tcp.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,75 @@
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{fs::File, os::raw::c_int, path::Path};

use bytes::Bytes;
use criterion::profiler::Profiler;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use http::{Request, Response, StatusCode, Uri};
use http_body_util::{BodyExt, Empty, Full};
use hyper::body::Incoming;
use hyper_rustls::HttpsConnectorBuilder;
use hyper_server::{load_certs, load_private_key, serve_http_with_shutdown};
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto::Builder as HttpConnectionBuilder;
use hyper_util::service::TowerToHyperService;
use pprof::ProfilerGuard;
use rustls::server::ServerSessionMemoryCache;
use rustls::{ClientConfig, RootCertStore, ServerConfig};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpSocket;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::time::{Duration, Instant};
use tokio_stream::wrappers::TcpListenerStream;
use tracing::info;

use hyper_server::{load_certs, load_private_key, serve_http_with_shutdown};

/// Custom profiler that creates a flamegraph for each benchmark
pub struct FlamegraphProfiler<'a> {
frequency: c_int,
active_profiler: Option<ProfilerGuard<'a>>,
}

impl<'a> FlamegraphProfiler<'a> {
pub fn new(frequency: c_int) -> Self {
FlamegraphProfiler {
frequency,
active_profiler: None,
}
}
}

impl<'a> Profiler for FlamegraphProfiler<'a> {
fn start_profiling(&mut self, _benchmark_id: &str, _benchmark_dir: &Path) {
self.active_profiler = Some(ProfilerGuard::new(self.frequency).unwrap());
}

fn stop_profiling(&mut self, _benchmark_id: &str, benchmark_dir: &Path) {
std::fs::create_dir_all(benchmark_dir).unwrap();
let flamegraph_path = benchmark_dir.join("flamegraph.svg");
let flamegraph_file = File::create(&flamegraph_path)
.expect("File system error while creating flamegraph.svg");
if let Some(profiler) = self.active_profiler.take() {
profiler
.report()
.build()
.unwrap()
.flamegraph(flamegraph_file)
.expect("Error writing flamegraph");
}
}
}

fn create_optimized_runtime(thread_count: usize) -> io::Result<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(thread_count)
.max_blocking_threads(thread_count * 2)
.enable_all()
.build()
}

async fn echo(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/") => Ok(Response::new(Full::new(Bytes::from("Hello, World!")))),
Expand All @@ -41,14 +89,20 @@ async fn setup_server() -> Result<
(TcpListenerStream, SocketAddr, Arc<ServerConfig>),
Box<dyn std::error::Error + Send + Sync>,
> {
// Socket configuration
let addr = SocketAddr::from(([127, 0, 0, 1], 0)); // Listen on all interfaces
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let socket = TcpSocket::new_v4()?;

// Optimize TCP parameters
socket.set_send_buffer_size(262_144)?; // 256 KB
socket.set_recv_buffer_size(262_144)?; // 256 KB
socket.set_nodelay(true)?; // Disable Nagle's algorithm
socket.set_nodelay(true)?;
socket.set_reuseaddr(true)?;
socket.set_reuseport(true)?;
socket.set_keepalive(true)?;

socket.bind(addr)?;
let listener = socket.listen(8192)?; // Increase backlog for high-traffic scenarios
let listener = socket.listen(8192)?; // Increased backlog for high-traffic scenarios

let server_addr = listener.local_addr()?;
let incoming = TcpListenerStream::new(listener);

Expand Down Expand Up @@ -125,18 +179,20 @@ async fn concurrent_benchmark(
num_requests: usize,
) -> (Duration, Vec<Duration>, usize) {
let start = Instant::now();
let mut futures = FuturesUnordered::new();
let mut handles = Vec::with_capacity(num_requests);

for _ in 0..num_requests {
let client = client.clone();
let url = url.clone();
futures.push(async move { send_request(&client, url).await });
let handle = tokio::spawn(async move { send_request(&client, url).await });
handles.push(handle);
}

let mut request_times = Vec::with_capacity(num_requests);
let mut total_bytes = 0;
while let Some(result) = futures.next().await {
if let Ok((duration, bytes)) = result {

for handle in handles {
if let Ok(Ok((duration, bytes))) = handle.await {
request_times.push(duration);
total_bytes += bytes;
}
Expand All @@ -147,8 +203,9 @@ async fn concurrent_benchmark(
}

fn bench_server(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let (server_addr, shutdown_tx, client) = rt.block_on(async {
let server_runtime = Arc::new(create_optimized_runtime(num_cpus::get() / 2).unwrap());

let (server_addr, shutdown_tx, client) = server_runtime.block_on(async {
let (server_addr, shutdown_tx) = start_server().await.expect("Failed to start server");
info!("Server started on {}", server_addr);

Expand Down Expand Up @@ -191,7 +248,8 @@ fn bench_server(c: &mut Criterion) {
group.bench_function("latency", |b| {
let client = client.clone();
let url = url.clone();
b.to_async(&rt)
let client_runtime = create_optimized_runtime(num_cpus::get() / 2).unwrap();
b.to_async(client_runtime)
.iter(|| async { send_request(&client, url.clone()).await.unwrap().0 });
});

Expand All @@ -200,12 +258,13 @@ fn bench_server(c: &mut Criterion) {
group.bench_function("throughput", |b| {
let client = client.clone();
let url = url.clone();
b.to_async(&rt)
let client_runtime = create_optimized_runtime(num_cpus::get() / 2).unwrap();
b.to_async(client_runtime)
.iter(|| async { send_request(&client, url.clone()).await.unwrap() });
});

// Concurrency stress test
let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]; // log sequence
let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987];
for &num_requests in &concurrent_requests {
group.throughput(Throughput::Elements(num_requests as u64));
group.bench_with_input(
Expand All @@ -214,7 +273,8 @@ fn bench_server(c: &mut Criterion) {
|b, &num_requests| {
let client = client.clone();
let url = url.clone();
b.to_async(&rt).iter(|| async {
let client_runtime = create_optimized_runtime(num_cpus::get() / 2).unwrap();
b.to_async(client_runtime).iter(|| async {
concurrent_benchmark(&client, url.clone(), num_requests).await
});
},
Expand All @@ -223,7 +283,7 @@ fn bench_server(c: &mut Criterion) {

group.finish();

rt.block_on(async {
server_runtime.block_on(async {
shutdown_tx.send(()).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
});
Expand All @@ -234,7 +294,8 @@ criterion_group! {
config = Criterion::default()
.sample_size(10)
.measurement_time(Duration::from_secs(20))
.warm_up_time(Duration::from_secs(5));
.warm_up_time(Duration::from_secs(5))
.with_profiler(FlamegraphProfiler::new(100));
targets = bench_server
}

Expand Down
Loading

0 comments on commit 66c09f2

Please sign in to comment.