Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload initdb results to S3 #5390

Merged
merged 36 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3c89fb5
Compress the initdb result with zstd and upload it to S3
arpad-m Sep 26, 2023
6caace3
Compress into directly into zstd without buffering
arpad-m Sep 26, 2023
b14076e
Strip path prefix
arpad-m Sep 28, 2023
a864126
Introduce YieldingVec to do yields repeatedly
arpad-m Sep 28, 2023
c43faff
Docs
arpad-m Sep 28, 2023
5637f73
Move inside
arpad-m Oct 3, 2023
5715e08
Use append_path_with_name instead
arpad-m Oct 3, 2023
17a1be8
Fix yielding logic to provide signal to wakers
arpad-m Oct 4, 2023
32f655f
Use long distance matching
arpad-m Oct 4, 2023
fba067a
Merge remote-tracking branch 'origin/main' into arpad/upload_initdb_r…
arpad-m Nov 3, 2023
b0b6f3f
Make create_create_tar_zst take a cammino path
arpad-m Nov 3, 2023
642249f
Merge remote-tracking branch 'origin/main' into arpad/upload_initdb_r…
arpad-m Nov 9, 2023
724ee2e
Merge remote-tracking branch 'origin/main' into arpad/upload_initdb_r…
arpad-m Nov 11, 2023
7622444
Remove poll_write_vectored
arpad-m Nov 11, 2023
4e18f95
Rename variables and improve docs
arpad-m Nov 11, 2023
0595740
Add some docs
arpad-m Nov 11, 2023
0aaa277
Retry uploads
arpad-m Nov 11, 2023
04099cc
run cargo hakari generate
arpad-m Nov 11, 2023
5d23f74
Allow "failed creation" warning
arpad-m Nov 14, 2023
e7b8618
Add test for recreation of timeline
arpad-m Nov 14, 2023
3e58f8b
Add warning for too large initdb.tar.zst
arpad-m Nov 14, 2023
c88d37d
WIP add test_wal_restore_initdb test
arpad-m Nov 16, 2023
ebe2cd5
Better name and also don't run initdb
arpad-m Nov 22, 2023
9206b0d
Seek to zero
arpad-m Nov 22, 2023
8d820dc
flush the zstd archive
arpad-m Nov 22, 2023
7abfa1f
Add empty dirs
arpad-m Nov 22, 2023
b2424a2
Use the correct mode that postgres likes for directory creation
arpad-m Nov 23, 2023
13f48a8
Properly shutdown, not flush the writer
arpad-m Nov 23, 2023
eb82f45
Print original and restored lsns
arpad-m Nov 23, 2023
e08c7eb
Fix wal restore script
arpad-m Nov 23, 2023
465021e
Merge remote-tracking branch 'origin/main' into arpad/upload_initdb_r…
arpad-m Nov 23, 2023
86ec4c2
fix format
arpad-m Nov 23, 2023
d121ad6
Use pre-saved initial tenant/timeline values
arpad-m Nov 23, 2023
ac1baf4
Address review comments
arpad-m Nov 23, 2023
2444429
Format
arpad-m Nov 23, 2023
1f571b1
Address review comments and improve comments
arpad-m Nov 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ license = "Apache-2.0"
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
azure_core = "0.16"
azure_identity = "0.16"
azure_storage = "0.16"
Expand Down
12 changes: 10 additions & 2 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.copied()
.context("Failed to parse postgres version from the argument string")?;

let timeline_info =
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
let new_timeline_id_opt = parse_timeline_id(create_match)?;

let timeline_info = pageserver.timeline_create(
tenant_id,
new_timeline_id_opt,
None,
None,
Some(pg_version),
)?;
let new_timeline_id = timeline_info.timeline_id;

