Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utilize identify events to add peers to the routing table #472

Merged
merged 9 commits into from
Mar 4, 2020
18 changes: 12 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -84,6 +85,10 @@ type IpfsDHT struct {
// "forked" DHTs (e.g., DHTs with custom protocols and/or private
// networks).
enableProviders, enableValues bool

subscriptions struct {
evtPeerIdentification event.Subscription
}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -114,15 +119,16 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues

subnot := (*subscriberNotifee)(dht)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht.host.Network().Notify(subnot)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.proc = goprocessctx.WithContext(ctx)

// register for network notifs.
dht.proc.Go((*subscriberNotifee)(dht).subscribe)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
// handle providers
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

Expand Down
28 changes: 17 additions & 11 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand All @@ -24,25 +27,28 @@ func TestGetFailures(t *testing.T) {
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()

os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

d, err := New(ctx, host1, opts.DisableAutoRefresh())
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
time.Sleep(400 * time.Millisecond)
s.Close()
})

host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
_, err = host1.Network().DialPeer(ctx, host2.ID())
if err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)

// This one should time out
ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
Expand All @@ -61,7 +67,7 @@ func TestGetFailures(t *testing.T) {
t.Log("Timeout test passed.")

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()

pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
Expand Down Expand Up @@ -113,7 +119,7 @@ func TestGetFailures(t *testing.T) {
Record: rec,
}

s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols[0])
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.5.2
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a
github.com/libp2p/go-libp2p-core v0.3.1
github.com/libp2p/go-libp2p-kbucket v0.2.3
github.com/libp2p/go-libp2p-peerstore v0.1.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p v0.5.2 h1:fjQUTyB7x/4XgO31OEWkJ5uFeHRgpoExlf0rXz5BO8k=
github.com/libp2p/go-libp2p v0.5.2/go.mod h1:o2r6AcpNl1eNGoiWhRtPji03NYOvZumeQ6u+X6gSxnM=
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a h1:cxYryrTPI23R5InZb9Kc86dj819f7yVMapQPuj1Ti1s=
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a/go.mod h1:8UlWMmxcKNxyY0ocYX8Ft4IZ0mMfr7b89v1qZdXxwrk=
github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI=
github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE=
github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro=
Expand Down
142 changes: 0 additions & 142 deletions notif.go

This file was deleted.

4 changes: 2 additions & 2 deletions notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func TestNotifieeMultipleConn(t *testing.T) {
d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)

nn1 := (*netNotifiee)(d1)
nn2 := (*netNotifiee)(d2)
nn1 := (*subscriberNotifee)(d1)
nn2 := (*subscriberNotifee)(d2)

connect(t, ctx, d1, d2)
c12 := d1.host.Network().ConnsToPeer(d2.self)[0]
Expand Down
Loading