Skip to content

Commit

Permalink
chain head listener: Reduce locking
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Aug 31, 2021
1 parent 7f1bcbf commit e9ed1d7
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions store/postgres/src/chain_head_listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use graph::{
blockchain::ChainHeadUpdateStream,
parking_lot::Mutex,
parking_lot::RwLock,
prelude::{
futures03::{self, FutureExt},
tokio, StoreError,
Expand Down Expand Up @@ -87,7 +87,7 @@ struct ChainHeadUpdate {

pub struct ChainHeadUpdateListener {
/// Update watchers keyed by network.
watchers: Arc<Mutex<BTreeMap<String, Watcher>>>,
watchers: Arc<RwLock<BTreeMap<String, Watcher>>>,
_listener: NotificationListener,
}

Expand All @@ -106,7 +106,7 @@ impl ChainHeadUpdateListener {
// Create a Postgres notification listener for chain head updates
let (mut listener, receiver) =
NotificationListener::new(&logger, postgres_url, CHANNEL_NAME.clone());
let watchers = Arc::new(Mutex::new(BTreeMap::new()));
let watchers = Arc::new(RwLock::new(BTreeMap::new()));

Self::listen(
logger,
Expand All @@ -131,7 +131,7 @@ impl ChainHeadUpdateListener {
metrics: Arc<BlockIngestorMetrics>,
listener: &mut NotificationListener,
mut receiver: Receiver<JsonNotification>,
watchers: Arc<Mutex<BTreeMap<String, Watcher>>>,
watchers: Arc<RwLock<BTreeMap<String, Watcher>>>,
) {
// Process chain head updates in a dedicated task
graph::spawn(async move {
Expand All @@ -156,7 +156,7 @@ impl ChainHeadUpdateListener {
.set_chain_head_number(&update.network_name, *&update.head_block_number as i64);

// If there are subscriptions for this network, notify them.
if let Some(watcher) = watchers.lock().get(&update.network_name) {
if let Some(watcher) = watchers.read().get(&update.network_name) {
watcher.send()
}
}
Expand All @@ -169,13 +169,28 @@ impl ChainHeadUpdateListener {

impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
fn subscribe(&self, network_name: String, logger: Logger) -> ChainHeadUpdateStream {
let update_receiver = self
.watchers
.lock()
.entry(network_name)
.or_insert_with(|| Watcher::new())
.receiver
.clone();
let update_receiver = {
let existing = {
let watchers = self.watchers.read();
watchers.get(&network_name).map(|w| w.receiver.clone())
};

if let Some(watcher) = existing {
// Common case, this is not the first subscription for this network.
watcher
} else {
// This is the first subscription for this network, a lock is required.
//
// Race condition: Another task could have simoultaneously entered this branch and
// inserted a writer, so we should check the entry again after acquiring the lock.
self.watchers
.write()
.entry(network_name)
.or_insert_with(|| Watcher::new())
.receiver
.clone()
}
};

Box::new(futures03::stream::unfold(
update_receiver,
Expand Down

0 comments on commit e9ed1d7

Please sign in to comment.