Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

implement decaying tags. #61

Merged
merged 8 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 59 additions & 26 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,30 @@ var log = logging.Logger("connmgr")
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int
connCount int32
gracePeriod time.Duration
segments segments
*decayer

cfg *BasicConnManagerConfig
segments segments

plk sync.RWMutex
protected map[peer.ID]map[string]struct{}

trimTrigger chan chan<- struct{}
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
trimTrigger chan chan<- struct{}
connCount int32

lastTrimMu sync.RWMutex
lastTrim time.Time

silencePeriod time.Duration

ctx context.Context
cancel func()
}

var _ connmgr.ConnManager = (*BasicConnMgr)(nil)
var (
_ connmgr.ConnManager = (*BasicConnMgr)(nil)
_ connmgr.Decayer = (*BasicConnMgr)(nil)
)

type segment struct {
sync.Mutex
Expand Down Expand Up @@ -80,6 +83,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[p] = pi
Expand All @@ -92,15 +96,32 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
ctx, cancel := context.WithCancel(context.Background())
cm := &BasicConnMgr{

cfg := &BasicConnManagerConfig{
highWater: hi,
lowWater: low,
gracePeriod: grace,
silencePeriod: SilencePeriod,
}

for _, o := range opts {
// TODO we're ignoring errors from options because we have no way to
// return them, or otherwise act on them.
_ = o(cfg)
}

if cfg.decayer == nil {
// Set the default decayer config.
cfg.decayer = (&DecayerCfg{}).WithDefaults()
}

cm := &BasicConnMgr{
cfg: cfg,
trimRunningCh: make(chan struct{}, 1),
trimTrigger: make(chan chan<- struct{}),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
ctx: ctx,
cancel: cancel,
segments: func() (ret segments) {
Expand All @@ -113,11 +134,17 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
}(),
}

decay, _ := NewDecayer(cfg.decayer, cm)
cm.decayer = decay

go cm.background()
return cm
}

func (cm *BasicConnMgr) Close() error {
if err := cm.decayer.Close(); err != nil {
return err
}
Comment on lines +145 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still cancel the background goroutine, even if this fails.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

cm.cancel()
return nil
}
Expand Down Expand Up @@ -151,10 +178,12 @@ func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {

// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
id peer.ID
tags map[string]int // value for each tag
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags

value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections

conns map[network.Conn]time.Time // start time of each connection

Expand Down Expand Up @@ -199,7 +228,7 @@ func (cm *BasicConnMgr) background() {
var waiting chan<- struct{}
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) {
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
Expand Down Expand Up @@ -235,7 +264,7 @@ func (cm *BasicConnMgr) trim() {
cm.lastTrimMu.RUnlock()

// skip this attempt to trim if the last one just took place.
if time.Since(lastTrim) < cm.silencePeriod {
if time.Since(lastTrim) < cm.cfg.silencePeriod {
return
}

Expand All @@ -256,21 +285,21 @@ func (cm *BasicConnMgr) trim() {
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
if cm.lowWater == 0 || cm.highWater == 0 {
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
// disabled
return nil
}

nconns := int(atomic.LoadInt32(&cm.connCount))
if nconns <= cm.lowWater {
if nconns <= cm.cfg.lowWater {
log.Info("open connection count below limit")
return nil
}

npeers := cm.segments.countPeers()
candidates := make([]*peerInfo, 0, npeers)
ncandidates := 0
gracePeriodStart := time.Now().Add(-cm.gracePeriod)
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)

cm.plk.RLock()
for _, s := range cm.segments {
Expand All @@ -291,7 +320,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}
cm.plk.RUnlock()

if ncandidates < cm.lowWater {
if ncandidates < cm.cfg.lowWater {
log.Info("open connection count above limit but too many are in the grace period")
// We have too many connections but fewer than lowWater
// connections out of the grace period.
Expand All @@ -311,7 +340,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return left.value < right.value
})

target := ncandidates - cm.lowWater
target := ncandidates - cm.cfg.lowWater

// slightly overallocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, target+10)
Expand Down Expand Up @@ -363,6 +392,9 @@ func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
for t, v := range pi.tags {
out.Tags[t] = v
}
for t, v := range pi.decaying {
out.Tags[t.name] = v.Value
}
for c, t := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = t
}
Expand Down Expand Up @@ -439,10 +471,10 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
cm.lastTrimMu.RUnlock()

return CMInfo{
HighWater: cm.highWater,
LowWater: cm.lowWater,
HighWater: cm.cfg.highWater,
LowWater: cm.cfg.lowWater,
LastTrim: lastTrim,
GracePeriod: cm.gracePeriod,
GracePeriod: cm.cfg.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
}
}
Expand Down Expand Up @@ -478,6 +510,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[id] = pinfo
Expand Down
Loading