Skip to content

Commit

Permalink
simple refactor & tests for Dht mode changes & protocol changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 authored and aschmahmann committed Mar 2, 2020
1 parent 2c8a2a6 commit d8f3bd2
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 33 deletions.
22 changes: 4 additions & 18 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
Expand Down Expand Up @@ -125,17 +124,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
// register for network notifs.
dht.host.Network().Notify(subnot)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*subscriberNotifee)(dht))
dht.proc = goprocessctx.WithContext(ctx)

if dht.subscriptions.evtPeerIdentification != nil {
_ = dht.subscriptions.evtPeerIdentification.Close()
}
return nil
})

dht.proc.AddChild(subnot.Process(ctx))
// register for network notifs.
dht.proc.Go((*subscriberNotifee)(dht).subscribe)
// handle providers
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

Expand Down Expand Up @@ -203,13 +196,6 @@ func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT {
triggerRtRefresh: make(chan chan<- error),
}

var err error
evts := []interface{}{&event.EvtPeerIdentificationCompleted{}, &event.EvtPeerIdentificationFailed{}}
dht.subscriptions.evtPeerIdentification, err = h.EventBus().Subscribe(evts, eventbus.BufSize(256))
if err != nil {
logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err)
}

dht.ctx = dht.newContextWithLocalTags(ctx)

return dht
Expand Down
66 changes: 51 additions & 15 deletions subscriber_notifee.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package dht

import (
"context"

"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"

"github.com/libp2p/go-eventbus"

ma "github.com/multiformats/go-multiaddr"
)

Expand All @@ -19,27 +19,52 @@ func (nn *subscriberNotifee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}

func (nn *subscriberNotifee) Process(ctx context.Context) goprocess.Process {
proc := goprocessctx.WithContext(ctx)
proc.Go(nn.subscribe)
return proc
}

func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
dht := nn.DHT()

dht.host.Network().Notify(nn)
defer dht.host.Network().StopNotify(nn)

var err error
evts := []interface{}{
&event.EvtPeerIdentificationCompleted{},
}

// subscribe to the EvtPeerIdentificationCompleted event which notifies us every time a peer successfully completes identification
sub, err := dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256))
if err != nil {
logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err)
}
defer sub.Close()

for {
select {
case evt, more := <-dht.subscriptions.evtPeerIdentification.Out():
case evt, more := <-sub.Out():
// we will not be getting any more events
if !more {
return
}
switch ev := evt.(type) {
case event.EvtPeerIdentificationCompleted:
protos, err := dht.peerstore.SupportsProtocols(ev.Peer, dht.protocolStrs()...)
if err == nil && len(protos) != 0 {
dht.Update(dht.ctx, ev.Peer)

// something has gone really wrong if we get an event for another type
ev, ok := evt.(event.EvtPeerIdentificationCompleted)
if !ok {
logger.Errorf("got wrong type from subscription: %T", ev)
return
}

// if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed
protos, err := dht.peerstore.SupportsProtocols(ev.Peer, dht.protocolStrs()...)
if err == nil && len(protos) != 0 {
refresh := dht.routingTable.Size() <= minRTRefreshThreshold
dht.Update(dht.ctx, ev.Peer)
if refresh && dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
default:
}
}
}

case <-proc.Closing():
return
}
Expand Down Expand Up @@ -67,6 +92,17 @@ func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {

dht.routingTable.Remove(p)

if dht.routingTable.Size() < minRTRefreshThreshold {
// TODO: Actively bootstrap. For now, just try to add the currently connected peers.
for _, p := range dht.host.Network().Peers() {
// Don't bother probing, we do that on connect.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) != 0 {
dht.Update(dht.Context(), p)
}
}
}

dht.smlk.Lock()
defer dht.smlk.Unlock()
ms, ok := dht.strmap[p]
Expand Down

0 comments on commit d8f3bd2

Please sign in to comment.