From 499b4d8467f7ed673a80e14b3b18ab88f110e2c2 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 20 May 2020 14:34:11 +0530 Subject: [PATCH] Rank Dial addresses (#212) * Rank dial addresss. --- p2p/net/swarm/limiter.go | 32 ++++++---- p2p/net/swarm/limiter_test.go | 45 ++++++++++++-- p2p/net/swarm/swarm.go | 2 +- p2p/net/swarm/swarm_dial.go | 107 ++++++++++++++++++++++++++++------ p2p/net/swarm/swarm_test.go | 54 +++++++++++++++++ 5 files changed, 205 insertions(+), 35 deletions(-) diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go index 6808dd71a6..3e20976b32 100644 --- a/p2p/net/swarm/limiter.go +++ b/p2p/net/swarm/limiter.go @@ -10,7 +10,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" - addrutil "github.com/libp2p/go-addr-util" ma "github.com/multiformats/go-multiaddr" ) @@ -43,9 +42,10 @@ func (dj *dialJob) dialTimeout() time.Duration { type dialLimiter struct { lk sync.Mutex - fdConsuming int - fdLimit int - waitingOnFd []*dialJob + isFdConsumingFnc isFdConsumingFnc + fdConsuming int + fdLimit int + waitingOnFd []*dialJob dialFunc dialfunc @@ -55,19 +55,21 @@ type dialLimiter struct { } type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error) +type isFdConsumingFnc func(ma.Multiaddr) bool -func newDialLimiter(df dialfunc) *dialLimiter { +func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter { fd := ConcurrentFdDials if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" { if n, err := strconv.ParseInt(env, 10, 32); err == nil { fd = int(n) } } - return newDialLimiterWithParams(df, fd, DefaultPerPeerRateLimit) + return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit) } -func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter { +func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter { return &dialLimiter{ + isFdConsumingFnc: isFdConsumingFnc, fdLimit: fdLimit, perPeerLimit: perPeerLimit, waitingOnPeerLimit: make(map[peer.ID][]*dialJob), @@ -140,16 +142,26 @@ func (dl *dialLimiter) freePeerToken(dj *dialJob) { func (dl *dialLimiter) finishedDial(dj *dialJob) { dl.lk.Lock() defer dl.lk.Unlock() - - if addrutil.IsFDCostlyTransport(dj.addr) { + if dl.shouldConsumeFd(dj.addr) { dl.freeFDToken() } dl.freePeerToken(dj) } +func (dl *dialLimiter) shouldConsumeFd(addr ma.Multiaddr) bool { + // we don't consume FD's for relay addresses for now as they will be consumed when the Relay Transport + // actually dials the Relay server. That dial call will also pass through this limiter with + // the address of the relay server i.e. non-relay address. + _, err := addr.ValueForProtocol(ma.P_CIRCUIT) + + isRelay := err == nil + + return !isRelay && dl.isFdConsumingFnc(addr) +} + func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) { - if addrutil.IsFDCostlyTransport(dj.addr) { + if dl.shouldConsumeFd(dj.addr) { if dl.fdConsuming >= dl.fdLimit { log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+ "limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd)) diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go index 7373ddbb3e..367e099fd9 100644 --- a/p2p/net/swarm/limiter_test.go +++ b/p2p/net/swarm/limiter_test.go @@ -10,11 +10,26 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" "github.com/libp2p/go-libp2p-core/transport" + ma "github.com/multiformats/go-multiaddr" mafmt "github.com/multiformats/go-multiaddr-fmt" ) +var isFdConsuming = func(addr ma.Multiaddr) bool { + res := false + + ma.ForEach(addr, func(c ma.Component) bool { + if c.Protocol().Code == ma.P_TCP { + res = true + return false + } + return true + }) + return res +} + func mustAddr(t *testing.T, s string) ma.Multiaddr { a, err := ma.NewMultiaddr(s) if err != nil { @@ -61,6 +76,11 @@ func hangDialFunc(hang chan struct{}) dialfunc { return transport.CapableConn(nil), nil } + _, err := a.ValueForProtocol(ma.P_CIRCUIT) + if err == nil { + return transport.CapableConn(nil), nil + } + if tcpPortOver(a, 10) { return transport.CapableConn(nil), nil } @@ -74,7 +94,7 @@ func TestLimiterBasicDials(t *testing.T) { hang := make(chan struct{}) defer close(hang) - l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4) + l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), ConcurrentFdDials, 4) bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} good := addrWithPort(t, 20) @@ -123,7 +143,7 @@ func TestLimiterBasicDials(t *testing.T) { func TestFDLimiting(t *testing.T) { hang := make(chan struct{}) defer close(hang) - l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5) + l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), 16, 5) bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"} @@ -168,6 +188,21 @@ func TestFDLimiting(t *testing.T) { case <-time.After(time.Second * 5): t.Fatal("timeout waiting for utp addr success") } + + // A relay address with tcp transport will complete because we do not consume fds for dials + // with relay addresses as the fd will be consumed when we actually dial the relay server. + pid6 := test.RandPeerIDFatal(t) + relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6)) + l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch}) + + select { + case res := <-resch: + if res.Err != nil { + t.Fatal("should have gotten successful response") + } + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for relay addr success") + } } func TestTokenRedistribution(t *testing.T) { @@ -184,7 +219,7 @@ func TestTokenRedistribution(t *testing.T) { <-ch return nil, fmt.Errorf("test bad dial") } - l := newDialLimiterWithParams(df, 8, 4) + l := newDialLimiterWithParams(isFdConsuming, df, 8, 4) bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} pids := []peer.ID{"testpeer1", "testpeer2"} @@ -277,7 +312,7 @@ func TestStressLimiter(t *testing.T) { return nil, fmt.Errorf("test bad dial") } - l := newDialLimiterWithParams(df, 20, 5) + l := newDialLimiterWithParams(isFdConsuming, df, 20, 5) var bads []ma.Multiaddr for i := 0; i < 100; i++ { @@ -337,7 +372,7 @@ func TestFDLimitUnderflow(t *testing.T) { return nil, fmt.Errorf("df timed out") } - l := newDialLimiterWithParams(df, 20, 3) + l := newDialLimiterWithParams(isFdConsuming, df, 20, 3) var addrs []ma.Multiaddr for i := 0; i <= 1000; i++ { diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index f5c0209c80..85f8b2f81c 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -119,7 +119,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } s.dsync = NewDialSync(s.doDial) - s.limiter = newDialLimiter(s.dialAddr) + s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) s.backf.init(s.ctx) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index f35f2b6372..68b1045c78 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -10,11 +10,13 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/transport" + + addrutil "github.com/libp2p/go-addr-util" lgbl "github.com/libp2p/go-libp2p-loggables" logging "github.com/ipfs/go-log" - addrutil "github.com/libp2p/go-addr-util" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" ) // Diagram of dial sync: @@ -337,13 +339,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { } ////// - /* - This slice-to-chan code is temporary, the peerstore can currently provide - a channel as an interface for receiving addresses, but more thought - needs to be put into the execution. For now, this allows us to use - the improved rate limiter, while maintaining the outward behaviour - that we previously had (halting a dial when we run out of addrs) - */ peerAddrs := s.peers.Addrs(p) if len(peerAddrs) == 0 { return nil, &DialError{Peer: p, Cause: ErrNoAddresses} @@ -352,23 +347,60 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { if len(goodAddrs) == 0 { return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} } - goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs)) - nonBackoff := false + + /////// Check backoff andnRank addresses + var nonBackoff bool for _, a := range goodAddrs { // skip addresses in back-off if !s.backf.Backoff(p, a) { nonBackoff = true - goodAddrsChan <- a } } - close(goodAddrsChan) if !nonBackoff { return nil, ErrDialBackoff } - ///////// - // try to get a connection to any addr - connC, dialErr := s.dialAddrs(ctx, p, goodAddrsChan) + // ranks addresses in descending order of preference for dialing + // Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server + rankAddrsFnc := func(addrs []ma.Multiaddr) []ma.Multiaddr { + var localUdpAddrs []ma.Multiaddr // private udp + var relayUdpAddrs []ma.Multiaddr // relay udp + var othersUdp []ma.Multiaddr // public udp + + var localFdAddrs []ma.Multiaddr // private fd consuming + var relayFdAddrs []ma.Multiaddr // relay fd consuming + var othersFd []ma.Multiaddr // public fd consuming + + for _, a := range addrs { + if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { + if s.IsFdConsumingAddr(a) { + relayFdAddrs = append(relayFdAddrs, a) + continue + } + relayUdpAddrs = append(relayUdpAddrs, a) + } else if manet.IsPrivateAddr(a) { + if s.IsFdConsumingAddr(a) { + localFdAddrs = append(localFdAddrs, a) + continue + } + localUdpAddrs = append(localUdpAddrs, a) + } else { + if s.IsFdConsumingAddr(a) { + othersFd = append(othersFd, a) + continue + } + othersUdp = append(othersUdp, a) + } + } + + relays := append(relayUdpAddrs, relayFdAddrs...) + fds := append(localFdAddrs, othersFd...) + + return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...) + } + + connC, dialErr := s.dialAddrs(ctx, p, rankAddrsFnc(goodAddrs)) + if dialErr != nil { logdial["error"] = dialErr.Cause.Error() switch dialErr.Cause { @@ -424,7 +456,23 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul ) } -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.CapableConn, *DialError) { +func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) { + /* + This slice-to-chan code is temporary, the peerstore can currently provide + a channel as an interface for receiving addresses, but more thought + needs to be put into the execution. For now, this allows us to use + the improved rate limiter, while maintaining the outward behaviour + that we previously had (halting a dial when we run out of addrs) + */ + var remoteAddrChan chan ma.Multiaddr + if len(remoteAddrs) > 0 { + remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs)) + for i := range remoteAddrs { + remoteAddrChan <- remoteAddrs[i] + } + close(remoteAddrChan) + } + log.Debugf("%s swarm dialing %s", s.local, p) ctx, cancel := context.WithCancel(ctx) @@ -438,7 +486,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. var active int dialLoop: - for remoteAddrs != nil || active > 0 { + for remoteAddrChan != nil || active > 0 { // Check for context cancellations and/or responses first. select { case <-ctx.Done(): @@ -464,9 +512,9 @@ dialLoop: // Now, attempt to dial. select { - case addr, ok := <-remoteAddrs: + case addr, ok := <-remoteAddrChan: if !ok { - remoteAddrs = nil + remoteAddrChan = nil continue } @@ -540,3 +588,24 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra // success! we got one! return connC, nil } + +// TODO We should have a `IsFdConsuming() bool` method on the `Transport` interface in go-libp2p-core/transport. +// This function checks if any of the transport protocols in the address requires a file descriptor. +// For now: +// A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming. +// For a circuit-relay address, we look at the address of the relay server/proxy +// and use the same logic as above to decide. +func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { + first, _ := ma.SplitFunc(addr, func(c ma.Component) bool { + return c.Protocol().Code == ma.P_CIRCUIT + }) + + // for safety + if first == nil { + return true + } + + _, err1 := first.ValueForProtocol(ma.P_TCP) + _, err2 := first.ValueForProtocol(ma.P_UNIX) + return err1 == nil || err2 == nil +} diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 4750b6a64d..aa18904acc 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -13,7 +13,10 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/test" + circuit "github.com/libp2p/go-libp2p-circuit" + qc "github.com/libp2p/go-libp2p-quic-transport" . "github.com/libp2p/go-libp2p-swarm" . "github.com/libp2p/go-libp2p-swarm/testing" @@ -383,6 +386,57 @@ func TestConnectionGating(t *testing.T) { } } +func TestIsFdConsuming(t *testing.T) { + tcs := map[string]struct { + addr string + isFdConsuming bool + }{ + "tcp": { + addr: "/ip4/127.0.0.1/tcp/20", + isFdConsuming: true, + }, + "quic": { + addr: "/ip4/127.0.0.1/udp/0/quic", + isFdConsuming: false, + }, + "addr-without-registered-transport": { + addr: "/ip4/127.0.0.1/tcp/20/ws", + isFdConsuming: true, + }, + "relay-tcp": { + addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + "relay-quic": { + addr: fmt.Sprintf("/ip4/127.0.0.1/udp/20/quic/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: false, + }, + "relay-without-serveraddr": { + addr: fmt.Sprintf("/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + "relay-without-registered-transport-server": { + addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/ws/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + } + + ctx := context.Background() + sw := GenSwarm(t, ctx) + sk := sw.Peerstore().PrivKey(sw.LocalPeer()) + require.NotNil(t, sk) + qtpt, err := qc.NewTransport(sk, nil, nil) + require.NoError(t, err) + require.NoError(t, sw.AddTransport(qtpt)) + require.NoError(t, sw.AddTransport(&circuit.RelayTransport{})) + + for name := range tcs { + maddr, err := ma.NewMultiaddr(tcs[name].addr) + require.NoError(t, err, name) + require.Equal(t, tcs[name].isFdConsuming, sw.IsFdConsumingAddr(maddr), name) + } +} + func TestNoDial(t *testing.T) { ctx := context.Background() swarms := makeSwarms(ctx, t, 2)