Skip to content

Commit

Permalink
pageserver: move page_service's import basebackup / import wal
Browse files Browse the repository at this point in the history
…to mgmt API (#8292)

I want to fix bugs in `page_service`
([issue](#7427)) and the
`import basebackup` / `import wal` stand in the way / make the
refactoring more complicated.

We don't use these methods anyway in practice, but, there have been some
objections to removing the functionality completely.

So, this PR preserves the existing functionality but moves it into the
HTTP management API.

Note that I don't try to fix existing bugs in the code, specifically not
fixing
* it only ever worked correctly for unsharded tenants
* it doesn't clean up on error

All errors are mapped to `ApiError::InternalServerError`.
  • Loading branch information
problame committed Jul 9, 2024
1 parent 9bb16c8 commit 1a49f1c
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 408 deletions.
58 changes: 18 additions & 40 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::time::Duration;

use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::models::{
self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo,
};
Expand Down Expand Up @@ -566,60 +565,39 @@ impl PageServerNode {
pg_wal: Option<(Lsn, PathBuf)>,
pg_version: u32,
) -> anyhow::Result<()> {
let (client, conn) = self.page_server_psql_client().await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {}", e);
}
});
let client = std::pin::pin!(client);

// Init base reader
let (start_lsn, base_tarfile_path) = base;
let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?;
let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile);
let base_tarfile =
mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(base_tarfile));

// Init wal reader if necessary
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
let wal_tarfile = tokio::fs::File::open(wal_tarfile_path).await?;
let wal_reader = tokio_util::io::ReaderStream::new(wal_tarfile);
let wal_reader =
mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(wal_tarfile));
(end_lsn, Some(wal_reader))
} else {
(start_lsn, None)
};

let copy_in = |reader, cmd| {
let client = &client;
async move {
let writer = client.copy_in(&cmd).await?;
let writer = std::pin::pin!(writer);
let mut writer = writer.sink_map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))
});
let mut reader = std::pin::pin!(reader);
writer.send_all(&mut reader).await?;
writer.into_inner().finish().await?;
anyhow::Ok(())
}
};

// Import base
copy_in(
base_tarfile,
format!(
"import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
),
)
.await?;
// Import wal if necessary
if let Some(wal_reader) = wal_reader {
copy_in(
wal_reader,
format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"),
self.http_client
.import_basebackup(
tenant_id,
timeline_id,
start_lsn,
end_lsn,
pg_version,
base_tarfile,
)
.await?;

// Import wal if necessary
if let Some(wal_reader) = wal_reader {
self.http_client
.import_wal(tenant_id, timeline_id, start_lsn, end_lsn, wal_reader)
.await?;
}

Ok(())
Expand Down
9 changes: 9 additions & 0 deletions libs/utils/src/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ pub fn parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
.transpose()
}

pub fn must_parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
parse_query_param(request, param_name)?.ok_or_else(|| {
ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters"))
})
}

pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError> {
match request.body_mut().data().await {
Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))),
Expand Down
2 changes: 1 addition & 1 deletion pageserver/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license.workspace = true
pageserver_api.workspace = true
thiserror.workspace = true
async-trait.workspace = true
reqwest.workspace = true
reqwest = { workspace = true, features = [ "stream" ] }
utils.workspace = true
serde.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
Expand Down
77 changes: 71 additions & 6 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use utils::{
lsn::Lsn,
};

pub use reqwest::Body as ReqwestBody;

pub mod util;

#[derive(Debug, Clone)]
Expand All @@ -20,6 +22,9 @@ pub struct Client {

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("send request: {0}")]
SendRequest(reqwest::Error),

#[error("receive body: {0}")]
ReceiveBody(reqwest::Error),

Expand Down Expand Up @@ -173,19 +178,30 @@ impl Client {
self.request(Method::GET, uri, ()).await
}

async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
let req = if let Some(value) = &self.authorization_header {
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
};
req.json(&body).send().await.map_err(Error::ReceiveBody)
}
}

async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
}

async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
Expand Down Expand Up @@ -609,4 +625,53 @@ impl Client {
}),
}
}

pub async fn import_basebackup(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
end_lsn: Lsn,
pg_version: u32,
basebackup_tarball: ReqwestBody,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
self.mgmt_api_endpoint,
);
self.start_request(Method::PUT, uri)
.body(basebackup_tarball)
.send()
.await
.map_err(Error::SendRequest)?
.error_from_body()
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}

pub async fn import_wal(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
wal_tarball: ReqwestBody,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
self.mgmt_api_endpoint,
);
self.start_request(Method::PUT, uri)
.body(wal_tarball)
.send()
.await
.map_err(Error::SendRequest)?
.error_from_body()
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}
1 change: 0 additions & 1 deletion pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,6 @@ fn start_pageserver(
async move {
page_service::libpq_listener_main(
tenant_manager,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,
Expand Down
Loading

1 comment on commit 1a49f1c

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3129 tests run: 3001 passed, 1 failed, 127 skipped (full report)


Failures on Postgres 14

  • test_sharding_autosplit[github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_sharding_autosplit[release-pg14-github-actions-selfhosted]"
Flaky tests (3)

Postgres 15

  • test_isolation[None]: debug
  • test_tenant_creation_fails: debug

Postgres 14

  • test_isolation[None]: debug

Code coverage* (full report)

  • functions: 32.6% (6941 of 21294 functions)
  • lines: 50.0% (54551 of 109142 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
1a49f1c at 2024-07-09T22:39:15.006Z :recycle:

Please sign in to comment.