Skip to content

Commit

Permalink
node, store: Add configuration and a separate pool for fdw connections
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Apr 1, 2021
1 parent 37bbdfb commit 92a9713
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
7 changes: 7 additions & 0 deletions node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ pub struct Shard {
pub weight: usize,
#[serde(default)]
pub pool_size: PoolSize,
#[serde(default = "PoolSize::five")]
pub fdw_pool_size: PoolSize,
#[serde(default)]
pub replicas: BTreeMap<String, Replica>,
}
Expand Down Expand Up @@ -248,6 +250,7 @@ impl Shard {
connection: postgres_url.clone(),
weight: opt.postgres_host_weights.get(0).cloned().unwrap_or(1),
pool_size,
fdw_pool_size: PoolSize::five(),
replicas,
})
}
Expand All @@ -268,6 +271,10 @@ impl Default for PoolSize {
}

impl PoolSize {
fn five() -> Self {
Self::Fixed(5)
}

fn validate(&self, connection: &str) -> Result<()> {
use PoolSize::*;

Expand Down
6 changes: 6 additions & 0 deletions node/src/store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl StoreBuilder {
"we can determine the pool size for store {}",
name
));
let fdw_pool_size = shard.fdw_pool_size.size_for(node, name).expect(&format!(
"we can determine the fdw pool size for store {}",
name
));
info!(
logger,
"Connecting to Postgres";
Expand All @@ -180,6 +184,7 @@ impl StoreBuilder {
"main",
shard.connection.to_owned(),
pool_size,
Some(fdw_pool_size),
&logger,
registry.cheap_clone(),
)
Expand Down Expand Up @@ -218,6 +223,7 @@ impl StoreBuilder {
pool,
replica.connection.clone(),
pool_size,
None,
&logger,
registry.cheap_clone(),
)
Expand Down
35 changes: 33 additions & 2 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,32 @@ impl ForeignServer {
}
}

/// How long to keep connections in the `fdw_pool` around before closing
/// them on idle. This is much shorter than the default of 10 minutes.
const FDW_IDLE_TIMEOUT: Duration = Duration::from_secs(60);

#[derive(Clone)]
pub struct ConnectionPool {
logger: Logger,
shard: Shard,
pool: Pool<ConnectionManager<PgConnection>>,
// A separate pool for connections that will use foreign data wrappers.
// Once such a connection accesses a foreign table, Postgres keeps a
// connection to the foreign server until the connection is closed.
// Normal pooled connections live quite long (up to 10 minutes) and can
// therefore keep a lot of connections into foreign databases open. We
// mitigate this by using a separate small pool with a much shorter
// connection lifetime. Starting with postgres_fdw 1.1 in Postgres 14,
// this will no longer be needed since it will then be possible to
// explicitly close connections to foreign servers when a connection is
// returned to the pool.
fdw_pool: Option<Pool<ConnectionManager<PgConnection>>>,
limiter: Arc<Semaphore>,
postgres_url: String,
pub(crate) wait_stats: PoolWaitStats,
}

#[derive(Clone)]
struct ErrorHandler(Logger, Counter);

impl std::fmt::Debug for ErrorHandler {
Expand All @@ -204,6 +220,7 @@ impl r2d2::HandleError<r2d2::Error> for ErrorHandler {
}
}

#[derive(Clone)]
struct EventHandler {
logger: Logger,
count_gauge: Gauge,
Expand Down Expand Up @@ -290,6 +307,7 @@ impl ConnectionPool {
pool_name: &str,
postgres_url: String,
pool_size: u32,
fdw_pool_size: Option<u32>,
logger: &Logger,
registry: Arc<dyn MetricsRegistry>,
) -> ConnectionPool {
Expand Down Expand Up @@ -333,12 +351,24 @@ impl ConnectionPool {
// available.
let timeout_seconds = if cfg!(test) { 30 } else { 6 * 60 * 60 };
let pool = Pool::builder()
.error_handler(error_handler)
.event_handler(event_handler)
.error_handler(error_handler.clone())
.event_handler(event_handler.clone())
.connection_timeout(Duration::from_secs(timeout_seconds))
.max_size(pool_size)
.build(conn_manager)
.unwrap();
let fdw_pool = fdw_pool_size.map(|pool_size| {
let conn_manager = ConnectionManager::new(postgres_url.clone());
Pool::builder()
.error_handler(error_handler)
.event_handler(event_handler)
.max_size(pool_size)
.min_idle(Some(1))
.idle_timeout(Some(FDW_IDLE_TIMEOUT))
.build(conn_manager)
.unwrap()
});

let limiter = Arc::new(Semaphore::new(pool_size as usize));
info!(logger_store, "Pool successfully connected to Postgres");
ConnectionPool {
Expand All @@ -347,6 +377,7 @@ impl ConnectionPool {
.expect("shard_name is a valid name for a shard"),
postgres_url: postgres_url.clone(),
pool,
fdw_pool,
limiter,
wait_stats,
}
Expand Down

0 comments on commit 92a9713

Please sign in to comment.