Skip to content

Commit

Permalink
core: Run all of starting a subgraph on the blocking pool
Browse files Browse the repository at this point in the history
Starting a subgraph can be very slow if we have to copy the subgraph as
part of starting it. Also, call `Writable.start_subgraph_deployment` early
on so that dynamic data sources are in place when we look for them.
  • Loading branch information
lutter committed Apr 7, 2021
1 parent 855afae commit 74cb23c
Showing 1 changed file with 49 additions and 57 deletions.
106 changes: 49 additions & 57 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,29 +211,34 @@ where
) {
let logger = self.logger_factory.subgraph_logger(&loc.hash);

match Self::start_subgraph_inner(
logger.clone(),
self.instances.clone(),
self.host_builder.clone(),
self.block_stream_builder.clone(),
self.subgraph_store.clone(),
self.block_store.cheap_clone(),
self.eth_networks.clone(),
loc,
manifest,
self.metrics_registry.cheap_clone(),
self.link_resolver.cheap_clone(),
)
.await
{
Ok(()) => self.manager_metrics.subgraph_count.inc(),
Err(err) => error!(
logger,
"Failed to start subgraph";
"error" => format!("{}", err),
"code" => LogCode::SubgraphStartFailure
),
}
// This task has many calls to the store, so mark it as `blocking`.
// This call is the reason why the size of the blocking thread pool
// size must always be well above the number of deployed subgraphs.
graph::spawn_blocking(async move {
match Self::start_subgraph_inner(
logger.clone(),
self.instances.clone(),
self.host_builder.clone(),
self.block_stream_builder.clone(),
self.subgraph_store.clone(),
self.block_store.cheap_clone(),
self.eth_networks.clone(),
loc,
manifest,
self.metrics_registry.cheap_clone(),
self.link_resolver.cheap_clone(),
)
.await
{
Ok(()) => self.manager_metrics.subgraph_count.inc(),
Err(err) => error!(
logger,
"Failed to start subgraph";
"error" => format!("{}", err),
"code" => LogCode::SubgraphStartFailure
),
}
});
}

fn stop_subgraph(&self, loc: DeploymentLocator) {
Expand Down Expand Up @@ -307,6 +312,14 @@ where
) -> Result<(), Error> {
let store = store.writable(&deployment)?;

// Start the subgraph deployment before reading dynamic data
// sources; if the subgraph is a copy, starting it will do the
// copying and dynamic data sources won't show up until after
// copying is done
store
.start_subgraph_deployment(&logger)
.map_err(Error::from)?;

let manifest = {
info!(logger, "Resolve subgraph files using IPFS");

Expand Down Expand Up @@ -359,21 +372,6 @@ where
&network,
&required_capabilities, e))?.clone();

{
let store = store.clone();
let logger = logger.clone();

// `start_subgraph_deployment` is blocking.
tokio::task::spawn_blocking(move || {
store
.start_subgraph_deployment(&logger)
.map_err(Error::from)
})
.await
.map_err(Error::from)
.and_then(|x| x)?;
}

let network_name = manifest.network_name();

// Obtain filters from the manifest
Expand Down Expand Up @@ -446,25 +444,14 @@ where
block_stream_metrics,
};

// Keep restarting the subgraph until it terminates. The subgraph
// will usually only run once, but is restarted whenever a block
// creates dynamic data sources. This allows us to recreate the
// block stream and include events for the new data sources going
// forward; this is easier than updating the existing block stream.
//
// This task has many calls to the store, so mark it as `blocking`.
// This call is the reason why the size of the blocking thread pool
// size must always be well above the number of deployed subgraphs.
graph::spawn_blocking(async move {
if let Err(e) = run_subgraph(ctx).await {
error!(
&logger,
"Subgraph instance failed to run: {}",
format!("{:#}", e)
);
}
subgraph_metrics_unregister.unregister(registry);
});
if let Err(e) = run_subgraph(ctx).await {
error!(
&logger,
"Subgraph instance failed to run: {}",
format!("{:#}", e)
);
}
subgraph_metrics_unregister.unregister(registry);

Ok(())
}
Expand All @@ -483,6 +470,11 @@ where
let id_for_err = ctx.inputs.deployment.hash.clone();
let mut first_run = true;

// Keep restarting the subgraph until it terminates. The subgraph
// will usually only run once, but is restarted whenever a block
// creates dynamic data sources. This allows us to recreate the
// block stream and include events for the new data sources going
// forward; this is easier than updating the existing block stream.
loop {
debug!(logger, "Starting or restarting subgraph");

Expand Down

0 comments on commit 74cb23c

Please sign in to comment.