Skip to content

Commit

Permalink
store: Cancel an ongoing copy when a deployment becomes unassigned
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Mar 29, 2021
1 parent 890b621 commit ed09417
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 9 deletions.
4 changes: 3 additions & 1 deletion store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ impl ForeignServer {
"create view {nsp}.deployment_schemas as
select * from public.deployment_schemas;
create view {nsp}.chains as
select * from public.chains",
select * from public.chains;
create view {nsp}.active_copies as
select * from public.active_copies;",
nsp = Self::PRIMARY_PUBLIC
)
} else {
Expand Down
46 changes: 41 additions & 5 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ table! {
}
}

// This is the same as primary::active_copies, but mapped into each shard
table! {
primary_public.active_copies(dst) {
src -> Integer,
dst -> Integer,
cancelled_at -> Nullable<Date>,
}
}

#[derive(Copy, Clone, PartialEq)]
pub enum Status {
Finished,
Cancelled,
}

#[allow(dead_code)]
struct CopyState {
src: Arc<Layout>,
Expand Down Expand Up @@ -378,9 +393,23 @@ impl TableState {
Ok(())
}

fn copy_batch(&mut self, conn: &PgConnection) -> Result<(), StoreError> {
fn copy_batch(&mut self, conn: &PgConnection) -> Result<Status, StoreError> {
fn is_cancelled(dst: &Site, conn: &PgConnection) -> Result<bool, StoreError> {
use active_copies as ac;

ac::table
.filter(ac::dst.eq(dst.id))
.select(ac::cancelled_at.is_not_null())
.get_result::<bool>(conn)
.map_err(|e| e.into())
}

let start = Instant::now();

if is_cancelled(self.dst_site.as_ref(), conn)? {
return Ok(Status::Cancelled);
}

// Copy all versions with next_vid <= vid <= next_vid + batch_size - 1,
// but do not go over target_vid
let last_vid = (self.next_vid + self.batch_size - 1).min(self.target_vid);
Expand All @@ -405,7 +434,11 @@ impl TableState {
self.record_finished(conn)?;
}

Ok(())
if is_cancelled(self.dst_site.as_ref(), conn)? {
return Ok(Status::Cancelled);
}

Ok(Status::Finished)
}
}

Expand Down Expand Up @@ -518,7 +551,7 @@ impl Connection {
src: Arc<Layout>,
dst: Arc<Layout>,
target_block: EthereumBlockPointer,
) -> Result<(), StoreError> {
) -> Result<Status, StoreError> {
let mut state =
self.transaction(|conn| CopyState::new(conn, src, dst.clone(), target_block))?;

Expand All @@ -527,7 +560,10 @@ impl Connection {

for table in state.tables.iter_mut().filter(|table| !table.finished()) {
while !table.finished() {
self.transaction(|conn| table.copy_batch(conn))?;
let status = self.transaction(|conn| table.copy_batch(conn))?;
if status == Status::Cancelled {
return Ok(status);
}
progress.update(table);
}
progress.table_finished(table);
Expand All @@ -536,6 +572,6 @@ impl Connection {
self.transaction(|conn| state.finished(conn))?;
progress.finished();

Ok(())
Ok(Status::Finished)
}
}
5 changes: 4 additions & 1 deletion store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,10 @@ impl DeploymentStore {
// that actually need to be copied from the source are compatible
// with the corresponding tables in `self`
let copy_conn = crate::copy::Connection::new(self.conn.clone());
copy_conn.copy_data(logger, src.clone(), dst.clone(), block.clone())?;
let status = copy_conn.copy_data(logger, src.clone(), dst.clone(), block.clone())?;
if status == crate::copy::Status::Cancelled {
return Ok(());
}

let conn = self.get_conn()?;
conn.transaction(|| -> Result<(), StoreError> {
Expand Down
26 changes: 24 additions & 2 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,18 @@ impl<'a> Connection<'a> {
}
}

/// Signal any copy process that might be copying into one of these
/// deployments that it should stop. Copying is cancelled whenever we
/// remove the assignment for a deployment
fn cancel_copies(&self, ids: Vec<DeploymentId>) -> Result<(), StoreError> {
use active_copies as ac;

update(ac::table.filter(ac::dst.eq_any(ids)))
.set(ac::cancelled_at.eq(sql("now()")))
.execute(self.0.as_ref())?;
Ok(())
}

/// Delete all assignments for deployments that are neither the current nor the
/// pending version of a subgraph and return the deployment id's
fn remove_unused_assignments(&self) -> Result<Vec<EntityChange>, StoreError> {
Expand All @@ -432,10 +444,18 @@ impl<'a> Connection<'a> {
.returning(a::id)
.load::<i32>(self.0.as_ref())?;

let removed = ds::table
let removed: Vec<_> = ds::table
.filter(ds::id.eq_any(removed))
.select((ds::id, ds::subgraph))
.load::<(DeploymentId, String)>(self.0.as_ref())?
.into_iter()
.collect();

// Stop ongoing copies
let removed_ids: Vec<_> = removed.iter().map(|(id, _)| id.clone()).collect();
self.cancel_copies(removed_ids)?;

let events = removed
.into_iter()
.map(|(id, hash)| {
SubgraphDeploymentId::new(hash)
Expand All @@ -453,7 +473,7 @@ impl<'a> Connection<'a> {
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(removed)
Ok(events)
}

/// Promote the deployment `id` to the current version everywhere where it was
Expand Down Expand Up @@ -729,6 +749,8 @@ impl<'a> Connection<'a> {
let conn = self.0.as_ref();
let delete_count = delete(a::table.filter(a::id.eq(site.id))).execute(conn)?;

self.cancel_copies(vec![site.id])?;

match delete_count {
0 => Ok(vec![]),
1 => {
Expand Down

0 comments on commit ed09417

Please sign in to comment.