Skip to content

Commit

Permalink
Passing neon options to the console (#5781)
Browse files Browse the repository at this point in the history
The idea is to pass neon_* prefixed options to control plane. It can be
used by cplane to dynamically create timelines and computes. Such
options also should be excluded from passing to compute. Another issue
is how connection caching is working now, because compute's instance now
depends not only on hostname but probably on such options too I included
them to cache key.
  • Loading branch information
prepor committed Nov 7, 2023
1 parent e310533 commit fc47af1
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 14 deletions.
39 changes: 37 additions & 2 deletions proxy/src/auth/credentials.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! User credentials used in authentication.

use crate::{auth::password_hack::parse_endpoint_param, error::UserFacingError};
use crate::{
auth::password_hack::parse_endpoint_param, error::UserFacingError, proxy::neon_options,
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::collections::HashSet;
Expand Down Expand Up @@ -38,6 +40,8 @@ pub struct ClientCredentials<'a> {
pub user: &'a str,
// TODO: this is a severe misnomer! We should think of a new name ASAP.
pub project: Option<String>,

pub cache_key: String,
}

impl ClientCredentials<'_> {
Expand All @@ -53,6 +57,7 @@ impl<'a> ClientCredentials<'a> {
ClientCredentials {
user: "",
project: None,
cache_key: "".to_string(),
}
}

Expand Down Expand Up @@ -120,7 +125,17 @@ impl<'a> ClientCredentials<'a> {

info!(user, project = project.as_deref(), "credentials");

Ok(Self { user, project })
let cache_key = format!(
"{}{}",
project.as_deref().unwrap_or(""),
neon_options(params).unwrap_or("".to_string())
);

Ok(Self {
user,
project,
cache_key,
})
}
}

Expand Down Expand Up @@ -176,6 +191,7 @@ mod tests {
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("foo"));
assert_eq!(creds.cache_key, "foo");

Ok(())
}
Expand Down Expand Up @@ -303,4 +319,23 @@ mod tests {
_ => panic!("bad error: {err:?}"),
}
}

#[test]
fn parse_neon_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "neon_lsn:0/2 neon_endpoint_type:read_write"),
]);

let sni = Some("project.localhost");
let common_names = Some(["localhost".into()].into());
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("project"));
assert_eq!(
creds.cache_key,
"projectneon_endpoint_type:read_write neon_lsn:0/2"
);

Ok(())
}
}
9 changes: 8 additions & 1 deletion proxy/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
cancellation::CancelClosure,
console::errors::WakeComputeError,
error::{io_error, UserFacingError},
proxy::is_neon_param,
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
Expand Down Expand Up @@ -278,7 +279,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none())
.filter(|opt| parse_endpoint_param(opt).is_none() && !is_neon_param(opt))
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

Expand Down Expand Up @@ -313,5 +314,11 @@ mod tests {

let params = StartupMessageParams::new([("options", "project = foo")]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));

let params = StartupMessageParams::new([(
"options",
"project = foo neon_endpoint_type:read_write neon_lsn:0/2",
)]);
assert_eq!(filtered_options(&params).as_deref(), Some("project = foo"));
}
}
1 change: 1 addition & 0 deletions proxy/src/console/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub struct ConsoleReqExtra<'a> {
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
pub options: Option<&'a str>,
}

/// Auth secret which is managed by the cloud.
Expand Down
3 changes: 2 additions & 1 deletion proxy/src/console/provider/neon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl Api {
.query(&[
("application_name", extra.application_name),
("project", Some(project)),
("options", extra.options),
])
.build()?;

Expand Down Expand Up @@ -151,7 +152,7 @@ impl super::Api for Api {
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = creds.project().expect("impossible");
let key: &str = &creds.cache_key;

// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);
Expand Down
31 changes: 30 additions & 1 deletion proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use crate::{
use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::TryFutureExt;
use itertools::Itertools;
use metrics::{exponential_buckets, register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use prometheus::{register_histogram_vec, HistogramVec};
use regex::Regex;
use std::{error::Error, io, ops::ControlFlow, sync::Arc, time::Instant};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
Expand Down Expand Up @@ -881,9 +883,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
allow_self_signed_compute,
} = self;

let console_options = neon_options(params);

let extra = console::ConsoleReqExtra {
session_id, // aka this connection's id
application_name: params.get("application_name"),
options: console_options.as_deref(),
};

let mut latency_timer = LatencyTimer::new(mode.protocol_label());
Expand Down Expand Up @@ -945,3 +950,27 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
proxy_pass(stream, node.stream, &aux).await
}
}

pub fn neon_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| is_neon_param(opt))
.sorted() // we sort it to use as cache key
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

// Don't even bother with empty options.
if options.is_empty() {
return None;
}

Some(options)
}

pub fn is_neon_param(bytes: &str) -> bool {
static RE: OnceCell<Regex> = OnceCell::new();
RE.get_or_init(|| Regex::new(r"^neon_\w+:").unwrap());

RE.get().unwrap().is_match(bytes)
}
1 change: 1 addition & 0 deletions proxy/src/proxy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ fn helper_create_connect_info(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
options: None,
};
let creds = auth::BackendType::Test(mechanism);
(cache, extra, creds)
Expand Down
21 changes: 12 additions & 9 deletions proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};

use crate::{
auth, console,
proxy::{LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
proxy::{
neon_options, LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER,
NUM_DB_CONNECTIONS_OPENED_COUNTER,
},
usage_metrics::{Ids, MetricCounter, USAGE_METRICS},
};
use crate::{compute, config};
Expand All @@ -41,6 +44,7 @@ pub struct ConnInfo {
pub dbname: String,
pub hostname: String,
pub password: String,
pub options: Option<String>,
}

impl ConnInfo {
Expand Down Expand Up @@ -401,26 +405,25 @@ async fn connect_to_compute(
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());

let credential_params = StartupMessageParams::new([
let params = StartupMessageParams::new([
("user", &conn_info.username),
("database", &conn_info.dbname),
("application_name", APP_NAME),
("options", conn_info.options.as_deref().unwrap_or("")),
]);

let creds = config
.auth_backend
.as_ref()
.map(|_| {
auth::ClientCredentials::parse(
&credential_params,
Some(&conn_info.hostname),
common_names,
)
})
.map(|_| auth::ClientCredentials::parse(&params, Some(&conn_info.hostname), common_names))
.transpose()?;

let console_options = neon_options(&params);

let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
options: console_options.as_deref(),
};

let node_info = creds
Expand Down
12 changes: 12 additions & 0 deletions proxy/src/serverless/sql_over_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,23 @@ fn get_conn_info(
}
}

let pairs = connection_url.query_pairs();

let mut options = Option::None;

for (key, value) in pairs {
if key == "options" {
options = Some(value.to_string());
break;
}
}

Ok(ConnInfo {
username: username.to_owned(),
dbname: dbname.to_owned(),
hostname: hostname.to_owned(),
password: password.to_owned(),
options,
})
}

Expand Down

1 comment on commit fc47af1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2438 tests run: 2317 passed, 0 failed, 121 skipped (full report)


Flaky tests (1)

Postgres 16

  • test_restarts_frequent_checkpoints: release

Code coverage (full report)

  • functions: 54.7% (8893 of 16257 functions)
  • lines: 81.6% (51234 of 62762 lines)

The comment gets automatically updated with the latest test results
fc47af1 at 2023-11-07T16:38:39.585Z :recycle:

Please sign in to comment.