Skip to content

Commit

Permalink
store: Make sure we only copy the data that we set out to copy
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Apr 7, 2021
1 parent 7c55817 commit 8479239
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
25 changes: 12 additions & 13 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,14 @@ impl TableState {
}

let target_vid = sql_query(&format!(
"select coalesce(max(vid), 0) as max_vid from {} where lower(block_range) <= $1",
"select coalesce(max(vid), -1) as max_vid from {} where lower(block_range) <= $1",
src.qualified_name.as_str()
))
.bind::<Integer, _>(&target_block.number)
.load::<MaxVid>(conn)?
.first()
.map(|v| v.max_vid)
.unwrap_or(0);
.unwrap_or(-1);

Ok(Self {
dst_site,
Expand All @@ -284,7 +284,7 @@ impl TableState {
}

fn finished(&self) -> bool {
self.next_vid >= self.target_vid
self.next_vid > self.target_vid
}

fn load(
Expand Down Expand Up @@ -396,18 +396,16 @@ impl TableState {
fn copy_batch(&mut self, conn: &PgConnection) -> Result<(), StoreError> {
let start = Instant::now();

rq::CopyEntityBatchQuery::new(
self.dst.as_ref(),
&self.src,
self.next_vid,
self.next_vid + self.batch_size,
)?
.execute(conn)?;
// Copy all versions with next_vid <= vid <= next_vid + batch_size - 1,
// but do not go over target_vid
let last_vid = (self.next_vid + self.batch_size - 1).min(self.target_vid);
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, self.next_vid, last_vid)?
.execute(conn)?;

let duration = start.elapsed();

// remember how far we got
self.next_vid += self.batch_size;
self.next_vid = last_vid + 1;

// adjust batch size by trying to extrapolate in such a way that we
// get close to TARGET_DURATION for the time it takes to copy one
Expand Down Expand Up @@ -517,8 +515,9 @@ impl Connection {

/// Copy the data for the subgraph `src` to the subgraph `dst`. The
/// schema for both subgraphs must have already been set up. The
/// `target_block` must by far enough behind the chain head so that the
/// block is guaranteed to not be subject to chain reorgs.
/// `target_block` must be far enough behind the chain head so that the
/// block is guaranteed to not be subject to chain reorgs. All data up
/// to and including `target_block` will be copied.
///
/// The copy logic makes heavy use of the fact that the `vid` and
/// `block_range` of entity versions are related since for two entity
Expand Down
4 changes: 2 additions & 2 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2755,7 +2755,7 @@ impl<'a> LoadQuery<PgConnection, ReturnedEntityData> for DeleteByPrefixQuery<'a>
impl<'a, Conn> RunQueryDsl<Conn> for DeleteByPrefixQuery<'a> {}

/// Copy the data of one table to another table. All rows whose `vid` is in
/// the range `[first_vid, last_vid)` will be copied
/// the range `[first_vid, last_vid]` will be copied
#[derive(Debug, Clone)]
pub struct CopyEntityBatchQuery<'a> {
src: &'a Table,
Expand Down Expand Up @@ -2834,7 +2834,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(self.src.qualified_name.as_str());
out.push_sql(" where vid >= ");
out.push_bind_param::<BigInt, _>(&self.first_vid)?;
out.push_sql(" and vid < ");
out.push_sql(" and vid <= ");
out.push_bind_param::<BigInt, _>(&self.last_vid)?;
Ok(())
}
Expand Down

0 comments on commit 8479239

Please sign in to comment.