Skip to content

Commit

Permalink
Properly track connections to peers in the DHT.
Browse files Browse the repository at this point in the history
fixes ipfs#70
  • Loading branch information
Stebalien committed Jul 27, 2017
1 parent 9b87e92 commit a1b973c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 31 deletions.
4 changes: 4 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type IpfsDHT struct {

strmap map[peer.ID]*messageSender
smlk sync.Mutex

plk sync.Mutex
peers map[peer.ID]*peerTracker
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand Down Expand Up @@ -106,6 +109,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()),
peers: make(map[peer.ID]*peerTracker),

Validator: make(record.Validator),
Selector: make(record.Selector),
Expand Down
96 changes: 65 additions & 31 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,24 @@ package dht
import (
"context"
"io"
"time"

inet "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
mstream "github.com/multiformats/go-multistream"
)

// TODO: There is a race condition here where we could process notifications
// out-of-order and incorrectly mark some peers as DHT nodes (or not DHT nodes).
// The correct fix for this is nasty so I'm not really sure it's worth it.
// Incorrectly recording or failing to record a DHT node in the routing table
// isn't a big issue.

const dhtCheckTimeout = 10 * time.Second

// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT

func (nn *netNotifiee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}

type peerTracker struct {
refcount int
cancel func()
}

func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
Expand All @@ -33,34 +29,57 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
default:
}

dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[v.RemotePeer()]
if ok {
conn.refcount += 1
return
}

ctx, cancel := context.WithCancel(dht.Context())

nn.peers[v.RemotePeer()] = &peerTracker{
refcount: 1,
cancel: cancel,
}

go func() {

// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream

// TODO: There's a race condition here where the connection may
// not be open (and we may sit here trying to connect). I've
// added a timeout but that's not really the correct fix.

ctx, cancel := context.WithTimeout(dht.Context(), dhtCheckTimeout)
defer cancel()
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)

switch err {
case nil:
s.Close()
// connected fine? full dht node
dht.Update(dht.Context(), v.RemotePeer())
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
case io.EOF:
// This is kindof an error, but it happens someone often so make it a warning
log.Warningf("checking dht client type: %s", err)
default:
// real error? thats odd
log.Errorf("checking dht client type: %s", err)
for {
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)

// Canceled.
if ctx.Err() != nil {
return
}

switch err {
case nil:
s.Close()
dht.plk.Lock()
defer dht.plk.Unlock()

// Check if canceled again under the lock.
if ctx.Err() == nil {
dht.Update(dht.Context(), v.RemotePeer())
}
case io.EOF:
// Connection died but we may still have *an* open connection so try again.
continue
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
default:
// real error? thats odd
log.Errorf("checking dht client type: %s", err)
}
return
}
}()
}
Expand All @@ -72,7 +91,22 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
return
default:
}
go dht.routingTable.Remove(v.RemotePeer())

dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[v.RemotePeer()]
if !ok {
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn.refcount -= 1
if conn.refcount == 0 {
delete(nn.peers, v.RemotePeer())
conn.cancel()
dht.routingTable.Remove(v.RemotePeer())
}
}

func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
Expand Down

0 comments on commit a1b973c

Please sign in to comment.