let last_record_lsn = timeline_info.last_record_lsn;
Expand Down Expand Up @@ -1308,6 +1315,7 @@ fn cli() -> Command {
.subcommand(Command::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(pg_version_arg.clone())
)
Expand Down
21 changes: 21 additions & 0 deletions libs/utils/scripts/restore_from_wal_initdb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

# like restore_from_wal.sh, but takes existing initdb.tar.zst

set -euxo pipefail

PG_BIN=$1
WAL_PATH=$2
DATA_DIR=$3
PORT=$4
echo "port=$PORT" >> "$DATA_DIR"/postgresql.conf
echo "shared_preload_libraries='\$libdir/neon_rmgr.so'" >> "$DATA_DIR"/postgresql.conf
REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"| cut -c 42-)
declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" start
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001
118 changes: 116 additions & 2 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@
//! a neon Timeline.
//!
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{self, Poll};

use anyhow::{bail, ensure, Context, Result};
use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
use bytes::Bytes;
use camino::Utf8Path;
use futures::StreamExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use nix::NixPath;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_tar::Archive;
use tokio_tar::Builder;
use tokio_tar::HeaderMode;
use tracing::*;
use walkdir::WalkDir;

use crate::context::RequestContext;
use crate::pgdatadir_mapping::*;
use crate::tenant::remote_timeline_client::INITDB_PATH;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
Expand All @@ -33,7 +40,9 @@ use utils::lsn::Lsn;
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
// Read control file to extract the LSN
let controlfile_path = path.join("global").join("pg_control");
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
let controlfile_buf = std::fs::read(&controlfile_path)
.with_context(|| format!("reading controlfile: {controlfile_path}"))?;
let controlfile = ControlFileData::decode(&controlfile_buf)?;
let lsn = controlfile.checkPoint;

Ok(Lsn(lsn))
Expand Down Expand Up @@ -618,3 +627,108 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes>
reader.read_to_end(&mut buf).await?;
Ok(Bytes::from(buf))
}

/// An in-memory buffer implementing `AsyncWrite`, inserting yields every now and then
///
/// The number of yields is bounded by above by the number of times poll_write is called,
/// so calling it with 8 KB chunks and 8 MB chunks gives the same number of yields in total.
/// This is an explicit choice as the `YieldingVec` is meant to give the async executor
/// breathing room between units of CPU intensive preparation of buffers to be written.
/// Once a write call is issued, the whole buffer has been prepared already, so there is no
/// gain in splitting up the memcopy further.
struct YieldingVec {
yield_budget: usize,
// the buffer written into
buf: Vec<u8>,
}

impl YieldingVec {
fn new() -> Self {
Self {
yield_budget: 0,
buf: Vec::new(),
}
}
// Whether we should yield for a read operation of given size
fn should_yield(&mut self, add_buf_len: usize) -> bool {
// Set this limit to a small value so that we are a
// good async citizen and yield repeatedly (but not
// too often for many small writes to cause many yields)
const YIELD_DIST: usize = 1024;

let target_buf_len = self.buf.len() + add_buf_len;
let ret = self.yield_budget / YIELD_DIST < target_buf_len / YIELD_DIST;
if self.yield_budget < target_buf_len {
self.yield_budget += add_buf_len;
}
ret
}
}

impl AsyncWrite for YieldingVec {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
if self.should_yield(buf.len()) {
cx.waker().wake_by_ref();
return Poll::Pending;
}
self.get_mut().buf.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}

pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result<Vec<u8>> {
let mut paths = Vec::new();
for entry in WalkDir::new(pgdata_path) {
let entry = entry?;
let metadata = entry.metadata().expect("error getting dir entry metadata");
koivunej marked this conversation as resolved.
Show resolved Hide resolved
// Also allow directories so that we also get empty directories
if !(metadata.is_file() || metadata.is_dir()) {
continue;
}
let path = entry.into_path();
paths.push(path);
}
// Do a sort to get a more consistent listing
paths.sort_unstable();
let zstd = ZstdEncoder::with_quality_and_params(
YieldingVec::new(),
Level::Default,
&[CParameter::enable_long_distance_matching(true)],
);
let mut builder = Builder::new(zstd);
// Use reproducible header mode
builder.mode(HeaderMode::Deterministic);
for path in paths {
let rel_path = path.strip_prefix(pgdata_path)?;
if rel_path.is_empty() {
// The top directory should not be compressed,
// the tar crate doesn't like that
continue;
}
builder.append_path_with_name(&path, rel_path).await?;
}
let mut zstd = builder.into_inner().await?;
zstd.shutdown().await?;
let compressed = zstd.into_inner();
let compressed_len = compressed.buf.len();
const INITDB_TAR_ZST_WARN_LIMIT: usize = 2_000_000;
if compressed_len > INITDB_TAR_ZST_WARN_LIMIT {
warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}.");
}
Ok(compressed.buf)
}
28 changes: 27 additions & 1 deletion pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//!

