Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix connection tracking race #111

Merged
merged 3 commits into from
Jan 20, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ type IpfsDHT struct {
strmap map[peer.ID]*messageSender
smlk sync.Mutex

plk sync.Mutex
peers map[peer.ID]*peerTracker
plk sync.Mutex
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand Down Expand Up @@ -119,7 +118,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,
peers: make(map[peer.ID]*peerTracker),

Validator: make(record.Validator),
Selector: make(record.Selector),
Expand Down
105 changes: 40 additions & 65 deletions notif.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package dht

import (
"context"
"io"

inet "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
mstream "github.com/multiformats/go-multistream"
Expand All @@ -12,15 +9,12 @@ import (
// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT

var dhtProtocols = []string{string(ProtocolDHT), string(ProtocolDHTOld)}

func (nn *netNotifiee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}

type peerTracker struct {
refcount int
cancel func()
}

func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
Expand All @@ -29,61 +23,51 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
default:
}

dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[v.RemotePeer()]
if ok {
conn.refcount++
p := v.RemotePeer()
protos, err := dht.peerstore.SupportsProtocols(p, dhtProtocols...)
if err == nil && len(protos) != 0 {
dht.plk.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly does this lock protect here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to protect Update -- but that's a public interface function.
It has no business being both public and requiring the lock to be held.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular lock is probably unnecessary because Connect and Disconnect notifications are synchronous (although I still want to leave it as it doesn't hurt and I like being consistent). However, we do need to take it below in the Disconnect handler and in testConnection. Otherwise, we could end up with the following interleaving:

testConnection Disconnect
Observe that we are connected
Observe that we are disconnected (disconnect event happens)
Remove peer from routing table
Add peer to routing table

This is an alternative to reference counting open connections (what we did before) that doesn't require keeping a bunch of additional state.

Copy link
Contributor

@vyzo vyzo Jan 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fair enough. Can we add a comment some comments to that effect -- it looks totally out of place otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the effect of calling Update without this lock? I am concerned about the public interface uses of it.

defer dht.plk.Unlock()
if dht.host.Network().Connectedness(p) == inet.Connected {
dht.Update(dht.Context(), p)
}
return
}

ctx, cancel := context.WithCancel(dht.Context())

nn.peers[v.RemotePeer()] = &peerTracker{
refcount: 1,
cancel: cancel,
}

// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
go nn.testConnection(ctx, v)

// Note: Unfortunately, the peerstore may not yet now that this peer is
// a DHT server. So, if it didn't return a positive response above, test
// manually.
go nn.testConnection(v)
}

func (nn *netNotifiee) testConnection(ctx context.Context, v inet.Conn) {
func (nn *netNotifiee) testConnection(v inet.Conn) {
dht := nn.DHT()
for {
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)

switch err {
case nil:
s.Close()
dht.plk.Lock()

// Check if canceled under the lock.
if ctx.Err() == nil {
dht.Update(dht.Context(), v.RemotePeer())
}

dht.plk.Unlock()
case io.EOF:
if ctx.Err() == nil {
// Connection died but we may still have *an* open connection (context not canceled) so try again.
continue
}
case context.Canceled:
// Context canceled while connecting.
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
default:
// real error? thats odd
log.Warningf("checking dht client type: %s", err)
}
p := v.RemotePeer()

// Forcibly use *this* connection. Otherwise, if we have two connections, we could:
// 1. Test it twice.
// 2. Have it closed from under us leaving the second (open) connection untested.
s, err := v.NewStream()
if err != nil {
// Connection error
return
}
defer s.Close()

selected, err := mstream.SelectOneOf(dhtProtocols, s)
if err != nil {
// Doesn't support the protocol
return
}
// Remember this choice (makes subsequent negotiations faster)
dht.peerstore.AddProtocols(p, selected)

dht.plk.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here again with the lock!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

defer dht.plk.Unlock()
// Make sure we're still connected under the lock (race with disconnect)
if dht.host.Network().Connectedness(p) == inet.Connected {
dht.Update(dht.Context(), p)
}
}

func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
Expand All @@ -100,16 +84,7 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[p]
if !ok {
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn.refcount -= 1
if conn.refcount == 0 {
delete(nn.peers, p)
conn.cancel()
if dht.host.Network().Connectedness(p) != inet.Connected {
dht.routingTable.Remove(p)
}
}()
Expand Down