Skip to content

Commit

Permalink
proxy: refactor how and when connections are returned to the pool (#5095
Browse files Browse the repository at this point in the history
)

## Problem

Transactions break connections in the pool

fixes #4698 

## Summary of changes

* Pool `Client`s are smart object that return themselves to the pool
* Pool `Client`s can be 'discard'ed
* Pool `Client`s are discarded when certain errors are encountered.
* Pool `Client`s are discarded when ReadyForQuery returns a non-idle
state.
  • Loading branch information
conradludgate committed Oct 17, 2023
1 parent ea648cf commit f775928
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 137 deletions.
13 changes: 6 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ env_logger = "0.10"
log = "0.4"

## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="9011f7110db12b5e15afaf98f8ac834501d50ddc" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="9011f7110db12b5e15afaf98f8ac834501d50ddc" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="9011f7110db12b5e15afaf98f8ac834501d50ddc" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="9011f7110db12b5e15afaf98f8ac834501d50ddc" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="9011f7110db12b5e15afaf98f8ac834501d50ddc" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="a2d0652ec3f8f710ff8cfc2e7c68f096fb852d9d" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="a2d0652ec3f8f710ff8cfc2e7c68f096fb852d9d" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="a2d0652ec3f8f710ff8cfc2e7c68f096fb852d9d" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="a2d0652ec3f8f710ff8cfc2e7c68f096fb852d9d" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="a2d0652ec3f8f710ff8cfc2e7c68f096fb852d9d" }

## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
Expand Down Expand Up @@ -200,7 +200,7 @@ tonic-build = "0.9"

# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="9011f7110db12b5e15afaf98f8ac834501d50ddc" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="a2d0652ec3f8f710ff8cfc2e7c68f096fb852d9d" }

################# Binary contents sections

Expand Down
129 changes: 112 additions & 17 deletions proxy/src/http/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ use pbkdf2::{
Params, Pbkdf2,
};
use pq_proto::StartupMessageParams;
use std::sync::atomic::{self, AtomicUsize};
use std::{collections::HashMap, sync::Arc};
use std::{
fmt,
task::{ready, Poll},
};
use std::{
ops::Deref,
sync::atomic::{self, AtomicUsize},
};
use tokio::time;
use tokio_postgres::AsyncMessage;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};

use crate::{
auth, console,
Expand All @@ -26,13 +29,13 @@ use crate::{compute, config};

use crate::proxy::ConnectMechanism;

use tracing::{error, warn};
use tracing::{error, warn, Span};
use tracing::{info, info_span, Instrument};

pub const APP_NAME: &str = "sql_over_http";
const MAX_CONNS_PER_ENDPOINT: usize = 20;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConnInfo {
pub username: String,
pub dbname: String,
Expand All @@ -55,7 +58,7 @@ impl fmt::Display for ConnInfo {
}

struct ConnPoolEntry {
conn: Client,
conn: ClientInner,
_last_access: std::time::Instant,
}

Expand Down Expand Up @@ -133,14 +136,20 @@ impl GlobalConnPool {
}

pub async fn get(
&self,
self: &Arc<Self>,
conn_info: &ConnInfo,
force_new: bool,
session_id: uuid::Uuid,
) -> anyhow::Result<Client> {
let mut client: Option<Client> = None;
let mut client: Option<ClientInner> = None;
let mut latency_timer = LatencyTimer::new("http");

let pool = if force_new {
None
} else {
Some((conn_info.clone(), self.clone()))
};

let mut hash_valid = false;
if !force_new {
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
Expand Down Expand Up @@ -188,7 +197,11 @@ impl GlobalConnPool {
latency_timer.pool_hit();
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
return Ok(client);
return Ok(Client {
inner: Some(client),
span: Span::current(),
pool,
});
}
} else {
info!("pool: opening a new connection '{conn_info}'");
Expand Down Expand Up @@ -228,10 +241,14 @@ impl GlobalConnPool {
_ => {}
}

new_client
new_client.map(|inner| Client {
inner: Some(inner),
span: Span::current(),
pool,
})
}

pub fn put(&self, conn_info: &ConnInfo, client: Client) -> anyhow::Result<()> {
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
// We want to hold this open while we return. This ensures that the pool can't close
// while we are in the middle of returning the connection.
let closed = self.closed.read();
Expand Down Expand Up @@ -326,7 +343,7 @@ struct TokioMechanism<'a> {

#[async_trait]
impl ConnectMechanism for TokioMechanism<'_> {
type Connection = Client;
type Connection = ClientInner;
type ConnectError = tokio_postgres::Error;
type Error = anyhow::Error;

Expand All @@ -350,7 +367,7 @@ async fn connect_to_compute(
conn_info: &ConnInfo,
session_id: uuid::Uuid,
latency_timer: LatencyTimer,
) -> anyhow::Result<Client> {
) -> anyhow::Result<ClientInner> {
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());

Expand Down Expand Up @@ -399,7 +416,7 @@ async fn connect_to_compute_once(
conn_info: &ConnInfo,
timeout: time::Duration,
mut session: uuid::Uuid,
) -> Result<Client, tokio_postgres::Error> {
) -> Result<ClientInner, tokio_postgres::Error> {
let mut config = (*node_info.config).clone();

let (client, mut connection) = config
Expand Down Expand Up @@ -462,21 +479,99 @@ async fn connect_to_compute_once(
.instrument(span)
);

Ok(Client {
Ok(ClientInner {
inner: client,
session: tx,
ids,
})
}

pub struct Client {
pub inner: tokio_postgres::Client,
struct ClientInner {
inner: tokio_postgres::Client,
session: tokio::sync::watch::Sender<uuid::Uuid>,
ids: Ids,
}

impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.ids.clone())
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
}
}

pub struct Client {
span: Span,
inner: Option<ClientInner>,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
}

pub struct Discard<'a> {
pool: &'a mut Option<(ConnInfo, Arc<GlobalConnPool>)>,
}

impl Client {
pub fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
let Self {
inner,
pool,
span: _,
} = self;
(
&mut inner
.as_mut()
.expect("client inner should not be removed")
.inner,
Discard { pool },
)
}

pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
self.inner().1.check_idle(status)
}
pub fn discard(&mut self) {
self.inner().1.discard()
}
}

impl Discard<'_> {
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
if status != ReadyForQueryStatus::Idle {
if let Some((conn_info, _)) = self.pool.take() {
info!("pool: throwing away connection '{conn_info}' because connection is not idle")
}
}
}
pub fn discard(&mut self) {
if let Some((conn_info, _)) = self.pool.take() {
info!("pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
}
}
}

impl Deref for Client {
type Target = tokio_postgres::Client;

fn deref(&self) -> &Self::Target {
&self
.inner
.as_ref()
.expect("client inner should not be removed")
.inner
}
}

impl Drop for Client {
fn drop(&mut self) {
let client = self
.inner
.take()
.expect("client inner should not be removed");
if let Some((conn_info, conn_pool)) = self.pool.take() {
let current_span = self.span.clone();
// return connection to the pool
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client);
});
}
}
}
Loading

1 comment on commit f775928

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2372 tests run: 2256 passed, 0 failed, 116 skipped (full report)


Code coverage (full report)

  • functions: 52.7% (8287 of 15732 functions)
  • lines: 80.5% (48323 of 60023 lines)

The comment gets automatically updated with the latest test results
f775928 at 2023-10-17T15:00:24.449Z :recycle:

Please sign in to comment.