Skip to content

Commit

Permalink
connmgr: prefer peers with no streams when closing connections (#1675)
Browse files Browse the repository at this point in the history
* Prefer peers with no streams when closing connections in connmgr

* Implement Stat for test connection
  • Loading branch information
MarcoPolo authored Aug 19, 2022
1 parent 0f4a969 commit 6472f8c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 26 deletions.
37 changes: 19 additions & 18 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,11 @@ type peerInfo struct {

type peerInfos []peerInfo

func (p peerInfos) SortByValue() {
sort.Slice(p, func(i, j int) bool {
left, right := p[i], p[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
return left.value < right.value
})
}

func (p peerInfos) SortByValueAndStreams() {
// SortByValueAndStreams sorts peerInfos by their value and stream count. It
// will sort peers with no streams before those with streams (all else being
// equal). If `sortByMoreStreams` is true it will sort peers with more streams
// before those with fewer streams. This is useful to prioritize freeing memory.
func (p peerInfos) SortByValueAndStreams(sortByMoreStreams bool) {
sort.Slice(p, func(i, j int) bool {
left, right := p[i], p[j]
// temporary peers are preferred for pruning.
Expand All @@ -278,12 +270,21 @@ func (p peerInfos) SortByValueAndStreams() {
}
leftIncoming, leftStreams := incomingAndStreams(left.conns)
rightIncoming, rightStreams := incomingAndStreams(right.conns)
// prefer closing inactive connections (no streams open)
if rightStreams != leftStreams && (leftStreams == 0 || rightStreams == 0) {
return leftStreams < rightStreams
}
// incoming connections are preferred for pruning
if leftIncoming != rightIncoming {
return leftIncoming
}
// prune connections with a higher number of streams first
return rightStreams < leftStreams

if sortByMoreStreams {
// prune connections with a higher number of streams first
return rightStreams < leftStreams
} else {
return leftStreams < rightStreams
}
})
}

Expand Down Expand Up @@ -368,7 +369,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn {
cm.plk.RUnlock()

// Sort peers according to their value.
candidates.SortByValueAndStreams()
candidates.SortByValueAndStreams(true)

selected := make([]network.Conn, 0, target+10)
for _, inf := range candidates {
Expand Down Expand Up @@ -398,7 +399,7 @@ func (cm *BasicConnMgr) getConnsToCloseEmergency(target int) []network.Conn {
}
cm.plk.RUnlock()

candidates.SortByValueAndStreams()
candidates.SortByValueAndStreams(true)
for _, inf := range candidates {
if target <= 0 {
break
Expand Down Expand Up @@ -459,7 +460,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}

// Sort peers according to their value.
candidates.SortByValue()
candidates.SortByValueAndStreams(false)

target := ncandidates - cm.cfg.lowWater

Expand Down
61 changes: 53 additions & 8 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (c *tconn) RemotePeer() peer.ID {
return c.peer
}

func (c *tconn) Stat() network.ConnStats {
return network.ConnStats{
Stats: network.Stats{
Direction: network.DirOutbound,
},
NumStreams: 1,
}
}

func (c *tconn) RemoteMultiaddr() ma.Multiaddr {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
if err != nil {
Expand Down Expand Up @@ -800,39 +809,75 @@ func TestPeerInfoSorting(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1")}
p2 := peerInfo{id: peer.ID("peer2"), temp: true}
pis := peerInfos{p1, p2}
pis.SortByValue()
pis.SortByValueAndStreams(false)
require.Equal(t, pis, peerInfos{p2, p1})
})

t.Run("starts with low-value connections", func(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1"), value: 40}
p2 := peerInfo{id: peer.ID("peer2"), value: 20}
pis := peerInfos{p1, p2}
pis.SortByValue()
pis.SortByValueAndStreams(false)
require.Equal(t, pis, peerInfos{p2, p1})
})

t.Run("in a memory emergency, starts with incoming connections", func(t *testing.T) {
t.Run("prefer peers with no streams", func(t *testing.T) {
p1 := peerInfo{id: peer.ID("peer1"),
conns: map[network.Conn]time.Time{
&mockConn{stats: network.ConnStats{NumStreams: 0}}: time.Now(),
},
}
p2 := peerInfo{id: peer.ID("peer2"),
conns: map[network.Conn]time.Time{
&mockConn{stats: network.ConnStats{NumStreams: 1}}: time.Now(),
},
}
pis := peerInfos{p2, p1}
pis.SortByValueAndStreams(false)
require.Equal(t, pis, peerInfos{p1, p2})
})

t.Run("in a memory emergency, starts with incoming connections and higher streams", func(t *testing.T) {
incoming := network.ConnStats{}
incoming.Direction = network.DirInbound
outgoing := network.ConnStats{}
outgoing.Direction = network.DirOutbound

outgoingSomeStreams := network.ConnStats{Stats: network.Stats{Direction: network.DirOutbound}, NumStreams: 1}
outgoingMoreStreams := network.ConnStats{Stats: network.Stats{Direction: network.DirOutbound}, NumStreams: 2}
p1 := peerInfo{
id: peer.ID("peer1"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoing}: time.Now(),
&mockConn{stats: outgoingSomeStreams}: time.Now(),
},
}
p2 := peerInfo{
id: peer.ID("peer2"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoingSomeStreams}: time.Now(),
&mockConn{stats: incoming}: time.Now(),
},
}
p3 := peerInfo{
id: peer.ID("peer3"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoing}: time.Now(),
&mockConn{stats: incoming}: time.Now(),
},
}
pis := peerInfos{p1, p2}
pis.SortByValueAndStreams()
require.Equal(t, pis, peerInfos{p2, p1})
p4 := peerInfo{
id: peer.ID("peer4"),
conns: map[network.Conn]time.Time{
&mockConn{stats: outgoingMoreStreams}: time.Now(),
&mockConn{stats: incoming}: time.Now(),
},
}
pis := peerInfos{p1, p2, p3, p4}
pis.SortByValueAndStreams(true)
// p3 is first because it is inactive (no streams).
// p4 is second because it has the most streams and we priortize killing
// connections with the higher number of streams.
require.Equal(t, pis, peerInfos{p3, p4, p2, p1})
})

t.Run("in a memory emergency, starts with connections that have many streams", func(t *testing.T) {
Expand All @@ -850,7 +895,7 @@ func TestPeerInfoSorting(t *testing.T) {
},
}
pis := peerInfos{p1, p2}
pis.SortByValueAndStreams()
pis.SortByValueAndStreams(true)
require.Equal(t, pis, peerInfos{p2, p1})
})
}

0 comments on commit 6472f8c

Please sign in to comment.