Skip to content

Commit

Permalink
Utilize identify events to add peers to the routing table (#472)
Browse files Browse the repository at this point in the history
* feat: consume identify events to evaluate routing table addition
* fix: routing table no longer gets an update just because new messages have arrived or been sent
* fix: add already connected peers into the routing table before listening to events

Co-authored-by: Raúl Kripalani <raul.kripalani@gmail.com>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
  • Loading branch information
3 people authored Mar 4, 2020
1 parent bf5f87a commit 2533335
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 180 deletions.
12 changes: 4 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.proc = goprocessctx.WithContext(ctx)

// register for network notifs.
dht.proc.Go((*subscriberNotifee)(dht).subscribe)
// handle providers
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

Expand Down
14 changes: 0 additions & 14 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

dht.updateFromMessage(ctx, mPeer, &req)

if resp == nil {
continue
}
Expand Down Expand Up @@ -187,9 +185,6 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
return nil, err
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)

stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
Expand Down Expand Up @@ -230,15 +225,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
dht.Update(ctx, p)
}
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
Expand Down
28 changes: 17 additions & 11 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand All @@ -24,25 +27,28 @@ func TestGetFailures(t *testing.T) {
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()

os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

d, err := New(ctx, host1, opts.DisableAutoRefresh())
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
time.Sleep(400 * time.Millisecond)
s.Close()
})

host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
_, err = host1.Network().DialPeer(ctx, host2.ID())
if err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)

// This one should time out
ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
Expand All @@ -61,7 +67,7 @@ func TestGetFailures(t *testing.T) {
t.Log("Timeout test passed.")

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()

pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
Expand Down Expand Up @@ -113,7 +119,7 @@ func TestGetFailures(t *testing.T) {
Record: rec,
}

s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols[0])
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.5.2
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a
github.com/libp2p/go-libp2p-core v0.3.1
github.com/libp2p/go-libp2p-kbucket v0.2.3
github.com/libp2p/go-libp2p-peerstore v0.1.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p v0.5.2 h1:fjQUTyB7x/4XgO31OEWkJ5uFeHRgpoExlf0rXz5BO8k=
github.com/libp2p/go-libp2p v0.5.2/go.mod h1:o2r6AcpNl1eNGoiWhRtPji03NYOvZumeQ6u+X6gSxnM=
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a h1:cxYryrTPI23R5InZb9Kc86dj819f7yVMapQPuj1Ti1s=
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a/go.mod h1:8UlWMmxcKNxyY0ocYX8Ft4IZ0mMfr7b89v1qZdXxwrk=
github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI=
github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE=
github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro=
Expand Down
142 changes: 0 additions & 142 deletions notif.go

This file was deleted.

4 changes: 2 additions & 2 deletions notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func TestNotifieeMultipleConn(t *testing.T) {
d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)

nn1 := (*netNotifiee)(d1)
nn2 := (*netNotifiee)(d2)
nn1 := (*subscriberNotifee)(d1)
nn2 := (*subscriberNotifee)(d2)

connect(t, ctx, d1, d2)
c12 := d1.host.Network().ConnsToPeer(d2.self)[0]
Expand Down
Loading

0 comments on commit 2533335

Please sign in to comment.