From adde0ecfe03ff2e352650c2b807bcef4d8a2dc49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 4 Jul 2024 18:59:19 +0200 Subject: [PATCH] Flatten compression algorithm setting (#8265) This flattens the compression algorithm setting, removing the `Option<_>` wrapping layer and making handling of the setting easier. It also adds a specific setting for *disabled* compression with the continued ability to read copmressed data, giving us the option to more easily back out of a compression rollout, should the need arise, which was one of the limitations of #8238. Implements my suggestion from https://github.com/neondatabase/neon/pull/8238#issuecomment-2206181594 , inspired by Christian's review in https://github.com/neondatabase/neon/pull/8238#pullrequestreview-2156460268 . Part of #5431 --- libs/pageserver_api/src/models.rs | 15 ++++++++++++++- pageserver/src/config.rs | 11 ++++++----- pageserver/src/tenant/blob_io.rs | 18 +++++++++++++----- .../src/tenant/storage_layer/delta_layer.rs | 4 ++-- pageserver/src/tenant/storage_layer/layer.rs | 2 +- 5 files changed, 36 insertions(+), 14 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index ad65602f54d9..ecc543917e56 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -450,9 +450,22 @@ pub enum CompactionAlgorithm { )] #[strum(serialize_all = "kebab-case")] pub enum ImageCompressionAlgorithm { + /// Disabled for writes, and never decompress during reading. + /// Never set this after you've enabled compression once! + DisabledNoDecompress, + // Disabled for writes, support decompressing during read path + Disabled, /// Zstandard compression. Level 0 means and None mean the same (default level). Levels can be negative as well. /// For details, see the [manual](http://facebook.github.io/zstd/zstd_manual.html). - Zstd { level: Option }, + Zstd { + level: Option, + }, +} + +impl ImageCompressionAlgorithm { + pub fn allow_decompression(&self) -> bool { + !matches!(self, ImageCompressionAlgorithm::DisabledNoDecompress) + } } #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index fa7f7d8d97c0..b7c9af224404 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -91,7 +91,8 @@ pub mod defaults { pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB - pub const DEFAULT_IMAGE_COMPRESSION: Option = None; + pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm = + ImageCompressionAlgorithm::DisabledNoDecompress; pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true; @@ -288,7 +289,7 @@ pub struct PageServerConf { pub validate_vectored_get: bool, - pub image_compression: Option, + pub image_compression: ImageCompressionAlgorithm, /// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this /// is exceeded, we start proactively closing ephemeral layers to limit the total amount @@ -402,7 +403,7 @@ struct PageServerConfigBuilder { validate_vectored_get: BuilderValue, - image_compression: BuilderValue>, + image_compression: BuilderValue, ephemeral_bytes_per_memory_kb: BuilderValue, @@ -680,7 +681,7 @@ impl PageServerConfigBuilder { self.validate_vectored_get = BuilderValue::Set(value); } - pub fn get_image_compression(&mut self, value: Option) { + pub fn get_image_compression(&mut self, value: ImageCompressionAlgorithm) { self.image_compression = BuilderValue::Set(value); } @@ -1028,7 +1029,7 @@ impl PageServerConf { builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?) } "image_compression" => { - builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?)) + builder.get_image_compression(parse_toml_from_str("image_compression", item)?) } "ephemeral_bytes_per_memory_kb" => { builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 1a6a5702f19b..0705182d5db2 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -273,7 +273,12 @@ impl BlobWriter { srcbuf: B, ctx: &RequestContext, ) -> (B::Buf, Result) { - self.write_blob_maybe_compressed(srcbuf, ctx, None).await + self.write_blob_maybe_compressed( + srcbuf, + ctx, + ImageCompressionAlgorithm::DisabledNoDecompress, + ) + .await } /// Write a blob of data. Returns the offset that it was written to, @@ -282,7 +287,7 @@ impl BlobWriter { &mut self, srcbuf: B, ctx: &RequestContext, - algorithm: Option, + algorithm: ImageCompressionAlgorithm, ) -> (B::Buf, Result) { let offset = self.offset; @@ -314,7 +319,7 @@ impl BlobWriter { ); } let (high_bit_mask, len_written, srcbuf) = match algorithm { - Some(ImageCompressionAlgorithm::Zstd { level }) => { + ImageCompressionAlgorithm::Zstd { level } => { let mut encoder = if let Some(level) = level { async_compression::tokio::write::ZstdEncoder::with_quality( Vec::new(), @@ -335,7 +340,10 @@ impl BlobWriter { (BYTE_UNCOMPRESSED, len, slice.into_inner()) } } - None => (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()), + ImageCompressionAlgorithm::Disabled + | ImageCompressionAlgorithm::DisabledNoDecompress => { + (BYTE_UNCOMPRESSED, len, srcbuf.slice_full().into_inner()) + } }; let mut len_buf = (len_written as u32).to_be_bytes(); assert_eq!(len_buf[0] & 0xf0, 0); @@ -414,7 +422,7 @@ mod tests { wtr.write_blob_maybe_compressed( blob.clone(), &ctx, - Some(ImageCompressionAlgorithm::Zstd { level: Some(1) }), + ImageCompressionAlgorithm::Zstd { level: Some(1) }, ) .await } else { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index e6a4d6d5c45a..685f6dce60e7 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -49,7 +49,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; use pageserver_api::keyspace::KeySpace; -use pageserver_api::models::LayerAccessKind; +use pageserver_api::models::{ImageCompressionAlgorithm, LayerAccessKind}; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; @@ -453,7 +453,7 @@ impl DeltaLayerWriterInner { ) -> (Vec, anyhow::Result<()>) { assert!(self.lsn_range.start <= lsn); // We don't want to use compression in delta layer creation - let compression = None; + let compression = ImageCompressionAlgorithm::DisabledNoDecompress; let (val, res) = self .blob_writer .write_blob_maybe_compressed(val, ctx, compression) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index d1f5cc8f43a7..afd11780e77d 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1685,7 +1685,7 @@ impl DownloadedLayer { lsn, summary, Some(owner.conf.max_vectored_read_bytes), - owner.conf.image_compression.is_some(), + owner.conf.image_compression.allow_decompression(), ctx, ) .await