From c8ce5b6776eb1724557705c47b46dae25f0a0d40 Mon Sep 17 00:00:00 2001 From: Alex Trottier Date: Wed, 15 Jan 2020 19:13:22 -0800 Subject: [PATCH] add minimalsub interface --- pubsub.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pubsub.go b/pubsub.go index 6535f9a..c1b79d5 100644 --- a/pubsub.go +++ b/pubsub.go @@ -24,6 +24,15 @@ import ( var log = logging.Logger("pubsub-valuestore") +// MinimalPubsub allows us to provide the bare minimum pubsub functionality +// to the router, while still using the same pubsub instance in other goroutines. +// This is primarily done to allow callers to not have to worry about topic management +// due to topics only allowing a single "joiner". +type MinimalPubsub interface { + RegisterTopicValidator(topic string, val pubsub.Validator, opts ...pubsub.ValidatorOpt) error + Join(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) +} + type watchGroup struct { // Note: this chan must be buffered, see notifyWatchers listeners map[chan []byte]struct{} @@ -32,7 +41,7 @@ type watchGroup struct { type PubsubValueStore struct { ctx context.Context ds ds.Datastore - ps *pubsub.PubSub + ps MinimalPubsub host host.Host fetch *fetchProtocol @@ -72,7 +81,7 @@ func KeyToTopic(key string) string { type Option func(*PubsubValueStore) error // NewPubsubValueStore constructs a new ValueStore that gets and receives records through pubsub. -func NewPubsubValueStore(ctx context.Context, host host.Host, ps *pubsub.PubSub, validator record.Validator, opts ...Option) (*PubsubValueStore, error) { +func NewPubsubValueStore(ctx context.Context, host host.Host, ps MinimalPubsub, validator record.Validator, opts ...Option) (*PubsubValueStore, error) { psValueStore := &PubsubValueStore{ ctx: ctx,