Skip to content

Commit

Permalink
Rank Dial addresses (#212)
Browse files Browse the repository at this point in the history
* Rank dial addresss.
  • Loading branch information
aarshkshah1992 authored May 20, 2020
1 parent dc499b7 commit 499b4d8
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 35 deletions.
32 changes: 22 additions & 10 deletions p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"

addrutil "github.com/libp2p/go-addr-util"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -43,9 +42,10 @@ func (dj *dialJob) dialTimeout() time.Duration {
type dialLimiter struct {
lk sync.Mutex

fdConsuming int
fdLimit int
waitingOnFd []*dialJob
isFdConsumingFnc isFdConsumingFnc
fdConsuming int
fdLimit int
waitingOnFd []*dialJob

dialFunc dialfunc

Expand All @@ -55,19 +55,21 @@ type dialLimiter struct {
}

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
type isFdConsumingFnc func(ma.Multiaddr) bool

func newDialLimiter(df dialfunc) *dialLimiter {
func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter {
fd := ConcurrentFdDials
if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
if n, err := strconv.ParseInt(env, 10, 32); err == nil {
fd = int(n)
}
}
return newDialLimiterWithParams(df, fd, DefaultPerPeerRateLimit)
return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
return &dialLimiter{
isFdConsumingFnc: isFdConsumingFnc,
fdLimit: fdLimit,
perPeerLimit: perPeerLimit,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
Expand Down Expand Up @@ -140,16 +142,26 @@ func (dl *dialLimiter) freePeerToken(dj *dialJob) {
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()

if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.shouldConsumeFd(dj.addr) {
dl.freeFDToken()
}

dl.freePeerToken(dj)
}

func (dl *dialLimiter) shouldConsumeFd(addr ma.Multiaddr) bool {
// we don't consume FD's for relay addresses for now as they will be consumed when the Relay Transport
// actually dials the Relay server. That dial call will also pass through this limiter with
// the address of the relay server i.e. non-relay address.
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)

isRelay := err == nil

return !isRelay && dl.isFdConsumingFnc(addr)
}

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.shouldConsumeFd(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+
"limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd))
Expand Down
45 changes: 40 additions & 5 deletions p2p/net/swarm/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,26 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
"github.com/libp2p/go-libp2p-core/transport"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
)

var isFdConsuming = func(addr ma.Multiaddr) bool {
res := false

ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_TCP {
res = true
return false
}
return true
})
return res
}

func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
Expand Down Expand Up @@ -61,6 +76,11 @@ func hangDialFunc(hang chan struct{}) dialfunc {
return transport.CapableConn(nil), nil
}

_, err := a.ValueForProtocol(ma.P_CIRCUIT)
if err == nil {
return transport.CapableConn(nil), nil
}

if tcpPortOver(a, 10) {
return transport.CapableConn(nil), nil
}
Expand All @@ -74,7 +94,7 @@ func TestLimiterBasicDials(t *testing.T) {
hang := make(chan struct{})
defer close(hang)

l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4)
l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), ConcurrentFdDials, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
Expand Down Expand Up @@ -123,7 +143,7 @@ func TestLimiterBasicDials(t *testing.T) {
func TestFDLimiting(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)
l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), 16, 5)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
Expand Down Expand Up @@ -168,6 +188,21 @@ func TestFDLimiting(t *testing.T) {
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for utp addr success")
}

// A relay address with tcp transport will complete because we do not consume fds for dials
// with relay addresses as the fd will be consumed when we actually dial the relay server.
pid6 := test.RandPeerIDFatal(t)
relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6))
l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch})

select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have gotten successful response")
}
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for relay addr success")
}
}

func TestTokenRedistribution(t *testing.T) {
Expand All @@ -184,7 +219,7 @@ func TestTokenRedistribution(t *testing.T) {
<-ch
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(df, 8, 4)
l := newDialLimiterWithParams(isFdConsuming, df, 8, 4)

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2"}
Expand Down Expand Up @@ -277,7 +312,7 @@ func TestStressLimiter(t *testing.T) {
return nil, fmt.Errorf("test bad dial")
}

l := newDialLimiterWithParams(df, 20, 5)
l := newDialLimiterWithParams(isFdConsuming, df, 20, 5)

var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -337,7 +372,7 @@ func TestFDLimitUnderflow(t *testing.T) {
return nil, fmt.Errorf("df timed out")
}

l := newDialLimiterWithParams(df, 20, 3)
l := newDialLimiterWithParams(isFdConsuming, df, 20, 3)

var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc
}

s.dsync = NewDialSync(s.doDial)
s.limiter = newDialLimiter(s.dialAddr)
s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr)
s.proc = goprocessctx.WithContext(ctx)
s.ctx = goprocessctx.OnClosingContext(s.proc)
s.backf.init(s.ctx)
Expand Down
107 changes: 88 additions & 19 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"

