Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add graphman rewind #2373

Merged
merged 3 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ pub enum Command {
/// The id of the deployment to unassign
id: String,
},
Rewind {
id: String,
block_hash: String,
block_number: i32,
},
/// Check and interrogate the configuration
///
/// Print information about a configuration file without
Expand Down Expand Up @@ -321,6 +326,14 @@ async fn main() {
let store = make_store();
commands::assign::reassign(store, id, node)
}
Rewind {
id,
block_hash,
block_number,
} => {
let store = make_store();
commands::rewind::run(store, id, block_hash, block_number)
}
Listen(cmd) => {
use ListenCommand::*;
match cmd {
Expand Down
1 change: 1 addition & 0 deletions node/src/manager/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod config;
pub mod info;
pub mod listen;
pub mod remove;
pub mod rewind;
pub mod txn_speed;
pub mod unused_deployments;
20 changes: 20 additions & 0 deletions node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::convert::TryFrom;
use std::sync::Arc;

use graph::prelude::{anyhow, BlockNumber, EthereumBlockPointer, SubgraphDeploymentId};
use graph_store_postgres::SubgraphStore;

pub fn run(
store: Arc<SubgraphStore>,
id: String,
block_hash: String,
block_number: BlockNumber,
) -> Result<(), anyhow::Error> {
let id =
SubgraphDeploymentId::new(id).map_err(|id| anyhow!("illegal deployment id `{}`", id))?;
let block_ptr_to = EthereumBlockPointer::try_from((block_hash.as_str(), block_number as i64))
.map_err(|e| anyhow!("error converting to block pointer: {}", e))?;

store.rewind(id, block_ptr_to)?;
Ok(())
}
46 changes: 33 additions & 13 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,22 +901,13 @@ impl DeploymentStore {
Ok(event)
}

pub(crate) fn revert_block_operations(
fn rewind_with_conn(
&self,
conn: &PgConnection,
site: Arc<Site>,
block_ptr_to: EthereumBlockPointer,
) -> Result<StoreEvent, StoreError> {
let conn = self.get_conn()?;

let event = conn.transaction(|| -> Result<_, StoreError> {
// Unwrap: If we are reverting then the block ptr is not `None`.
let block_ptr_from = Self::block_ptr_with_conn(&site.deployment, &conn)?.unwrap();

// Sanity check on block numbers
if block_ptr_from.number != block_ptr_to.number + 1 {
panic!("revert_block_operations must revert a single block only");
}

// Don't revert past a graft point
let info = self.subgraph_info_with_conn(&conn, &site.deployment)?;
if let Some(graft_block) = info.graft_block {
Expand All @@ -933,17 +924,19 @@ impl DeploymentStore {
}
}

deployment::revert_block_ptr(&conn, &site.deployment, block_ptr_to)?;
deployment::revert_block_ptr(&conn, &site.deployment, block_ptr_to.clone())?;

// Revert the data
let layout = self.layout(&conn, site.clone())?;

// At 1 block per 15 seconds, the maximum i32
// value affords just over 1020 years of blocks.
let block = block_ptr_from
let block: BlockNumber = block_ptr_to
.number
.try_into()
.expect("block numbers fit into an i32");
// The revert functions want the number of the first block that we need to get rid of
let block = block + 1;

let (event, count) = layout.revert_block(&conn, &site.deployment, block)?;

Expand All @@ -966,6 +959,33 @@ impl DeploymentStore {
Ok(event)
}

pub(crate) fn rewind(
&self,
site: Arc<Site>,
block_ptr_to: EthereumBlockPointer,
) -> Result<StoreEvent, StoreError> {
let conn = self.get_conn()?;

self.rewind_with_conn(&conn, site, block_ptr_to)
}

pub(crate) fn revert_block_operations(
&self,
site: Arc<Site>,
block_ptr_to: EthereumBlockPointer,
) -> Result<StoreEvent, StoreError> {
let conn = self.get_conn()?;
// Unwrap: If we are reverting then the block ptr is not `None`.
let block_ptr_from = Self::block_ptr_with_conn(&site.deployment, &conn)?.unwrap();

// Sanity check on block numbers
if block_ptr_from.number != block_ptr_to.number + 1 {
panic!("revert_block_operations must revert a single block only");
}
Comment on lines +981 to +984
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just here because we revert block by block usually? It's not here because reverting many blocks at once breaks things?

Copy link
Collaborator Author

@lutter lutter Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably used to be that we had to go block-by-block; but for quite some time now, it's been safe to jump the block pointer back (assuming you know that you are jumping to the correct block pointer, which this PR slyly puts on the user) All the reversion queries just look at things that happened at/after the given block.

I left that code in since I didn't want to muck with the 'normal' reversion code path.


self.rewind_with_conn(&conn, site, block_ptr_to)
}

pub(crate) async fn deployment_state_from_id(
&self,
id: SubgraphDeploymentId,
Expand Down
10 changes: 10 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,16 @@ impl SubgraphStore {
pub(crate) async fn vacuum(&self) -> Vec<Result<(), StoreError>> {
join_all(self.stores.values().map(|store| store.vacuum())).await
}

pub fn rewind(
&self,
id: SubgraphDeploymentId,
block_ptr_to: EthereumBlockPointer,
) -> Result<(), StoreError> {
let (store, site) = self.store(&id)?;
let event = store.rewind(site, block_ptr_to)?;
self.send_store_event(&event)
}
}

#[async_trait::async_trait]
Expand Down