Skip to content

Commit

Permalink
store: Tighten up how WritableStore uses SubgraphStore
Browse files Browse the repository at this point in the history
The SubgraphStore gives access to all instances of a deployment, but for
the WritableStore we want to be very careful that we do not accidentally
query or modify another deployment instance than the one in the site. These
code changes will hopefully make it more obvious when we would rely on
SubgraphStore functionality that is dependent on the precise deployment
instance.
  • Loading branch information
lutter committed Apr 6, 2021
1 parent a872523 commit e0a57a7
Showing 1 changed file with 55 additions and 51 deletions.
106 changes: 55 additions & 51 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,21 +939,15 @@ impl SubgraphStoreTrait for SubgraphStore {
deployment: &DeploymentLocator,
) -> Result<Arc<dyn store::WritableStore>, StoreError> {
let site = self.find_site(deployment.id.into())?;
Ok(Arc::new(WritableStore {
store: self.clone(),
site,
}))
Ok(Arc::new(WritableStore::new(self.clone(), site)?))
}

fn writable_for_network_indexer(
&self,
id: &SubgraphDeploymentId,
) -> Result<Arc<dyn WritableStoreTrait>, StoreError> {
let site = self.site(id)?;
Ok(Arc::new(WritableStore {
store: self.clone(),
site,
}))
Ok(Arc::new(WritableStore::new(self.clone(), site)?))
}

fn is_deployed(&self, id: &SubgraphDeploymentId) -> Result<bool, Error> {
Expand Down Expand Up @@ -983,78 +977,93 @@ impl SubgraphStoreTrait for SubgraphStore {
}
}

struct WritableStore {
store: SubgraphStore,
site: Arc<Site>,
}
/// A wrapper around `SubgraphStore` that only exposes functions that are
/// safe to call from `WritableStore`, i.e., functions that either do not
/// deal with anything that depends on a specific deployment
/// location/instance, or where the result is independent of the deployment
/// instance
struct WritableSubgraphStore(SubgraphStore);