addrutil "github.com/libp2p/go-addr-util"
lgbl "github.com/libp2p/go-libp2p-loggables"

logging "github.com/ipfs/go-log"
addrutil "github.com/libp2p/go-addr-util"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// Diagram of dial sync:
Expand Down Expand Up @@ -337,13 +339,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
}

//////
/*
This slice-to-chan code is temporary, the peerstore can currently provide
a channel as an interface for receiving addresses, but more thought
needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
peerAddrs := s.peers.Addrs(p)
if len(peerAddrs) == 0 {
return nil, &DialError{Peer: p, Cause: ErrNoAddresses}
Expand All @@ -352,23 +347,60 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
if len(goodAddrs) == 0 {
return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
}
goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
nonBackoff := false

/////// Check backoff andnRank addresses
var nonBackoff bool
for _, a := range goodAddrs {
// skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
goodAddrsChan <- a
}
}
close(goodAddrsChan)
if !nonBackoff {
return nil, ErrDialBackoff
}
/////////

// try to get a connection to any addr
connC, dialErr := s.dialAddrs(ctx, p, goodAddrsChan)
// ranks addresses in descending order of preference for dialing
// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
rankAddrsFnc := func(addrs []ma.Multiaddr) []ma.Multiaddr {
var localUdpAddrs []ma.Multiaddr // private udp
var relayUdpAddrs []ma.Multiaddr // relay udp
var othersUdp []ma.Multiaddr // public udp

var localFdAddrs []ma.Multiaddr // private fd consuming
var relayFdAddrs []ma.Multiaddr // relay fd consuming
var othersFd []ma.Multiaddr // public fd consuming

for _, a := range addrs {
if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
if s.IsFdConsumingAddr(a) {
relayFdAddrs = append(relayFdAddrs, a)
continue
}
relayUdpAddrs = append(relayUdpAddrs, a)
} else if manet.IsPrivateAddr(a) {
if s.IsFdConsumingAddr(a) {
localFdAddrs = append(localFdAddrs, a)
continue
}
localUdpAddrs = append(localUdpAddrs, a)
} else {
if s.IsFdConsumingAddr(a) {
othersFd = append(othersFd, a)
continue
}
othersUdp = append(othersUdp, a)
}
}

relays := append(relayUdpAddrs, relayFdAddrs...)
fds := append(localFdAddrs, othersFd...)

return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...)
}

connC, dialErr := s.dialAddrs(ctx, p, rankAddrsFnc(goodAddrs))

if dialErr != nil {
logdial["error"] = dialErr.Cause.Error()
switch dialErr.Cause {
Expand Down Expand Up @@ -424,7 +456,23 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul
)
}

func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.CapableConn, *DialError) {
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) {
/*
This slice-to-chan code is temporary, the peerstore can currently provide
a channel as an interface for receiving addresses, but more thought
needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
var remoteAddrChan chan ma.Multiaddr
if len(remoteAddrs) > 0 {
remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs))
for i := range remoteAddrs {
remoteAddrChan <- remoteAddrs[i]
}
close(remoteAddrChan)
}

log.Debugf("%s swarm dialing %s", s.local, p)

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -438,7 +486,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.

var active int
dialLoop:
for remoteAddrs != nil || active > 0 {
for remoteAddrChan != nil || active > 0 {
// Check for context cancellations and/or responses first.
select {
case <-ctx.Done():
Expand All @@ -464,9 +512,9 @@ dialLoop:

// Now, attempt to dial.
select {
case addr, ok := <-remoteAddrs:
case addr, ok := <-remoteAddrChan:
if !ok {
remoteAddrs = nil
remoteAddrChan = nil
continue
}

Expand Down Expand Up @@ -540,3 +588,24 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra
// success! we got one!
return connC, nil
}

// TODO We should have a `IsFdConsuming() bool` method on the `Transport` interface in go-libp2p-core/transport.
// This function checks if any of the transport protocols in the address requires a file descriptor.
// For now:
// A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming.
// For a circuit-relay address, we look at the address of the relay server/proxy
// and use the same logic as above to decide.
func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool {
first, _ := ma.SplitFunc(addr, func(c ma.Component) bool {
return c.Protocol().Code == ma.P_CIRCUIT
})

// for safety
if first == nil {
return true
}

_, err1 := first.ValueForProtocol(ma.P_TCP)
_, err2 := first.ValueForProtocol(ma.P_UNIX)
return err1 == nil || err2 == nil
}
Loading

0 comments on commit 499b4d8

Please sign in to comment.