Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable customization of metrics calculation #26

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Empty file added .gitignore
Empty file.
6 changes: 3 additions & 3 deletions flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestBasic(t *testing.T) {
m.Mark(1000)
mockClock.Add(40 * time.Millisecond)
}
if rate := m.Snapshot().Rate; rate != 25000 {
if rate := m.Snapshot().Rate; !approxEq(rate, 25000, 1) {
t.Errorf("expected rate 25000, got %f", rate)
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestUnregister(t *testing.T) {
}

actual := m.Snapshot()
if actual.Rate != 10 {
if !approxEq(actual.Rate, 10, 1) {
t.Errorf("expected rate 10, got %f", actual.Rate)
}

Expand All @@ -120,7 +120,7 @@ func TestUnregister(t *testing.T) {
}

actual = m.Snapshot()
if actual.Rate != 20 {
if !approxEq(actual.Rate, 20, 1) {
t.Errorf("expected rate 20, got %f", actual.Rate)
}

Expand Down
109 changes: 108 additions & 1 deletion meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,46 @@ package flow

import (
"fmt"
"math"
"sync/atomic"
"time"
)

// IdleRate the rate at which we declare a meter idle (and stop tracking it
// until it's re-registered).
//
// The default ensures that 1 event every ~30s will keep the meter from going
// idle.
var IdleRate = 1e-13

// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)

// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
LastUpdate time.Time
}

type MeterInterface interface {
String() string
Mark(count uint64)
Snapshot() Snapshot
Reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd drop all of these from this interface and focus on what the sweeper requires. Different meter implementations will need different interfaces (e.g., different data in their snapshots).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't drop Snapshot() because it's used in MeterRegistry.walkIdle()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran some tests and discovered that my changes made the library incompatible with the current version of libp2p. Turns out that BandwidthCounter from libp2p uses method MeterRegistry.ForEach(). In my version this method accepts func(string, MeterInterface) but originally it accepts func(string, *Meter). I plan to explicitly specify the right version of MeterRegistry.ForEach() for compatibility with libp2p. Do you have better suggestions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien Gentle reminder, can you, please, check my last commit and say if it's fine with you?

Update(tdiff time.Duration)
IsIdle() bool
SetIdle()
SetActive()
}

// NewMeter returns a new Meter with the correct idle time.
//
// While zero-value Meters can be used, their "last update" time will start at
// the program start instead of when the meter was created.
func NewMeter() *Meter {
func NewMeter() MeterInterface {
return &Meter{
fresh: true,
snapshot: Snapshot{
LastUpdate: cl.Now(),
},
Expand All @@ -38,6 +61,8 @@ type Meter struct {

// Take lock.
snapshot Snapshot

fresh bool
}

// Mark updates the total.
Expand Down Expand Up @@ -69,3 +94,85 @@ func (m *Meter) Reset() {
func (m *Meter) String() string {
return m.Snapshot().String()
}

func (m *Meter) Update(tdiff time.Duration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pass now here as well. Getting the current time isn't always free.

if !m.fresh {
timeMultiplier := float64(time.Second) / float64(tdiff)
total := m.accumulator.Load()
diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff)

if diff > 0 {
m.snapshot.LastUpdate = cl.Now()
}

if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
} else {
m.snapshot.Rate += alpha * (instant - m.snapshot.Rate)
}
m.snapshot.Total = total

// This is equivalent to one zeros, then one, then 30 zeros.
// We'll consider that to be "idle".
if m.snapshot.Rate > IdleRate {
return
}

// Ok, so we are idle...
// Mark this as idle by zeroing the accumulator.
swappedTotal := m.accumulator.Swap(0)
// So..., are we really idle?
if swappedTotal > total {
// Not so idle...
// Now we need to make sure this gets re-registered.

// First, add back what we removed. If we can do this
// fast enough, we can put it back before anyone
// notices.
currentTotal := m.accumulator.Add(swappedTotal)

// Did we make it?
if currentTotal == swappedTotal {
// Yes! Nobody noticed, move along.
return
}
// No. Someone noticed and will (or has) put back into
// the registration channel.
//
// Remove the snapshot total, it'll get added back on
// registration.
//
// `^uint64(total - 1)` is the two's complement of
// `total`. It's the "correct" way to subtract
// atomically in go.
m.accumulator.Add(^uint64(m.snapshot.Total - 1))
} else {
m.SetIdle()
}
} else {
// Re-add the total to all the newly active accumulators and set the snapshot to the total.
// 1. We don't do this on register to avoid having to take the snapshot lock.
// 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation.
total := m.accumulator.Add(m.snapshot.Total)
if total > m.snapshot.Total {
m.snapshot.LastUpdate = cl.Now()
}
m.snapshot.Total = total
m.fresh = false
}
}

func (m *Meter) IsIdle() bool {
return !m.registered
}

func (m *Meter) SetIdle() {
m.snapshot.Rate = 0
m.registered = false
m.fresh = true
}

func (m *Meter) SetActive() {
m.registered = true
}
48 changes: 48 additions & 0 deletions mockclocktest/mock_clock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package mockclocktest

import (
"math"
"testing"
"time"

flow "github.com/libp2p/go-flow-metrics"

"github.com/benbjohnson/clock"
)

var cl = clock.NewMock()

func init() {
flow.SetClock(cl)
}

func TestBasic(t *testing.T) {
m := new(flow.Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
cl.Add(40 * time.Millisecond)
}
if rate := m.Snapshot().Rate; approxEq(rate, 25000, 1) {
t.Errorf("expected rate 25000, got %f", rate)
}

for i := 0; i < 200; i++ {
m.Mark(200)
cl.Add(40 * time.Millisecond)
}

// Adjusts
if rate := m.Snapshot().Rate; approxEq(rate, 5017.776503840969, 0.0001) {
t.Errorf("expected rate 5017.776503840969, got %f", rate)
}

// Let it settle.
cl.Add(2 * time.Second)
if total := m.Snapshot().Total; total != 340000 {
t.Errorf("expected total 3400000, got %d", total)
}
}

func approxEq(a, b, err float64) bool {
return math.Abs(a-b) < err
}
12 changes: 6 additions & 6 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ type MeterRegistry struct {
}

// Get gets (or creates) a meter by name.
func (r *MeterRegistry) Get(name string) *Meter {
func (r *MeterRegistry) Get(name string) MeterInterface {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
return m.(MeterInterface)
}
m, _ := r.meters.LoadOrStore(name, NewMeter())
return m.(*Meter)
return m.(MeterInterface)
}

// FindIdle finds all meters that haven't been used since the given time.
Expand Down Expand Up @@ -50,7 +50,7 @@ func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {

r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
if v.(*Meter).snapshot.LastUpdate.Before(since) {
if v.(MeterInterface).Snapshot().LastUpdate.Before(since) {
cb(k)
}
return true
Expand All @@ -66,9 +66,9 @@ func (r *MeterRegistry) Remove(name string) {
}

// ForEach calls the passed function for each registered meter.
func (r *MeterRegistry) ForEach(iterFunc func(string, *Meter)) {
func (r *MeterRegistry) ForEach(iterFunc func(string, MeterInterface)) {
r.meters.Range(func(k, v interface{}) bool {
iterFunc(k.(string), v.(*Meter))
iterFunc(k.(string), v.(MeterInterface))
return true
})
}
Expand Down
12 changes: 6 additions & 6 deletions registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func TestRegistry(t *testing.T) {
t.Errorf("expected the last update (%s) to have after (%s)", lu, m1Update)
}

expectedMeters := map[string]*Meter{
expectedMeters := map[string]MeterInterface{
"first": m1,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
r.ForEach(func(n string, m MeterInterface) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
Expand All @@ -48,7 +48,7 @@ func TestRegistry(t *testing.T) {
r.Remove("first")

found := false
r.ForEach(func(n string, m *Meter) {
r.ForEach(func(n string, m MeterInterface) {
if n != "second" {
t.Errorf("found unexpected meter: %s", n)
return
Expand All @@ -71,11 +71,11 @@ func TestRegistry(t *testing.T) {
t.Errorf("expected first total to now be 0, got %d", total)
}

expectedMeters = map[string]*Meter{
expectedMeters = map[string]MeterInterface{
"first": m3,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
r.ForEach(func(n string, m MeterInterface) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestClearRegistry(t *testing.T) {

r.Clear()

r.ForEach(func(n string, _m *Meter) {
r.ForEach(func(n string, _m MeterInterface) {
t.Errorf("expected no meters at all, found a meter %s", n)
})

Expand Down
Loading