Skip to content

Commit

Permalink
move import basebackup/wal to http mgmt api
Browse files Browse the repository at this point in the history
  • Loading branch information
problame committed Jul 5, 2024
1 parent c9fd8d7 commit 544dd1e
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 406 deletions.
58 changes: 18 additions & 40 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::time::Duration;

use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::models::{
self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo,
};
Expand Down Expand Up @@ -566,60 +565,39 @@ impl PageServerNode {
pg_wal: Option<(Lsn, PathBuf)>,
pg_version: u32,
) -> anyhow::Result<()> {
let (client, conn) = self.page_server_psql_client().await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {}", e);
}
});
let client = std::pin::pin!(client);

// Init base reader
let (start_lsn, base_tarfile_path) = base;
let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?;
let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile);
let base_tarfile =
mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(base_tarfile));

// Init wal reader if necessary
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
let wal_tarfile = tokio::fs::File::open(wal_tarfile_path).await?;
let wal_reader = tokio_util::io::ReaderStream::new(wal_tarfile);
let wal_reader =
mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(wal_tarfile));
(end_lsn, Some(wal_reader))
} else {
(start_lsn, None)
};

let copy_in = |reader, cmd| {
let client = &client;
async move {
let writer = client.copy_in(&cmd).await?;
let writer = std::pin::pin!(writer);
let mut writer = writer.sink_map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))
});
let mut reader = std::pin::pin!(reader);
writer.send_all(&mut reader).await?;
writer.into_inner().finish().await?;
anyhow::Ok(())
}
};

// Import base
copy_in(
base_tarfile,
format!(
"import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
),
)
.await?;
// Import wal if necessary
if let Some(wal_reader) = wal_reader {
copy_in(
wal_reader,
format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"),
self.http_client
.import_basebackup(
tenant_id,
timeline_id,
start_lsn,
end_lsn,
pg_version,
base_tarfile,
)
.await?;

// Import wal if necessary
if let Some(wal_reader) = wal_reader {
self.http_client
.import_wal(tenant_id, timeline_id, start_lsn, end_lsn, wal_reader)
.await?;
}

Ok(())
Expand Down
9 changes: 9 additions & 0 deletions libs/utils/src/http/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ pub fn parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
.transpose()
}

pub fn must_parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
parse_query_param(request, param_name)?.ok_or_else(|| {
ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters"))
})
}

pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError> {
match request.body_mut().data().await {
Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))),
Expand Down
2 changes: 1 addition & 1 deletion pageserver/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license.workspace = true
pageserver_api.workspace = true
thiserror.workspace = true
async-trait.workspace = true
reqwest.workspace = true
reqwest = { workspace = true, features = [ "stream" ] }
utils.workspace = true
serde.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
Expand Down
70 changes: 64 additions & 6 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use utils::{
lsn::Lsn,
};

pub use reqwest::Body as ReqwestBody;

pub mod util;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -173,19 +175,30 @@ impl Client {
self.request(Method::GET, uri, ()).await
}

async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
let req = if let Some(value) = &self.authorization_header {
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
};
req.json(&body).send().await.map_err(Error::ReceiveBody)
}
}

async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
}

async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
Expand Down Expand Up @@ -609,4 +622,49 @@ impl Client {
}),
}
}

pub async fn import_basebackup(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
end_lsn: Lsn,
pg_version: u32,
basebackup_tarball: ReqwestBody,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
self.mgmt_api_endpoint,
);
self.start_request(Method::PUT, uri)
.body(basebackup_tarball)
.send()
.await
.map_err(Error::ReceiveBody)?
.json()
.await
.map_err(Error::ReceiveBody)
}

pub async fn import_wal(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
wal_tarball: ReqwestBody,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
self.mgmt_api_endpoint,
);
self.start_request(Method::PUT, uri)
.body(wal_tarball)
.send()
.await
.map_err(Error::ReceiveBody)?
.json()
.await
.map_err(Error::ReceiveBody)
}
}
1 change: 0 additions & 1 deletion pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,6 @@ fn start_pageserver(
async move {
page_service::libpq_listener_main(
tenant_manager,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,
Expand Down
Loading

0 comments on commit 544dd1e

Please sign in to comment.