From 74eda0b0b75194a3e35f735b619cce60676f4dae Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 17 Apr 2024 13:28:07 +0100 Subject: [PATCH 1/8] pageserver: make bench'able methods public --- pageserver/src/tenant/storage_layer/inmemory_layer.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index f9010ae8a649..15c0bc8b1c3d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -482,8 +482,7 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - - pub(crate) async fn put_value( + pub async fn put_value( &self, key: Key, lsn: Lsn, @@ -579,7 +578,7 @@ impl InMemoryLayer { /// if there are no matching keys. /// /// Returns a new delta layer with all the same data as this in-memory layer - pub(crate) async fn write_to_disk( + pub async fn write_to_disk( &self, timeline: &Arc, ctx: &RequestContext, From 137cbb4db454e9a355c07efdcf445ba35875ca4b Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 17 Apr 2024 12:48:03 +0100 Subject: [PATCH 2/8] pageserver: refactor DeltaLayerWriter to not need a Timeline --- pageserver/src/l0_flush.rs | 4 +- .../src/tenant/storage_layer/delta_layer.rs | 47 ++++++++----------- .../tenant/storage_layer/inmemory_layer.rs | 18 +++---- pageserver/src/tenant/timeline.rs | 11 +++-- pageserver/src/tenant/timeline/compaction.rs | 43 +++++++++-------- .../src/tenant/timeline/detach_ancestor.rs | 6 ++- 6 files changed, 64 insertions(+), 65 deletions(-) diff --git a/pageserver/src/l0_flush.rs b/pageserver/src/l0_flush.rs index 8945e5accdbf..10187f2ba309 100644 --- a/pageserver/src/l0_flush.rs +++ b/pageserver/src/l0_flush.rs @@ -24,7 +24,7 @@ impl Default for L0FlushConfig { #[derive(Clone)] pub struct L0FlushGlobalState(Arc); -pub(crate) enum Inner { +pub enum Inner { PageCached, Direct { semaphore: tokio::sync::Semaphore }, } @@ -40,7 +40,7 @@ impl L0FlushGlobalState { } } - pub(crate) fn inner(&self) -> &Arc { + pub fn inner(&self) -> &Arc { &self.0 } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f9becf53ff02..ba5d291694b0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -36,13 +36,13 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi use crate::tenant::disk_btree::{ DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection, }; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState}; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ BlobFlag, MaxVectoredReadBytes, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner, }; -use crate::tenant::{PageReconstructError, Timeline}; +use crate::tenant::PageReconstructError; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; @@ -73,8 +73,7 @@ use utils::{ }; use super::{ - AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ResidentLayer, - ValuesReconstructState, + AsLayerDesc, LayerAccessStats, LayerName, PersistentLayerDesc, ValuesReconstructState, }; /// @@ -373,7 +372,6 @@ impl DeltaLayer { /// 3. Call `finish`. /// struct DeltaLayerWriterInner { - conf: &'static PageServerConf, pub path: Utf8PathBuf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, @@ -417,7 +415,6 @@ impl DeltaLayerWriterInner { let tree_builder = DiskBtreeBuilder::new(block_buf); Ok(Self { - conf, path, timeline_id, tenant_shard_id, @@ -488,11 +485,10 @@ impl DeltaLayerWriterInner { async fn finish( self, key_end: Key, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let temp_path = self.path.clone(); - let result = self.finish0(key_end, timeline, ctx).await; + let result = self.finish0(key_end, ctx).await; if result.is_err() { tracing::info!(%temp_path, "cleaning up temporary file after error during writing"); if let Err(e) = std::fs::remove_file(&temp_path) { @@ -505,9 +501,8 @@ impl DeltaLayerWriterInner { async fn finish0( self, key_end: Key, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { let index_start_blk = ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; @@ -572,11 +567,9 @@ impl DeltaLayerWriterInner { // fsync the file file.sync_all().await?; - let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?; - - trace!("created delta layer {}", layer.local_path()); + trace!("created delta layer {}", self.path); - Ok(layer) + Ok((desc, self.path)) } } @@ -677,14 +670,9 @@ impl DeltaLayerWriter { pub(crate) async fn finish( mut self, key_end: Key, - timeline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result { - self.inner - .take() - .unwrap() - .finish(key_end, timeline, ctx) - .await + ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + self.inner.take().unwrap().finish(key_end, ctx).await } } @@ -1669,8 +1657,9 @@ pub(crate) mod test { use super::*; use crate::repository::Value; use crate::tenant::harness::TIMELINE_ID; + use crate::tenant::storage_layer::{Layer, ResidentLayer}; use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; - use crate::tenant::Tenant; + use crate::tenant::{Tenant, Timeline}; use crate::{ context::DownloadBehavior, task_mgr::TaskKind, @@ -1964,9 +1953,8 @@ pub(crate) mod test { res?; } - let resident = writer - .finish(entries_meta.key_range.end, &timeline, &ctx) - .await?; + let (desc, path) = writer.finish(entries_meta.key_range.end, &ctx).await?; + let resident = Layer::finish_creating(harness.conf, &timeline, desc, &path)?; let inner = resident.get_as_delta(&ctx).await?; @@ -2155,7 +2143,8 @@ pub(crate) mod test { .await .unwrap(); - let copied_layer = writer.finish(Key::MAX, &branch, ctx).await.unwrap(); + let (desc, path) = writer.finish(Key::MAX, ctx).await.unwrap(); + let copied_layer = Layer::finish_creating(tenant.conf, &branch, desc, &path).unwrap(); copied_layer.get_as_delta(ctx).await.unwrap(); @@ -2283,7 +2272,9 @@ pub(crate) mod test { for (key, lsn, value) in deltas { writer.put_value(key, lsn, value, ctx).await?; } - let delta_layer = writer.finish(key_end, tline, ctx).await?; + + let (desc, path) = writer.finish(key_end, ctx).await?; + let delta_layer = Layer::finish_creating(tenant.conf, tline, desc, &path)?; Ok::<_, anyhow::Error>(delta_layer) } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 15c0bc8b1c3d..706a2c30551a 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -12,9 +12,10 @@ use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::storage_layer::ValueReconstructResult; use crate::tenant::timeline::GetVectoredError; -use crate::tenant::{PageReconstructError, Timeline}; +use crate::tenant::PageReconstructError; use crate::{l0_flush, page_cache, walrecord}; use anyhow::{anyhow, ensure, Result}; +use camino::Utf8PathBuf; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; @@ -34,7 +35,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize}; use tokio::sync::{RwLock, RwLockWriteGuard}; use super::{ - DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState, + DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValueReconstructState, ValuesReconstructState, }; @@ -580,10 +581,10 @@ impl InMemoryLayer { /// Returns a new delta layer with all the same data as this in-memory layer pub async fn write_to_disk( &self, - timeline: &Arc, ctx: &RequestContext, key_range: Option>, - ) -> Result> { + l0_flush_global_state: &l0_flush::Inner, + ) -> Result> { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the // write lock on it, so we shouldn't block anyone. There's one exception @@ -595,9 +596,8 @@ impl InMemoryLayer { // rare though, so we just accept the potential latency hit for now. let inner = self.inner.read().await; - let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone(); use l0_flush::Inner; - let _concurrency_permit = match &*l0_flush_global_state { + let _concurrency_permit = match l0_flush_global_state { Inner::PageCached => None, Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await), }; @@ -627,7 +627,7 @@ impl InMemoryLayer { ) .await?; - match &*l0_flush_global_state { + match l0_flush_global_state { l0_flush::Inner::PageCached => { let ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) @@ -692,7 +692,7 @@ impl InMemoryLayer { } // MAX is used here because we identify L0 layers by full key range - let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?; + let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?; // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``. // @@ -704,6 +704,6 @@ impl InMemoryLayer { // we dirtied when writing to the filesystem have been flushed and marked !dirty. drop(_concurrency_permit); - Ok(Some(delta_layer)) + Ok(Some((desc, path))) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 37ebeded660e..abda60bb1126 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4181,12 +4181,14 @@ impl Timeline { let frozen_layer = Arc::clone(frozen_layer); let ctx = ctx.attached_child(); let work = async move { - let Some(new_delta) = frozen_layer - .write_to_disk(&self_clone, &ctx, key_range) + let Some((desc, path)) = frozen_layer + .write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner()) .await? else { return Ok(None); }; + let new_delta = Layer::finish_creating(self_clone.conf, &self_clone, desc, &path)?; + // The write_to_disk() above calls writer.finish() which already did the fsync of the inodes. // We just need to fsync the directory in which these inodes are linked, // which we know to be the timeline directory. @@ -5797,9 +5799,8 @@ impl Timeline { for (key, lsn, val) in deltas.data { delta_layer_writer.put_value(key, lsn, val, ctx).await?; } - let delta_layer = delta_layer_writer - .finish(deltas.key_range.end, self, ctx) - .await?; + let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?; { let mut guard = self.layers.write().await; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 61d662d25d66..1e2df7a90ab3 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1006,14 +1006,16 @@ impl Timeline { || contains_hole { // ... if so, flush previous layer and prepare to write new one - new_layers.push( - writer - .take() - .unwrap() - .finish(prev_key.unwrap().next(), self, ctx) - .await - .map_err(CompactionError::Other)?, - ); + let (desc, path) = writer + .take() + .unwrap() + .finish(prev_key.unwrap().next(), ctx) + .await + .map_err(CompactionError::Other)?; + let new_delta = Layer::finish_creating(self.conf, self, desc, &path) + .map_err(CompactionError::Other)?; + + new_layers.push(new_delta); writer = None; if contains_hole { @@ -1076,12 +1078,13 @@ impl Timeline { prev_key = Some(key); } if let Some(writer) = writer { - new_layers.push( - writer - .finish(prev_key.unwrap().next(), self, ctx) - .await - .map_err(CompactionError::Other)?, - ); + let (desc, path) = writer + .finish(prev_key.unwrap().next(), ctx) + .await + .map_err(CompactionError::Other)?; + let new_delta = Layer::finish_creating(self.conf, self, desc, &path) + .map_err(CompactionError::Other)?; + new_layers.push(new_delta); } // Sync layers @@ -1853,9 +1856,11 @@ impl Timeline { for (key, lsn, val) in deltas { delta_layer_writer.put_value(key, lsn, val, ctx).await?; } - let delta_layer = delta_layer_writer - .finish(delta_key.key_range.end, tline, ctx) + + let (desc, path) = delta_layer_writer + .finish(delta_key.key_range.end, ctx) .await?; + let delta_layer = Layer::finish_creating(tline.conf, tline, desc, &path)?; Ok(Some(FlushDeltaResult::CreateResidentLayer(delta_layer))) } @@ -2262,9 +2267,9 @@ impl CompactionJobExecutor for TimelineAdaptor { )) }); - let new_delta_layer = writer - .finish(prev.unwrap().0.next(), &self.timeline, ctx) - .await?; + let (desc, path) = writer.finish(prev.unwrap().0.next(), ctx).await?; + let new_delta_layer = + Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?; self.new_deltas.push(new_delta_layer); Ok(()) diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index ee5f8cd52a49..645b5ad2bfd2 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -488,10 +488,12 @@ async fn copy_lsn_prefix( // reuse the key instead of adding more holes between layers by using the real // highest key in the layer. let reused_highest_key = layer.layer_desc().key_range.end; - let copied = writer - .finish(reused_highest_key, target_timeline, ctx) + let (desc, path) = writer + .finish(reused_highest_key, ctx) .await .map_err(CopyDeltaPrefix)?; + let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path) + .map_err(CopyDeltaPrefix)?; tracing::debug!(%layer, %copied, "new layer produced"); From ae7d635098a7dd8c8ce428d83296df4382f684ed Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 1 Aug 2024 14:27:02 +0000 Subject: [PATCH 3/8] pageserver: add ingest bench --- pageserver/Cargo.toml | 4 + pageserver/benches/bench_ingest.rs | 226 +++++++++++++++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 pageserver/benches/bench_ingest.rs diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 43976250a449..0e748ee3db7c 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -108,3 +108,7 @@ harness = false [[bench]] name = "bench_walredo" harness = false + +[[bench]] +name = "bench_ingest" +harness = false diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs new file mode 100644 index 000000000000..7d1f69ae0ba1 --- /dev/null +++ b/pageserver/benches/bench_ingest.rs @@ -0,0 +1,226 @@ +use std::{env, num::NonZeroUsize}; + +use bytes::Bytes; +use camino::Utf8PathBuf; +use criterion::{criterion_group, criterion_main, Criterion}; +use pageserver::{ + config::PageServerConf, + context::{DownloadBehavior, RequestContext}, + l0_flush::{L0FlushConfig, L0FlushGlobalState}, + page_cache, + repository::Value, + task_mgr::TaskKind, + tenant::storage_layer::InMemoryLayer, + virtual_file::{self, api::IoEngineKind}, +}; +use pageserver_api::{key::Key, shard::TenantShardId}; +use utils::{ + bin_ser::BeSer, + id::{TenantId, TimelineId}, +}; + +// A very cheap hash for generating non-sequential keys. +fn murmurhash32(mut h: u32) -> u32 { + h ^= h >> 16; + h = h.wrapping_mul(0x85ebca6b); + h ^= h >> 13; + h = h.wrapping_mul(0xc2b2ae35); + h ^= h >> 16; + h +} + +enum KeyLayout { + /// Sequential unique keys + Sequential, + /// Random unique keys + Random, + /// Random keys, but only use the bits from the mask of them + RandomReuse(u32), +} + +enum WriteDelta { + Yes, + No, +} + +async fn ingest( + conf: &'static PageServerConf, + put_size: usize, + put_count: usize, + key_layout: KeyLayout, + write_delta: WriteDelta, +) -> anyhow::Result<()> { + let mut lsn = utils::lsn::Lsn(1000); + let mut key = Key::from_i128(0x0); + + let timeline_id = TimelineId::generate(); + let tenant_id = TenantId::generate(); + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + + tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?; + + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + + let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?; + + let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?; + let ctx = RequestContext::new( + pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler, + pageserver::context::DownloadBehavior::Download, + ); + + for i in 0..put_count { + lsn += put_size as u64; + + match key_layout { + KeyLayout::Sequential => { + // Use sequential order to illustrate the experience a user is likely to have + // when ingesting bulk data. + key.field3 = i as u32; + } + KeyLayout::Random => { + // Use random-order keys to avoid giving a false advantage to data structures that are + // faster when inserting on the end. + key.field3 = murmurhash32(i as u32); + } + KeyLayout::RandomReuse(mask) => { + // Use low bits only, to limit cardinality + key.field3 = murmurhash32(i as u32) & mask; + } + } + + layer.put_value(key, lsn, &data, &ctx).await?; + } + layer.freeze(lsn + 1).await; + + if matches!(write_delta, WriteDelta::Yes) { + let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct { + max_concurrency: NonZeroUsize::new(1).unwrap(), + }); + let (_desc, path) = layer + .write_to_disk(&ctx, None, l0_flush_state.inner()) + .await? + .unwrap(); + tokio::fs::remove_file(path).await?; + } + + Ok(()) +} + +/// Wrapper to instantiate a tokio runtime +fn ingest_main( + conf: &'static PageServerConf, + put_size: usize, + put_count: usize, + key_layout: KeyLayout, + write_delta: WriteDelta, +) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + runtime.block_on(async move { + let r = ingest(conf, put_size, put_count, key_layout, write_delta).await; + if let Err(e) = r { + panic!("{e:?}"); + } + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + let repo_dir: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap(); + let repo_dir = repo_dir.join("bench_data"); + eprintln!("Data directory: {repo_dir}"); + + let conf: &'static PageServerConf = Box::leak(Box::new( + pageserver::config::PageServerConf::dummy_conf(repo_dir), + )); + virtual_file::init(16384, IoEngineKind::TokioEpollUring); + page_cache::init(conf.page_cache_size); + + { + let mut group = c.benchmark_group("ingest-small-values"); + let put_size = 100usize; + let put_count = 128 * 1024 * 1024 / put_size; + group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64)); + group.sample_size(10); + group.bench_function("ingest 128MB/100b seq", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/100b rand", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Random, + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/100b rand-1024keys", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::RandomReuse(0x3ff), + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/100b seq, no delta", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::No, + ) + }) + }); + } + + { + let mut group = c.benchmark_group("ingest-big-values"); + let put_size = 8192usize; + let put_count = 128 * 1024 * 1024 / put_size; + group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64)); + group.sample_size(10); + group.bench_function("ingest 128MB/8k seq", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::Yes, + ) + }) + }); + group.bench_function("ingest 128MB/8k seq, no delta", |b| { + b.iter(|| { + ingest_main( + conf, + put_size, + put_count, + KeyLayout::Sequential, + WriteDelta::No, + ) + }) + }); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 5dcfe1c4b8422c58d28bd1bc87536b7cbad308eb Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 17 Apr 2024 15:58:11 +0100 Subject: [PATCH 4/8] pageserver: downgrade an assertion to debug --- .../src/tenant/storage_layer/inmemory_layer.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 706a2c30551a..d5fa51d93baa 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -548,8 +548,6 @@ impl InMemoryLayer { /// Records the end_lsn for non-dropped layers. /// `end_lsn` is exclusive pub async fn freeze(&self, end_lsn: Lsn) { - let inner = self.inner.write().await; - assert!( self.start_lsn < end_lsn, "{} >= {}", @@ -567,9 +565,13 @@ impl InMemoryLayer { }) .expect("frozen_local_path_str set only once"); - for vec_map in inner.index.values() { - for (lsn, _pos) in vec_map.as_slice() { - assert!(*lsn < end_lsn); + #[cfg(debug_assertions)] + { + let inner = self.inner.write().await; + for vec_map in inner.index.values() { + for (lsn, _pos) in vec_map.as_slice() { + assert!(*lsn < end_lsn); + } } } } From a8be0f3376546fd2f797cbf5ff4a5476a6750f39 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 5 Aug 2024 12:18:06 +0000 Subject: [PATCH 5/8] add a doc comment --- pageserver/benches/bench_ingest.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 7d1f69ae0ba1..cb908798933a 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -128,6 +128,13 @@ fn ingest_main( }); } +/// Declare a series of benchmarks for the Pageserver's ingest write path. +/// +/// This benchmark does not include WAL decode: it starts at InMemoryLayer::put_value, and ends either +/// at freezing the ephemeral layer, or writing the ephemeral layer out to an L0 (depending on whether WriteDelta is set). +/// +/// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on +/// a fast disk, CPU is the bottleneck at time of writing. fn criterion_benchmark(c: &mut Criterion) { let repo_dir: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap(); let repo_dir = repo_dir.join("bench_data"); From d152a57c2954b7a5275a1f509641afdbce3a9580 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 5 Aug 2024 12:23:15 +0000 Subject: [PATCH 6/8] s/field3/field6/ --- pageserver/benches/bench_ingest.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index cb908798933a..3eb1814a34d0 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -72,20 +72,22 @@ async fn ingest( for i in 0..put_count { lsn += put_size as u64; + // Generate lots of keys within a single relation, which simulates the typical bulk ingest case: people + // usually care the most about write performance when they're blasting a huge batch of data into a huge table. match key_layout { KeyLayout::Sequential => { // Use sequential order to illustrate the experience a user is likely to have // when ingesting bulk data. - key.field3 = i as u32; + key.field6 = i as u32; } KeyLayout::Random => { // Use random-order keys to avoid giving a false advantage to data structures that are // faster when inserting on the end. - key.field3 = murmurhash32(i as u32); + key.field6 = murmurhash32(i as u32); } KeyLayout::RandomReuse(mask) => { // Use low bits only, to limit cardinality - key.field3 = murmurhash32(i as u32) & mask; + key.field6 = murmurhash32(i as u32) & mask; } } From c2d5395a0050f2e42464fa0af9cb3669f107a8be Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 5 Aug 2024 12:36:15 +0000 Subject: [PATCH 7/8] clean up temp dir --- pageserver/benches/bench_ingest.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 3eb1814a34d0..af2b6934c610 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -138,12 +138,12 @@ fn ingest_main( /// Genuine disk I/O is used, so expect results to differ depending on storage. However, when running on /// a fast disk, CPU is the bottleneck at time of writing. fn criterion_benchmark(c: &mut Criterion) { - let repo_dir: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap(); - let repo_dir = repo_dir.join("bench_data"); - eprintln!("Data directory: {repo_dir}"); + let temp_dir_parent: Utf8PathBuf = env::current_dir().unwrap().try_into().unwrap(); + let temp_dir = camino_tempfile::tempdir_in(temp_dir_parent).unwrap(); + eprintln!("Data directory: {}", temp_dir.path()); let conf: &'static PageServerConf = Box::leak(Box::new( - pageserver::config::PageServerConf::dummy_conf(repo_dir), + pageserver::config::PageServerConf::dummy_conf(temp_dir.path().to_path_buf()), )); virtual_file::init(16384, IoEngineKind::TokioEpollUring); page_cache::init(conf.page_cache_size); From bf3e767b35abd9a1b36c07bef6116ae59235499b Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 5 Aug 2024 17:43:08 +0000 Subject: [PATCH 8/8] update split_writer for merge --- pageserver/src/tenant/storage_layer/split_writer.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/split_writer.rs b/pageserver/src/tenant/storage_layer/split_writer.rs index a966775f9ea3..d7bfe48c60f6 100644 --- a/pageserver/src/tenant/storage_layer/split_writer.rs +++ b/pageserver/src/tenant/storage_layer/split_writer.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use pageserver_api::key::{Key, KEY_SIZE}; use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId}; +use crate::tenant::storage_layer::Layer; use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline}; use super::{DeltaLayerWriter, ImageLayerWriter, ResidentLayer}; @@ -173,8 +174,9 @@ impl SplitDeltaLayerWriter { ) .await?; let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer); - self.generated_layers - .push(prev_delta_writer.finish(key, tline, ctx).await?); + let (desc, path) = prev_delta_writer.finish(key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + self.generated_layers.push(delta_layer); } self.inner.put_value(key, lsn, val, ctx).await } @@ -190,7 +192,10 @@ impl SplitDeltaLayerWriter { inner, .. } = self; - generated_layers.push(inner.finish(end_key, tline, ctx).await?); + + let (desc, path) = inner.finish(end_key, ctx).await?; + let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?; + generated_layers.push(delta_layer); Ok(generated_layers) }