From 544dd1ecc8d48532722d85f7ad681c16f4ebf3f6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Jul 2024 20:34:51 +0200 Subject: [PATCH 1/3] move import basebackup/wal to http mgmt api --- control_plane/src/pageserver.rs | 58 ++--- libs/utils/src/http/request.rs | 9 + pageserver/client/Cargo.toml | 2 +- pageserver/client/src/mgmt_api.rs | 70 +++++- pageserver/src/bin/pageserver.rs | 1 - pageserver/src/http/routes.rs | 194 ++++++++++++++++ pageserver/src/metrics.rs | 2 - pageserver/src/page_service.rs | 357 +----------------------------- 8 files changed, 287 insertions(+), 406 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 983f78577ce4..f0403b179622 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -15,7 +15,6 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; -use futures::SinkExt; use pageserver_api::models::{ self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo, }; @@ -566,60 +565,39 @@ impl PageServerNode { pg_wal: Option<(Lsn, PathBuf)>, pg_version: u32, ) -> anyhow::Result<()> { - let (client, conn) = self.page_server_psql_client().await?; - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - tokio::spawn(async move { - if let Err(e) = conn.await { - eprintln!("connection error: {}", e); - } - }); - let client = std::pin::pin!(client); - // Init base reader let (start_lsn, base_tarfile_path) = base; let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?; - let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile); + let base_tarfile = + mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(base_tarfile)); // Init wal reader if necessary let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal { let wal_tarfile = tokio::fs::File::open(wal_tarfile_path).await?; - let wal_reader = tokio_util::io::ReaderStream::new(wal_tarfile); + let wal_reader = + mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(wal_tarfile)); (end_lsn, Some(wal_reader)) } else { (start_lsn, None) }; - let copy_in = |reader, cmd| { - let client = &client; - async move { - let writer = client.copy_in(&cmd).await?; - let writer = std::pin::pin!(writer); - let mut writer = writer.sink_map_err(|e| { - std::io::Error::new(std::io::ErrorKind::Other, format!("{e}")) - }); - let mut reader = std::pin::pin!(reader); - writer.send_all(&mut reader).await?; - writer.into_inner().finish().await?; - anyhow::Ok(()) - } - }; - // Import base - copy_in( - base_tarfile, - format!( - "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}" - ), - ) - .await?; - // Import wal if necessary - if let Some(wal_reader) = wal_reader { - copy_in( - wal_reader, - format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"), + self.http_client + .import_basebackup( + tenant_id, + timeline_id, + start_lsn, + end_lsn, + pg_version, + base_tarfile, ) .await?; + + // Import wal if necessary + if let Some(wal_reader) = wal_reader { + self.http_client + .import_wal(tenant_id, timeline_id, start_lsn, end_lsn, wal_reader) + .await?; } Ok(()) diff --git a/libs/utils/src/http/request.rs b/libs/utils/src/http/request.rs index 766bbfc9dfae..8b8ed5a67f39 100644 --- a/libs/utils/src/http/request.rs +++ b/libs/utils/src/http/request.rs @@ -74,6 +74,15 @@ pub fn parse_query_param>( .transpose() } +pub fn must_parse_query_param>( + request: &Request, + param_name: &str, +) -> Result { + parse_query_param(request, param_name)?.ok_or_else(|| { + ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters")) + }) +} + pub async fn ensure_no_body(request: &mut Request) -> Result<(), ApiError> { match request.body_mut().data().await { Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))), diff --git a/pageserver/client/Cargo.toml b/pageserver/client/Cargo.toml index 0ed27602cd3c..a938367334fa 100644 --- a/pageserver/client/Cargo.toml +++ b/pageserver/client/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true pageserver_api.workspace = true thiserror.workspace = true async-trait.workspace = true -reqwest.workspace = true +reqwest = { workspace = true, features = [ "stream" ] } utils.workspace = true serde.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 48b27775cb91..e635fa4d62e1 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -9,6 +9,8 @@ use utils::{ lsn::Lsn, }; +pub use reqwest::Body as ReqwestBody; + pub mod util; #[derive(Debug, Clone)] @@ -173,19 +175,30 @@ impl Client { self.request(Method::GET, uri, ()).await } - async fn request_noerror( + fn start_request( &self, method: Method, uri: U, - body: B, - ) -> Result { + ) -> reqwest::RequestBuilder { let req = self.client.request(method, uri); - let req = if let Some(value) = &self.authorization_header { + if let Some(value) = &self.authorization_header { req.header(reqwest::header::AUTHORIZATION, value) } else { req - }; - req.json(&body).send().await.map_err(Error::ReceiveBody) + } + } + + async fn request_noerror( + &self, + method: Method, + uri: U, + body: B, + ) -> Result { + self.start_request(method, uri) + .json(&body) + .send() + .await + .map_err(Error::ReceiveBody) } async fn request( @@ -609,4 +622,49 @@ impl Client { }), } } + + pub async fn import_basebackup( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + base_lsn: Lsn, + end_lsn: Lsn, + pg_version: u32, + basebackup_tarball: ReqwestBody, + ) -> Result<()> { + let uri = format!( + "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}", + self.mgmt_api_endpoint, + ); + self.start_request(Method::PUT, uri) + .body(basebackup_tarball) + .send() + .await + .map_err(Error::ReceiveBody)? + .json() + .await + .map_err(Error::ReceiveBody) + } + + pub async fn import_wal( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + start_lsn: Lsn, + end_lsn: Lsn, + wal_tarball: ReqwestBody, + ) -> Result<()> { + let uri = format!( + "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}", + self.mgmt_api_endpoint, + ); + self.start_request(Method::PUT, uri) + .body(wal_tarball) + .send() + .await + .map_err(Error::ReceiveBody)? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 39d4e46c9663..4426fd86a4c9 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -657,7 +657,6 @@ fn start_pageserver( async move { page_service::libpq_listener_main( tenant_manager, - broker_client, pg_auth, pageserver_listener, conf.pg_auth_type, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 893302b7d6d9..5b9fa590f7d1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -10,6 +10,7 @@ use std::time::Duration; use anyhow::{anyhow, Context, Result}; use enumset::EnumSet; +use futures::StreamExt; use futures::TryFutureExt; use humantime::format_rfc3339; use hyper::header; @@ -44,12 +45,14 @@ use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; use remote_storage::TimeTravelError; use tenant_size_model::{svg::SvgBranchKind, SizeResult, StorageModel}; +use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; use tracing::*; use utils::auth::JwtAuth; use utils::failpoint_support::failpoints_handler; use utils::http::endpoint::prometheus_metrics_handler; use utils::http::endpoint::request_span; +use utils::http::request::must_parse_query_param; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; use crate::context::{DownloadBehavior, RequestContext}; @@ -2404,6 +2407,189 @@ async fn post_top_tenants( ) } +async fn put_tenant_timeline_import_basebackup( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let base_lsn: Lsn = must_parse_query_param(&request, "base_lsn")?; + let end_lsn: Lsn = must_parse_query_param(&request, "end_lsn")?; + let pg_version: u32 = must_parse_query_param(&request, "pg_version")?; + + check_permission(&request, Some(tenant_id))?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + + let span = info_span!("import_basebackup", tenant_id=%tenant_id, timeline_id=%timeline_id, base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version); + async move { + let state = get_state(&request); + let tenant = state + .tenant_manager + .get_attached_tenant_shard(TenantShardId::unsharded(tenant_id))?; + + let broker_client = state.broker_client.clone(); + + let mut body = StreamReader::new(request.into_body().map(|res| { + res.map_err(|error| { + std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error)) + }) + })); + + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + + let timeline = tenant + .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) + .map_err(ApiError::InternalServerError) + .await?; + + // TODO mark timeline as not ready until it reaches end_lsn. + // We might have some wal to import as well, and we should prevent compute + // from connecting before that and writing conflicting wal. + // + // This is not relevant for pageserver->pageserver migrations, since there's + // no wal to import. But should be fixed if we want to import from postgres. + + // TODO leave clean state on error. For now you can use detach to clean + // up broken state from a failed import. + + // Import basebackup provided via CopyData + info!("importing basebackup"); + + timeline + .import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &ctx) + .await + .map_err(ApiError::InternalServerError)?; + + // Read the end of the tar archive. + read_tar_eof(body) + .await + .map_err(ApiError::InternalServerError)?; + + // TODO check checksum + // Meanwhile you can verify client-side by taking fullbackup + // and checking that it matches in size with what was imported. + // It wouldn't work if base came from vanilla postgres though, + // since we discard some log files. + + info!("done"); + json_response(StatusCode::OK, ()) + } + .instrument(span) + .await +} + +async fn put_tenant_timeline_import_wal( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let start_lsn: Lsn = must_parse_query_param(&request, "start_lsn")?; + let end_lsn: Lsn = must_parse_query_param(&request, "end_lsn")?; + + check_permission(&request, Some(tenant_id))?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + + let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn); + async move { + let state = get_state(&request); + + let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?; + + let mut body = StreamReader::new(request.into_body().map(|res| { + res.map_err(|error| { + std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error)) + }) + })); + + let last_record_lsn = timeline.get_last_record_lsn(); + if last_record_lsn != start_lsn { + return Err(ApiError::InternalServerError(anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))); + } + + // TODO leave clean state on error. For now you can use detach to clean + // up broken state from a failed import. + + // Import wal provided via CopyData + info!("importing wal"); + crate::import_datadir::import_wal_from_tar(&timeline, &mut body, start_lsn, end_lsn, &ctx).await.map_err(ApiError::InternalServerError)?; + info!("wal import complete"); + + // Read the end of the tar archive. + read_tar_eof(body).await.map_err(ApiError::InternalServerError)?; + + // TODO Does it make sense to overshoot? + if timeline.get_last_record_lsn() < end_lsn { + return Err(ApiError::InternalServerError(anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))); + } + + // Flush data to disk, then upload to s3. No need for a forced checkpoint. + // We only want to persist the data, and it doesn't matter if it's in the + // shape of deltas or images. + info!("flushing layers"); + timeline.freeze_and_flush().await.map_err(|e| match e { + tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown, + other => ApiError::InternalServerError(anyhow::anyhow!(other)), + })?; + + info!("done"); + + json_response(StatusCode::OK, ()) + }.instrument(span).await +} + +/// Read the end of a tar archive. +/// +/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each. +/// `tokio_tar` already read the first such block. Read the second all-zeros block, +/// and check that there is no more data after the EOF marker. +/// +/// 'tar' command can also write extra blocks of zeros, up to a record +/// size, controlled by the --record-size argument. Ignore them too. +async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> { + use tokio::io::AsyncReadExt; + let mut buf = [0u8; 512]; + + // Read the all-zeros block, and verify it + let mut total_bytes = 0; + while total_bytes < 512 { + let nbytes = reader.read(&mut buf[total_bytes..]).await?; + total_bytes += nbytes; + if nbytes == 0 { + break; + } + } + if total_bytes < 512 { + anyhow::bail!("incomplete or invalid tar EOF marker"); + } + if !buf.iter().all(|&x| x == 0) { + anyhow::bail!("invalid tar EOF marker"); + } + + // Drain any extra zero-blocks after the EOF marker + let mut trailing_bytes = 0; + let mut seen_nonzero_bytes = false; + loop { + let nbytes = reader.read(&mut buf).await?; + trailing_bytes += nbytes; + if !buf.iter().all(|&x| x == 0) { + seen_nonzero_bytes = true; + } + if nbytes == 0 { + break; + } + } + if seen_nonzero_bytes { + anyhow::bail!("unexpected non-zero bytes after the tar archive"); + } + if trailing_bytes % 512 != 0 { + anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive"); + } + Ok(()) +} + /// Common functionality of all the HTTP API handlers. /// /// - Adds a tracing span to each request (by `request_span`) @@ -2698,5 +2884,13 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/perf_info", |r| testing_api_handler("perf_info", r, perf_info), ) + .put( + "/v1/tenant/:tenant_id/timeline/:timeline_id/import_basebackup", + |r| api_handler(r, put_tenant_timeline_import_basebackup), + ) + .put( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/import_wal", + |r| api_handler(r, put_tenant_timeline_import_wal), + ) .any(handler_404)) } diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9e9fe7fbb834..b18dfef4aa85 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1471,8 +1471,6 @@ pub(crate) enum ComputeCommandKind { PageStream, Basebackup, Fullbackup, - ImportBasebackup, - ImportWal, LeaseLsn, Show, } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a440ad63785b..f9ae1cad2f92 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -4,9 +4,7 @@ use anyhow::Context; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; -use bytes::Bytes; use futures::stream::FuturesUnordered; -use futures::Stream; use futures::StreamExt; use pageserver_api::key::Key; use pageserver_api::models::TenantState; @@ -28,7 +26,6 @@ use std::borrow::Cow; use std::collections::HashMap; use std::io; use std::net::TcpListener; -use std::pin::pin; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -37,7 +34,6 @@ use std::time::Instant; use std::time::SystemTime; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::ConnectionId; @@ -53,7 +49,6 @@ use crate::auth::check_permission; use crate::basebackup; use crate::basebackup::BasebackupError; use crate::context::{DownloadBehavior, RequestContext}; -use crate::import_datadir::import_wal_from_tar; use crate::metrics; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT}; use crate::pgdatadir_mapping::Version; @@ -66,7 +61,6 @@ use crate::tenant::mgr::GetTenantError; use crate::tenant::mgr::ShardResolveResult; use crate::tenant::mgr::ShardSelector; use crate::tenant::mgr::TenantManager; -use crate::tenant::timeline::FlushLayerError; use crate::tenant::timeline::WaitLsnError; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; @@ -82,56 +76,6 @@ use postgres_ffi::BLCKSZ; // is not yet in state [`TenantState::Active`]. const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); -/// Read the end of a tar archive. -/// -/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each. -/// `tokio_tar` already read the first such block. Read the second all-zeros block, -/// and check that there is no more data after the EOF marker. -/// -/// 'tar' command can also write extra blocks of zeros, up to a record -/// size, controlled by the --record-size argument. Ignore them too. -async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> { - use tokio::io::AsyncReadExt; - let mut buf = [0u8; 512]; - - // Read the all-zeros block, and verify it - let mut total_bytes = 0; - while total_bytes < 512 { - let nbytes = reader.read(&mut buf[total_bytes..]).await?; - total_bytes += nbytes; - if nbytes == 0 { - break; - } - } - if total_bytes < 512 { - anyhow::bail!("incomplete or invalid tar EOF marker"); - } - if !buf.iter().all(|&x| x == 0) { - anyhow::bail!("invalid tar EOF marker"); - } - - // Drain any extra zero-blocks after the EOF marker - let mut trailing_bytes = 0; - let mut seen_nonzero_bytes = false; - loop { - let nbytes = reader.read(&mut buf).await?; - trailing_bytes += nbytes; - if !buf.iter().all(|&x| x == 0) { - seen_nonzero_bytes = true; - } - if nbytes == 0 { - break; - } - } - if seen_nonzero_bytes { - anyhow::bail!("unexpected non-zero bytes after the tar archive"); - } - if trailing_bytes % 512 != 0 { - anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive"); - } - Ok(()) -} - /////////////////////////////////////////////////////////////////////////////// /// @@ -141,7 +85,6 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<() /// pub async fn libpq_listener_main( tenant_manager: Arc, - broker_client: storage_broker::BrokerClientChannel, auth: Option>, listener: TcpListener, auth_type: AuthType, @@ -186,7 +129,6 @@ pub async fn libpq_listener_main( false, page_service_conn_main( tenant_manager.clone(), - broker_client.clone(), local_auth, socket, auth_type, @@ -209,7 +151,6 @@ pub async fn libpq_listener_main( #[instrument(skip_all, fields(peer_addr))] async fn page_service_conn_main( tenant_manager: Arc, - broker_client: storage_broker::BrokerClientChannel, auth: Option>, socket: tokio::net::TcpStream, auth_type: AuthType, @@ -267,8 +208,7 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = - PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx); + let mut conn_handler = PageServerHandler::new(tenant_manager, auth, connection_ctx); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; match pgbackend @@ -299,7 +239,6 @@ struct HandlerTimeline { } struct PageServerHandler { - broker_client: storage_broker::BrokerClientChannel, auth: Option>, claims: Option, @@ -391,13 +330,11 @@ impl From for QueryError { impl PageServerHandler { pub fn new( tenant_manager: Arc, - broker_client: storage_broker::BrokerClientChannel, auth: Option>, connection_ctx: RequestContext, ) -> Self { PageServerHandler { tenant_manager, - broker_client, auth, claims: None, connection_ctx, @@ -480,73 +417,6 @@ impl PageServerHandler { ) } - fn copyin_stream<'a, IO>( - &'a self, - pgb: &'a mut PostgresBackend, - cancel: &'a CancellationToken, - ) -> impl Stream> + 'a - where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, - { - async_stream::try_stream! { - loop { - let msg = tokio::select! { - biased; - - _ = cancel.cancelled() => { - // We were requested to shut down. - let msg = "pageserver is shutting down"; - let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None)); - Err(QueryError::Shutdown) - } - - msg = pgb.read_message() => { msg.map_err(QueryError::from)} - }; - - match msg { - Ok(Some(message)) => { - let copy_data_bytes = match message { - FeMessage::CopyData(bytes) => bytes, - FeMessage::CopyDone => { break }, - FeMessage::Sync => continue, - FeMessage::Terminate => { - let msg = "client terminated connection with Terminate message during COPY"; - let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg))); - // error can't happen here, ErrorResponse serialization should be always ok - pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?; - Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?; - break; - } - m => { - let msg = format!("unexpected message {m:?}"); - // error can't happen here, ErrorResponse serialization should be always ok - pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?; - Err(io::Error::new(io::ErrorKind::Other, msg))?; - break; - } - }; - - yield copy_data_bytes; - } - Ok(None) => { - let msg = "client closed connection during COPY"; - let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg))); - // error can't happen here, ErrorResponse serialization should be always ok - pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?; - self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?; - } - Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => { - Err(io_error)?; - } - Err(other) => { - Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?; - } - }; - } - } - } - #[instrument(skip_all)] async fn handle_pagerequests( &mut self, @@ -718,128 +588,6 @@ impl PageServerHandler { Ok(()) } - #[allow(clippy::too_many_arguments)] - #[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))] - async fn handle_import_basebackup( - &self, - pgb: &mut PostgresBackend, - tenant_id: TenantId, - timeline_id: TimelineId, - base_lsn: Lsn, - _end_lsn: Lsn, - pg_version: u32, - ctx: RequestContext, - ) -> Result<(), QueryError> - where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, - { - debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - - // Create empty timeline - info!("creating new timeline"); - let tenant = self - .get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT) - .await?; - let timeline = tenant - .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) - .await?; - - // TODO mark timeline as not ready until it reaches end_lsn. - // We might have some wal to import as well, and we should prevent compute - // from connecting before that and writing conflicting wal. - // - // This is not relevant for pageserver->pageserver migrations, since there's - // no wal to import. But should be fixed if we want to import from postgres. - - // TODO leave clean state on error. For now you can use detach to clean - // up broken state from a failed import. - - // Import basebackup provided via CopyData - info!("importing basebackup"); - pgb.write_message_noflush(&BeMessage::CopyInResponse)?; - self.flush_cancellable(pgb, &tenant.cancel).await?; - - let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel))); - timeline - .import_basebackup_from_tar( - tenant.clone(), - &mut copyin_reader, - base_lsn, - self.broker_client.clone(), - &ctx, - ) - .await?; - - // Read the end of the tar archive. - read_tar_eof(copyin_reader).await?; - - // TODO check checksum - // Meanwhile you can verify client-side by taking fullbackup - // and checking that it matches in size with what was imported. - // It wouldn't work if base came from vanilla postgres though, - // since we discard some log files. - - info!("done"); - Ok(()) - } - - #[instrument(skip_all, fields(shard_id, %start_lsn, %end_lsn))] - async fn handle_import_wal( - &self, - pgb: &mut PostgresBackend, - tenant_id: TenantId, - timeline_id: TimelineId, - start_lsn: Lsn, - end_lsn: Lsn, - ctx: RequestContext, - ) -> Result<(), QueryError> - where - IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, - { - let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) - .await?; - let last_record_lsn = timeline.get_last_record_lsn(); - if last_record_lsn != start_lsn { - return Err(QueryError::Other( - anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}")) - ); - } - - // TODO leave clean state on error. For now you can use detach to clean - // up broken state from a failed import. - - // Import wal provided via CopyData - info!("importing wal"); - pgb.write_message_noflush(&BeMessage::CopyInResponse)?; - self.flush_cancellable(pgb, &timeline.cancel).await?; - let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel))); - import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?; - info!("wal import complete"); - - // Read the end of the tar archive. - read_tar_eof(copyin_reader).await?; - - // TODO Does it make sense to overshoot? - if timeline.get_last_record_lsn() < end_lsn { - return Err(QueryError::Other( - anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}")) - ); - } - - // Flush data to disk, then upload to s3. No need for a forced checkpoint. - // We only want to persist the data, and it doesn't matter if it's in the - // shape of deltas or images. - info!("flushing layers"); - timeline.freeze_and_flush().await.map_err(|e| match e { - FlushLayerError::Cancelled => QueryError::Shutdown, - other => QueryError::Other(other.into()), - })?; - - info!("done"); - Ok(()) - } - /// Helper function to handle the LSN from client request. /// /// Each GetPage (and Exists and Nblocks) request includes information about @@ -1710,109 +1458,6 @@ where ) .await?; pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - } else if query_string.starts_with("import basebackup ") { - // Import the `base` section (everything but the wal) of a basebackup. - // Assumes the tenant already exists on this pageserver. - // - // Files are scheduled to be persisted to remote storage, and the - // caller should poll the http api to check when that is done. - // - // Example import command: - // 1. Get start/end LSN from backup_manifest file - // 2. Run: - // cat my_backup/base.tar | psql -h $PAGESERVER \ - // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION" - let params = &parts[2..]; - if params.len() != 5 { - return Err(QueryError::Other(anyhow::anyhow!( - "invalid param number for import basebackup command" - ))); - } - let tenant_id = TenantId::from_str(params[0]) - .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?; - let timeline_id = TimelineId::from_str(params[1]) - .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; - let base_lsn = Lsn::from_str(params[2]) - .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?; - let end_lsn = Lsn::from_str(params[3]) - .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?; - let pg_version = u32::from_str(params[4]) - .with_context(|| format!("Failed to parse pg_version from {}", params[4]))?; - - tracing::Span::current() - .record("tenant_id", field::display(tenant_id)) - .record("timeline_id", field::display(timeline_id)); - - self.check_permission(Some(tenant_id))?; - - COMPUTE_COMMANDS_COUNTERS - .for_command(ComputeCommandKind::ImportBasebackup) - .inc(); - - match self - .handle_import_basebackup( - pgb, - tenant_id, - timeline_id, - base_lsn, - end_lsn, - pg_version, - ctx, - ) - .await - { - Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, - Err(e) => { - error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}"); - pgb.write_message_noflush(&BeMessage::ErrorResponse( - &e.to_string(), - Some(e.pg_error_code()), - ))? - } - }; - } else if query_string.starts_with("import wal ") { - // Import the `pg_wal` section of a basebackup. - // - // Files are scheduled to be persisted to remote storage, and the - // caller should poll the http api to check when that is done. - let params = &parts[2..]; - if params.len() != 4 { - return Err(QueryError::Other(anyhow::anyhow!( - "invalid param number for import wal command" - ))); - } - let tenant_id = TenantId::from_str(params[0]) - .with_context(|| format!("Failed to parse tenant id from {}", params[0]))?; - let timeline_id = TimelineId::from_str(params[1]) - .with_context(|| format!("Failed to parse timeline id from {}", params[1]))?; - let start_lsn = Lsn::from_str(params[2]) - .with_context(|| format!("Failed to parse Lsn from {}", params[2]))?; - let end_lsn = Lsn::from_str(params[3]) - .with_context(|| format!("Failed to parse Lsn from {}", params[3]))?; - - tracing::Span::current() - .record("tenant_id", field::display(tenant_id)) - .record("timeline_id", field::display(timeline_id)); - - self.check_permission(Some(tenant_id))?; - - COMPUTE_COMMANDS_COUNTERS - .for_command(ComputeCommandKind::ImportWal) - .inc(); - - match self - .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx) - .await - { - Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, - Err(e) => { - error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}"); - pgb.write_message_noflush(&BeMessage::ErrorResponse( - &e.to_string(), - Some(e.pg_error_code()), - ))? - } - }; } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect From 0e42c79feb9dfea1860dd41c1ab5526383f1adfc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Jul 2024 21:55:39 +0200 Subject: [PATCH 2/3] fixup: move import basebackup/wal to http mgmt api --- pageserver/client/src/mgmt_api.rs | 11 +++++++++-- pageserver/src/http/routes.rs | 2 +- storage_controller/src/node.rs | 2 +- storage_controller/src/service.rs | 4 ++++ 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index e635fa4d62e1..e3ddb446fa2c 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -22,6 +22,9 @@ pub struct Client { #[derive(thiserror::Error, Debug)] pub enum Error { + #[error("send request: {0}")] + SendRequest(reqwest::Error), + #[error("receive body: {0}")] ReceiveBody(reqwest::Error), @@ -640,7 +643,9 @@ impl Client { .body(basebackup_tarball) .send() .await - .map_err(Error::ReceiveBody)? + .map_err(Error::SendRequest)? + .error_from_body() + .await? .json() .await .map_err(Error::ReceiveBody) @@ -662,7 +667,9 @@ impl Client { .body(wal_tarball) .send() .await - .map_err(Error::ReceiveBody)? + .map_err(Error::SendRequest)? + .error_from_body() + .await? .json() .await .map_err(Error::ReceiveBody) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 5b9fa590f7d1..6f8f3e6389d5 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2889,7 +2889,7 @@ pub fn make_router( |r| api_handler(r, put_tenant_timeline_import_basebackup), ) .put( - "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/import_wal", + "/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal", |r| api_handler(r, put_tenant_timeline_import_wal), ) .any(handler_404)) diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index 4d17dff9feaf..fff44aaf2670 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -226,7 +226,7 @@ impl Node { fn is_fatal(e: &mgmt_api::Error) -> bool { use mgmt_api::Error::*; match e { - ReceiveBody(_) | ReceiveErrorBody(_) => false, + SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false, ApiError(StatusCode::SERVICE_UNAVAILABLE, _) | ApiError(StatusCode::GATEWAY_TIMEOUT, _) | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 3965d7453d49..a8f459509873 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -151,6 +151,10 @@ struct ServiceState { /// controller API. fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError { match e { + mgmt_api::Error::SendRequest(e) => { + // Presume errors sending requests are connectivity/availability issues + ApiError::ResourceUnavailable(format!("{node} error sending request: {e}").into()) + } mgmt_api::Error::ReceiveErrorBody(str) => { // Presume errors receiving body are connectivity/availability issues ApiError::ResourceUnavailable( From 21ebb69541fa8b5587829bc8266d0146a1c631ca Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Jul 2024 22:01:19 +0200 Subject: [PATCH 3/3] allowed_errors --- test_runner/regress/test_import.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index d97e882a7093..4dae9176b83f 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -88,7 +88,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build env.pageserver.allowed_errors.extend( [ - ".*error importing base backup .*", + ".*Failed to import basebackup.*", + ".*unexpected non-zero bytes after the tar archive.*", ".*Timeline got dropped without initializing, cleaning its files.*", ".*InternalServerError.*timeline not found.*", ".*InternalServerError.*Tenant .* not found.*",