Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

Commit

Permalink
be more lenient with tag intervals.
Browse files Browse the repository at this point in the history
We now allow the user to request tags at an interval lower than
the decayer's resolution, and we override the effective interval
to be the resolution.

We also allow tags to have any interval, not necessarily a
multiple of the resolution any longer.
  • Loading branch information
raulk committed May 14, 2020
1 parent 489f0dd commit b02e214
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 39 deletions.
84 changes: 47 additions & 37 deletions decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type decayer struct {
tagsMu sync.Mutex
knownTags map[string]*decayingTag

// currRound is the current round we're in; guarded by atomic.
currRound uint64
// lastTick stores the last time the decayer ticked. Guarded by atomic.
lastTick atomic.Value

// bumpCh queues bump commands to be processed by the loop.
bumpCh chan bumpCmd
Expand Down Expand Up @@ -79,23 +79,14 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
doneCh: make(chan struct{}),
}

d.lastTick.Store(d.clock.Now())

// kick things off.
go d.process()

return d, nil
}

// decayingTag represents a decaying tag, with an associated decay interval, a
// decay function, and a bump function.
type decayingTag struct {
trkr *decayer
name string
interval time.Duration
nextRound uint64
decayFn connmgr.DecayFn
bumpFn connmgr.BumpFn
}

func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
d.tagsMu.Lock()
defer d.tagsMu.Unlock()
Expand All @@ -106,30 +97,24 @@ func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decay
}

if interval < d.cfg.Resolution {
err := fmt.Errorf("decay interval for %s (%s) is lower than tracker's resolution (%s)",
name,
interval,
d.cfg.Resolution)
return nil, err
log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
name, interval, d.cfg.Resolution)
interval = d.cfg.Resolution
}

if interval%d.cfg.Resolution != 0 {
err := fmt.Errorf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
"preventing undesired loss of precision",
name,
interval,
d.cfg.Resolution)
return nil, err
log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
"some precision may be lost", name, interval, d.cfg.Resolution)
}

initRound := atomic.LoadUint64(&d.currRound) + uint64(interval/d.cfg.Resolution)
lastTick := d.lastTick.Load().(time.Time)
tag = &decayingTag{
trkr: d,
name: name,
interval: interval,
nextRound: initRound,
decayFn: decayFn,
bumpFn: bumpFn,
trkr: d,
name: name,
interval: interval,
nextTick: lastTick.Add(interval),
decayFn: decayFn,
bumpFn: bumpFn,
}

d.knownTags[name] = tag
Expand Down Expand Up @@ -169,14 +154,18 @@ func (d *decayer) process() {
for {
select {
case now = <-ticker.C:
round := atomic.AddUint64(&d.currRound, 1)
d.lastTick.Store(now)

d.tagsMu.Lock()
for _, tag := range d.knownTags {
if tag.nextRound <= round {
// Mark the tag to be updated in this round.
visit[tag] = struct{}{}
if tag.nextTick.After(now) {
// skip the tag.
continue
}
// Mark the tag to be updated in this round.
visit[tag] = struct{}{}
}
d.tagsMu.Unlock()

// Visit each peer, and decay tags that need to be decayed.
for _, s := range d.mgr.segments {
Expand All @@ -198,7 +187,7 @@ func (d *decayer) process() {
delete(p.decaying, tag)
} else {
// accumulate the delta, and apply the changes.
delta += (after - v.Value)
delta += after - v.Value
v.Value, v.LastVisit = after, now
}
p.value += delta
Expand All @@ -210,7 +199,7 @@ func (d *decayer) process() {

// Reset each tag's next visit round, and clear the visited set.
for tag := range visit {
tag.nextRound = round + uint64(tag.interval/d.cfg.Resolution)
tag.nextTick = tag.nextTick.Add(tag.interval)
delete(visit, tag)
}

Expand Down Expand Up @@ -249,6 +238,27 @@ func (d *decayer) process() {
}
}

// decayingTag represents a decaying tag, with an associated decay interval, a
// decay function, and a bump function.
type decayingTag struct {
trkr *decayer
name string
interval time.Duration
nextTick time.Time
decayFn connmgr.DecayFn
bumpFn connmgr.BumpFn
}

var _ connmgr.DecayingTag = (*decayingTag)(nil)

func (t *decayingTag) Name() string {
return t.name
}

func (t *decayingTag) Interval() time.Duration {
return t.interval
}

// Bump bumps a tag for this peer.
func (t *decayingTag) Bump(p peer.ID, delta int) error {
bmp := bumpCmd{peer: p, tag: t, delta: delta}
Expand Down
41 changes: 40 additions & 1 deletion decay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/peer"
tu "github.com/libp2p/go-libp2p-core/test"
"github.com/stretchr/testify/require"

"github.com/benbjohnson/clock"
)

const TestResolution = 50 * time.Millisecond

func TestDecayExpire(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
Expand Down Expand Up @@ -262,10 +265,46 @@ func TestLinearDecayOverwrite(t *testing.T) {
}
}

func TestResolutionMisaligned(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)

tag1, err := decay.RegisterDecayingTag("beep", time.Duration(float64(TestResolution)*1.4), connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

tag2, err := decay.RegisterDecayingTag("bop", time.Duration(float64(TestResolution)*2.4), connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

_ = tag1.Bump(id, 1000)
_ = tag2.Bump(id, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)

// nothing has happened.
mockClock.Add(TestResolution)
require.Equal(1000, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"])

// next tick; tag1 would've ticked.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"])

// next tick; tag1 would've ticked twice, tag2 once.
mockClock.Add(TestResolution)
require.Equal(998, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id).Tags["bop"])

require.Equal(1997, mgr.GetTagInfo(id).Value)
}

func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Mock) {
mockClock := clock.NewMock()
cfg := &DecayerCfg{
Resolution: 50 * time.Millisecond,
Resolution: TestResolution,
Clock: mockClock,
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/benbjohnson/clock v1.0.1
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-log v1.0.4
github.com/libp2p/go-libp2p-core v0.5.4-0.20200514121551-d3277047d6ca
github.com/libp2p/go-libp2p-core v0.5.5-0.20200514134608-fd0d4abfc174
github.com/multiformats/go-multiaddr v0.2.1
github.com/stretchr/testify v1.4.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoR
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-core v0.5.4-0.20200514121551-d3277047d6ca h1:9xF2NgTB7L0GFdRBnEE8sa7sZbeshEznH1pEuqH5A8o=
github.com/libp2p/go-libp2p-core v0.5.4-0.20200514121551-d3277047d6ca/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.5-0.20200514134608-fd0d4abfc174 h1:OID2AvF6Ax15EvUjAsJVbVhhY9CWPA3ub6hSRXk9vxU=
github.com/libp2p/go-libp2p-core v0.5.5-0.20200514134608-fd0d4abfc174/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg=
github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
Expand Down

0 comments on commit b02e214

Please sign in to comment.