Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain head listener: Reduce locking #2763

Merged
merged 1 commit into from
Aug 31, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions store/postgres/src/chain_head_listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use graph::{
blockchain::ChainHeadUpdateStream,
parking_lot::Mutex,
parking_lot::RwLock,
prelude::{
futures03::{self, FutureExt},
tokio, StoreError,
Expand Down Expand Up @@ -87,7 +87,7 @@ struct ChainHeadUpdate {

pub struct ChainHeadUpdateListener {
/// Update watchers keyed by network.
watchers: Arc<Mutex<BTreeMap<String, Watcher>>>,
watchers: Arc<RwLock<BTreeMap<String, Watcher>>>,
_listener: NotificationListener,
}

Expand All @@ -106,7 +106,7 @@ impl ChainHeadUpdateListener {
// Create a Postgres notification listener for chain head updates
let (mut listener, receiver) =
NotificationListener::new(&logger, postgres_url, CHANNEL_NAME.clone());
let watchers = Arc::new(Mutex::new(BTreeMap::new()));
let watchers = Arc::new(RwLock::new(BTreeMap::new()));

Self::listen(
logger,
Expand All @@ -131,7 +131,7 @@ impl ChainHeadUpdateListener {
metrics: Arc<BlockIngestorMetrics>,
listener: &mut NotificationListener,
mut receiver: Receiver<JsonNotification>,
watchers: Arc<Mutex<BTreeMap<String, Watcher>>>,
watchers: Arc<RwLock<BTreeMap<String, Watcher>>>,
) {
// Process chain head updates in a dedicated task
graph::spawn(async move {
Expand All @@ -156,7 +156,7 @@ impl ChainHeadUpdateListener {
.set_chain_head_number(&update.network_name, *&update.head_block_number as i64);

// If there are subscriptions for this network, notify them.
if let Some(watcher) = watchers.lock().get(&update.network_name) {
if let Some(watcher) = watchers.read().get(&update.network_name) {
watcher.send()
}
}
Expand All @@ -169,13 +169,28 @@ impl ChainHeadUpdateListener {

impl ChainHeadUpdateListenerTrait for ChainHeadUpdateListener {
fn subscribe(&self, network_name: String, logger: Logger) -> ChainHeadUpdateStream {
let update_receiver = self
.watchers
.lock()
.entry(network_name)
.or_insert_with(|| Watcher::new())
.receiver
.clone();
let update_receiver = {
let existing = {
let watchers = self.watchers.read();
watchers.get(&network_name).map(|w| w.receiver.clone())
};

if let Some(watcher) = existing {
// Common case, this is not the first subscription for this network.
watcher
} else {
// This is the first subscription for this network, a lock is required.
//
// Race condition: Another task could have simoultaneously entered this branch and
// inserted a writer, so we should check the entry again after acquiring the lock.
self.watchers
.write()
.entry(network_name)
.or_insert_with(|| Watcher::new())
.receiver
.clone()
}
};

Box::new(futures03::stream::unfold(
update_receiver,
Expand Down