Skip to content

Commit

Permalink
Buffer early tags in temporary entries (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored May 23, 2019
1 parent 2dcf306 commit 90574a9
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 29 deletions.
81 changes: 55 additions & 26 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,36 @@ type segment struct {

type segments [256]*segment

func (s *segments) get(p peer.ID) *segment {
return s[byte(p[len(p)-1])]
func (ss *segments) get(p peer.ID) *segment {
return ss[byte(p[len(p)-1])]
}

func (s *segments) countPeers() (count int) {
for _, seg := range s {
func (ss *segments) countPeers() (count int) {
for _, seg := range ss {
seg.Lock()
count += len(seg.peers)
seg.Unlock()
}
return count
}

func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
pi, ok := s.peers[p]
if ok {
return pi
}
// create a temporary peer to buffer early tags before the Connected notification arrives.
pi = &peerInfo{
id: p,
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
conns: make(map[inet.Conn]time.Time),
}
s.peers[p] = pi
return pi
}

// NewConnManager creates a new BasicConnMgr with the provided params:
// * lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
Expand Down Expand Up @@ -134,6 +151,7 @@ type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections

conns map[inet.Conn]time.Time // start time of each connection

Expand Down Expand Up @@ -218,7 +236,13 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {

// Sort peers according to their value.
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].value < candidates[j].value
left, right := candidates[i], candidates[j]
// temporary peers are preferred for pruning.
if left.temp != right.temp {
return left.temp
}
// otherwise, compare by value.
return left.value < right.value
})

target := nconns - cm.lowWater
Expand All @@ -227,6 +251,9 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
selected := make([]inet.Conn, 0, target+10)

for _, inf := range candidates {
if target <= 0 {
break
}
// TODO: should we be using firstSeen or the time associated with the connection itself?
if inf.firstSeen.Add(cm.gracePeriod).After(now) {
continue
Expand All @@ -235,15 +262,18 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
// lock this to protect from concurrent modifications from connect/disconnect events
s := cm.segments.get(inf.id)
s.Lock()
for c := range inf.conns {
selected = append(selected, c)

if len(inf.conns) == 0 && inf.temp {
// handle temporary entries for early tags -- this entry has gone past the grace period
// and still holds no connections, so prune it.
delete(s.peers, inf.id)
} else {
for c := range inf.conns {
selected = append(selected, c)
}
}
target -= len(inf.conns)
s.Unlock()

if target <= 0 {
break
}
}

return selected
Expand Down Expand Up @@ -284,14 +314,10 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
s.Lock()
defer s.Unlock()

pi, ok := s.peers[p]
if !ok {
log.Info("tried to tag conn from untracked peer: ", p)
return
}
pi := s.tagInfoFor(p)

// Update the total value of the peer.
pi.value += (val - pi.tags[tag])
pi.value += val - pi.tags[tag]
pi.tags[tag] = val
}

Expand All @@ -318,15 +344,11 @@ func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
s.Lock()
defer s.Unlock()

pi, ok := s.peers[p]
if !ok {
log.Info("tried to upsert tag from untracked peer: ", p)
return
}
pi := s.tagInfoFor(p)

oldval := pi.tags[tag]
newval := upsert(oldval)
pi.value += (newval - oldval)
pi.value += newval - oldval
pi.tags[tag] = newval
}

Expand Down Expand Up @@ -383,15 +405,22 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) {
s.Lock()
defer s.Unlock()

pinfo, ok := s.peers[p]
id := c.RemotePeer()
pinfo, ok := s.peers[id]
if !ok {
pinfo = &peerInfo{
id: p,
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
conns: make(map[inet.Conn]time.Time),
}
s.peers[p] = pinfo
s.peers[id] = pinfo
} else if pinfo.temp {
// we had created a temporary entry for this peer to buffer early tags before the
// Connected notification arrived: flip the temporary flag, and update the firstSeen
// timestamp to the real one.
pinfo.temp = false
pinfo.firstSeen = time.Now()
}

_, ok = pinfo.conns[c]
Expand Down
50 changes: 47 additions & 3 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func TestTagPeerNonExistant(t *testing.T) {
id := tu.RandPeerIDFatal(t)
cm.TagPeer(id, "test", 1)

if cm.segments.countPeers() != 0 {
t.Fatal("expected zero peers")
if !cm.segments.get(id).peers[id].temp {
t.Fatal("expected 1 temporary entry")
}
}

Expand Down Expand Up @@ -525,9 +525,9 @@ func TestUpsertTag(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t, nil)
not.Connected(nil, conn)
rp := conn.RemotePeer()

// this is an early tag, before the Connected notification arrived.
cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 })
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected a tag")
Expand All @@ -536,6 +536,9 @@ func TestUpsertTag(t *testing.T) {
t.Fatal("expected a tag value of 1")
}

// now let's notify the connection.
not.Connected(nil, conn)

cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 })
if len(cm.segments.get(rp).peers[rp].tags) != 1 {
t.Fatal("expected a tag")
Expand All @@ -552,3 +555,44 @@ func TestUpsertTag(t *testing.T) {
t.Fatal("expected a tag value of 1")
}
}

func TestTemporaryEntriesClearedFirst(t *testing.T) {
cm := NewConnManager(1, 1, 0)

id := tu.RandPeerIDFatal(t)
cm.TagPeer(id, "test", 20)

if cm.GetTagInfo(id).Value != 20 {
t.Fatal("expected an early tag with value 20")
}

not := cm.Notifee()
conn1, conn2 := randConn(t, nil), randConn(t, nil)
not.Connected(nil, conn1)
not.Connected(nil, conn2)

cm.TrimOpenConns(context.Background())
if cm.GetTagInfo(id) != nil {
t.Fatal("expected no temporary tags after trimming")
}
}

func TestTemporaryEntryConvertedOnConnection(t *testing.T) {
cm := NewConnManager(1, 1, 0)

conn := randConn(t, nil)
cm.TagPeer(conn.RemotePeer(), "test", 20)

ti := cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()]

if ti.value != 20 || !ti.temp {
t.Fatal("expected a temporary tag with value 20")
}

not := cm.Notifee()
not.Connected(nil, conn)

if ti.value != 20 || ti.temp {
t.Fatal("expected a non-temporary tag with value 20")
}
}

0 comments on commit 90574a9

Please sign in to comment.