diff --git a/discovery.go b/discovery.go new file mode 100644 index 00000000..b69f2092 --- /dev/null +++ b/discovery.go @@ -0,0 +1,336 @@ +package pubsub + +import ( + "context" + "math/rand" + "time" + + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + discimpl "github.com/libp2p/go-libp2p-discovery" +) + +var ( + // poll interval + + // DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling + DiscoveryPollInitialDelay = 0 * time.Millisecond + // DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the + // more peers are needed for any topic + DiscoveryPollInterval = 1 * time.Second +) + +type DiscoverOpt func(*discoverOptions) error + +type discoverOptions struct { + connFactory BackoffConnectorFactory + opts []discovery.Option +} + +func defaultDiscoverOptions() *discoverOptions { + rng := rand.New(rand.NewSource(rand.Int63())) + minBackoff, maxBackoff := time.Second*10, time.Hour + cacheSize := 100 + dialTimeout := time.Minute * 2 + discoverOpts := &discoverOptions{ + connFactory: func(host host.Host) (*discimpl.BackoffConnector, error) { + backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rng) + return discimpl.NewBackoffConnector(host, cacheSize, dialTimeout, backoff) + }, + } + + return discoverOpts +} + +// discover represents the discovery pipeline. +// The discovery pipeline handles advertising and discovery of peers +type discover struct { + p *PubSub + + // discovery assists in discovering and advertising peers for a topic + discovery discovery.Discovery + + // advertising tracks which topics are being advertised + advertising map[string]context.CancelFunc + + // discoverQ handles continuing peer discovery + discoverQ chan *discoverReq + + // ongoing tracks ongoing discovery requests + ongoing map[string]struct{} + + // done handles completion of a discovery request + done chan string + + // connector handles connecting to new peers found via discovery + connector *discimpl.BackoffConnector + + // options are the set of options to be used to complete struct construction in Start + options *discoverOptions +} + +// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size. +// The router ultimately decides the whether it is ready or not, the given size is just a suggestion. +func MinTopicSize(size int) RouterReady { + return func(rt PubSubRouter, topic string) (bool, error) { + return rt.EnoughPeers(topic, size), nil + } +} + +// Start attaches the discovery pipeline to a pubsub instance, initializes discovery and starts event loop +func (d *discover) Start(p *PubSub, opts ...DiscoverOpt) error { + if d.discovery == nil || p == nil { + return nil + } + + d.p = p + d.advertising = make(map[string]context.CancelFunc) + d.discoverQ = make(chan *discoverReq, 32) + d.ongoing = make(map[string]struct{}) + d.done = make(chan string) + + conn, err := d.options.connFactory(p.host) + if err != nil { + return err + } + d.connector = conn + + go d.discoverLoop() + go d.pollTimer() + + return nil +} + +func (d *discover) pollTimer() { + select { + case <-time.After(DiscoveryPollInitialDelay): + case <-d.p.ctx.Done(): + return + } + + select { + case d.p.eval <- d.requestDiscovery: + case <-d.p.ctx.Done(): + return + } + + ticker := time.NewTicker(DiscoveryPollInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + select { + case d.p.eval <- d.requestDiscovery: + case <-d.p.ctx.Done(): + return + } + case <-d.p.ctx.Done(): + return + } + } +} + +func (d *discover) requestDiscovery() { + for t := range d.p.myTopics { + if !d.p.rt.EnoughPeers(t, 0) { + d.discoverQ <- &discoverReq{topic: t, done: make(chan struct{}, 1)} + } + } +} + +func (d *discover) discoverLoop() { + for { + select { + case discover := <-d.discoverQ: + topic := discover.topic + + if _, ok := d.ongoing[topic]; ok { + discover.done <- struct{}{} + continue + } + + d.ongoing[topic] = struct{}{} + + go func() { + d.handleDiscovery(d.p.ctx, topic, discover.opts) + select { + case d.done <- topic: + case <-d.p.ctx.Done(): + } + discover.done <- struct{}{} + }() + case topic := <-d.done: + delete(d.ongoing, topic) + case <-d.p.ctx.Done(): + return + } + } +} + +// Advertise advertises this node's interest in a topic to a discovery service. Advertise is not thread-safe. +func (d *discover) Advertise(topic string) { + if d.discovery == nil { + return + } + + advertisingCtx, cancel := context.WithCancel(d.p.ctx) + + if _, ok := d.advertising[topic]; ok { + cancel() + return + } + d.advertising[topic] = cancel + + go func() { + next, err := d.discovery.Advertise(advertisingCtx, topic) + if err != nil { + log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error()) + } + + t := time.NewTimer(next) + for { + select { + case <-t.C: + next, err = d.discovery.Advertise(advertisingCtx, topic) + if err != nil { + log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error()) + } + t.Reset(next) + case <-advertisingCtx.Done(): + t.Stop() + return + } + } + }() +} + +// StopAdvertise stops advertising this node's interest in a topic. StopAdvertise is not thread-safe. +func (d *discover) StopAdvertise(topic string) { + if d.discovery == nil { + return + } + + if advertiseCancel, ok := d.advertising[topic]; ok { + advertiseCancel() + delete(d.advertising, topic) + } +} + +// Discover searches for additional peers interested in a given topic +func (d *discover) Discover(topic string, opts ...discovery.Option) { + if d.discovery == nil { + return + } + + d.discoverQ <- &discoverReq{topic, opts, make(chan struct{}, 1)} +} + +// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise. +func (d *discover) Bootstrap(ctx context.Context, topic string, ready RouterReady, opts ...discovery.Option) bool { + if d.discovery == nil { + return true + } + + t := time.NewTimer(time.Hour) + if !t.Stop() { + <-t.C + } + + for { + // Check if ready for publishing + bootstrapped := make(chan bool, 1) + select { + case d.p.eval <- func() { + done, _ := ready(d.p.rt, topic) + bootstrapped <- done + }: + if <-bootstrapped { + return true + } + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + + // If not ready discover more peers + disc := &discoverReq{topic, opts, make(chan struct{}, 1)} + select { + case d.discoverQ <- disc: + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + + select { + case <-disc.done: + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + + t.Reset(time.Millisecond * 100) + select { + case <-t.C: + case <-d.p.ctx.Done(): + return false + case <-ctx.Done(): + return false + } + } +} + +func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) { + discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + peerCh, err := d.discovery.FindPeers(discoverCtx, topic, opts...) + if err != nil { + log.Debugf("error finding peers for topic %s: %v", topic, err) + return + } + + d.connector.Connect(ctx, peerCh) +} + +type discoverReq struct { + topic string + opts []discovery.Option + done chan struct{} +} + +type pubSubDiscovery struct { + discovery.Discovery + opts []discovery.Option +} + +func (d *pubSubDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + return d.Discovery.Advertise(ctx, "floodsub:"+ns, append(opts, d.opts...)...) +} + +func (d *pubSubDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + return d.Discovery.FindPeers(ctx, "floodsub:"+ns, append(opts, d.opts...)...) +} + +// WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem +func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt { + return func(d *discoverOptions) error { + d.opts = opts + return nil + } +} + +// BackoffConnectorFactory creates a BackoffConnector that is attached to a given host +type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error) + +// WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers +func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt { + return func(d *discoverOptions) error { + d.connFactory = connFactory + return nil + } +} diff --git a/discovery_test.go b/discovery_test.go new file mode 100644 index 00000000..59e51812 --- /dev/null +++ b/discovery_test.go @@ -0,0 +1,292 @@ +package pubsub + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" +) + +type mockDiscoveryServer struct { + mx sync.Mutex + db map[string]map[peer.ID]*discoveryRegistration +} + +type discoveryRegistration struct { + info peer.AddrInfo + ttl time.Duration +} + +func newDiscoveryServer() *mockDiscoveryServer { + return &mockDiscoveryServer{ + db: make(map[string]map[peer.ID]*discoveryRegistration), + } +} + +func (s *mockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time.Duration) (time.Duration, error) { + s.mx.Lock() + defer s.mx.Unlock() + + peers, ok := s.db[ns] + if !ok { + peers = make(map[peer.ID]*discoveryRegistration) + s.db[ns] = peers + } + peers[info.ID] = &discoveryRegistration{info, ttl} + return ttl, nil +} + +func (s *mockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrInfo, error) { + s.mx.Lock() + defer s.mx.Unlock() + + peers, ok := s.db[ns] + if !ok || len(peers) == 0 { + emptyCh := make(chan peer.AddrInfo) + close(emptyCh) + return emptyCh, nil + } + + count := len(peers) + if count > limit { + count = limit + } + ch := make(chan peer.AddrInfo, count) + numSent := 0 + for _, reg := range peers { + if numSent == count { + break + } + numSent++ + ch <- reg.info + } + close(ch) + + return ch, nil +} + +func (s *mockDiscoveryServer) hasPeerRecord(ns string, pid peer.ID) bool { + s.mx.Lock() + defer s.mx.Unlock() + + if peers, ok := s.db[ns]; ok { + _, ok := peers[pid] + return ok + } + return false +} + +type mockDiscoveryClient struct { + host host.Host + server *mockDiscoveryServer +} + +func (d *mockDiscoveryClient) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return 0, err + } + + return d.server.Advertise(ns, *host.InfoFromHost(d.host), options.Ttl) +} + +func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return nil, err + } + + return d.server.FindPeers(ns, options.Limit) +} + +func TestSimpleDiscovery(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup Discovery server and pubsub clients + const numHosts = 20 + const topic = "foobar" + + server := newDiscoveryServer() + discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(1 * time.Minute)} + + hosts := getNetHosts(t, ctx, numHosts) + psubs := make([]*PubSub, numHosts) + topicHandlers := make([]*Topic, numHosts) + + for i, h := range hosts { + disc := &mockDiscoveryClient{h, server} + ps := getPubsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...))) + psubs[i] = ps + topicHandlers[i], _ = ps.Join(topic) + } + + // Subscribe with all but one pubsub instance + msgs := make([]*Subscription, numHosts) + for i, th := range topicHandlers[1:] { + subch, err := th.Subscribe() + if err != nil { + t.Fatal(err) + } + + msgs[i+1] = subch + } + + // Wait for the advertisements to go through then check that they did + for { + server.mx.Lock() + numPeers := len(server.db["floodsub:foobar"]) + server.mx.Unlock() + if numPeers == numHosts-1 { + break + } else { + time.Sleep(time.Millisecond * 100) + } + } + + for i, h := range hosts[1:] { + if !server.hasPeerRecord("floodsub:"+topic, h.ID()) { + t.Fatalf("Server did not register host %d with ID: %s", i+1, h.ID().Pretty()) + } + } + + // Try subscribing followed by publishing a single message + subch, err := topicHandlers[0].Subscribe() + if err != nil { + t.Fatal(err) + } + msgs[0] = subch + + msg := []byte("first message") + if err := topicHandlers[0].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil { + t.Fatal(err) + } + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + + // Try random peers sending messages and make sure they are received + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i)) + + owner := rand.Intn(len(psubs)) + + if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(1))); err != nil { + t.Fatal(err) + } + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup Discovery server and pubsub clients + partitionSize := GossipSubDlo - 1 + numHosts := partitionSize * 2 + const ttl = 1 * time.Minute + + const topic = "foobar" + + server1, server2 := newDiscoveryServer(), newDiscoveryServer() + discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)} + + // Put the pubsub clients into two partitions + hosts := getNetHosts(t, ctx, numHosts) + psubs := make([]*PubSub, numHosts) + topicHandlers := make([]*Topic, numHosts) + + for i, h := range hosts { + s := server1 + if i >= partitionSize { + s = server2 + } + disc := &mockDiscoveryClient{h, s} + ps := getGossipsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...))) + psubs[i] = ps + topicHandlers[i], _ = ps.Join(topic) + } + + msgs := make([]*Subscription, numHosts) + for i, th := range topicHandlers { + subch, err := th.Subscribe() + if err != nil { + t.Fatal(err) + } + + msgs[i] = subch + } + + // Wait for network to finish forming then join the partitions via discovery + for _, ps := range psubs { + waitUntilGossipsubMeshCount(ps, topic, partitionSize-1) + } + + for i := 0; i < partitionSize; i++ { + if _, err := server1.Advertise("floodsub:"+topic, *host.InfoFromHost(hosts[i+partitionSize]), ttl); err != nil { + t.Fatal(err) + } + } + + // test the mesh + for i := 0; i < 100; i++ { + msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i)) + + owner := rand.Intn(numHosts) + + if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil { + t.Fatal(err) + } + + for _, sub := range msgs { + got, err := sub.Next(ctx) + if err != nil { + t.Fatal(sub.err) + } + if !bytes.Equal(msg, got.Data) { + t.Fatal("got wrong message!") + } + } + } +} + +func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) { + done := false + doneCh := make(chan bool, 1) + rt := ps.rt.(*GossipSubRouter) + for !done { + ps.eval <- func() { + doneCh <- len(rt.mesh[topic]) == count + } + done = <-doneCh + if !done { + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/floodsub.go b/floodsub.go index 0d57c991..f70a0a7a 100644 --- a/floodsub.go +++ b/floodsub.go @@ -11,7 +11,8 @@ import ( ) const ( - FloodSubID = protocol.ID("/floodsub/1.0.0") + FloodSubID = protocol.ID("/floodsub/1.0.0") + FloodSubTopicSearchSize = 5 ) // NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps. @@ -44,6 +45,24 @@ func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {} func (fs *FloodSubRouter) RemovePeer(peer.ID) {} +func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { + // check all peers in the topic + tmap, ok := fs.p.topics[topic] + if !ok { + return false + } + + if suggested == 0 { + suggested = FloodSubTopicSearchSize + } + + if len(tmap) >= suggested { + return true + } + + return false +} + func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) { diff --git a/go.mod b/go.mod index 932f14bf..91711923 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/ipfs/go-log v0.0.1 github.com/libp2p/go-libp2p-blankhost v0.1.4 github.com/libp2p/go-libp2p-core v0.2.4 + github.com/libp2p/go-libp2p-discovery v0.2.0 github.com/libp2p/go-libp2p-swarm v0.2.2 github.com/multiformats/go-multiaddr v0.1.1 github.com/multiformats/go-multistream v0.1.0 diff --git a/go.sum b/go.sum index 11ae112a..626e2193 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= @@ -28,7 +30,9 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= +github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -44,6 +48,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -65,8 +70,10 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= +github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= +github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= @@ -104,6 +111,7 @@ github.com/libp2p/go-eventbus v0.1.0 h1:mlawomSAjjkk97QnYiEmHsLu7E136+2oCWSHRUvM github.com/libp2p/go-eventbus v0.1.0/go.mod h1:vROgu5cs5T7cv7POWlWxBaVLxfSegC5UGQf8A2eEmx4= github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s= github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= +github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro= github.com/libp2p/go-libp2p-blankhost v0.1.4 h1:I96SWjR4rK9irDHcHq3XHN6hawCRTPUADzkJacgZLvk= github.com/libp2p/go-libp2p-blankhost v0.1.4/go.mod h1:oJF0saYsAXQCSfDq254GMNmLNz6ZTHTOvtF4ZydUvwU= github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= @@ -113,15 +121,22 @@ github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv github.com/libp2p/go-libp2p-core v0.2.2/go.mod h1:8fcwTbsG2B+lTgRJ1ICZtiM5GWCWZVoVrLaDRvIRng0= github.com/libp2p/go-libp2p-core v0.2.4 h1:Et6ykkTwI6PU44tr8qUF9k43vP0aduMNniShAbUJJw8= github.com/libp2p/go-libp2p-core v0.2.4/go.mod h1:STh4fdfa5vDYr0/SzYYeqnt+E6KfEV5VxfIrm0bcI0g= +github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= +github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY= +github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= github.com/libp2p/go-libp2p-mplex v0.2.1 h1:E1xaJBQnbSiTHGI1gaBKmKhu1TUKkErKJnE8iGvirYI= github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE= +github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY= +github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY= github.com/libp2p/go-libp2p-peerstore v0.1.3 h1:wMgajt1uM2tMiqf4M+4qWKVyyFc8SfA+84VV9glZq1M= github.com/libp2p/go-libp2p-peerstore v0.1.3/go.mod h1:BJ9sHlm59/80oSkpWgr1MyY1ciXAXV397W6h1GH/uKI= +github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8= github.com/libp2p/go-libp2p-secio v0.2.0 h1:ywzZBsWEEz2KNTn5RtzauEDq5RFEefPsttXYwAWqHng= github.com/libp2p/go-libp2p-secio v0.2.0/go.mod h1:2JdZepB8J5V9mBp79BmwsaPQhRPNN2NrnB2lKQcdy6g= +github.com/libp2p/go-libp2p-swarm v0.1.0/go.mod h1:wQVsCdjsuZoc730CgOvh5ox6K8evllckjebkdiY5ta4= github.com/libp2p/go-libp2p-swarm v0.2.2 h1:T4hUpgEs2r371PweU3DuH7EOmBIdTBCwWs+FLcgx3bQ= github.com/libp2p/go-libp2p-swarm v0.2.2/go.mod h1:fvmtQ0T1nErXym1/aa1uJEyN7JzaTNyBcHImCxRpPKU= github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= @@ -141,6 +156,7 @@ github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDR github.com/libp2p/go-mplex v0.0.3/go.mod h1:pK5yMLmOoBR1pNCqDlA2GQrdAVTMkqFalaTWe7l4Yd0= github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0= github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= +github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA= github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-openssl v0.0.2/go.mod h1:v8Zw2ijCSWBQi8Pq5GAixw6DbFfa9u6VIYDXnvOXkc0= @@ -153,6 +169,7 @@ github.com/libp2p/go-reuseport-transport v0.0.2/go.mod h1:YkbSDrvjUVDL6b8XqriyA2 github.com/libp2p/go-stream-muxer v0.0.1/go.mod h1:bAo8x7YkSpadMTbtTaxGVHWUQsR/l5MEaHbKaliuT14= github.com/libp2p/go-stream-muxer-multistream v0.2.0 h1:714bRJ4Zy9mdhyTLJ+ZKiROmAFwUHpeRidG+q7LTQOg= github.com/libp2p/go-stream-muxer-multistream v0.2.0/go.mod h1:j9eyPol/LLRqT+GPLSxvimPhNph4sfYfMoDPd7HkzIc= +github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc= github.com/libp2p/go-tcp-transport v0.1.1 h1:yGlqURmqgNA2fvzjSgZNlHcsd/IulAnKM8Ncu+vlqnw= github.com/libp2p/go-tcp-transport v0.1.1/go.mod h1:3HzGvLbx6etZjnFlERyakbaYPdfjg2pWP97dFZworkY= github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg= @@ -183,6 +200,7 @@ github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= @@ -192,12 +210,15 @@ github.com/multiformats/go-multiaddr v0.1.0/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lg github.com/multiformats/go-multiaddr v0.1.1 h1:rVAztJYMhCQ7vEFr8FvxW3mS+HF2eY/oPbOMeS0ZDnE= github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= +github.com/multiformats/go-multiaddr-dns v0.0.2/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q= +github.com/multiformats/go-multiaddr-fmt v0.0.1/go.mod h1:aBYjqL4T/7j4Qx+R73XSv/8JsgnRFlf0w2KGLCmXl3Q= github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= github.com/multiformats/go-multiaddr-net v0.0.1 h1:76O59E3FavvHqNg7jvzWzsPSW5JSi/ek0E4eiDVbg9g= github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d1tnvM7ihcFwHMSstNAVUU= github.com/multiformats/go-multiaddr-net v0.1.0 h1:ZepO8Ezwovd+7b5XPPDhQhayk1yt0AJpzQBpq9fejx4= github.com/multiformats/go-multiaddr-net v0.1.0/go.mod h1:5JNbcfBOP4dnhoZOv10JJVkJO0pCCEf8mTnipAo2UZQ= +github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= @@ -266,6 +287,7 @@ golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= @@ -318,6 +340,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/gossipsub.go b/gossipsub.go index 9b8c3adf..1b2026c7 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -94,6 +94,35 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) { delete(gs.control, p) } +func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { + // check all peers in the topic + tmap, ok := gs.p.topics[topic] + if !ok { + return false + } + + fsPeers, gsPeers := 0, 0 + // floodsub peers + for p := range tmap { + if gs.peers[p] == FloodSubID { + fsPeers++ + } + } + + // gossipsub peers + gsPeers = len(gs.mesh[topic]) + + if suggested == 0 { + suggested = GossipSubDlo + } + + if fsPeers+gsPeers >= suggested || gsPeers >= GossipSubDhi { + return true + } + + return false +} + func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { ctl := rpc.GetControl() if ctl == nil { diff --git a/gossipsub_test.go b/gossipsub_test.go index 67af2b76..90041cc7 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -11,14 +11,18 @@ import ( "github.com/libp2p/go-libp2p-core/host" ) +func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub { + ps, err := NewGossipSub(ctx, h, opts...) + if err != nil { + panic(err) + } + return ps +} + func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSub { var psubs []*PubSub for _, h := range hs { - ps, err := NewGossipSub(ctx, h, opts...) - if err != nil { - panic(err) - } - psubs = append(psubs, ps) + psubs = append(psubs, getGossipsub(ctx, h, opts...)) } return psubs } diff --git a/pubsub.go b/pubsub.go index 7f647982..cffb83be 100644 --- a/pubsub.go +++ b/pubsub.go @@ -12,6 +12,7 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -42,6 +43,8 @@ type PubSub struct { val *validation + disc *discover + // incoming messages from other peers incoming chan *RPC @@ -129,6 +132,9 @@ type PubSubRouter interface { AddPeer(peer.ID, protocol.ID) // RemovePeer notifies the router that a peer has been disconnected. RemovePeer(peer.ID) + // EnoughPeers returns whether the router needs more peers before it's ready to publish new records. + // Suggested (if greater than 0) is a suggested number of peers that the router should need. + EnoughPeers(topic string, suggested int) bool // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) @@ -167,6 +173,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option ctx: ctx, rt: rt, val: newValidation(), + disc: &discover{}, signID: h.ID(), signKey: h.Peerstore().PrivKey(h.ID()), signStrict: true, @@ -207,6 +214,10 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option return nil, fmt.Errorf("strict signature verification enabled but message signing is disabled") } + if err := ps.disc.Start(ps); err != nil { + return nil, err + } + rt.Attach(ps) for _, id := range rt.Protocols() { @@ -275,6 +286,23 @@ func WithBlacklist(b Blacklist) Option { } } +// WithDiscovery provides a discovery mechanism used to bootstrap and provide peers into PubSub +func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option { + return func(p *PubSub) error { + discoverOpts := defaultDiscoverOptions() + for _, opt := range opts { + err := opt(discoverOpts) + if err != nil { + return err + } + } + + p.disc.discovery = &pubSubDiscovery{Discovery: d, opts: discoverOpts.opts} + p.disc.options = discoverOpts + return nil + } +} + // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { defer func() { @@ -480,6 +508,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { if len(subs) == 0 { delete(p.mySubs, sub.topic) + p.disc.StopAdvertise(sub.topic) p.announce(sub.topic, false) p.rt.Leave(sub.topic) } @@ -495,6 +524,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { // announce we want this topic if len(subs) == 0 { + p.disc.Advertise(sub.topic) p.announce(sub.topic, true) p.rt.Join(sub.topic) } diff --git a/randomsub.go b/randomsub.go index e4304d23..9435c719 100644 --- a/randomsub.go +++ b/randomsub.go @@ -49,6 +49,41 @@ func (rs *RandomSubRouter) RemovePeer(p peer.ID) { delete(rs.peers, p) } +func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool { + // check all peers in the topic + tmap, ok := rs.p.topics[topic] + if !ok { + return false + } + + fsPeers := 0 + rsPeers := 0 + + // count floodsub and randomsub peers + for p := range tmap { + switch rs.peers[p] { + case FloodSubID: + fsPeers++ + case RandomSubID: + rsPeers++ + } + } + + if suggested == 0 { + suggested = RandomSubD + } + + if fsPeers+rsPeers >= suggested { + return true + } + + if rsPeers >= RandomSubD { + return true + } + + return false +} + func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) { diff --git a/topic.go b/topic.go index 145f1201..2ccc4a44 100644 --- a/topic.go +++ b/topic.go @@ -82,6 +82,8 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { out := make(chan *Subscription, 1) + t.p.disc.Discover(sub.topic) + t.p.addSub <- &addSubReq{ sub: sub, resp: out, @@ -90,7 +92,12 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { return <-out, nil } -type PublishOptions struct{} +// RouterReady is a function that decides if a router is ready to publish +type RouterReady func(rt PubSubRouter, topic string) (bool, error) + +type PublishOptions struct { + ready RouterReady +} type PubOpt func(pub *PublishOptions) error @@ -120,11 +127,24 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error } } + if pub.ready != nil { + t.p.disc.Bootstrap(ctx, t.topic, pub.ready) + } + t.p.publish <- &Message{m, id} return nil } +// WithReadiness returns a publishing option for only publishing when the router is ready. +// This option is not useful unless PubSub is also using WithDiscovery +func WithReadiness(ready RouterReady) PubOpt { + return func(pub *PublishOptions) error { + pub.ready = ready + return nil + } +} + // Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. // Does not error if the topic is already closed. func (t *Topic) Close() error {