Skip to content

Commit

Permalink
control_plane/attachment_service: pass through timeline GETs
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Jan 19, 2024
1 parent dd4c83c commit de91592
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 7 deletions.
59 changes: 52 additions & 7 deletions control_plane/attachment_service/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use std::sync::Arc;
use utils::auth::SwappableJwtAuth;
use utils::http::endpoint::{auth_middleware, request_span};
Expand Down Expand Up @@ -134,6 +135,41 @@ async fn handle_tenant_timeline_create(mut req: Request<Body>) -> Result<Respons
)
}

async fn handle_tenant_timeline_passthrough(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;

let Some(path) = req.uri().path_and_query() else {
// This should never happen, our request router only calls us if there is a path
return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
};

// Find the node that holds shard zero
let state = get_state(&req);
let base_url = state.service.tenant_shard0_baseurl(tenant_id)?;

let client = mgmt_api::Client::new(base_url, state.service.get_config().jwt_token.as_deref());
let resp = client.proxy_get(format!("v1/{}", path)).await.map_err(|_e|
// FIXME: give APiError a proper Unavailable variant. We return 503 here because
// if we can't successfully send a request to the pageserver, we aren't available.
ApiError::ShuttingDown)?;

// We have a reqest::Response, would like a http::Response
let mut builder = hyper::Response::builder()
.status(resp.status())
.version(resp.version());
for (k, v) in resp.headers() {
builder = builder.header(k, v);
}

let response = builder
.body(Body::wrap_stream(resp.bytes_stream()))
.map_err(|e| ApiError::InternalServerError(e.into()))?;

Ok(response)
}

async fn handle_tenant_locate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let state = get_state(&req);
Expand Down Expand Up @@ -209,26 +245,35 @@ pub fn make_router(
router
.data(Arc::new(HttpState::new(service, auth)))
.get("/status", |r| request_span(r, handle_status))
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
// Testing endpoints
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
.post("/inspect", |r| request_span(r, handle_inspect))
.get("/tenant/:tenant_id/locate", |r| {
request_span(r, handle_tenant_locate)
})
// Upcalls for the pageserver
.post("/re-attach", |r| request_span(r, handle_re_attach))
.post("/validate", |r| request_span(r, handle_validate))
// Node operations
.post("/node", |r| request_span(r, handle_node_register))
.get("/node", |r| request_span(r, handle_node_list))
.put("/node/:node_id/config", |r| {
request_span(r, handle_node_configure)
})
// Tenant operations
.post("/tenant", |r| request_span(r, handle_tenant_create))
.delete("/tenant/:tenant_id", |r| {
request_span(r, handle_tenant_delete)
})
.put("/tenant/:tenant_shard_id/migrate", |r| {
request_span(r, handle_tenant_shard_migrate)
})
// Timeline operations
.post("/tenant/:tenant_id/timeline", |r| {
request_span(r, handle_tenant_timeline_create)
})
.get("/tenant/:tenant_id/locate", |r| {
request_span(r, handle_tenant_locate)
})
.put("/tenant/:tenant_shard_id/migrate", |r| {
request_span(r, handle_tenant_shard_migrate)
// Timeline GET passthrough to shard zero
.get("/tenant/:tenant_id/timeline/*", |r| {
request_span(r, handle_tenant_timeline_passthrough)
})
}
36 changes: 36 additions & 0 deletions control_plane/attachment_service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ impl From<ReconcileWaitError> for ApiError {
}

impl Service {
pub fn get_config(&self) -> &Config {
&self.config
}

pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();

Expand Down Expand Up @@ -864,6 +868,38 @@ impl Service {
Ok(timeline_info.expect("targets cannot be empty"))
}

/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
/// function looks it up and returns the url. If the tenant isn't found, returns Err(ApiError::NotFound)
pub(crate) fn tenant_shard0_baseurl(&self, tenant_id: TenantId) -> Result<String, ApiError> {
let locked = self.inner.read().unwrap();
let Some((_shard_id, shard)) = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
));
};

// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
// point to somewhere we haven't attached yet.
let Some(node_id) = shard.intent.attached else {
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
));
};

let Some(node) = locked.nodes.get(&node_id) else {
// This should never happen
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard refers to nonexistent node"
)));
};

Ok(node.base_url())
}

pub(crate) fn tenant_locate(
&self,
tenant_id: TenantId,
Expand Down
12 changes: 12 additions & 0 deletions pageserver/client/src/mgmt_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}

pub async fn proxy_get(&self, path: String) -> Result<reqwest::Response> {
let uri = format!("{}/{}", self.mgmt_api_endpoint, path);

let req = self.client.request(Method::GET, uri);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
};
req.send().await.map_err(Error::ReceiveBody)
}

pub async fn tenant_details(
&self,
tenant_shard_id: TenantShardId,
Expand Down

0 comments on commit de91592

Please sign in to comment.