impl std::ops::Deref for WritableStore {
type Target = SubgraphStore;
impl WritableSubgraphStore {
fn primary_conn(&self) -> Result<primary::Connection, StoreError> {
self.0.primary_conn()
}

fn deref(&self) -> &Self::Target {
&self.store
pub(crate) fn send_store_event(&self, event: &StoreEvent) -> Result<(), StoreError> {
self.0.send_store_event(event)
}

fn layout(&self, id: &SubgraphDeploymentId) -> Result<Arc<Layout>, StoreError> {
self.0.layout(id)
}
}

struct WritableStore {
store: WritableSubgraphStore,
writable: Arc<DeploymentStore>,
site: Arc<Site>,
}

impl WritableStore {
fn writable(&self) -> Result<&Arc<DeploymentStore>, StoreError> {
self.store.for_site(self.site.as_ref())
fn new(subgraph_store: SubgraphStore, site: Arc<Site>) -> Result<Self, StoreError> {
let store = WritableSubgraphStore(subgraph_store.clone());
let writable = subgraph_store.for_site(site.as_ref())?.clone();
Ok(Self {
store,
writable,
site,
})
}
}

#[async_trait::async_trait]
impl WritableStoreTrait for WritableStore {
fn block_ptr(&self) -> Result<Option<EthereumBlockPointer>, Error> {
let store = self.writable()?;
store.block_ptr(self.site.as_ref())
self.writable.block_ptr(self.site.as_ref())
}

fn start_subgraph_deployment(&self, logger: &Logger) -> Result<(), StoreError> {
let store = self.writable()?;
let store = &self.writable;

let graft_base = match store.graft_pending(&self.site.deployment)? {
Some((base_id, base_ptr)) => {
let src = self.layout(&base_id)?;
let src = self.store.layout(&base_id)?;
Some((src, base_ptr))
}
None => None,
};
store.start_subgraph(logger, self.site.clone(), graft_base)?;
self.primary_conn()?.copy_finished(self.site.as_ref())
self.store.primary_conn()?.copy_finished(self.site.as_ref())
}

fn revert_block_operations(
&self,
block_ptr_to: EthereumBlockPointer,
) -> Result<(), StoreError> {
let store = self.writable()?;
let event = store.revert_block_operations(self.site.clone(), block_ptr_to)?;
self.send_store_event(&event)
let event = self
.writable
.revert_block_operations(self.site.clone(), block_ptr_to)?;
self.store.send_store_event(&event)
}

fn unfail(&self) -> Result<(), StoreError> {
let store = self.writable()?;
store.unfail(self.site.clone())
self.writable.unfail(self.site.clone())
}

async fn fail_subgraph(&self, error: SubgraphError) -> Result<(), StoreError> {
let store = self.writable()?;
store
self.writable
.fail_subgraph(self.site.deployment.clone(), error)
.await
}

fn supports_proof_of_indexing<'a>(self: Arc<Self>) -> DynTryFuture<'a, bool> {
let store = match self.writable() {
Ok(store) => store,
Err(e) => return Box::pin(std::future::ready(Err(e.into()))),
};
store.clone().supports_proof_of_indexing(self.site.clone())
self.writable
.clone()
.supports_proof_of_indexing(self.site.clone())
}

fn get(&self, key: EntityKey) -> Result<Option<Entity>, QueryExecutionError> {
let (store, site) = self.store(&key.subgraph_id)?;
store.get(site, key)
self.writable.get(self.site.clone(), key)
}

fn transact_block_operations(
Expand All @@ -1069,42 +1078,38 @@ impl WritableStoreTrait for WritableStore {
same_subgraph(&mods, &self.site.deployment),
"can only transact operations within one shard"
);
let store = self.writable()?;
let event = store.transact_block_operations(
let event = self.writable.transact_block_operations(
self.site.clone(),
block_ptr_to,
mods,
stopwatch,
data_sources,
deterministic_errors,
)?;
self.send_store_event(&event)
self.store.send_store_event(&event)
}

fn get_many(
&self,
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
let store = self.writable()?;
store.get_many(self.site.clone(), ids_for_type)
self.writable.get_many(self.site.clone(), ids_for_type)
}

fn is_deployment_synced(&self) -> Result<bool, Error> {
let store = self.writable()?;
Ok(store.exists_and_synced(&self.site.deployment)?)
Ok(self.writable.exists_and_synced(&self.site.deployment)?)
}

fn unassign_subgraph(&self) -> Result<(), StoreError> {
let pconn = self.primary_conn()?;
let pconn = self.store.primary_conn()?;
pconn.transaction(|| -> Result<_, StoreError> {
let changes = pconn.unassign_subgraph(self.site.as_ref())?;
pconn.send_store_event(&StoreEvent::new(changes))
})
}

async fn load_dynamic_data_sources(&self) -> Result<Vec<StoredDynamicDataSource>, StoreError> {
let store = self.writable()?;
store
self.writable
.load_dynamic_data_sources(self.site.deployment.clone())
.await
}
Expand All @@ -1114,17 +1119,16 @@ impl WritableStoreTrait for WritableStore {
// Make sure we drop `pconn` before we call into the deployment
// store so that we do not hold two database connections which
// might come from the same pool and could therefore deadlock
let pconn = self.primary_conn()?;
let pconn = self.store.primary_conn()?;
pconn.transaction(|| -> Result<_, Error> {
let changes = pconn.promote_deployment(&self.site.deployment)?;
Ok(StoreEvent::new(changes))
})?
};

let dstore = self.writable()?;
dstore.deployment_synced(&self.site.deployment)?;
self.writable.deployment_synced(&self.site.deployment)?;

Ok(self.primary_conn()?.send_store_event(&event)?)
Ok(self.store.send_store_event(&event)?)
}
}

Expand Down

0 comments on commit e0a57a7

Please sign in to comment.