From b829e737567db218efc4d28b20ddeb855914f867 Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 18 Dec 2017 14:14:32 +0100 Subject: [PATCH] subscriptions: Add SubscriptionListener functionality This allows external components to install subscription listeners, which are notified whenever a subscription is added to or removed from the subscription manager. --- subscriptions.go | 61 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/subscriptions.go b/subscriptions.go index fc3f7c9..e19f209 100644 --- a/subscriptions.go +++ b/subscriptions.go @@ -62,6 +62,16 @@ type ConnectionSubscriptions map[string]*Subscription // subscription IDs to subscriptions. type Subscriptions map[Connection]ConnectionSubscriptions +// SubscriptionListener defines an interface for components that +// are to be notified of added/removed subscriptions. +type SubscriptionListener interface { + // SubscriptionAdded is called whenever a new subscription is added. + SubscriptionAdded(Connection, *Subscription) + + // SubscriptionRemoved is called whenever a subscription is removed. + SubscriptionRemoved(Connection, *Subscription) +} + // SubscriptionManager provides a high-level interface to managing // and accessing the subscriptions made by GraphQL WS clients. type SubscriptionManager interface { @@ -77,6 +87,13 @@ type SubscriptionManager interface { // RemoveSubscriptions removes all subscriptions of a client connection. RemoveSubscriptions(Connection) + + // AddSubscriptionListener adds a listener to be notified when subscriptions + // are added or removed. + AddSubscriptionListener(SubscriptionListener) + + // RemoveSubscriptionListener removes a previously added listener. + RemoveSubscriptionListener(SubscriptionListener) } /** @@ -87,15 +104,17 @@ type subscriptionManager struct { subscriptions Subscriptions schema *graphql.Schema logger *log.Entry + listeners map[SubscriptionListener]bool } // NewSubscriptionManager creates a new subscription manager. func NewSubscriptionManager(schema *graphql.Schema) SubscriptionManager { - manager := new(subscriptionManager) - manager.subscriptions = make(Subscriptions) - manager.logger = NewLogger("subscriptions") - manager.schema = schema - return manager + return &subscriptionManager{ + subscriptions: make(Subscriptions), + logger: NewLogger("subscriptions"), + schema: schema, + listeners: map[SubscriptionListener]bool{}, + } } func (m *subscriptionManager) Subscriptions() Subscriptions { @@ -157,6 +176,9 @@ func (m *subscriptionManager) AddSubscription( m.subscriptions[conn][subscription.ID] = subscription + // Notify listeners + m.notifySubscriptionAdded(conn, subscription) + return nil } @@ -169,6 +191,9 @@ func (m *subscriptionManager) RemoveSubscription( "subscription": subscription.ID, }).Info("Remove subscription") + // Notify listeners + m.notifySubscriptionRemoved(conn, subscription) + // Remove the subscription from its connections' subscription map delete(m.subscriptions[conn], subscription.ID) @@ -195,6 +220,32 @@ func (m *subscriptionManager) RemoveSubscriptions(conn Connection) { } } +func (m *subscriptionManager) AddSubscriptionListener(listener SubscriptionListener) { + m.listeners[listener] = true +} + +func (m *subscriptionManager) RemoveSubscriptionListener(listener SubscriptionListener) { + delete(m.listeners, listener) +} + +func (m *subscriptionManager) notifySubscriptionAdded( + conn Connection, + subscription *Subscription, +) { + for listener := range m.listeners { + listener.SubscriptionAdded(conn, subscription) + } +} + +func (m *subscriptionManager) notifySubscriptionRemoved( + conn Connection, + subscription *Subscription, +) { + for listener := range m.listeners { + listener.SubscriptionRemoved(conn, subscription) + } +} + func validateSubscription(s *Subscription) []error { errs := []error{}