Skip to content

Commit

Permalink
Merge pull request #576 from libp2p/feat/filter-rt
Browse files Browse the repository at this point in the history
fix: use the routing table filter
  • Loading branch information
willscott authored Apr 9, 2020
2 parents 6c82b40 + 6400d24 commit 796b95b
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 4 deletions.
38 changes: 37 additions & 1 deletion dht_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package dht
import (
"bytes"
"net"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/google/gopacket/routing"
netroute "github.com/libp2p/go-netroute"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -64,10 +68,42 @@ func PrivateQueryFilter(dht *IpfsDHT, ai peer.AddrInfo) bool {

var _ QueryFilterFunc = PrivateQueryFilter

// We call this very frequently but routes can technically change at runtime.
// Cache it for two minutes.
const routerCacheTime = 2 * time.Minute

var routerCache struct {
sync.RWMutex
router routing.Router
expires time.Time
}

func getCachedRouter() routing.Router {
routerCache.RLock()
router := routerCache.router
expires := routerCache.expires
routerCache.RUnlock()

if time.Now().Before(expires) {
return router
}

routerCache.Lock()
defer routerCache.Unlock()

now := time.Now()
if now.Before(routerCache.expires) {
return router
}
routerCache.router, _ = netroute.New()
routerCache.expires = now.Add(routerCacheTime)
return router
}

// PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a private network
func PrivateRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
router, _ := netroute.New()
router := getCachedRouter()
myAdvertisedIPs := make([]net.IP, 0)
for _, a := range dht.Host().Addrs() {
if manet.IsPublicAddr(a) && !isRelayAddr(a) {
Expand Down
43 changes: 43 additions & 0 deletions dht_filters_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package dht

import (
"context"
"net"
"testing"

ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

func TestIsRelay(t *testing.T) {
Expand All @@ -21,3 +28,39 @@ func TestIsRelay(t *testing.T) {
}

}

type mockConn struct {
local peer.AddrInfo
remote peer.AddrInfo
}

func (m *mockConn) Close() error { return nil }
func (m *mockConn) NewStream() (network.Stream, error) { return nil, nil }
func (m *mockConn) GetStreams() []network.Stream { return []network.Stream{} }
func (m *mockConn) Stat() network.Stat { return network.Stat{Direction: network.DirOutbound} }
func (m *mockConn) LocalMultiaddr() ma.Multiaddr { return m.local.Addrs[0] }
func (m *mockConn) RemoteMultiaddr() ma.Multiaddr { return m.remote.Addrs[0] }
func (m *mockConn) LocalPeer() peer.ID { return m.local.ID }
func (m *mockConn) LocalPrivateKey() ic.PrivKey { return nil }
func (m *mockConn) RemotePeer() peer.ID { return m.remote.ID }
func (m *mockConn) RemotePublicKey() ic.PubKey { return nil }

func TestFilterCaching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := setupDHT(ctx, t, true)

remote, _ := manet.FromIP(net.IPv4(8, 8, 8, 8))
if PrivateRoutingTableFilter(d, []network.Conn{&mockConn{
local: d.Host().Peerstore().PeerInfo(d.Host().ID()),
remote: peer.AddrInfo{ID: "", Addrs: []ma.Multiaddr{remote}},
}}) {
t.Fatal("filter should prevent public remote peers.")
}

r1 := getCachedRouter()
r2 := getCachedRouter()
if r1 != r2 {
t.Fatal("router should be returned multiple times.")
}
}
24 changes: 24 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1958,3 +1958,27 @@ func TestInvalidKeys(t *testing.T) {
t.Fatal("expected to have failed")
}
}

func TestRoutingFilter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

nDHTs := 2
dhts := setupDHTS(t, ctx, nDHTs)
defer func() {
for i := 0; i < nDHTs; i++ {
dhts[i].Close()
defer dhts[i].host.Close()
}
}()
dhts[0].routingTablePeerFilter = PublicRoutingTableFilter

connectNoSync(t, ctx, dhts[0], dhts[1])
wait(t, ctx, dhts[1], dhts[0])

select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-time.After(time.Millisecond * 200):
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/gogo/protobuf v1.3.1
github.com/google/gopacket v1.1.17
github.com/google/uuid v1.1.1
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4
Expand All @@ -24,7 +25,6 @@ require (
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-msgio v0.0.4
github.com/libp2p/go-netroute v0.1.2
github.com/mr-tron/base58 v1.1.3
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-dns v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabil
// routing table
func (dht *IpfsDHT) validRTPeer(p peer.ID) (bool, error) {
protos, err := dht.peerstore.SupportsProtocols(p, protocol.ConvertToStrings(dht.protocols)...)
if err != nil {
if len(protos) == 0 || err != nil {
return false, err
}

return len(protos) > 0, nil
return dht.routingTablePeerFilter == nil || dht.routingTablePeerFilter(dht, dht.Host().Network().ConnsToPeer(p)), nil
}

func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
Expand Down

0 comments on commit 796b95b

Please sign in to comment.