use anyhow::{bail, Context};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use futures::FutureExt;
Expand All @@ -24,6 +25,7 @@ use tokio::sync::watch;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext;
Expand Down Expand Up @@ -2876,7 +2878,7 @@ impl Tenant {
}

/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
/// - after initialization completes, tar up the temp dir and upload it to S3.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
Expand Down Expand Up @@ -2917,6 +2919,30 @@ impl Tenant {
let pgdata_path = &initdb_path;
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();

// Upload the created data dir to S3
if let Some(storage) = &self.remote_storage {
let pgdata_zstd = import_datadir::create_tar_zst(pgdata_path).await?;
let pgdata_zstd = Bytes::from(pgdata_zstd);
backoff::retry(
|| async {
self::remote_timeline_client::upload_initdb_dir(
storage,
&self.tenant_id,
&timeline_id,
pgdata_zstd.clone(),
)
.await
},
|_| false,
3,
u32::MAX,
"persist_initdb_tar_zst",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await?;
}

// Import the contents of the data directory at the initial checkpoint
// LSN, and any WAL after that.
// Initdb lsn will be equal to last_record_lsn which will be set after import.
Expand Down
10 changes: 10 additions & 0 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ use chrono::{NaiveDateTime, Utc};

use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
pub(crate) use upload::upload_initdb_dir;
use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
Expand Down Expand Up @@ -249,6 +250,8 @@ pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
// retries. Uploads and deletions are retried forever, though.
pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;

pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";

pub enum MaybeDeletedIndexPart {
IndexPart(IndexPart),
Deleted(IndexPart),
Expand Down Expand Up @@ -1537,6 +1540,13 @@ pub fn remote_layer_path(
RemotePath::from_string(&path).expect("Failed to construct path")
}

pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
RemotePath::from_string(&format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
))
.expect("Failed to construct path")
}

pub fn remote_index_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,
Expand Down
24 changes: 23 additions & 1 deletion pageserver/src/tenant/remote_timeline_client/upload.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Helper functions to upload files to remote storage with a RemoteStorage

use anyhow::{bail, Context};
use bytes::Bytes;
use camino::Utf8Path;
use fail::fail_point;
use std::io::ErrorKind;
Expand All @@ -9,7 +10,9 @@ use tokio::fs;
use super::Generation;
use crate::{
config::PageServerConf,
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
tenant::remote_timeline_client::{
index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
},
};
use remote_storage::GenericRemoteStorage;
use utils::id::{TenantId, TimelineId};
Expand Down Expand Up @@ -104,3 +107,22 @@ pub(super) async fn upload_timeline_layer<'a>(

Ok(())
}

/// Uploads the given `initdb` data to the remote storage.
pub(crate) async fn upload_initdb_dir(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
initdb_dir: Bytes,
) -> anyhow::Result<()> {
tracing::trace!("uploading initdb dir");

let size = initdb_dir.len();
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(initdb_dir));

let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
storage
.upload_storage_object(bytes, size, &remote_path)
.await
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
}
4 changes: 4 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,7 @@ def create_timeline(
self,
new_branch_name: str,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
cmd = [
"timeline",
Expand All @@ -1236,6 +1237,9 @@ def create_timeline(
self.env.pg_version,
]

if timeline_id is not None:
cmd.extend(["--timeline-id", str(timeline_id)])

res = self.raw_cli(cmd)
res.check_returncode()

Expand Down
Loading