Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy subgraph data in batches when grafting #2293

Merged
merged 11 commits into from
Apr 21, 2021
Merged
15 changes: 15 additions & 0 deletions graph/src/components/ethereum/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,12 @@ impl Display for BlockHash {
}
}

impl From<Vec<u8>> for BlockHash {
fn from(bytes: Vec<u8>) -> Self {
BlockHash(bytes.as_slice().into())
}
}

/// A block hash and block number from a specific Ethereum block.
///
/// Block numbers are signed 32 bit integers
Expand Down Expand Up @@ -550,6 +556,15 @@ impl<'a> From<&'a EthereumBlock> for EthereumBlockPointer {
}
}

impl From<(Vec<u8>, i32)> for EthereumBlockPointer {
fn from((bytes, number): (Vec<u8>, i32)) -> Self {
EthereumBlockPointer {
hash: BlockHash::from(bytes),
number,
}
}
}

impl From<(H256, i32)> for EthereumBlockPointer {
fn from((hash, number): (H256, i32)) -> EthereumBlockPointer {
EthereumBlockPointer {
Expand Down
2 changes: 2 additions & 0 deletions graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,8 @@ pub enum StoreError {
UnknownShard(String),
#[error("Fulltext search not yet deterministic")]
FulltextSearchNonDeterministic,
#[error("operation was canceled")]
Canceled,
}

// Convenience to report a constraint violation
Expand Down
10 changes: 10 additions & 0 deletions graph/src/ext/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,13 @@ impl From<anyhow::Error> for CancelableError<anyhow::Error> {
Self::Error(e)
}
}

impl From<CancelableError<StoreError>> for StoreError {
fn from(err: CancelableError<StoreError>) -> StoreError {
use CancelableError::*;
match err {
Cancel => StoreError::Canceled,
Error(e) => e,
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
drop table subgraphs.copy_table_state;
drop table subgraphs.copy_state;
drop table active_copies;
37 changes: 37 additions & 0 deletions store/postgres/migrations/2021-03-12-070453_add_copy_state/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- This is populated in the primary
create table active_copies(
src int not null references deployment_schemas(id),
dst int primary key
references deployment_schemas(id) on delete cascade,
queued_at timestamptz not null,
cancelled_at timestamptz,
unique(src, dst)
);

-- These two tables live in each shard; entries are in the same
-- shard as the subgraph_deployment we are copying to ('dst')
create table subgraphs.copy_state(
src int not null,
dst int primary key
references subgraphs.subgraph_deployment(id)
on delete cascade,
target_block_hash bytea not null,
target_block_number int not null,
started_at timestamptz not null default now(),
finished_at timestamptz,
cancelled_at timestamptz
);

create table subgraphs.copy_table_state(
id serial primary key,
entity_type text not null,
dst int not null
references subgraphs.copy_state(dst) on delete cascade,
next_vid int8 not null,
target_vid int8 not null,
batch_size int8 not null,
started_at timestamptz not null default now(),
finished_at timestamptz,
duration_ms int8 not null default 0,
unique(dst, entity_type)
);
55 changes: 52 additions & 3 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
use diesel::prelude::RunQueryDsl;
use diesel::sql_types::Text;
use diesel::{connection::SimpleConnection, prelude::RunQueryDsl, select};
use diesel::{pg::PgConnection, sql_query};
use diesel::{sql_types::Text, ExpressionMethods, QueryDsl};
use std::collections::{HashMap, HashSet};

use graph::{data::subgraph::schema::POI_TABLE, prelude::StoreError};

use crate::{primary::Namespace, relational::SqlName};
use crate::{
primary::{Namespace, Site},
relational::SqlName,
};

// This is a view not a table. We only read from it
table! {
information_schema.foreign_tables(foreign_table_schema, foreign_table_name) {
foreign_table_catalog -> Text,
foreign_table_schema -> Text,
foreign_table_name -> Text,
foreign_server_catalog -> Text,
foreign_server_name -> Text,
}
}

// Readonly; we only access the name
table! {
pg_namespace(nspname) {
nspname -> Text,
}
}

/// Information about what tables and columns we have in the database
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -103,3 +124,31 @@ pub fn current_servers(conn: &PgConnection) -> Result<Vec<String>, StoreError> {
.map(|srv| srv.srvname)
.collect())
}

pub fn has_namespace(conn: &PgConnection, namespace: &Namespace) -> Result<bool, StoreError> {
use pg_namespace as nsp;

Ok(select(diesel::dsl::exists(
nsp::table.filter(nsp::nspname.eq(namespace.as_str())),
))
.get_result::<bool>(conn)?)
}

/// Drop the schema for `src` if it is a foreign schema imported from
/// another database. If the schema does not exist, or is not a foreign
/// schema, do nothing. This crucially depends on the fact that we never mix
/// foreign and local tables in the same schema.
pub fn drop_foreign_schema(conn: &PgConnection, src: &Site) -> Result<(), StoreError> {
use foreign_tables as ft;

let is_foreign = select(diesel::dsl::exists(
ft::table.filter(ft::foreign_table_schema.eq(src.namespace.as_str())),
))
.get_result::<bool>(conn)?;

if is_foreign {
let query = format!("drop schema if exists {} cascade", src.namespace);
conn.batch_execute(&query)?;
}
Ok(())
}
43 changes: 39 additions & 4 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,18 @@ pub struct ForeignServer {
impl ForeignServer {
const PRIMARY_PUBLIC: &'static str = "primary_public";

fn name(shard: &Shard) -> String {
/// The name of the foreign server under which data for `shard` is
/// accessible
pub fn name(shard: &Shard) -> String {
format!("shard_{}", shard.as_str())
}

/// The name of the schema under which the `subgraphs` schema for `shard`
/// is accessible in shards that are not `shard`
pub fn metadata_schema(shard: &Shard) -> String {
format!("{}_subgraphs", Self::name(shard))
}

fn new(shard: Shard, postgres_url: &str) -> Result<Self, anyhow::Error> {
let config: Config = match postgres_url.parse() {
Ok(config) => config,
Expand Down Expand Up @@ -145,7 +153,7 @@ impl ForeignServer {
} else {
format!(
"import foreign schema public
limit to (deployment_schemas, chains) \
limit to (deployment_schemas, chains, active_copies) \
from server {shard} into {nsp};",
shard = Self::name(&*PRIMARY_SHARD),
nsp = Self::PRIMARY_PUBLIC
Expand All @@ -154,6 +162,19 @@ impl ForeignServer {
conn.batch_execute(&query)?;
Ok(())
}

/// Map the `subgraphs` schema from the foreign server `self` into the
/// database accessible through `conn`
fn map_metadata(&self, conn: &PgConnection) -> Result<(), StoreError> {
let query = format!(
"drop schema if exists {nsp} cascade;\
create schema {nsp};
import foreign schema subgraphs from server {srvname} into {nsp};",
nsp = Self::metadata_schema(&self.shard),
srvname = self.name
);
Ok(conn.batch_execute(&query)?)
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -463,9 +484,10 @@ impl ConnectionPool {
.get_result::<LockResult>(conn)
.expect("we can try to get advisory locks 1 and 2");
let result = if lock.migrate {
pool.configure_fdw(servers)
pool.configure_fdw(servers.as_ref())
.and_then(|_| migrate_schema(&pool.logger, conn))
.and_then(|_| pool.map_primary())
.and_then(|_| pool.map_metadata(servers.as_ref()))
} else {
Ok(())
};
Expand All @@ -479,7 +501,7 @@ impl ConnectionPool {
.expect("migrations are never canceled");
}

fn configure_fdw(&self, servers: Arc<Vec<ForeignServer>>) -> Result<(), StoreError> {
fn configure_fdw(&self, servers: &Vec<ForeignServer>) -> Result<(), StoreError> {
info!(&self.logger, "Setting up fdw");
let conn = self.get()?;
conn.transaction(|| {
Expand All @@ -505,6 +527,19 @@ impl ConnectionPool {
let conn = self.get()?;
conn.transaction(|| ForeignServer::map_primary(&conn, &self.shard))
}

// Map the `subgraphs` metadata schema from foreign servers to
// ourselves. The mapping is recreated on every server start so that we
// pick up possible schema changes in the mappings
fn map_metadata(&self, servers: &Vec<ForeignServer>) -> Result<(), StoreError> {
let conn = self.get()?;
conn.transaction(|| {
for server in servers.iter().filter(|server| server.shard != self.shard) {
server.map_metadata(&conn)?;
}
Ok(())
})
}
}

embed_migrations!("./migrations");
Expand Down
Loading