Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage controller: include stripe size in compute notifications #6974

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions control_plane/attachment_service/src/compute_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, time::Duration};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use hyper::{Method, StatusCode};
use pageserver_api::shard::{ShardIndex, ShardNumber, TenantShardId};
use pageserver_api::shard::{ShardIndex, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
Expand All @@ -21,6 +21,9 @@ pub(crate) const API_CONCURRENCY: usize = 32;

pub(super) struct ComputeHookTenant {
shards: Vec<(ShardIndex, NodeId)>,
// A tenant is not obliged to advertise a ShardStripeSize until it has multiple shards. Once
// at least one shard with ShardCount >1 is present, stripe_size must be set.
stripe_size: Option<ShardStripeSize>,
}
jcsp marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Serialize, Deserialize, Debug)]
Expand All @@ -33,6 +36,7 @@ struct ComputeHookNotifyRequestShard {
#[derive(Serialize, Deserialize, Debug)]
struct ComputeHookNotifyRequest {
tenant_id: TenantId,
stripe_size: Option<ShardStripeSize>,
jcsp marked this conversation as resolved.
Show resolved Hide resolved
shards: Vec<ComputeHookNotifyRequestShard>,
}

Expand Down Expand Up @@ -89,6 +93,7 @@ impl ComputeHookTenant {
node_id: *node_id,
})
.collect(),
stripe_size: self.stripe_size,
});
} else {
tracing::info!(
Expand Down Expand Up @@ -139,7 +144,11 @@ impl ComputeHook {
};
let cplane =
ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
let ComputeHookNotifyRequest { tenant_id, shards } = reconfigure_request;
let ComputeHookNotifyRequest {
tenant_id,
shards,
stripe_size,
} = reconfigure_request;

let compute_pageservers = shards
.into_iter()
Expand All @@ -156,7 +165,9 @@ impl ComputeHook {
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
endpoint.reconfigure(compute_pageservers.clone()).await?;
endpoint
.reconfigure(compute_pageservers.clone(), stripe_size)
.await?;
}
}

Expand Down Expand Up @@ -271,18 +282,27 @@ impl ComputeHook {
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let mut locked = self.state.lock().await;
let entry = locked
.entry(tenant_shard_id.tenant_id)
.or_insert_with(|| ComputeHookTenant { shards: Vec::new() });
.or_insert_with(|| ComputeHookTenant {
shards: Vec::new(),
stripe_size: None,
});

let shard_index = ShardIndex {
shard_count: tenant_shard_id.shard_count,
shard_number: tenant_shard_id.shard_number,
};

// We will advertise stripe size in notifications as soon as the tenant is multi-sharded.
if tenant_shard_id.shard_count.count() > 1 {
entry.stripe_size = Some(stripe_size);
}

let mut set = false;
for (existing_shard, existing_node) in &mut entry.shards {
if *existing_shard == shard_index {
Expand Down
7 changes: 6 additions & 1 deletion control_plane/attachment_service/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,12 @@ impl Reconciler {
if let Some(node_id) = self.intent.attached {
let result = self
.compute_hook
.notify(self.tenant_shard_id, node_id, &self.cancel)
.notify(
self.tenant_shard_id,
node_id,
self.shard.stripe_size,
&self.cancel,
)
.await;
if let Err(e) = &result {
// It is up to the caller whether they want to drop out on this error, but they don't have to:
Expand Down
37 changes: 29 additions & 8 deletions control_plane/attachment_service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ impl Service {
// emit a compute notification for this. In the case where our observed state does not
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
if let Some(attached_at) = tenant_state.stably_attached() {
compute_notifications.push((*tenant_shard_id, attached_at));
compute_notifications.push((
*tenant_shard_id,
attached_at,
tenant_state.shard.stripe_size,
));
}
}
}
Expand Down Expand Up @@ -469,7 +473,7 @@ impl Service {
/// Returns a set of any shards for which notifications where not acked within the deadline.
async fn compute_notify_many(
&self,
notifications: Vec<(TenantShardId, NodeId)>,
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
deadline: Instant,
) -> HashSet<TenantShardId> {
let compute_hook = self.inner.read().unwrap().compute_hook.clone();
Expand All @@ -480,11 +484,14 @@ impl Service {
// Construct an async stream of futures to invoke the compute notify function: we do this
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
let mut stream = futures::stream::iter(notifications.into_iter())
.map(|(tenant_shard_id, node_id)| {
.map(|(tenant_shard_id, node_id, stripe_size)| {
let compute_hook = compute_hook.clone();
let cancel = self.cancel.clone();
async move {
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
if let Err(e) = compute_hook
.notify(tenant_shard_id, node_id, stripe_size, &cancel)
.await
{
tracing::error!(
%tenant_shard_id,
%node_id,
Expand Down Expand Up @@ -1232,7 +1239,10 @@ impl Service {
}

let mut waiters = Vec::new();
let mut result = TenantLocationConfigResponse { shards: Vec::new() };
let mut result = TenantLocationConfigResponse {
shards: Vec::new(),
stripe_size: None,
};
let maybe_create = {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
Expand All @@ -1245,6 +1255,11 @@ impl Service {
// Saw an existing shard: this is not a creation
create = false;

// Update stripe size
if result.stripe_size.is_none() && shard.shard.count.count() > 1 {
result.stripe_size = Some(shard.shard.stripe_size);
}

// Note that for existing tenants we do _not_ respect the generation in the request: this is likely
// to be stale. Once a tenant is created in this service, our view of generation is authoritative, and
// callers' generations may be ignored. This represents a one-way migration of tenants from the outer
Expand Down Expand Up @@ -1344,6 +1359,9 @@ impl Service {
};

let waiters = if let Some(create_req) = maybe_create {
if create_req.shard_parameters.count.count() > 1 {
result.stripe_size = Some(create_req.shard_parameters.stripe_size);
}
let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
result.shards = create_resp
.shards
Expand Down Expand Up @@ -2189,7 +2207,7 @@ impl Service {
// as at this point in the split process we have succeeded and this part is infallible:
// we will never need to do any special recovery from this state.

child_locations.push((child, pageserver));
child_locations.push((child, pageserver, child_shard.stripe_size));

tenants.insert(child, child_state);
response.new_shards.push(child);
Expand All @@ -2199,8 +2217,11 @@ impl Service {

// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps) in child_locations {
if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await {
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = compute_hook
.notify(child_id, child_ps, stripe_size, &self.cancel)
.await
{
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
child_id, child_ps);
failed_notifications.push(child_id);
Expand Down
2 changes: 1 addition & 1 deletion control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
})
.collect::<Vec<_>>()
};
endpoint.reconfigure(pageservers).await?;
endpoint.reconfigure(pageservers, None).await?;
jcsp marked this conversation as resolved.
Show resolved Hide resolved
}
"stop" => {
let endpoint_id = sub_args
Expand Down
10 changes: 9 additions & 1 deletion control_plane/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use compute_api::spec::RemoteExtSpec;
use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use serde::{Deserialize, Serialize};
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
Expand Down Expand Up @@ -735,7 +736,11 @@ impl Endpoint {
}
}

pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> {
pub async fn reconfigure(
&self,
mut pageservers: Vec<(Host, u16)>,
stripe_size: Option<ShardStripeSize>,
) -> Result<()> {
let mut spec: ComputeSpec = {
let spec_path = self.endpoint_path().join("spec.json");
let file = std::fs::File::open(spec_path)?;
Expand Down Expand Up @@ -765,6 +770,9 @@ impl Endpoint {
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstr.is_empty());
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}

let client = reqwest::Client::new();
let response = client
Expand Down
2 changes: 2 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ pub struct TenantShardLocation {
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigResponse {
pub shards: Vec<TenantShardLocation>,
// If the shards' ShardCount count is >1, stripe_size will be set.
pub stripe_size: Option<ShardStripeSize>,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/http/openapi_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,10 @@ components:
type: array
items:
$ref: "#/components/schemas/TenantShardLocation"
stripe_size:
description: If multiple shards are present, this field contains the sharding stripe size, else it is null.
type: integer
nullable: true
TenantShardLocation:
type: object
required:
Expand Down
19 changes: 14 additions & 5 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1451,11 +1451,12 @@ async fn put_tenant_location_config_handler(
tenant::SpawnMode::Eager
};

let attached = state
let tenant = state
.tenant_manager
.upsert_location(tenant_shard_id, location_conf, flush, spawn_mode, &ctx)
.await?
.is_some();
.await?;
let stripe_size = tenant.as_ref().map(|t| t.get_shard_stripe_size());
let attached = tenant.is_some();

if let Some(_flush_ms) = flush {
match state
Expand All @@ -1477,12 +1478,20 @@ async fn put_tenant_location_config_handler(
// This API returns a vector of pageservers where the tenant is attached: this is
// primarily for use in the sharding service. For compatibilty, we also return this
// when called directly on a pageserver, but the payload is always zero or one shards.
let mut response = TenantLocationConfigResponse { shards: Vec::new() };
let mut response = TenantLocationConfigResponse {
shards: Vec::new(),
stripe_size: None,
};
if attached {
response.shards.push(TenantShardLocation {
shard_id: tenant_shard_id,
node_id: state.conf.id,
})
});
if tenant_shard_id.shard_count.count() > 1 {
// Stripe size should be set if we are attached
debug_assert!(stripe_size.is_some());
response.stripe_size = stripe_size;
}
}

json_response(StatusCode::OK, response)
Expand Down
5 changes: 5 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use pageserver_api::models;
use pageserver_api::models::TimelineState;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
Expand Down Expand Up @@ -2088,6 +2089,10 @@ impl Tenant {
&self.tenant_shard_id
}

pub(crate) fn get_shard_stripe_size(&self) -> ShardStripeSize {
self.shard_identity.stripe_size
}
jcsp marked this conversation as resolved.
Show resolved Hide resolved

pub(crate) fn get_generation(&self) -> Generation {
self.generation
}
Expand Down
26 changes: 23 additions & 3 deletions test_runner/regress/test_sharding_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, Dict, List
from typing import Any, Dict, List, Union

import pytest
from fixtures.log_helper import log
Expand Down Expand Up @@ -398,10 +398,12 @@ def handler(request: Request):

# Initial notification from tenant creation
assert len(notifications) == 1
expect = {
expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
}
assert notifications[0] == expect

env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})

Expand All @@ -415,6 +417,7 @@ def node_evacuated(node_id: int):
log.info(f"notifications: {notifications}")
expect = {
"tenant_id": str(env.initial_tenant),
"stripe_size": None,
"shards": [{"node_id": int(env.pageservers[1].id), "shard_number": 0}],
}

Expand All @@ -430,10 +433,27 @@ def received_migration_notification():

def received_restart_notification():
assert len(notifications) == 3
assert notifications[1] == expect
assert notifications[2] == expect

wait_until(10, 1, received_restart_notification)

# Splitting a tenant should cause its stripe size to become visible in the compute notification
env.attachment_service.tenant_shard_split(env.initial_tenant, shard_count=2)
expect = {
"tenant_id": str(env.initial_tenant),
"stripe_size": 32768,
"shards": [
{"node_id": int(env.pageservers[1].id), "shard_number": 0},
{"node_id": int(env.pageservers[1].id), "shard_number": 1},
],
}

def received_split_notification():
assert len(notifications) == 4
assert notifications[3] == expect

wait_until(10, 1, received_split_notification)

env.attachment_service.consistency_check()


Expand Down
Loading