Skip to content

Commit

Permalink
Allow to change compute safekeeper list without restart.
Browse files Browse the repository at this point in the history
- Add --safekeepers option to neon_local reconfigure
- Add it to python Endpoint reconfigure
- Implement config reload in walproposer by restarting the whole bgw when
  safekeeper list changes.

ref #6341
  • Loading branch information
arssher committed Jun 18, 2024
1 parent 4feb6ba commit 62c0c1d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 55 deletions.
46 changes: 30 additions & 16 deletions control_plane/src/bin/neon_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,20 +862,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re

let allow_multiple = sub_args.get_flag("allow-multiple");

// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
})?);
safekeepers.push(sk_id);
}
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&sub_args)? {
safekeepers
} else {
env.safekeepers.iter().map(|sk| sk.id).collect()
};

let endpoint = cplane
.endpoints
Expand Down Expand Up @@ -979,7 +972,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
})
.collect::<Vec<_>>()
};
endpoint.reconfigure(pageservers, None).await?;
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = parse_safekeepers(&sub_args)?;
endpoint.reconfigure(pageservers, None, safekeepers).await?;
}
"stop" => {
let endpoint_id = sub_args
Expand All @@ -1001,6 +997,23 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
Ok(())
}

/// Parse --safekeepers as list of safekeeper ids.
fn parse_safekeepers(sub_args: &ArgMatches) -> Result<Option<Vec<NodeId>>> {
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
let mut safekeepers: Vec<NodeId> = Vec::new();
for sk_id in safekeepers_str.split(',').map(str::trim) {
let sk_id = NodeId(
u64::from_str(sk_id)
.map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
);
safekeepers.push(sk_id);
}
Ok(Some(safekeepers))
} else {
Ok(None)
}
}

fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match sub_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
Expand Down Expand Up @@ -1573,14 +1586,15 @@ fn cli() -> Command {
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.arg(endpoint_pageserver_id_arg.clone())
.arg(safekeepers_arg)
.arg(safekeepers_arg.clone())
.arg(remote_ext_config_args)
.arg(create_test_user)
.arg(allow_multiple.clone())
)
.subcommand(Command::new("reconfigure")
.about("Reconfigure the endpoint")
.arg(endpoint_pageserver_id_arg)
.arg(safekeepers_arg)
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
)
Expand Down
37 changes: 25 additions & 12 deletions control_plane/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,23 @@ impl Endpoint {
.join(",")
}

/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in sk_ids {
let sk = self
.env
.safekeepers
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
}
}
Ok(safekeeper_connstrings)
}

pub async fn start(
&self,
auth_token: &Option<String>,
Expand All @@ -523,18 +540,7 @@ impl Endpoint {
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstring.is_empty());

let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
for sk_id in safekeepers {
let sk = self
.env
.safekeepers
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
}
}
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;

// check for file remote_extensions_spec.json
// if it is present, read it and pass to compute_ctl
Expand Down Expand Up @@ -741,6 +747,7 @@ impl Endpoint {
&self,
mut pageservers: Vec<(Host, u16)>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
let mut spec: ComputeSpec = {
let spec_path = self.endpoint_path().join("spec.json");
Expand Down Expand Up @@ -775,6 +782,12 @@ impl Endpoint {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}

// If safekeepers are not specified, don't change them.
if let Some(safekeepers) = safekeepers {
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
spec.safekeeper_connstrings = safekeeper_connstrings;
}

let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
Expand Down
52 changes: 48 additions & 4 deletions pgxn/neon/walproposer_pg.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;

/* Set to true in the walproposer bgw. */
static bool am_walproposer;
static WalproposerShmemState *walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
Expand All @@ -76,6 +78,7 @@ static HotStandbyFeedback agg_hs_feedback;

static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static bool backpressure_throttling_impl(void);
Expand Down Expand Up @@ -116,7 +119,8 @@ init_walprop_config(bool syncSafekeepers)
{
walprop_config.neon_tenant = neon_tenant;
walprop_config.neon_timeline = neon_timeline;
walprop_config.safekeepers_list = wal_acceptors_list;
/* WalProposerCreate scribbles directly on it, so pstrdup */
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
walprop_config.wal_segment_size = wal_segment_size;
Expand Down Expand Up @@ -156,6 +160,7 @@ WalProposerMain(Datum main_arg)

init_walprop_config(false);
walprop_pg_init_bgworker();
am_walproposer = true;
walprop_pg_load_libpqwalreceiver();

wp = WalProposerCreate(&walprop_config, walprop_pg);
Expand Down Expand Up @@ -194,10 +199,10 @@ nwp_register_gucs(void)
NULL, /* long_desc */
&wal_acceptors_list, /* valueAddr */
"", /* bootValue */
PGC_POSTMASTER,
PGC_SIGHUP,
GUC_LIST_INPUT, /* extensions can't use*
* GUC_LIST_QUOTE */
NULL, NULL, NULL);
NULL, assign_neon_safekeepers, NULL);

DefineCustomIntVariable(
"neon.safekeeper_reconnect_timeout",
Expand All @@ -220,6 +225,33 @@ nwp_register_gucs(void)
NULL, NULL, NULL);
}

