Skip to content

Commit

Permalink
store: Change how the subgraph_deployment table is organized
Browse files Browse the repository at this point in the history
This change gets rid of the block_range column on subgraph_deployment,
renames id, the column for the deployment's IPFS hash to 'subgraph', and
adds an integer id column whose value will be the same as the id in the
deployment_schemas table in the primary.
  • Loading branch information
lutter committed Mar 29, 2021
1 parent 7406a84 commit af76792
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 46 deletions.
6 changes: 5 additions & 1 deletion node/src/manager/commands/unused_deployments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ pub fn record(store: Arc<SubgraphStore>) -> Result<(), Error> {
let recorded = store.record_unused_deployments()?;

for deployment in store.list_unused_deployments(unused::Filter::New)? {
if recorded.iter().find(|r| r.id == deployment.id).is_some() {
if recorded
.iter()
.find(|r| r.deployment == deployment.id)
.is_some()
{
add_row(&mut list, deployment);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
do $$
begin
raise 'This migration is irreversible';
end
$$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
alter table subgraphs.subgraph_deployment
drop column block_range;
alter table subgraphs.subgraph_deployment
rename column id to deployment;
alter table subgraphs.subgraph_deployment
add column id int;
4 changes: 2 additions & 2 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,8 +1312,8 @@ impl ChainStoreTrait for ChainStore {
from subgraphs.subgraph_deployment d,
subgraphs.subgraph_deployment_assignment a,
deployment_schemas ds
where ds.subgraph = d.id
and a.id = d.id
where ds.subgraph = d.deployment
and a.id = d.deployment
and not d.failed
and ds.network = $2) a;";
let ancestor_count = i32::try_from(ancestor_count)
Expand Down
55 changes: 29 additions & 26 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use stable_hash::crypto::SetHasher;
use std::str::FromStr;
use std::{collections::BTreeSet, convert::TryFrom, ops::Bound};

use crate::block_range::{BLOCK_RANGE_COLUMN, UNVERSIONED_RANGE};
use crate::{
block_range::{BLOCK_RANGE_COLUMN, UNVERSIONED_RANGE},
primary::Site,
};
use graph::constraint_violation;

#[derive(DbEnum, Debug, Clone, Copy)]
Expand All @@ -47,7 +50,8 @@ impl From<SubgraphHealth> for graph::data::subgraph::schema::SubgraphHealth {
table! {
subgraphs.subgraph_deployment (vid) {
vid -> BigInt,
id -> Text,
id -> Integer,
deployment -> Text,
manifest -> Text,
failed -> Bool,
health -> crate::deployment::SubgraphHealthMapping,
Expand All @@ -67,7 +71,6 @@ table! {
reorg_count -> Integer,
current_reorg_depth -> Integer,
max_reorg_depth -> Integer,
block_range -> Range<Integer>,
}
}

Expand Down Expand Up @@ -112,7 +115,7 @@ fn graft(

let graft_query = sd::table
.select((sd::graft_base, sd::graft_block_hash, sd::graft_block_number))
.filter(sd::id.eq(id.as_str()));
.filter(sd::deployment.eq(id.as_str()));
// The name of the base subgraph, the hash, and block number
let graft: (Option<String>, Option<Vec<u8>>, Option<BigDecimal>) = if pending_only {
graft_query
Expand Down Expand Up @@ -219,7 +222,7 @@ pub fn forward_block_ptr(
// Work around a Diesel issue with serializing BigDecimals to numeric
let number = format!("{}::numeric", ptr.number);

update(d::table.filter(d::id.eq(id.as_str())))
update(d::table.filter(d::deployment.eq(id.as_str())))
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
Expand All @@ -240,7 +243,7 @@ pub fn revert_block_ptr(
// Work around a Diesel issue with serializing BigDecimals to numeric
let number = format!("{}::numeric", ptr.number);

update(d::table.filter(d::id.eq(id.as_str())))
update(d::table.filter(d::deployment.eq(id.as_str())))
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
Expand All @@ -260,7 +263,7 @@ pub fn block_ptr(
use subgraph_deployment as d;

let (number, hash) = d::table
.filter(d::id.eq(id.as_str()))
.filter(d::deployment.eq(id.as_str()))
.select((
d::latest_ethereum_block_number,
d::latest_ethereum_block_hash,
Expand Down Expand Up @@ -315,9 +318,9 @@ pub fn state(conn: &PgConnection, id: SubgraphDeploymentId) -> Result<Deployment
use subgraph_deployment as d;

match d::table
.filter(d::id.eq(id.as_str()))
.filter(d::deployment.eq(id.as_str()))
.select((
d::id,
d::deployment,
d::reorg_count,
d::max_reorg_depth,
d::latest_ethereum_block_number,
Expand Down Expand Up @@ -352,7 +355,7 @@ pub fn set_synced(conn: &PgConnection, id: &SubgraphDeploymentId) -> Result<(),

update(
d::table
.filter(d::id.eq(id.as_str()))
.filter(d::deployment.eq(id.as_str()))
.filter(d::synced.eq(false)),
)
.set(d::synced.eq(true))
Expand All @@ -365,7 +368,7 @@ pub fn exists(conn: &PgConnection, id: &str) -> Result<bool, StoreError> {
use subgraph_deployment as d;

let exists = d::table
.filter(d::id.eq(id))
.filter(d::deployment.eq(id))
.count()
.get_result::<i64>(conn)?
> 0;
Expand All @@ -377,7 +380,7 @@ pub fn exists_and_synced(conn: &PgConnection, id: &str) -> Result<bool, StoreErr
use subgraph_deployment as d;

let synced = d::table
.filter(d::id.eq(id))
.filter(d::deployment.eq(id))
.select(d::synced)
.first(conn)
.optional()?
Expand Down Expand Up @@ -430,7 +433,7 @@ pub fn fail(
use subgraph_deployment as d;

let error_id = insert_subgraph_error(conn, error)?;
update(d::table.filter(d::id.eq(id.as_str())))
update(d::table.filter(d::deployment.eq(id.as_str())))
.set((
d::failed.eq(true),
d::health.eq(SubgraphHealth::Failed),
Expand All @@ -452,7 +455,7 @@ pub(crate) fn has_non_fatal_errors(
let block = match block {
Some(block) => d::table.select(sql(&block.to_string())).into_boxed(),
None => d::table
.filter(d::id.eq(id.as_str()))
.filter(d::deployment.eq(id.as_str()))
.select(d::latest_ethereum_block_number)
.into_boxed(),
};
Expand Down Expand Up @@ -485,7 +488,7 @@ pub fn unfail(conn: &PgConnection, id: &SubgraphDeploymentId) -> Result<(), Stor
};

let fatal_error_id = match d::table
.filter(d::id.eq(id.as_str()))
.filter(d::deployment.eq(id.as_str()))
.select(d::fatal_error)
.get_result::<Option<String>>(conn)?
{
Expand All @@ -496,7 +499,7 @@ pub fn unfail(conn: &PgConnection, id: &SubgraphDeploymentId) -> Result<(), Stor
};

// Unfail the deployment.
update(d::table.filter(d::id.eq(id.as_str())))
update(d::table.filter(d::deployment.eq(id.as_str())))
.set((
d::failed.eq(false),
d::health.eq(prev_health),
Expand Down Expand Up @@ -555,7 +558,7 @@ fn check_health(

update(
d::table
.filter(d::id.eq(id.as_str()))
.filter(d::deployment.eq(id.as_str()))
.filter(d::health.eq(old)),
)
.set(d::health.eq(new))
Expand Down Expand Up @@ -603,7 +606,7 @@ pub fn drop_metadata(conn: &PgConnection, id: &SubgraphDeploymentId) -> Result<(

// We don't need to delete from subgraph_error since that cascades from
// deleting the subgraph_deployment
let manifest: String = delete(d::table.filter(d::id.eq(id.as_str())))
let manifest: String = delete(d::table.filter(d::deployment.eq(id.as_str())))
.returning(d::manifest)
.get_result(conn)?;
delete(m::table.filter(m::id.eq(manifest))).execute(conn)?;
Expand All @@ -612,7 +615,7 @@ pub fn drop_metadata(conn: &PgConnection, id: &SubgraphDeploymentId) -> Result<(

pub fn create_deployment(
conn: &PgConnection,
id: &SubgraphDeploymentId,
site: &Site,
deployment: SubgraphDeploymentEntity,
exists: bool,
replace: bool,
Expand Down Expand Up @@ -654,10 +657,11 @@ pub fn create_deployment(
max_reorg_depth: _,
} = deployment;

let manifest_id = SubgraphManifestEntity::id(id);
let manifest_id = SubgraphManifestEntity::id(&site.deployment);

let deployment_values = (
d::id.eq(id.as_str()),
d::id.eq(site.id),
d::deployment.eq(site.deployment.as_str()),
d::manifest.eq(&manifest_id),
d::failed.eq(failed),
d::synced.eq(synced),
Expand All @@ -672,7 +676,6 @@ pub fn create_deployment(
d::graft_base.eq(graft_base.as_ref().map(|s| s.as_str())),
d::graft_block_hash.eq(b(&graft_block)),
d::graft_block_number.eq(n(&graft_block)),
d::block_range.eq(UNVERSIONED_RANGE),
);

let manifest_values = (
Expand All @@ -686,7 +689,7 @@ pub fn create_deployment(
);

if exists && replace {
update(d::table.filter(d::id.eq(id.as_str())))
update(d::table.filter(d::deployment.eq(site.deployment.as_str())))
.set(deployment_values)
.execute(conn)?;

Expand All @@ -707,7 +710,7 @@ pub fn create_deployment(

pub fn update_entity_count(
conn: &PgConnection,
id: &SubgraphDeploymentId,
site: &Site,
full_count_query: &str,
count: i32,
) -> Result<(), StoreError> {
Expand Down Expand Up @@ -735,13 +738,13 @@ pub fn update_entity_count(
set entity_count =
coalesce((nullif(entity_count, -1)) + $1,
({full_count_query}))
where id = $2
where deployment = $2
",
full_count_query = full_count_query
);
Ok(diesel::sql_query(query)
.bind::<Integer, _>(count)
.bind::<Text, _>(id.as_str())
.bind::<Text, _>(site.deployment.as_str())
.execute(conn)
.map(|_| ())?)
}
6 changes: 3 additions & 3 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl DeploymentStore {
if replace || !exists {
deployment::create_deployment(
&conn,
&site.deployment,
&site,
deployment,
exists,
replace,
Expand Down Expand Up @@ -865,7 +865,7 @@ impl DeploymentStore {
)?;
deployment::update_entity_count(
&conn,
&site.deployment,
site.as_ref(),
layout.count_query.as_str(),
count,
)?;
Expand Down Expand Up @@ -944,7 +944,7 @@ impl DeploymentStore {

deployment::update_entity_count(
&conn,
&site.deployment,
site.as_ref(),
layout.count_query.as_str(),
count,
)?;
Expand Down
26 changes: 14 additions & 12 deletions store/postgres/src/detail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Bytes = Vec<u8>;
#[allow(dead_code)]
pub struct DeploymentDetail {
vid: i64,
pub id: String,
id: i32,
pub deployment: String,
manifest: String,
pub failed: bool,
health: HealthType,
Expand All @@ -46,7 +47,6 @@ pub struct DeploymentDetail {
reorg_count: i32,
current_reorg_depth: i32,
max_reorg_depth: i32,
block_range: (Bound<i32>, Bound<i32>),
}

#[derive(Queryable, QueryableByName)]
Expand Down Expand Up @@ -143,8 +143,7 @@ impl<'a> TryFrom<DetailAndError<'a>> for status::Info {
let DetailAndError(detail, error, sites) = detail_and_error;

let DeploymentDetail {
vid: _,
id,
deployment,
manifest: _,
failed: _,
health,
Expand All @@ -164,20 +163,20 @@ impl<'a> TryFrom<DetailAndError<'a>> for status::Info {

let site = sites
.iter()
.find(|site| site.deployment.as_str() == &id)
.ok_or_else(|| constraint_violation!("missing site for subgraph `{}`", id))?;
.find(|site| site.deployment.as_str() == &deployment)
.ok_or_else(|| constraint_violation!("missing site for subgraph `{}`", deployment))?;

// This needs to be filled in later since it lives in a
// different shard
let chain_head_block = None;
let earliest_block = block(
&id,
&deployment,
"earliest_ethereum_block",
earliest_ethereum_block_hash,
earliest_ethereum_block_number,
)?;
let latest_block = block(
&id,
&deployment,
"latest_ethereum_block",
latest_ethereum_block_hash,
latest_ethereum_block_number,
Expand All @@ -190,12 +189,15 @@ impl<'a> TryFrom<DetailAndError<'a>> for status::Info {
latest_block,
};
let entity_count = entity_count.to_u64().ok_or_else(|| {
constraint_violation!("the entityCount for {} is not representable as a u64", id)
constraint_violation!(
"the entityCount for {} is not representable as a u64",
deployment
)
})?;
let fatal_error = error.map(|e| SubgraphError::try_from(e)).transpose()?;
// 'node' needs to be filled in later from a different shard
Ok(status::Info {
subgraph: id,
subgraph: deployment,
synced,
health,
fatal_error,
Expand All @@ -219,7 +221,7 @@ pub(crate) fn deployment_details(
d::table.load::<DeploymentDetail>(conn)?
} else {
d::table
.filter(d::id.eq_any(&deployments))
.filter(d::deployment.eq_any(&deployments))
.load::<DeploymentDetail>(conn)?
};
Ok(details)
Expand Down Expand Up @@ -248,7 +250,7 @@ pub(crate) fn deployment_statuses(

d::table
.left_outer_join(e::table.on(d::fatal_error.eq(e::id.nullable())))
.filter(d::id.eq_any(&ids))
.filter(d::deployment.eq_any(&ids))
.load::<(DeploymentDetail, Option<ErrorDetail>)>(conn)?
.into_iter()
.map(|(detail, error)| status::Info::try_from(DetailAndError(detail, error, sites)))
Expand Down
4 changes: 2 additions & 2 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ impl<'a> Connection<'a> {

for detail in details {
let (latest_hash, latest_number) = block(
&detail.id,
&detail.deployment,
"latest_ethereum_block",
detail.latest_ethereum_block_hash.clone(),
detail.latest_ethereum_block_number.clone(),
Expand All @@ -988,7 +988,7 @@ impl<'a> Connection<'a> {
.unwrap_or((None, None));
let entity_count = detail.entity_count.to_u64().unwrap_or(0) as i32;

update(u::table.filter(u::id.eq(&detail.id)))
update(u::table.filter(u::id.eq(&detail.deployment)))
.set((
u::entity_count.eq(entity_count),
u::latest_ethereum_block_hash.eq(latest_hash),
Expand Down

0 comments on commit af76792

Please sign in to comment.