Skip to content

Commit

Permalink
Merge pull request #597 from libp2p/feat/address-push
Browse files Browse the repository at this point in the history
Periodically schedule identify push if the address set has changed
  • Loading branch information
vyzo authored Apr 19, 2019
2 parents f1888d9 + f17a4a8 commit b0f4a6f
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 6 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
}
}

// start the host background tasks
h.Start()

if router != nil {
return routed.Wrap(h, router), nil
}
Expand Down
75 changes: 74 additions & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"net"
"sync"
"time"

logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -70,6 +71,11 @@ type BasicHost struct {
negtimeout time.Duration

proc goprocess.Process

ctx context.Context
cancel func()
mx sync.Mutex
lastAddrs []ma.Multiaddr
}

// HostOpts holds options that can be passed to NewHost in order to
Expand Down Expand Up @@ -164,6 +170,11 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,

net.SetConnHandler(h.newConnHandler)
net.SetStreamHandler(h.newStreamHandler)

bgctx, cancel := context.WithCancel(ctx)
h.ctx = bgctx
h.cancel = cancel

return h, nil
}

Expand Down Expand Up @@ -204,6 +215,11 @@ func New(net inet.Network, opts ...interface{}) *BasicHost {
return h
}

// Start starts background tasks in the host
func (h *BasicHost) Start() {
go h.background()
}

// newConnHandler is the remote-opened conn handler for inet.Network
func (h *BasicHost) newConnHandler(c inet.Conn) {
// Clear protocols on connecting to new peer to avoid issues caused
Expand Down Expand Up @@ -263,7 +279,63 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
// PushIdentify pushes an identify update through the identify push protocol
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) PushIdentify() {
h.ids.Push()
push := false

h.mx.Lock()
addrs := h.Addrs()
if !sameAddrs(addrs, h.lastAddrs) {
push = true
h.lastAddrs = addrs
}
h.mx.Unlock()

if push {
h.ids.Push()
}
}

func (h *BasicHost) background() {
// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

// initialize lastAddrs
h.mx.Lock()
if h.lastAddrs == nil {
h.lastAddrs = h.Addrs()
}
h.mx.Unlock()

for {
select {
case <-ticker.C:
h.PushIdentify()

case <-h.ctx.Done():
return
}
}
}

func sameAddrs(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}

bmap := make(map[string]struct{}, len(b))
for _, addr := range b {
bmap[string(addr.Bytes())] = struct{}{}
}

for _, addr := range a {
_, ok := bmap[string(addr.Bytes())]
if !ok {
return false
}
}

return true
}

// ID returns the (local) peer.ID associated with this Host
Expand Down Expand Up @@ -646,6 +718,7 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {

// Close shuts down the Host's services (network, etc).
func (h *BasicHost) Close() error {
h.cancel()
return h.proc.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func TestLimitedStreams(t *testing.T) {
func TestFuzzManyPeers(t *testing.T) {
peerCount := 50000
if detectrace.WithRace() {
peerCount = 1000
peerCount = 100
}
for i := 0; i < peerCount; i++ {
_, err := FullMeshConnected(context.Background(), 2)
Expand Down
34 changes: 30 additions & 4 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const transientTTL = 10 * time.Second
type IDService struct {
Host host.Host

ctx context.Context

// connections undergoing identification
// for wait purposes
currid map[inet.Conn]chan struct{}
Expand All @@ -64,6 +66,7 @@ type IDService struct {
func NewIDService(ctx context.Context, h host.Host) *IDService {
s := &IDService{
Host: h,
ctx: ctx,
currid: make(map[inet.Conn]chan struct{}),
observedAddrs: NewObservedAddrSet(ctx),
}
Expand Down Expand Up @@ -156,19 +159,42 @@ func (ids *IDService) pushHandler(s inet.Stream) {
}

func (ids *IDService) Push() {
var wg sync.WaitGroup

ctx, cancel := context.WithTimeout(ids.ctx, 30*time.Second)
ctx = inet.WithNoDial(ctx, "identify push")

for _, p := range ids.Host.Network().Peers() {
wg.Add(1)
go func(p peer.ID) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
defer wg.Done()

s, err := ids.Host.NewStream(ctx, p, IDPush)
if err != nil {
log.Debugf("error opening push stream: %s", err.Error())
log.Debugf("error opening push stream to %s: %s", p, err.Error())
return
}

ids.requestHandler(s)
rch := make(chan struct{}, 1)
go func() {
ids.requestHandler(s)
rch <- struct{}{}
}()

select {
case <-rch:
case <-ctx.Done():
// this is taking too long, abort!
s.Reset()
}
}(p)
}

// this supervisory goroutine is necessary to cancel the context
go func() {
wg.Wait()
cancel()
}()
}

func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {
Expand Down

0 comments on commit b0f4a6f

Please sign in to comment.