Skip to content

Commit

Permalink
Merge pull request #22 from RTradeLtd/upstream-v0-2-4
Browse files Browse the repository at this point in the history
merge Upstream v0.2.4
  • Loading branch information
RT-nilPointer authored Jul 30, 2020
2 parents adfee96 + 1c5b2c5 commit 6418f67
Show file tree
Hide file tree
Showing 6 changed files with 896 additions and 35 deletions.
109 changes: 82 additions & 27 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,31 @@ var SilencePeriod = 10 * time.Second
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int
connCount int32
gracePeriod time.Duration
segments segments
*decayer

cfg *BasicConnManagerConfig
segments segments

plk sync.RWMutex
protected map[peer.ID]map[string]struct{}

trimTrigger chan chan<- struct{}
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
trimTrigger chan chan<- struct{}
connCount int32

lastTrimMu sync.RWMutex
lastTrim time.Time

silencePeriod time.Duration

logger *zap.Logger
ctx context.Context
cancel context.CancelFunc
}

var _ connmgr.ConnManager = (*BasicConnMgr)(nil)
var (
_ connmgr.ConnManager = (*BasicConnMgr)(nil)
_ connmgr.Decayer = (*BasicConnMgr)(nil)
)

type segment struct {
sync.Mutex
Expand Down Expand Up @@ -80,6 +84,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[p] = pi
Expand All @@ -92,15 +97,34 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(ctx context.Context, logger *zap.Logger, low, hi int, grace time.Duration) *BasicConnMgr {
cm := &BasicConnMgr{

func NewConnManager(ctx context.Context, logger *zap.Logger, low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
ctx, cancel := context.WithCancel(ctx)
cfg := &BasicConnManagerConfig{
highWater: hi,
lowWater: low,
gracePeriod: grace,
silencePeriod: SilencePeriod,
}

for _, o := range opts {
// TODO we're ignoring errors from options because we have no way to
// return them, or otherwise act on them.
_ = o(cfg)
}

if cfg.decayer == nil {
// Set the default decayer config.
cfg.decayer = (&DecayerCfg{}).WithDefaults()
}

cm := &BasicConnMgr{
cfg: cfg,
trimRunningCh: make(chan struct{}, 1),
trimTrigger: make(chan chan<- struct{}),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
ctx: ctx,
cancel: cancel,
logger: logger.Named("connmgr"),
segments: func() (ret segments) {
for i := range ret {
Expand All @@ -111,13 +135,21 @@ func NewConnManager(ctx context.Context, logger *zap.Logger, low, hi int, grace
return ret
}(),
}

decay, _ := NewDecayer(cfg.decayer, cm)
cm.decayer = decay

go cm.background()
return cm
}

// Close is here to satisfy the interface of ConnectionManager
// previously in the libp2p version this called a cancel func
func (cm *BasicConnMgr) Close() error {
if err := cm.decayer.Close(); err != nil {
return err
}
cm.cancel()
return nil
}

Expand Down Expand Up @@ -151,12 +183,31 @@ func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
return true
}

func (cm *BasicConnMgr) IsProtected(id peer.ID, tag string) (protected bool) {
cm.plk.Lock()
defer cm.plk.Unlock()

tags, ok := cm.protected[id]
if !ok {
return false
}

if tag == "" {
return true
}

_, protected = tags[tag]
return protected
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
id peer.ID
tags map[string]int // value for each tag
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags

value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections

conns map[network.Conn]time.Time // start time of each connection

Expand Down Expand Up @@ -200,7 +251,7 @@ func (cm *BasicConnMgr) background() {
var waiting chan<- struct{}
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) {
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
Expand Down Expand Up @@ -236,7 +287,7 @@ func (cm *BasicConnMgr) trim() {
cm.lastTrimMu.RUnlock()

// skip this attempt to trim if the last one just took place.
if time.Since(lastTrim) < cm.silencePeriod {
if time.Since(lastTrim) < cm.cfg.silencePeriod {
return
}

Expand All @@ -256,21 +307,21 @@ func (cm *BasicConnMgr) trim() {
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
if cm.lowWater == 0 || cm.highWater == 0 {
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
// disabled
return nil
}

nconns := int(atomic.LoadInt32(&cm.connCount))
if nconns <= cm.lowWater {
if nconns <= cm.cfg.lowWater {
cm.logger.Debug("open connection count below limit")
return nil
}

npeers := cm.segments.countPeers()
candidates := make([]*peerInfo, 0, npeers)
ncandidates := 0
gracePeriodStart := time.Now().Add(-cm.gracePeriod)
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)

cm.plk.RLock()
for _, s := range cm.segments {
Expand All @@ -291,7 +342,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}
cm.plk.RUnlock()

if ncandidates < cm.lowWater {
if ncandidates < cm.cfg.lowWater {
cm.logger.Info("open connection count above limit but too many are in the grace period")
// We have too many connections but fewer than lowWater
// connections out of the grace period.
Expand All @@ -311,9 +362,9 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return left.value < right.value
})

target := ncandidates - cm.lowWater
target := ncandidates - cm.cfg.lowWater

// slightly overallocate because we may have more than one conns per peer
// slightly over allocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, target+10)

for _, inf := range candidates {
Expand Down Expand Up @@ -363,6 +414,9 @@ func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
for t, v := range pi.tags {
out.Tags[t] = v
}
for t, v := range pi.decaying {
out.Tags[t.name] = v.Value
}
for c, t := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = t
}
Expand Down Expand Up @@ -439,10 +493,10 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
cm.lastTrimMu.RUnlock()

return CMInfo{
HighWater: cm.highWater,
LowWater: cm.lowWater,
HighWater: cm.cfg.highWater,
LowWater: cm.cfg.lowWater,
LastTrim: lastTrim,
GracePeriod: cm.gracePeriod,
GracePeriod: cm.cfg.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
}
}
Expand Down Expand Up @@ -478,6 +532,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[id] = pinfo
Expand Down
Loading

0 comments on commit 6418f67

Please sign in to comment.