/*
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
* the list changed.
*/
static void
assign_neon_safekeepers(const char *newval, void *extra)
{
if (!am_walproposer)
return;

if (!newval) {
/* should never happen */
wpg_log(FATAL, "neon.safekeepers is empty");
}

/*
* TODO: restarting through FATAL is stupid and introduces 1s delay before
* next bgw start. We should refactor walproposer to allow graceful exit and
* thus remove this delay.
*/
if (strcmp(wal_acceptors_list, newval) != 0)
{
wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s",
wal_acceptors_list, newval);
}
}

/* Check if we need to suspend inserts because of lagging replication. */
static uint64
backpressure_lag_impl(void)
Expand Down Expand Up @@ -368,7 +400,7 @@ walprop_register_bgworker(void)
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer");
snprintf(bgw.bgw_type, BGW_MAXLEN, "WAL proposer");
bgw.bgw_restart_time = 5;
bgw.bgw_restart_time = 1;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;

Expand Down Expand Up @@ -1775,6 +1807,18 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
late_cv_trigger = ConditionVariableCancelSleep();
#endif

/*
* Process config if requested. This restarts walproposer if safekeepers
* list changed. Don't do that for sync-safekeepers because quite probably
* it (re-reading config) won't work without some effort, and
* sync-safekeepers should be quick to finish anyway.
*/
if (!wp->config->syncSafekeepers && ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}

/*
* If wait is terminated by latch set (walsenders' latch is set on each
* wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
Expand Down
2 changes: 1 addition & 1 deletion storage_controller/src/compute_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl ComputeHook {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
endpoint
.reconfigure(compute_pageservers.clone(), *stripe_size)
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
.await?;
}
}
Expand Down
22 changes: 20 additions & 2 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1914,13 +1914,16 @@ def endpoint_reconfigure(
endpoint_id: str,
tenant_id: Optional[TenantId] = None,
pageserver_id: Optional[int] = None,
safekeepers: Optional[List[int]] = None,
check_return_code=True,
) -> "subprocess.CompletedProcess[str]":
args = ["endpoint", "reconfigure", endpoint_id]
if tenant_id is not None:
args.extend(["--tenant-id", str(tenant_id)])
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
if safekeepers is not None:
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
return self.raw_cli(args, check_return_code=check_return_code)

def endpoint_stop(
Expand Down Expand Up @@ -3407,6 +3410,7 @@ def __init__(
self.pg_port = pg_port
self.http_port = http_port
self.check_stop_result = check_stop_result
# passed to endpoint create and endpoint reconfigure
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf

Expand Down Expand Up @@ -3469,6 +3473,7 @@ def start(
self,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
safekeepers: Optional[List[int]] = None,
allow_multiple: bool = False,
) -> "Endpoint":
"""
Expand All @@ -3478,6 +3483,11 @@ def start(

assert self.endpoint_id is not None

# If `safekeepers` is not None, they are remember them as active and use
# in the following commands.
if safekeepers is not None:
self.active_safekeepers = safekeepers

log.info(f"Starting postgres endpoint {self.endpoint_id}")

self.env.neon_cli.endpoint_start(
Expand Down Expand Up @@ -3538,9 +3548,17 @@ def edit_hba(self, hba: List[str]):
if self.running:
self.safe_psql("SELECT pg_reload_conf()")

def reconfigure(self, pageserver_id: Optional[int] = None):
def reconfigure(
self, pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None
):
assert self.endpoint_id is not None
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
# If `safekeepers` is not None, they are remember them as active and use
# in the following commands.
if safekeepers is not None:
self.active_safekeepers = safekeepers
self.env.neon_cli.endpoint_reconfigure(
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
)

def respec(self, **kwargs):
"""Update the endpoint.json file used by control_plane."""
Expand Down
35 changes: 15 additions & 20 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,10 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):


# Basic pull_timeline test.
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
# When live_sk_change is False, compute is restarted to change set of
# safekeepers; otherwise it is live reload.
@pytest.mark.parametrize("live_sk_change", [False, True])
def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
neon_env_builder.auth_enabled = True

def execute_payload(endpoint: Endpoint):
Expand Down Expand Up @@ -1757,8 +1760,7 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2, 3]
endpoint.start()
endpoint.start(safekeepers=[1, 2, 3])

execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
Expand All @@ -1770,29 +1772,22 @@ def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_i
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
env.safekeepers[3].start()

res = (
env.safekeepers[3]
.http_client(auth_token=env.auth_keys.generate_safekeeper_token())
.pull_timeline(
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"http_hosts": [
f"http://localhost:{env.safekeepers[0].port.http}",
f"http://localhost:{env.safekeepers[2].port.http}",
],
}
)
res = env.safekeepers[3].pull_timeline(
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
)
log.info("Finished pulling timeline")
log.info(res)

show_statuses(env.safekeepers, tenant_id, timeline_id)

log.info("Restarting compute with new config to verify that it works")
endpoint.stop_and_destroy().create("main")
endpoint.active_safekeepers = [1, 3, 4]
endpoint.start()
action = "reconfiguing" if live_sk_change else "restarting"
log.info(f"{action} compute with new config to verify that it works")
new_sks = [1, 3, 4]
if not live_sk_change:
endpoint.stop_and_destroy().create("main")
endpoint.start(safekeepers=new_sks)
else:
endpoint.reconfigure(safekeepers=new_sks)

execute_payload(endpoint)
show_statuses(env.safekeepers, tenant_id, timeline_id)
Expand Down

0 comments on commit 62c0c1d

Please sign in to comment.