Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a pagectl tool to recompress image layers #7879

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8745c0d
Add a pagectl tool to recompress image layers
arpad-m May 24, 2024
2d37db2
Add mode to compare multiple files from a tenant
arpad-m Jun 4, 2024
f5baac2
clippy
arpad-m Jun 4, 2024
9850794
Remote layer file after done
arpad-m Jun 6, 2024
554a6bd
Two separate commands
arpad-m Jun 6, 2024
d030cbf
Print number of keys
arpad-m Jun 6, 2024
f132658
Some prints
arpad-m Jun 6, 2024
fce252f
Rename dest_path to tmp_dir
arpad-m Jun 6, 2024
9b74d55
Remove generation suffix
arpad-m Jun 6, 2024
80803ff
Printing tweaks
arpad-m Jun 6, 2024
3182c33
Corrections
arpad-m Jun 6, 2024
0c50045
Print error better
arpad-m Jun 6, 2024
a9963db
Create timeline dir in temp location if not existent
arpad-m Jun 6, 2024
8fcb236
Increase listing limit
arpad-m Jun 6, 2024
14447b9
Yield in between
arpad-m Jun 6, 2024
0e667dc
more yielding
arpad-m Jun 6, 2024
dadbd87
Add percent to output
arpad-m Jun 6, 2024
c824ffe
Add ZstdHigh compression mode
arpad-m Jun 7, 2024
843d996
More precise printing
arpad-m Jun 7, 2024
e6a0e7e
Add zstd with low compression quality
arpad-m Jun 7, 2024
2eb8b42
Also support the generation-less legacy naming scheme
arpad-m Jun 7, 2024
8fcdc22
Add stats info
arpad-m Jun 7, 2024
88b24e1
Move constants out into file
arpad-m Jun 10, 2024
40e7971
Add decompression
arpad-m Jun 12, 2024
07bd0ce
Also measure decompression time
arpad-m Jun 14, 2024
2f70221
Don't forget the flush
arpad-m Jun 14, 2024
2ea8d1b
Shutdown instead of flush
arpad-m Jun 14, 2024
983972f
Add tests for compression
arpad-m Jun 14, 2024
f1bebda
Fix failing test
arpad-m Jun 14, 2024
0fdeca8
Fix tests
arpad-m Jun 14, 2024
d2533c0
Always delete the file, even on error
arpad-m Jun 15, 2024
c5034f0
Fix build
arpad-m Jun 17, 2024
e749e73
Add the compression algo name
arpad-m Jun 17, 2024
66d3bef
More printing and assertions
arpad-m Jun 17, 2024
6b67135
Fix offset stream issue
arpad-m Jun 17, 2024
5e32cce
Add script for plots
arpad-m Jun 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
lz4_flex = "0.11"
md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }
Expand Down
20 changes: 20 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,26 @@ pub enum CompactionAlgorithm {
Tiered,
}

#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
enum_map::Enum,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
ZstdLow,
Zstd,
ZstdHigh,
LZ4,
}

#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,
Expand Down
1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
lz4_flex.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses
Expand Down
154 changes: 153 additions & 1 deletion pageserver/ctl/src/layers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::num::NonZeroU32;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;

use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
Expand All @@ -8,7 +11,7 @@ use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{delta_layer, image_layer, LayerName};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
Expand All @@ -20,7 +23,12 @@ use pageserver::{
},
virtual_file::VirtualFile,
};
use pageserver_api::models::ImageCompressionAlgorithm;
use remote_storage::{GenericRemoteStorage, ListingMode, RemotePath, RemoteStorageConfig};
use std::fs;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};

Expand Down Expand Up @@ -55,6 +63,17 @@ pub(crate) enum LayerCmd {
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
CompressOne {
dest_path: Utf8PathBuf,
layer_file_path: Utf8PathBuf,
},
CompressMany {
tmp_dir: Utf8PathBuf,
tenant_remote_prefix: String,
tenant_remote_config: String,
layers_dir: Utf8PathBuf,
parallelism: Option<u32>,
},
}

async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
Expand Down Expand Up @@ -240,5 +259,138 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {

anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
LayerCmd::CompressOne {
dest_path,
layer_file_path,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);

let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

let stats =
ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?;
println!(
"Statistics: {stats:#?}\n{}",
serde_json::to_string(&stats).unwrap()
);
Ok(())
}
LayerCmd::CompressMany {
tmp_dir,
tenant_remote_prefix,
tenant_remote_config,
layers_dir,
parallelism,
} => {
pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
pageserver::page_cache::init(100);

let toml_document = toml_edit::Document::from_str(tenant_remote_config)?;
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");
let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config");
let storage = remote_storage::GenericRemoteStorage::from_config(&config)?;
let storage = Arc::new(storage);

let cancel = CancellationToken::new();
let path = RemotePath::from_string(tenant_remote_prefix)?;
let max_files = NonZeroU32::new(128_000);
let files_list = storage
.list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel)
.await?;

println!("Listing gave {} keys", files_list.keys.len());

tokio::fs::create_dir_all(&layers_dir).await?;

let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize));

