Skip to content

Commit

Permalink
chain head listener: Ensure periodic updates
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Aug 20, 2021
1 parent 83f65bc commit d843c60
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ impl Blockchain for Chain {
.subgraph_store
.writable(&deployment)
.expect(&format!("no store for deployment `{}`", deployment.hash));
let chain_head_update_stream = self.chain_head_update_listener.subscribe(self.name.clone());
let chain_head_update_stream = self
.chain_head_update_listener
.subscribe(self.name.clone(), logger.clone());

let requirements = filter.node_capabilities();

Expand Down
2 changes: 1 addition & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ slog-envlogger = "2.1.0"
slog-term = "2.7.0"
petgraph = "0.6.0"
tiny-keccak = "1.5.0"
tokio = { version = "1.10.0", features = ["time", "sync", "macros", "test-util", "rt-multi-thread"] }
tokio = { version = "1.10.0", features = ["time", "sync", "macros", "test-util", "rt-multi-thread", "parking_lot"] }
tokio-stream = { version = "0.1.7", features = ["sync"] }
tokio-retry = "0.3.0"
url = "2.2.1"
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,5 @@ pub type ChainHeadUpdateStream = Box<dyn Stream<Item = ()> + Send + Unpin>;

pub trait ChainHeadUpdateListener: Send + Sync + 'static {
/// Subscribe to chain head updates for the given network.
fn subscribe(&self, network: String) -> ChainHeadUpdateStream;
fn subscribe(&self, network: String, logger: Logger) -> ChainHeadUpdateStream;
}
6 changes: 1 addition & 5 deletions graph/src/blockchain/polling_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,7 @@ impl<C: Blockchain> Stream for PollingBlockStream<C> {
))));
}

Poll::Pending => {
// Stay idle
self.state = BlockStreamState::Idle;
break Poll::Pending;
}
Poll::Pending => break Poll::Pending,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions store/postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ diesel_derives = "1.4.1"
anyhow = "1.0.43"
git-testament = "0.2.0"
itertools = "0.10.1"
pin-utils = "0.1"

[dev-dependencies]
clap = "2.33.3"
Expand Down
54 changes: 48 additions & 6 deletions store/postgres/src/chain_head_listener.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use graph::{
blockchain::ChainHeadUpdateStream, parking_lot::Mutex, prelude::StoreError,
blockchain::ChainHeadUpdateStream,
parking_lot::Mutex,
prelude::{
futures03::{self, FutureExt},
tokio, StoreError,
},
prometheus::GaugeVec,
};
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::Arc;
use std::{collections::BTreeMap, time::Duration};

use diesel::RunQueryDsl;
use lazy_static::lazy_static;
Expand All @@ -16,10 +22,17 @@ use graph::blockchain::ChainHeadUpdateListener as ChainHeadUpdateListenerTrait;
use graph::prelude::serde::{Deserialize, Serialize};
use graph::prelude::serde_json::{self, json};
use graph::prelude::tokio::sync::{mpsc::Receiver, watch};
use graph::prelude::{crit, o, CheapClone, Logger, MetricsRegistry};
use graph::tokio_stream::wrappers::WatchStream;
use graph::prelude::{crit, debug, o, CheapClone, Logger, MetricsRegistry};

lazy_static! {
pub static ref CHAIN_HEAD_WATCHER_TIMEOUT: Duration = Duration::from_secs(
std::env::var("GRAPH_CHAIN_HEAD_WATCHER_TIMEOUT")
.ok()
.map(|s| u64::from_str(&s).unwrap_or_else(|_| panic!(
"failed to parse env var GRAPH_CHAIN_HEAD_WATCHER_TIMEOUT"
)))
.unwrap_or(30)
);
pub static ref CHANNEL_NAME: SafeChannelName =
SafeChannelName::i_promise_this_is_safe("chain_head_updates");
}
Expand Down Expand Up @@ -155,7 +168,7 @@ impl ChainHeadUpdateListener {
}

impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
fn subscribe(&self, network_name: String) -> ChainHeadUpdateStream {
fn subscribe(&self, network_name: String, logger: Logger) -> ChainHeadUpdateStream {
let update_receiver = self
.watchers
.lock()
Expand All @@ -164,7 +177,36 @@ impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
.receiver
.clone();

Box::new(WatchStream::new(update_receiver))
Box::new(futures03::stream::unfold(
update_receiver,
move |mut update_receiver| {
let logger = logger.clone();
async move {
// To be robust against any problems with the listener for the DB channel, a
// timeout is set so that subscribers are guaranteed to get periodic updates.
match tokio::time::timeout(
*CHAIN_HEAD_WATCHER_TIMEOUT,
update_receiver.changed(),
)
.await
{
// Received an update.
Ok(Ok(())) => (),

// The sender was dropped, this should never happen.
Ok(Err(_)) => crit!(logger, "chain head watcher terminated"),

Err(_) => debug!(
logger,
"no chain head update for {} seconds, polling for update",
CHAIN_HEAD_WATCHER_TIMEOUT.as_secs()
),
};
Some(((), update_receiver))
}
.boxed()
},
))
}
}

Expand Down

0 comments on commit d843c60

Please sign in to comment.