Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: add bench_ingest #7409

Merged
merged 12 commits into from
Aug 6, 2024
4 changes: 4 additions & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ harness = false
[[bench]]
name = "bench_walredo"
harness = false

[[bench]]
name = "bench_ingest"
harness = false
226 changes: 226 additions & 0 deletions pageserver/benches/bench_ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
use std::{env, num::NonZeroUsize};

use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{criterion_group, criterion_main, Criterion};
use pageserver::{
config::PageServerConf,
context::{DownloadBehavior, RequestContext},
l0_flush::{L0FlushConfig, L0FlushGlobalState},
page_cache,
repository::Value,
task_mgr::TaskKind,
tenant::storage_layer::InMemoryLayer,
virtual_file::{self, api::IoEngineKind},
};
use pageserver_api::{key::Key, shard::TenantShardId};
use utils::{
bin_ser::BeSer,
id::{TenantId, TimelineId},
};

// A very cheap hash for generating non-sequential keys.
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h = h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}

enum KeyLayout {
/// Sequential unique keys
Sequential,
/// Random unique keys
Random,
/// Random keys, but only use the bits from the mask of them
RandomReuse(u32),
}

enum WriteDelta {
Yes,
No,
}

async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) -> anyhow::Result<()> {
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);

let timeline_id = TimelineId::generate();
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);

tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
jcsp marked this conversation as resolved.
Show resolved Hide resolved

let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;

let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
jcsp marked this conversation as resolved.
Show resolved Hide resolved
let ctx = RequestContext::new(
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
pageserver::context::DownloadBehavior::Download,
);

for i in 0..put_count {
lsn += put_size as u64;

match key_layout {
KeyLayout::Sequential => {
// Use sequential order to illustrate the experience a user is likely to have
// when ingesting bulk data.
key.field3 = i as u32;
jcsp marked this conversation as resolved.
Show resolved Hide resolved
}
KeyLayout::Random => {
// Use random-order keys to avoid giving a false advantage to data structures that are
// faster when inserting on the end.
key.field3 = murmurhash32(i as u32);
}
KeyLayout::RandomReuse(mask) => {
// Use low bits only, to limit cardinality
key.field3 = murmurhash32(i as u32) & mask;
}
}

layer.put_value(key, lsn, &data, &ctx).await?;
}
layer.freeze(lsn + 1).await;

if matches!(write_delta, WriteDelta::Yes) {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
let (_desc, path) = layer
.write_to_disk(&ctx, None, l0_flush_state.inner())
.await?
.unwrap();
tokio::fs::remove_file(path).await?;
}

Ok(())
}

/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

runtime.block_on(async move {
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
if let Err(e) = r {
panic!("{e:?}");
}
});
}

fn criterion_benchmark(c: &mut Criterion) {
jcsp marked this conversation as resolved.
Show resolved Hide resolved
let repo_dir: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap();
let repo_dir = repo_dir.join("bench_data");
eprintln!("Data directory: {repo_dir}");

let conf: &'static PageServerConf = Box::leak(Box::new(
pageserver::config::PageServerConf::dummy_conf(repo_dir),
));
virtual_file::init(16384, IoEngineKind::TokioEpollUring);
page_cache::init(conf.page_cache_size);

{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}

{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);