Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utilize identify events to add peers to the routing table #472

Merged
merged 9 commits into from
Mar 4, 2020
12 changes: 4 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.proc = goprocessctx.WithContext(ctx)

// register for network notifs.
dht.proc.Go((*subscriberNotifee)(dht).subscribe)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
// handle providers
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

Expand Down
14 changes: 0 additions & 14 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

dht.updateFromMessage(ctx, mPeer, &req)

if resp == nil {
continue
}
Expand Down Expand Up @@ -187,9 +185,6 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
return nil, err
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)

stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
Expand Down Expand Up @@ -230,15 +225,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
dht.Update(ctx, p)
}
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
Expand Down
28 changes: 17 additions & 11 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand All @@ -24,25 +27,28 @@ func TestGetFailures(t *testing.T) {
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()

os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

d, err := New(ctx, host1, opts.DisableAutoRefresh())
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
time.Sleep(400 * time.Millisecond)
s.Close()
})

host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
_, err = host1.Network().DialPeer(ctx, host2.ID())
if err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)

// This one should time out
ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
Expand All @@ -61,7 +67,7 @@ func TestGetFailures(t *testing.T) {
t.Log("Timeout test passed.")

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()

pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
Expand Down Expand Up @@ -113,7 +119,7 @@ func TestGetFailures(t *testing.T) {
Record: rec,
}

s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols[0])
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.5.2
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a
github.com/libp2p/go-libp2p-core v0.3.1
github.com/libp2p/go-libp2p-kbucket v0.2.3
github.com/libp2p/go-libp2p-peerstore v0.1.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p v0.5.2 h1:fjQUTyB7x/4XgO31OEWkJ5uFeHRgpoExlf0rXz5BO8k=
github.com/libp2p/go-libp2p v0.5.2/go.mod h1:o2r6AcpNl1eNGoiWhRtPji03NYOvZumeQ6u+X6gSxnM=
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a h1:cxYryrTPI23R5InZb9Kc86dj819f7yVMapQPuj1Ti1s=
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a/go.mod h1:8UlWMmxcKNxyY0ocYX8Ft4IZ0mMfr7b89v1qZdXxwrk=
github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI=
github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE=
github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro=
Expand Down
142 changes: 0 additions & 142 deletions notif.go

This file was deleted.

4 changes: 2 additions & 2 deletions notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func TestNotifieeMultipleConn(t *testing.T) {
d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)

nn1 := (*netNotifiee)(d1)
nn2 := (*netNotifiee)(d2)
nn1 := (*subscriberNotifee)(d1)
nn2 := (*subscriberNotifee)(d2)

connect(t, ctx, d1, d2)
c12 := d1.host.Network().ConnsToPeer(d2.self)[0]
Expand Down
Loading