let mut tasks = JoinSet::new();
for (file_idx, file_key) in files_list.keys.iter().enumerate() {
let Some(file_name) = file_key.object_name() else {
continue;
};
match LayerName::from_str(file_name) {
Ok(LayerName::Delta(_)) => continue,
Ok(LayerName::Image(_)) => (),
Err(_e) => {
// Split off the final part. We ensured above that this is not turning a
// generation-less delta layer file name into an image layer file name.
let Some(file_without_generation) = file_name.rsplit_once('-') else {
continue;
};
let Ok(LayerName::Image(_layer_file_name)) =
LayerName::from_str(file_without_generation.0)
else {
// Skipping because it's either not a layer or an image layer
//println!("object {file_name}: not an image layer");
continue;
};
}
}
let json_file_path = layers_dir.join(format!("{file_name}.json"));
if tokio::fs::try_exists(&json_file_path).await? {
//println!("object {file_name}: report already created");
// If we have already created a report for the layer, skip it.
continue;
}
let local_layer_path = layers_dir.join(file_name);
async fn stats(
semaphore: Arc<Semaphore>,
local_layer_path: Utf8PathBuf,
json_file_path: Utf8PathBuf,
tmp_dir: Utf8PathBuf,
storage: Arc<GenericRemoteStorage>,
file_key: RemotePath,
) -> Result<Vec<(Option<ImageCompressionAlgorithm>, u64, u64, u64)>, anyhow::Error>
{
let _permit = semaphore.acquire().await?;
let cancel = CancellationToken::new();
let download = storage.download(&file_key, &cancel).await?;
let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?;
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?;
println!("Downloaded file to {local_layer_path}");
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let stats =
ImageLayer::compression_statistics(&tmp_dir, &local_layer_path, &ctx)
.await?;

let stats_str = serde_json::to_string(&stats).unwrap();
tokio::fs::write(json_file_path, stats_str).await?;

tokio::fs::remove_file(&local_layer_path).await?;
Ok(stats)
}
let semaphore = semaphore.clone();
let file_key = file_key.to_owned();
let storage = storage.clone();
let tmp_dir = tmp_dir.to_owned();
let file_name = file_name.to_owned();
let percent = (file_idx * 100) as f64 / files_list.keys.len() as f64;
tasks.spawn(async move {
let stats = stats(
semaphore,
local_layer_path.to_owned(),
json_file_path.to_owned(),
tmp_dir,
storage,
file_key,
)
.await;
match stats {
Ok(stats) => {
println!("Statistics for {file_name} ({percent:.1}%): {stats:?}\n")
}
Err(e) => eprintln!("Error for {file_name}: {e:?}"),
};
});
}
while let Some(_res) = tasks.join_next().await {}

Ok(())
}
}
}
21 changes: 20 additions & 1 deletion pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! See also `settings.md` for better description on every parameter.

use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use pageserver_api::{models::ImageCompressionAlgorithm, shard::TenantShardId};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
Expand Down Expand Up @@ -55,6 +55,7 @@ pub mod defaults {
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
use pageserver_api::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;

pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
Expand Down Expand Up @@ -95,6 +96,8 @@ pub mod defaults {

pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB

pub const DEFAULT_IMAGE_COMPRESSION: Option<ImageCompressionAlgorithm> = None;

pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;

pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
Expand Down Expand Up @@ -290,6 +293,8 @@ pub struct PageServerConf {

pub validate_vectored_get: bool,

pub image_compression: Option<ImageCompressionAlgorithm>,

/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
Expand Down Expand Up @@ -400,6 +405,8 @@ struct PageServerConfigBuilder {

validate_vectored_get: BuilderValue<bool>,

image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,

ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
}

Expand Down Expand Up @@ -487,6 +494,7 @@ impl PageServerConfigBuilder {
max_vectored_read_bytes: Set(MaxVectoredReadBytes(
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
}
Expand Down Expand Up @@ -672,6 +680,10 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}

pub fn get_image_compression(&mut self, value: Option<ImageCompressionAlgorithm>) {
self.image_compression = BuilderValue::Set(value);
}

pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}
Expand Down Expand Up @@ -732,6 +744,7 @@ impl PageServerConfigBuilder {
get_impl,
max_vectored_read_bytes,
validate_vectored_get,
image_compression,
ephemeral_bytes_per_memory_kb,
}
CUSTOM LOGIC
Expand Down Expand Up @@ -1026,6 +1039,9 @@ impl PageServerConf {
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"image_compression" => {
builder.get_image_compression(Some(parse_toml_from_str("image_compression", item)?))
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
Expand Down Expand Up @@ -1110,6 +1126,7 @@ impl PageServerConf {
NonZeroUsize::new(defaults::DEFAULT_MAX_VECTORED_READ_BYTES)
.expect("Invalid default constant"),
),
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
}
Expand Down Expand Up @@ -1350,6 +1367,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
},
"Correct defaults should be used when no config values are provided"
Expand Down Expand Up @@ -1423,6 +1441,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
},
"Should be able to parse all basic config values correctly"
Expand Down
Loading
Loading