Skip to content

Commit

Permalink
store: Copy only necessary DDS so we don't need to revert them
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Apr 20, 2021
1 parent 1ac003d commit 674efa6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
15 changes: 8 additions & 7 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ impl DeploymentStore {
dst.catalog.namespace
);

// 1. Copy subgraph data
// Copy subgraph data
// We allow both not copying tables at all from the source, as well
// as adding new tables in `self`; we only need to check that tables
// that actually need to be copied from the source are compatible
Expand All @@ -1095,20 +1095,21 @@ impl DeploymentStore {

let conn = self.get_conn()?;
conn.transaction(|| -> Result<(), StoreError> {
// 2. Copy dynamic data sources and adjust their ID
let count = dynds::copy(&conn, &src.site, &dst.site)?;
// Copy dynamic data sources and adjust their ID
let count = dynds::copy(&conn, &src.site, &dst.site, &block)?;
info!(logger, "Copied {} dynamic data sources", count;
"time_ms" => start.elapsed().as_millis());

// 3. Rewind the subgraph. `revert_block` gets rid of everything
// including the block passed to it. We want to preserve `block`
// and therefore revert `block+1`
// Rewind the subgraph so that entity versions that are
// clamped in the future (beyond `block`) become valid for
// all blocks after `block`. `revert_block` gets rid of
// everything including the block passed to it. We want to
// preserve `block` and therefore revert `block+1`
let start = Instant::now();
let block_to_revert: BlockNumber = (block.number + 1)
.try_into()
.expect("block numbers fit into an i32");
dst.revert_block(&conn, &dst.site.deployment, block_to_revert)?;
Layout::revert_metadata(&conn, &dst.site.deployment, block_to_revert)?;
info!(logger, "Rewound subgraph to block {}", block.number;
"time_ms" => start.elapsed().as_millis());

Expand Down
15 changes: 12 additions & 3 deletions store/postgres/src/dynds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use diesel::{
dsl::sql,
prelude::{ExpressionMethods, QueryDsl, RunQueryDsl},
sql_query,
sql_types::Text,
sql_types::{Integer, Text},
};
use diesel::{insert_into, pg::PgConnection};

Expand Down Expand Up @@ -160,7 +160,14 @@ pub(crate) fn insert(
.map_err(|e| e.into())
}

pub(crate) fn copy(conn: &PgConnection, src: &Site, dst: &Site) -> Result<usize, StoreError> {
/// Copy the dynamic data sources for `src` to `dst`. All data sources that
/// were created up to and including `target_block` will be copied.
pub(crate) fn copy(
conn: &PgConnection,
src: &Site,
dst: &Site,
target_block: &EthereumBlockPointer,
) -> Result<usize, StoreError> {
let src_nsp = if src.shard == dst.shard {
"subgraphs".to_string()
} else {
Expand All @@ -176,13 +183,15 @@ pub(crate) fn copy(conn: &PgConnection, src: &Site, dst: &Site) -> Result<usize,
e.ethereum_block_hash, e.ethereum_block_number, $2 as deployment,
e.context
from {src_nsp}.dynamic_ethereum_contract_data_source e
where e.deployment = $1",
where e.deployment = $1
and e.ethereum_block_number <= $3",
src_nsp = src_nsp
);

Ok(sql_query(&query)
.bind::<Text, _>(src.deployment.as_str())
.bind::<Text, _>(dst.deployment.as_str())
.bind::<Integer, _>(target_block.number)
.execute(conn)?)
}

Expand Down

0 comments on commit 674efa6

Please sign in to comment.