Skip to content

Commit

Permalink
store: Use the fdw pool for copy connections
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Mar 31, 2021
1 parent d87191f commit 0e33a0d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
28 changes: 27 additions & 1 deletion store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use graph::{
anyhow::{self, anyhow, bail},
debug, error, info, o,
tokio::sync::Semaphore,
CancelGuard, CancelHandle, CancelToken as _, CancelableError, Counter, Gauge, Logger,
warn, CancelGuard, CancelHandle, CancelToken as _, CancelableError, Counter, Gauge, Logger,
MetricsRegistry, MovingStats, PoolWaitStats, StoreError,
},
util::security::SafeDisplay,
Expand Down Expand Up @@ -485,6 +485,32 @@ impl ConnectionPool {
}
}

/// Get a connection from the pool for foreign data wrapper access;
/// since that pool can be very contended, periodically log that we are
/// still waiting for a connection
pub fn get_fdw(
&self,
logger: &Logger,
) -> Result<PooledConnection<ConnectionManager<PgConnection>>, graph::prelude::Error> {
let pool = match &self.fdw_pool {
Some(pool) => pool,
None => {
const MSG: &str =
"internal error: trying to get fdw connection on a pool that doesn't have any";
error!(logger, "{}", MSG);
bail!(MSG)
}
};
loop {
match pool.get() {
Ok(conn) => return Ok(conn),
Err(e) => warn!(logger, "still trying to get fdw connection";
"detail" => e.to_string(),
),
}
}
}

pub fn connection_detail(&self) -> Result<ForeignServer, StoreError> {
ForeignServer::new(self.shard.clone(), &self.postgres_url).map_err(|e| e.into())
}
Expand Down
13 changes: 7 additions & 6 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,11 @@ impl Connection {
Self { pool }
}

fn transaction<T, F>(&self, f: F) -> Result<T, StoreError>
fn transaction<T, F>(&self, logger: &Logger, f: F) -> Result<T, StoreError>
where
F: FnOnce(&PgConnection) -> Result<T, StoreError>,
{
let conn = self.pool.get()?;
let conn = self.pool.get_fdw(logger)?;
conn.transaction(|| f(&conn)).map_err(|e| e.into())
}

Expand All @@ -552,15 +552,16 @@ impl Connection {
dst: Arc<Layout>,
target_block: EthereumBlockPointer,
) -> Result<Status, StoreError> {
let mut state =
self.transaction(|conn| CopyState::new(conn, src, dst.clone(), target_block))?;
let mut state = self.transaction(logger, |conn| {
CopyState::new(conn, src, dst.clone(), target_block)
})?;

let mut progress = CopyProgress::new(logger, &state);
progress.start();

for table in state.tables.iter_mut().filter(|table| !table.finished()) {
while !table.finished() {
let status = self.transaction(|conn| table.copy_batch(conn))?;
let status = self.transaction(logger, |conn| table.copy_batch(conn))?;
if status == Status::Cancelled {
return Ok(status);
}
Expand All @@ -569,7 +570,7 @@ impl Connection {
progress.table_finished(table);
}

self.transaction(|conn| state.finished(conn))?;
self.transaction(logger, |conn| state.finished(conn))?;
progress.finished();

Ok(Status::Finished)
Expand Down

0 comments on commit 0e33a0d

Please sign in to comment.