From bb21ae9ec3816e19b933670c8502885a9e1e50a5 Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Tue, 31 Oct 2023 18:39:02 +0300 Subject: [PATCH 01/11] Initial changes --- .gitignore | 1 + flow_test.go | 31 ++++++++---- go.mod | 2 +- mockclocktest/mock_clock_test.go | 2 +- sweeper.go | 85 ++++++++++++++++++-------------- sweeper_test.go | 4 +- 6 files changed, 74 insertions(+), 51 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..600d2d3 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.vscode \ No newline at end of file diff --git a/flow_test.go b/flow_test.go index fb25828..0d926f5 100644 --- a/flow_test.go +++ b/flow_test.go @@ -25,11 +25,27 @@ func TestBasic(t *testing.T) { m.Mark(1000) <-ticker.C } + time.Sleep(1 * time.Second) actual := m.Snapshot() - if !approxEq(actual.Rate, 25000, 1000) { - t.Errorf("expected rate 25000 (±1000), got %f", actual.Rate) + if !approxEq(actual.Rate, 25000, 1100) { + t.Errorf("expected rate 25000 (±1100), got %f", actual.Rate) + } + if actual.Total != 300000 { + t.Errorf("1. Expected total %d, got %d", 300000, actual.Total) + } + + time.Sleep(15 * time.Second) + if actual.Total != 300000 { + t.Errorf("2. Expected total %d, got %d", 300000, actual.Total) } + // Let it settle. + time.Sleep(10 * time.Second) + + actual = m.Snapshot() + if actual.Total != 300000 { + t.Errorf("3. Expected total %d, got %d", 300000, actual.Total) + } for i := 0; i < 200; i++ { m.Mark(200) <-ticker.C @@ -37,17 +53,14 @@ func TestBasic(t *testing.T) { // Adjusts actual = m.Snapshot() - if !approxEq(actual.Rate, 5000, 200) { - t.Errorf("expected rate 5000 (±200), got %f", actual.Rate) + if !approxEq(actual.Rate, 5000, 250) { + t.Errorf("expected rate 5000 (±250), got %f", actual.Rate) } - // Let it settle. - time.Sleep(2 * time.Second) - // get the right total actual = m.Snapshot() - if actual.Total != 340000 { - t.Errorf("expected total %d, got %d", 340000, actual.Total) + if actual.Total != 335400 { + t.Errorf("4. Expected total %d, got %d", 335400, actual.Total) } }() } diff --git a/go.mod b/go.mod index 98980b9..c63c182 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/libp2p/go-flow-metrics +module github.com/ArtemVasilevMIPT/go-maxflow-metrics go 1.20 diff --git a/mockclocktest/mock_clock_test.go b/mockclocktest/mock_clock_test.go index 9cff3d6..a3ae653 100644 --- a/mockclocktest/mock_clock_test.go +++ b/mockclocktest/mock_clock_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/libp2p/go-flow-metrics" + flow "github.com/ArtemVasilevMIPT/go-maxflow-metrics" "github.com/benbjohnson/clock" ) diff --git a/sweeper.go b/sweeper.go index 8b9a262..443ef07 100644 --- a/sweeper.go +++ b/sweeper.go @@ -14,10 +14,10 @@ import ( // // The default ensures that 1 event every ~30s will keep the meter from going // idle. -var IdleRate = 1e-13 +// var IdleRate = 1e-13 -// Alpha for EWMA of 1s -var alpha = 1 - math.Exp(-1.0) +// IdleTime the time that need to pass scince last update before we declare a metere idle +var IdleTime = 20 * time.Second // The global sweeper. var globalSweeper sweeper @@ -107,52 +107,61 @@ func (sw *sweeper) update() { if diff > 0 { m.snapshot.LastUpdate = now } - + // log.Printf("Rate: %f Instant: %f Total: %d", m.snapshot.Rate, instant, total) if m.snapshot.Rate == 0 { m.snapshot.Rate = instant } else { - m.snapshot.Rate += alpha * (instant - m.snapshot.Rate) + m.snapshot.Rate = math.Max(instant, m.snapshot.Rate) } m.snapshot.Total = total - // This is equivalent to one zeros, then one, then 30 zeros. - // We'll consider that to be "idle". - if m.snapshot.Rate > IdleRate { + /* + // This is equivalent to one zeros, then one, then 30 zeros. + // We'll consider that to be "idle". + if m.snapshot.Rate > IdleRate { + continue + } + */ + if now.Sub(m.snapshot.LastUpdate) < IdleTime { continue } - // Ok, so we are idle... - - // Mark this as idle by zeroing the accumulator. - swappedTotal := atomic.SwapUint64(&m.accumulator, 0) - - // So..., are we really idle? - if swappedTotal > total { - // Not so idle... - // Now we need to make sure this gets re-registered. - - // First, add back what we removed. If we can do this - // fast enough, we can put it back before anyone - // notices. - currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) - - // Did we make it? - if currentTotal == swappedTotal { - // Yes! Nobody noticed, move along. - continue - } - // No. Someone noticed and will (or has) put back into - // the registration channel. - // - // Remove the snapshot total, it'll get added back on - // registration. - // - // `^uint64(total - 1)` is the two's complement of - // `total`. It's the "correct" way to subtract - // atomically in go. + if !atomic.CompareAndSwapUint64(&m.accumulator, total, 0) { + // ... or not atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) + // atomic.AddUint64(&m.accumulator, -total) } - + /* + // Mark this as idle by zeroing the accumulator. + swappedTotal := atomic.SwapUint64(&m.accumulator, 0) + + // So..., are we really idle? + if swappedTotal > total { + // Not so idle... + // Now we need to make sure this gets re-registered. + + // First, add back what we removed. If we can do this + // fast enough, we can put it back before anyone + // notices. + currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) + + // Did we make it? + if currentTotal == swappedTotal { + // Yes! Nobody noticed, move along. + continue + } + // No. Someone noticed and will (or has) put back into + // the registration channel. + // + // Remove the snapshot total, it'll get added back on + // registration. + // + // `^uint64(total - 1)` is the two's complement of + // `total`. It's the "correct" way to subtract + // atomically in go. + atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) + } + */ // Reset the rate, keep the total. m.registered = false m.snapshot.Rate = 0 diff --git a/sweeper_test.go b/sweeper_test.go index 2ba9188..1cb0d22 100644 --- a/sweeper_test.go +++ b/sweeper_test.go @@ -21,7 +21,7 @@ func TestIdleInconsistency(t *testing.T) { m3.Mark(30) // make m1 and m3 go idle - for i := 0; i < 30; i++ { + for i := 0; i < 25; i++ { time.Sleep(time.Second) m2.Mark(1) } @@ -37,7 +37,7 @@ func TestIdleInconsistency(t *testing.T) { t.Errorf("expected first total to be 10, got %d", total) } - if total := r.Get("second").Snapshot().Total; total != 50 { + if total := r.Get("second").Snapshot().Total; total != 45 { t.Errorf("expected second total to be 50, got %d", total) } From 978bf57469d54462d5f8af1aea1bfdfc731b3f9a Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Mon, 13 Nov 2023 17:38:57 +0300 Subject: [PATCH 02/11] Added interfaces --- meter.go | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++- registry.go | 12 +++--- sweeper.go | 101 ++++++++------------------------------------------- 3 files changed, 123 insertions(+), 93 deletions(-) diff --git a/meter.go b/meter.go index b70593e..242d847 100644 --- a/meter.go +++ b/meter.go @@ -2,10 +2,21 @@ package flow import ( "fmt" + "math" "sync/atomic" "time" ) +// IdleRate the rate at which we declare a meter idle (and stop tracking it +// until it's re-registered). +// +// The default ensures that 1 event every ~30s will keep the meter from going +// idle. +var IdleRate = 1e-13 + +// Alpha for EWMA of 1s +var alpha = 1 - math.Exp(-1.0) + // Snapshot is a rate/total snapshot. type Snapshot struct { Rate float64 @@ -13,11 +24,22 @@ type Snapshot struct { LastUpdate time.Time } +type MeterInterface interface { + String() string + Mark(count uint64) + Snapshot() Snapshot + Reset() + Update(tdiff time.Duration) + IsIdle() bool + SetIdle() + SetActive() +} + // NewMeter returns a new Meter with the correct idle time. // // While zero-value Meters can be used, their "last update" time will start at // the program start instead of when the meter was created. -func NewMeter() *Meter { +func NewMeter() MeterInterface { return &Meter{ snapshot: Snapshot{ LastUpdate: time.Now(), @@ -69,3 +91,82 @@ func (m *Meter) Reset() { func (m *Meter) String() string { return m.Snapshot().String() } + +func (m *Meter) Update(tdiff time.Duration) { + timeMultiplier := float64(time.Second) / float64(tdiff) + total := atomic.LoadUint64(&m.accumulator) + diff := total - m.snapshot.Total + instant := timeMultiplier * float64(diff) + + if diff > 0 { + m.snapshot.LastUpdate = cl.Now() + } + + if m.snapshot.Rate == 0 { + m.snapshot.Rate = instant + } else { + m.snapshot.Rate += alpha * (instant - m.snapshot.Rate) + } + m.snapshot.Total = total + + // This is equivalent to one zeros, then one, then 30 zeros. + // We'll consider that to be "idle". + if m.snapshot.Rate > IdleRate { + m.SetIdle() + goto label1 + } + { + // Ok, so we are idle... + + // Mark this as idle by zeroing the accumulator. + swappedTotal := atomic.SwapUint64(&m.accumulator, 0) + + // So..., are we really idle? + if swappedTotal > total { + // Not so idle... + // Now we need to make sure this gets re-registered. + + // First, add back what we removed. If we can do this + // fast enough, we can put it back before anyone + // notices. + currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) + + // Did we make it? + if currentTotal == swappedTotal { + // Yes! Nobody noticed, move along. + goto label1 + } + // No. Someone noticed and will (or has) put back into + // the registration channel. + // + // Remove the snapshot total, it'll get added back on + // registration. + // + // `^uint64(total - 1)` is the two's complement of + // `total`. It's the "correct" way to subtract + // atomically in go. + atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) + } + } +label1: + // Re-add the total to all the newly active accumulators and set the snapshot to the total. + // 1. We don't do this on register to avoid having to take the snapshot lock. + // 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation. + total = atomic.AddUint64(&m.accumulator, m.snapshot.Total) + if total > m.snapshot.Total { + m.snapshot.LastUpdate = cl.Now() + } + m.snapshot.Total = total +} + +func (m *Meter) IsIdle() bool { + return m.registered +} + +func (m *Meter) SetIdle() { + m.registered = false +} + +func (m *Meter) SetActive() { + m.registered = true +} diff --git a/registry.go b/registry.go index d3e47be..9a77bf6 100644 --- a/registry.go +++ b/registry.go @@ -11,12 +11,12 @@ type MeterRegistry struct { } // Get gets (or creates) a meter by name. -func (r *MeterRegistry) Get(name string) *Meter { +func (r *MeterRegistry) Get(name string) MeterInterface { if m, ok := r.meters.Load(name); ok { - return m.(*Meter) + return m.(MeterInterface) } m, _ := r.meters.LoadOrStore(name, NewMeter()) - return m.(*Meter) + return m.(MeterInterface) } // FindIdle finds all meters that haven't been used since the given time. @@ -50,7 +50,7 @@ func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) { r.meters.Range(func(k, v interface{}) bool { // So, this _is_ slightly inaccurate. - if v.(*Meter).snapshot.LastUpdate.Before(since) { + if v.(MeterInterface).Snapshot().LastUpdate.Before(since) { cb(k) } return true @@ -66,9 +66,9 @@ func (r *MeterRegistry) Remove(name string) { } // ForEach calls the passed function for each registered meter. -func (r *MeterRegistry) ForEach(iterFunc func(string, *Meter)) { +func (r *MeterRegistry) ForEach(iterFunc func(string, MeterInterface)) { r.meters.Range(func(k, v interface{}) bool { - iterFunc(k.(string), v.(*Meter)) + iterFunc(k.(string), v.(MeterInterface)) return true }) } diff --git a/sweeper.go b/sweeper.go index 443ef07..8ce83e9 100644 --- a/sweeper.go +++ b/sweeper.go @@ -1,9 +1,7 @@ package flow import ( - "math" "sync" - "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -30,19 +28,23 @@ func SetClock(c clock.Clock) { cl = c } +type SweeperInterface interface { + Register(m MeterInterface) +} + type sweeper struct { sweepOnce sync.Once snapshotMu sync.RWMutex - meters []*Meter + meters []MeterInterface activeMeters int lastUpdateTime time.Time - registerChannel chan *Meter + registerChannel chan MeterInterface } func (sw *sweeper) start() { - sw.registerChannel = make(chan *Meter, 16) + sw.registerChannel = make(chan MeterInterface, 16) go sw.run() } @@ -53,12 +55,12 @@ func (sw *sweeper) run() { } } -func (sw *sweeper) register(m *Meter) { - if m.registered { +func (sw *sweeper) register(m MeterInterface) { + if !m.IsIdle() { // registered twice, move on. return } - m.registered = true + m.SetActive() sw.meters = append(sw.meters, m) } @@ -70,7 +72,7 @@ func (sw *sweeper) runActive() { for len(sw.meters) > 0 { // Scale back allocation. if len(sw.meters)*2 < cap(sw.meters) { - newMeters := make([]*Meter, len(sw.meters)) + newMeters := make([]MeterInterface, len(sw.meters)) copy(newMeters, sw.meters) sw.meters = newMeters } @@ -96,87 +98,14 @@ func (sw *sweeper) update() { return } sw.lastUpdateTime = now - timeMultiplier := float64(time.Second) / float64(tdiff) // Calculate the bandwidth for all active meters. for i, m := range sw.meters[:sw.activeMeters] { - total := atomic.LoadUint64(&m.accumulator) - diff := total - m.snapshot.Total - instant := timeMultiplier * float64(diff) - - if diff > 0 { - m.snapshot.LastUpdate = now - } - // log.Printf("Rate: %f Instant: %f Total: %d", m.snapshot.Rate, instant, total) - if m.snapshot.Rate == 0 { - m.snapshot.Rate = instant - } else { - m.snapshot.Rate = math.Max(instant, m.snapshot.Rate) - } - m.snapshot.Total = total - - /* - // This is equivalent to one zeros, then one, then 30 zeros. - // We'll consider that to be "idle". - if m.snapshot.Rate > IdleRate { - continue - } - */ - if now.Sub(m.snapshot.LastUpdate) < IdleTime { - continue - } - // Ok, so we are idle... - if !atomic.CompareAndSwapUint64(&m.accumulator, total, 0) { - // ... or not - atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) - // atomic.AddUint64(&m.accumulator, -total) - } - /* - // Mark this as idle by zeroing the accumulator. - swappedTotal := atomic.SwapUint64(&m.accumulator, 0) - - // So..., are we really idle? - if swappedTotal > total { - // Not so idle... - // Now we need to make sure this gets re-registered. - - // First, add back what we removed. If we can do this - // fast enough, we can put it back before anyone - // notices. - currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) - - // Did we make it? - if currentTotal == swappedTotal { - // Yes! Nobody noticed, move along. - continue - } - // No. Someone noticed and will (or has) put back into - // the registration channel. - // - // Remove the snapshot total, it'll get added back on - // registration. - // - // `^uint64(total - 1)` is the two's complement of - // `total`. It's the "correct" way to subtract - // atomically in go. - atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) - } - */ + m.Update(tdiff) // Reset the rate, keep the total. - m.registered = false - m.snapshot.Rate = 0 - sw.meters[i] = nil - } - - // Re-add the total to all the newly active accumulators and set the snapshot to the total. - // 1. We don't do this on register to avoid having to take the snapshot lock. - // 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation. - for _, m := range sw.meters[sw.activeMeters:] { - total := atomic.AddUint64(&m.accumulator, m.snapshot.Total) - if total > m.snapshot.Total { - m.snapshot.LastUpdate = now + if m.IsIdle() { + sw.meters[i] = nil } - m.snapshot.Total = total } // compress and trim the meter list @@ -194,7 +123,7 @@ func (sw *sweeper) update() { sw.activeMeters = len(sw.meters) } -func (sw *sweeper) Register(m *Meter) { +func (sw *sweeper) Register(m MeterInterface) { sw.sweepOnce.Do(sw.start) sw.registerChannel <- m } From 5659dc2afa3229eb6ecd63c5dadf39634a90d632 Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Tue, 14 Nov 2023 20:55:39 +0300 Subject: [PATCH 03/11] Fixed interfaces --- meter.go | 127 +++++++++++++++++++++++++---------------------- registry_test.go | 12 ++--- sweeper.go | 4 +- 3 files changed, 75 insertions(+), 68 deletions(-) diff --git a/meter.go b/meter.go index 242d847..0197813 100644 --- a/meter.go +++ b/meter.go @@ -41,6 +41,7 @@ type MeterInterface interface { // the program start instead of when the meter was created. func NewMeter() MeterInterface { return &Meter{ + fresh: true, snapshot: Snapshot{ LastUpdate: time.Now(), }, @@ -60,6 +61,8 @@ type Meter struct { // Take lock. snapshot Snapshot + + fresh bool } // Mark updates the total. @@ -93,78 +96,82 @@ func (m *Meter) String() string { } func (m *Meter) Update(tdiff time.Duration) { - timeMultiplier := float64(time.Second) / float64(tdiff) - total := atomic.LoadUint64(&m.accumulator) - diff := total - m.snapshot.Total - instant := timeMultiplier * float64(diff) - - if diff > 0 { - m.snapshot.LastUpdate = cl.Now() - } + if !m.fresh { + timeMultiplier := float64(time.Second) / float64(tdiff) + total := atomic.LoadUint64(&m.accumulator) + diff := total - m.snapshot.Total + instant := timeMultiplier * float64(diff) + + if diff > 0 { + m.snapshot.LastUpdate = cl.Now() + } - if m.snapshot.Rate == 0 { - m.snapshot.Rate = instant - } else { - m.snapshot.Rate += alpha * (instant - m.snapshot.Rate) - } - m.snapshot.Total = total + if m.snapshot.Rate == 0 { + m.snapshot.Rate = instant + } else { + m.snapshot.Rate += alpha * (instant - m.snapshot.Rate) + } + m.snapshot.Total = total - // This is equivalent to one zeros, then one, then 30 zeros. - // We'll consider that to be "idle". - if m.snapshot.Rate > IdleRate { - m.SetIdle() - goto label1 - } - { - // Ok, so we are idle... - - // Mark this as idle by zeroing the accumulator. - swappedTotal := atomic.SwapUint64(&m.accumulator, 0) - - // So..., are we really idle? - if swappedTotal > total { - // Not so idle... - // Now we need to make sure this gets re-registered. - - // First, add back what we removed. If we can do this - // fast enough, we can put it back before anyone - // notices. - currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) - - // Did we make it? - if currentTotal == swappedTotal { - // Yes! Nobody noticed, move along. - goto label1 + // This is equivalent to one zeros, then one, then 30 zeros. + // We'll consider that to be "idle". + if m.snapshot.Rate > IdleRate { + m.SetIdle() + return + } + { + // Ok, so we are idle... + + // Mark this as idle by zeroing the accumulator. + swappedTotal := atomic.SwapUint64(&m.accumulator, 0) + + // So..., are we really idle? + if swappedTotal > total { + // Not so idle... + // Now we need to make sure this gets re-registered. + + // First, add back what we removed. If we can do this + // fast enough, we can put it back before anyone + // notices. + currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) + + // Did we make it? + if currentTotal == swappedTotal { + // Yes! Nobody noticed, move along. + return + } + // No. Someone noticed and will (or has) put back into + // the registration channel. + // + // Remove the snapshot total, it'll get added back on + // registration. + // + // `^uint64(total - 1)` is the two's complement of + // `total`. It's the "correct" way to subtract + // atomically in go. + atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) } - // No. Someone noticed and will (or has) put back into - // the registration channel. - // - // Remove the snapshot total, it'll get added back on - // registration. - // - // `^uint64(total - 1)` is the two's complement of - // `total`. It's the "correct" way to subtract - // atomically in go. - atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) } + } else { + // Re-add the total to all the newly active accumulators and set the snapshot to the total. + // 1. We don't do this on register to avoid having to take the snapshot lock. + // 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation. + total := atomic.AddUint64(&m.accumulator, m.snapshot.Total) + if total > m.snapshot.Total { + m.snapshot.LastUpdate = cl.Now() + } + m.snapshot.Total = total + m.fresh = false } -label1: - // Re-add the total to all the newly active accumulators and set the snapshot to the total. - // 1. We don't do this on register to avoid having to take the snapshot lock. - // 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation. - total = atomic.AddUint64(&m.accumulator, m.snapshot.Total) - if total > m.snapshot.Total { - m.snapshot.LastUpdate = cl.Now() - } - m.snapshot.Total = total } func (m *Meter) IsIdle() bool { - return m.registered + return !m.registered } func (m *Meter) SetIdle() { m.registered = false + m.fresh = true } func (m *Meter) SetActive() { diff --git a/registry_test.go b/registry_test.go index f7976c9..455ac2d 100644 --- a/registry_test.go +++ b/registry_test.go @@ -28,11 +28,11 @@ func TestRegistry(t *testing.T) { t.Error("expected the last update to have been updated") } - expectedMeters := map[string]*Meter{ + expectedMeters := map[string]MeterInterface{ "first": m1, "second": m2, } - r.ForEach(func(n string, m *Meter) { + r.ForEach(func(n string, m MeterInterface) { if expectedMeters[n] != m { t.Errorf("wrong meter '%s'", n) } @@ -45,7 +45,7 @@ func TestRegistry(t *testing.T) { r.Remove("first") found := false - r.ForEach(func(n string, m *Meter) { + r.ForEach(func(n string, m MeterInterface) { if n != "second" { t.Errorf("found unexpected meter: %s", n) return @@ -68,11 +68,11 @@ func TestRegistry(t *testing.T) { t.Errorf("expected first total to now be 0, got %d", total) } - expectedMeters = map[string]*Meter{ + expectedMeters = map[string]MeterInterface{ "first": m3, "second": m2, } - r.ForEach(func(n string, m *Meter) { + r.ForEach(func(n string, m MeterInterface) { if expectedMeters[n] != m { t.Errorf("wrong meter '%s'", n) } @@ -111,7 +111,7 @@ func TestClearRegistry(t *testing.T) { r.Clear() - r.ForEach(func(n string, _m *Meter) { + r.ForEach(func(n string, _m MeterInterface) { t.Errorf("expected no meters at all, found a meter %s", n) }) diff --git a/sweeper.go b/sweeper.go index 8ce83e9..99b925b 100644 --- a/sweeper.go +++ b/sweeper.go @@ -15,7 +15,7 @@ import ( // var IdleRate = 1e-13 // IdleTime the time that need to pass scince last update before we declare a metere idle -var IdleTime = 20 * time.Second +//var IdleTime = 20 * time.Second // The global sweeper. var globalSweeper sweeper @@ -100,7 +100,7 @@ func (sw *sweeper) update() { sw.lastUpdateTime = now // Calculate the bandwidth for all active meters. - for i, m := range sw.meters[:sw.activeMeters] { + for i, m := range sw.meters { m.Update(tdiff) // Reset the rate, keep the total. if m.IsIdle() { From 1070bec7f780ae4e06e3dcef343d1c648adf4203 Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Tue, 21 Nov 2023 21:27:28 +0300 Subject: [PATCH 04/11] Tests passed --- flow_test.go | 236 +++++++++++++++++++---------------------------- meter.go | 77 ++++++++-------- meter_test.go | 52 +---------- registry_test.go | 18 ++-- sweeper_test.go | 22 +++-- 5 files changed, 160 insertions(+), 245 deletions(-) diff --git a/flow_test.go b/flow_test.go index 0d926f5..be9beba 100644 --- a/flow_test.go +++ b/flow_test.go @@ -3,183 +3,137 @@ package flow import ( "math" "sync" - "sync/atomic" "testing" "time" ) func TestBasic(t *testing.T) { - if testing.Short() { - t.Skip("short testing requested") + m := new(Meter) + for i := 0; i < 300; i++ { + m.Mark(1000) + mockClock.Add(40 * time.Millisecond) + } + if rate := m.Snapshot().Rate; !approxEq(rate, 25000, 1) { // Changed + t.Errorf("expected rate 25000, got %f", rate) } - var wg sync.WaitGroup - wg.Add(100) - for i := 0; i < 100; i++ { - go func() { - defer wg.Done() - ticker := time.NewTicker(40 * time.Millisecond) - defer ticker.Stop() - - m := new(Meter) - for i := 0; i < 300; i++ { - m.Mark(1000) - <-ticker.C - } - time.Sleep(1 * time.Second) - actual := m.Snapshot() - if !approxEq(actual.Rate, 25000, 1100) { - t.Errorf("expected rate 25000 (±1100), got %f", actual.Rate) - } - if actual.Total != 300000 { - t.Errorf("1. Expected total %d, got %d", 300000, actual.Total) - } - time.Sleep(15 * time.Second) - if actual.Total != 300000 { - t.Errorf("2. Expected total %d, got %d", 300000, actual.Total) - } + for i := 0; i < 400; i++ { + m.Mark(200) + mockClock.Add(40 * time.Millisecond) + } - // Let it settle. - time.Sleep(10 * time.Second) + if rate := m.Snapshot().Rate; !approxEq(rate, 5000, 1) { + t.Errorf("expected rate 5000, got %f", rate) + } - actual = m.Snapshot() - if actual.Total != 300000 { - t.Errorf("3. Expected total %d, got %d", 300000, actual.Total) - } - for i := 0; i < 200; i++ { - m.Mark(200) - <-ticker.C - } + mockClock.Add(time.Second) - // Adjusts - actual = m.Snapshot() - if !approxEq(actual.Rate, 5000, 250) { - t.Errorf("expected rate 5000 (±250), got %f", actual.Rate) - } - - // get the right total - actual = m.Snapshot() - if actual.Total != 335400 { - t.Errorf("4. Expected total %d, got %d", 335400, actual.Total) - } - }() + if total := m.Snapshot().Total; total != 380000 { + t.Errorf("expected total %d, got %d", 380000, total) } - wg.Wait() } func TestShared(t *testing.T) { - if testing.Short() { - t.Skip("short testing requested") - } var wg sync.WaitGroup - wg.Add(20 * 21) - for i := 0; i < 20; i++ { - m := new(Meter) - for j := 0; j < 20; j++ { - go func() { - defer wg.Done() - ticker := time.NewTicker(40 * time.Millisecond) - defer ticker.Stop() - for i := 0; i < 300; i++ { - m.Mark(50) - <-ticker.C - } - - for i := 0; i < 200; i++ { - m.Mark(10) - <-ticker.C - } - }() - } + wg.Add(20) + m := new(Meter) + for j := 0; j < 20; j++ { go func() { defer wg.Done() - time.Sleep(40 * 300 * time.Millisecond) - actual := m.Snapshot() - if !approxEq(actual.Rate, 25000, 250) { - t.Errorf("expected rate 25000 (±250), got %f", actual.Rate) + for i := 0; i < 300; i++ { + m.Mark(50) + mockClock.Sleep(40 * time.Millisecond) } - time.Sleep(40 * 200 * time.Millisecond) - - // Adjusts - actual = m.Snapshot() - if !approxEq(actual.Rate, 5000, 50) { - t.Errorf("expected rate 5000 (±50), got %f", actual.Rate) + for i := 0; i < 300; i++ { + m.Mark(10) + mockClock.Sleep(40 * time.Millisecond) } + }() + } - // Let it settle. - time.Sleep(2 * time.Second) + time.Sleep(time.Millisecond) + mockClock.Add(20 * 300 * time.Millisecond) + time.Sleep(time.Millisecond) + mockClock.Add(20 * 300 * time.Millisecond) + time.Sleep(time.Millisecond) - // get the right total - actual = m.Snapshot() - if actual.Total != 340000 { - t.Errorf("expected total %d, got %d", 340000, actual.Total) - } - }() + actual := m.Snapshot() + if !approxEq(actual.Rate, 25000, 1) { + t.Errorf("expected rate 25000, got %f", actual.Rate) + } + + time.Sleep(time.Millisecond) + mockClock.Add(20 * 300 * time.Millisecond) + time.Sleep(time.Millisecond) + mockClock.Add(20 * 300 * time.Millisecond) + time.Sleep(time.Millisecond) + + // Adjusts + actual = m.Snapshot() + if !approxEq(actual.Rate, 5000, 1) { + t.Errorf("expected rate 5000, got %f", actual.Rate) + } + + // Let it settle. + time.Sleep(time.Millisecond) + mockClock.Add(time.Second) + time.Sleep(time.Millisecond) + mockClock.Add(time.Second) + time.Sleep(time.Millisecond) + + // get the right total + actual = m.Snapshot() + if actual.Total != 360000 { + t.Errorf("expected total %d, got %d", 360000, actual.Total) } wg.Wait() } func TestUnregister(t *testing.T) { - if testing.Short() { - t.Skip("short testing requested") + m := new(Meter) + + for i := 0; i < 40; i++ { + m.Mark(1) + mockClock.Add(100 * time.Millisecond) } - var wg sync.WaitGroup - wg.Add(100 * 2) - for i := 0; i < 100; i++ { - m := new(Meter) - go func() { - defer wg.Done() - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - for i := 0; i < 40; i++ { - m.Mark(1) - <-ticker.C - } - time.Sleep(62 * time.Second) + actual := m.Snapshot() + if !approxEq(actual.Rate, 10, 1) { // Changed + t.Errorf("expected rate 10, got %f", actual.Rate) + } - for i := 0; i < 40; i++ { - m.Mark(2) - <-ticker.C - } - }() - go func() { - defer wg.Done() - time.Sleep(40 * 100 * time.Millisecond) + mockClock.Add(62 * time.Second) - actual := m.Snapshot() - if !approxEq(actual.Rate, 10, 1) { - t.Errorf("expected rate 10 (±1), got %f", actual.Rate) - } + if m.accumulator.Load() != 0 { + t.Error("expected meter to be paused") + } - time.Sleep(60 * time.Second) - if atomic.LoadUint64(&m.accumulator) != 0 { - t.Error("expected meter to be paused") - } + actual = m.Snapshot() + if actual.Total != 40 { + t.Errorf("expected total 4000, got %d", actual.Total) + } - actual = m.Snapshot() - if actual.Total != 40 { - t.Errorf("expected total 4000, got %d", actual.Total) - } - time.Sleep(2*time.Second + 40*100*time.Millisecond) + for i := 0; i < 40; i++ { + m.Mark(2) + mockClock.Add(100 * time.Millisecond) + } - actual = m.Snapshot() - if !approxEq(actual.Rate, 20, 4) { - t.Errorf("expected rate 20 (±4), got %f", actual.Rate) - } - time.Sleep(2 * time.Second) - actual = m.Snapshot() - if actual.Total != 120 { - t.Errorf("expected total 120, got %d", actual.Total) - } - if atomic.LoadUint64(&m.accumulator) == 0 { - t.Error("expected meter to be active") - } - }() + actual = m.Snapshot() + if actual.Rate != 20 { + t.Errorf("expected rate 20, got %f", actual.Rate) + } + + mockClock.Add(2 * time.Second) + actual = m.Snapshot() + if actual.Total != 120 { + t.Errorf("expected total 120, got %d", actual.Total) } - wg.Wait() + if m.accumulator.Load() == 0 { + t.Error("expected meter to be active") + } + } func approxEq(a, b, err float64) bool { diff --git a/meter.go b/meter.go index 0197813..d9e550d 100644 --- a/meter.go +++ b/meter.go @@ -43,7 +43,7 @@ func NewMeter() MeterInterface { return &Meter{ fresh: true, snapshot: Snapshot{ - LastUpdate: time.Now(), + LastUpdate: cl.Now(), }, } } @@ -54,7 +54,7 @@ func (s Snapshot) String() string { // Meter is a meter for monitoring a flow. type Meter struct { - accumulator uint64 + accumulator atomic.Uint64 // managed by the sweeper loop. registered bool @@ -67,7 +67,7 @@ type Meter struct { // Mark updates the total. func (m *Meter) Mark(count uint64) { - if count > 0 && atomic.AddUint64(&m.accumulator, count) == count { + if count > 0 && m.accumulator.Add(count) == count { // The accumulator is 0 so we probably need to register. We may // already _be_ registered however, if we are, the registration // loop will notice that `m.registered` is set and ignore us. @@ -85,7 +85,7 @@ func (m *Meter) Snapshot() Snapshot { // Reset sets accumulator, total and rate to zero. func (m *Meter) Reset() { globalSweeper.snapshotMu.Lock() - atomic.StoreUint64(&m.accumulator, 0) + m.accumulator.Store(0) m.snapshot.Rate = 0 m.snapshot.Total = 0 globalSweeper.snapshotMu.Unlock() @@ -98,7 +98,7 @@ func (m *Meter) String() string { func (m *Meter) Update(tdiff time.Duration) { if !m.fresh { timeMultiplier := float64(time.Second) / float64(tdiff) - total := atomic.LoadUint64(&m.accumulator) + total := m.accumulator.Load() diff := total - m.snapshot.Total instant := timeMultiplier * float64(diff) @@ -116,47 +116,47 @@ func (m *Meter) Update(tdiff time.Duration) { // This is equivalent to one zeros, then one, then 30 zeros. // We'll consider that to be "idle". if m.snapshot.Rate > IdleRate { - m.SetIdle() + // m.SetIdle() return } - { - // Ok, so we are idle... - - // Mark this as idle by zeroing the accumulator. - swappedTotal := atomic.SwapUint64(&m.accumulator, 0) - - // So..., are we really idle? - if swappedTotal > total { - // Not so idle... - // Now we need to make sure this gets re-registered. - - // First, add back what we removed. If we can do this - // fast enough, we can put it back before anyone - // notices. - currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal) - - // Did we make it? - if currentTotal == swappedTotal { - // Yes! Nobody noticed, move along. - return - } - // No. Someone noticed and will (or has) put back into - // the registration channel. - // - // Remove the snapshot total, it'll get added back on - // registration. - // - // `^uint64(total - 1)` is the two's complement of - // `total`. It's the "correct" way to subtract - // atomically in go. - atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1)) + + // Ok, so we are idle... + // Mark this as idle by zeroing the accumulator. + swappedTotal := m.accumulator.Swap(0) + // So..., are we really idle? + if swappedTotal > total { + // Not so idle... + // Now we need to make sure this gets re-registered. + + // First, add back what we removed. If we can do this + // fast enough, we can put it back before anyone + // notices. + currentTotal := m.accumulator.Add(swappedTotal) + + // Did we make it? + if currentTotal == swappedTotal { + // Yes! Nobody noticed, move along. + //m.SetActive() + return } + // No. Someone noticed and will (or has) put back into + // the registration channel. + // + // Remove the snapshot total, it'll get added back on + // registration. + // + // `^uint64(total - 1)` is the two's complement of + // `total`. It's the "correct" way to subtract + // atomically in go. + m.accumulator.Add(^uint64(m.snapshot.Total - 1)) + } else { + m.SetIdle() } } else { // Re-add the total to all the newly active accumulators and set the snapshot to the total. // 1. We don't do this on register to avoid having to take the snapshot lock. // 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation. - total := atomic.AddUint64(&m.accumulator, m.snapshot.Total) + total := m.accumulator.Add(m.snapshot.Total) if total > m.snapshot.Total { m.snapshot.LastUpdate = cl.Now() } @@ -170,6 +170,7 @@ func (m *Meter) IsIdle() bool { } func (m *Meter) SetIdle() { + m.snapshot.Rate = 0 m.registered = false m.fresh = true } diff --git a/meter_test.go b/meter_test.go index 0f29a8e..8442ebd 100644 --- a/meter_test.go +++ b/meter_test.go @@ -1,42 +1,17 @@ package flow import ( - "fmt" - "math" - "sync" "testing" "time" ) -func ExampleMeter() { - meter := new(Meter) - t := time.NewTicker(100 * time.Millisecond) - for i := 0; i < 100; i++ { - <-t.C - meter.Mark(30) - } - - // Get the current rate. This will be accurate *now* but not after we - // sleep (because we calculate it using EWMA). - rate := meter.Snapshot().Rate - - // Sleep 2 seconds to allow the total to catch up. We snapshot every - // second so the total may not yet be accurate. - time.Sleep(2 * time.Second) - - // Get the current total. - total := meter.Snapshot().Total - - fmt.Printf("%d (%d/s)\n", total, roundTens(rate)) - // Output: 3000 (300/s) -} - func TestResetMeter(t *testing.T) { meter := new(Meter) meter.Mark(30) - time.Sleep(2 * time.Second) + mockClock.Add(time.Millisecond) + mockClock.Add(1 * time.Second) if total := meter.Snapshot().Total; total != 30 { t.Errorf("total = %d; want 30", total) @@ -48,26 +23,3 @@ func TestResetMeter(t *testing.T) { t.Errorf("total = %d; want 0", total) } } - -func TestMarkResetMeterMulti(t *testing.T) { - var wg sync.WaitGroup - wg.Add(2) - - meter := new(Meter) - go func(meter *Meter) { - meter.Mark(30) - meter.Mark(30) - wg.Done() - }(meter) - - go func(meter *Meter) { - meter.Reset() - wg.Done() - }(meter) - - wg.Wait() -} - -func roundTens(x float64) int64 { - return int64(math.Floor(x/10+0.5)) * 10 -} diff --git a/registry_test.go b/registry_test.go index 455ac2d..ea94cb7 100644 --- a/registry_test.go +++ b/registry_test.go @@ -11,11 +11,14 @@ func TestRegistry(t *testing.T) { m2 := r.Get("second") m1Update := m1.Snapshot().LastUpdate + mockClock.Add(5 * time.Second) m1.Mark(10) m2.Mark(30) - time.Sleep(2*time.Second + time.Millisecond) + mockClock.Add(1 * time.Second) + mockClock.Add(1 * time.Second) + mockClock.Add(1 * time.Millisecond) if total := r.Get("first").Snapshot().Total; total != 10 { t.Errorf("expected first total to be 10, got %d", total) @@ -24,8 +27,8 @@ func TestRegistry(t *testing.T) { t.Errorf("expected second total to be 30, got %d", total) } - if !m1.Snapshot().LastUpdate.After(m1Update) { - t.Error("expected the last update to have been updated") + if lu := m1.Snapshot().LastUpdate; !lu.After(m1Update) { + t.Errorf("expected the last update (%s) to have after (%s)", lu, m1Update) } expectedMeters := map[string]MeterInterface{ @@ -82,10 +85,11 @@ func TestRegistry(t *testing.T) { t.Errorf("missing meters: '%v'", expectedMeters) } - before := time.Now() + before := mockClock.Now() + mockClock.Add(time.Millisecond) m3.Mark(1) - time.Sleep(2 * time.Second) - after := time.Now() + mockClock.Add(2 * time.Second) + after := mockClock.Now() if len(r.FindIdle(before)) != 1 { t.Error("expected 1 idle timer") } @@ -107,7 +111,7 @@ func TestClearRegistry(t *testing.T) { m1.Mark(10) m2.Mark(30) - time.Sleep(2 * time.Second) + mockClock.Add(2 * time.Second) r.Clear() diff --git a/sweeper_test.go b/sweeper_test.go index 1cb0d22..6e7da6d 100644 --- a/sweeper_test.go +++ b/sweeper_test.go @@ -3,14 +3,18 @@ package flow import ( "testing" "time" + + "github.com/benbjohnson/clock" ) +var mockClock = clock.NewMock() + +func init() { + SetClock(mockClock) +} + // regression test for libp2p/go-libp2p-core#65 func TestIdleInconsistency(t *testing.T) { - if testing.Short() { - t.Skip("short testing requested") - } - r := new(MeterRegistry) m1 := r.Get("first") m2 := r.Get("second") @@ -21,23 +25,23 @@ func TestIdleInconsistency(t *testing.T) { m3.Mark(30) // make m1 and m3 go idle - for i := 0; i < 25; i++ { - time.Sleep(time.Second) + for i := 0; i < 30; i++ { + mockClock.Add(time.Second) m2.Mark(1) } - time.Sleep(time.Second) + mockClock.Add(time.Second) // re-activate m3 m3.Mark(20) - time.Sleep(time.Second + time.Millisecond) + mockClock.Add(time.Second) // check the totals if total := r.Get("first").Snapshot().Total; total != 10 { t.Errorf("expected first total to be 10, got %d", total) } - if total := r.Get("second").Snapshot().Total; total != 45 { + if total := r.Get("second").Snapshot().Total; total != 50 { t.Errorf("expected second total to be 50, got %d", total) } From 2feb4eb26942d9f230711f60c7636e11982f6962 Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Tue, 21 Nov 2023 21:48:59 +0300 Subject: [PATCH 05/11] Refactored sweeper --- sweeper.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sweeper.go b/sweeper.go index 99b925b..6e7123a 100644 --- a/sweeper.go +++ b/sweeper.go @@ -35,9 +35,8 @@ type SweeperInterface interface { type sweeper struct { sweepOnce sync.Once - snapshotMu sync.RWMutex - meters []MeterInterface - activeMeters int + snapshotMu sync.RWMutex + meters []MeterInterface lastUpdateTime time.Time registerChannel chan MeterInterface @@ -118,9 +117,6 @@ func (sw *sweeper) update() { } sw.meters = sw.meters[:newLen] - - // Finally, mark all meters still in the list as "active". - sw.activeMeters = len(sw.meters) } func (sw *sweeper) Register(m MeterInterface) { From cc7dc35363a380debd3eb3e7fb3ef8778220990d Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Tue, 21 Nov 2023 22:28:57 +0300 Subject: [PATCH 06/11] Removed debug comments --- flow_test.go | 4 ++-- meter.go | 2 -- sweeper.go | 10 ---------- 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/flow_test.go b/flow_test.go index be9beba..bd7bfc8 100644 --- a/flow_test.go +++ b/flow_test.go @@ -13,7 +13,7 @@ func TestBasic(t *testing.T) { m.Mark(1000) mockClock.Add(40 * time.Millisecond) } - if rate := m.Snapshot().Rate; !approxEq(rate, 25000, 1) { // Changed + if rate := m.Snapshot().Rate; !approxEq(rate, 25000, 1) { t.Errorf("expected rate 25000, got %f", rate) } @@ -99,7 +99,7 @@ func TestUnregister(t *testing.T) { } actual := m.Snapshot() - if !approxEq(actual.Rate, 10, 1) { // Changed + if !approxEq(actual.Rate, 10, 1) { t.Errorf("expected rate 10, got %f", actual.Rate) } diff --git a/meter.go b/meter.go index d9e550d..cb2ba15 100644 --- a/meter.go +++ b/meter.go @@ -116,7 +116,6 @@ func (m *Meter) Update(tdiff time.Duration) { // This is equivalent to one zeros, then one, then 30 zeros. // We'll consider that to be "idle". if m.snapshot.Rate > IdleRate { - // m.SetIdle() return } @@ -136,7 +135,6 @@ func (m *Meter) Update(tdiff time.Duration) { // Did we make it? if currentTotal == swappedTotal { // Yes! Nobody noticed, move along. - //m.SetActive() return } // No. Someone noticed and will (or has) put back into diff --git a/sweeper.go b/sweeper.go index 6e7123a..3296d8d 100644 --- a/sweeper.go +++ b/sweeper.go @@ -7,16 +7,6 @@ import ( "github.com/benbjohnson/clock" ) -// IdleRate the rate at which we declare a meter idle (and stop tracking it -// until it's re-registered). -// -// The default ensures that 1 event every ~30s will keep the meter from going -// idle. -// var IdleRate = 1e-13 - -// IdleTime the time that need to pass scince last update before we declare a metere idle -//var IdleTime = 20 * time.Second - // The global sweeper. var globalSweeper sweeper From b5904791626eefdada2cf6ece9608f65f519191c Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Wed, 22 Nov 2023 16:56:23 +0300 Subject: [PATCH 07/11] Refactored float comparison --- flow_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow_test.go b/flow_test.go index bd7bfc8..c08bbd9 100644 --- a/flow_test.go +++ b/flow_test.go @@ -120,7 +120,7 @@ func TestUnregister(t *testing.T) { } actual = m.Snapshot() - if actual.Rate != 20 { + if !approxEq(actual.Rate, 20, 1) { t.Errorf("expected rate 20, got %f", actual.Rate) } From 617790b77a316ee9be26c9f78c5311e2417ec60e Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Thu, 23 Nov 2023 00:42:52 +0300 Subject: [PATCH 08/11] Modified .gitignore, go.mod and mock_clock_test.go --- .gitignore | 1 - go.mod | 2 +- mockclocktest/mock_clock_test.go | 11 ++++++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 600d2d3..e69de29 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +0,0 @@ -.vscode \ No newline at end of file diff --git a/go.mod b/go.mod index c63c182..98980b9 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/ArtemVasilevMIPT/go-maxflow-metrics +module github.com/libp2p/go-flow-metrics go 1.20 diff --git a/mockclocktest/mock_clock_test.go b/mockclocktest/mock_clock_test.go index a3ae653..5bfdc4e 100644 --- a/mockclocktest/mock_clock_test.go +++ b/mockclocktest/mock_clock_test.go @@ -1,10 +1,11 @@ package mockclocktest import ( + "math" "testing" "time" - flow "github.com/ArtemVasilevMIPT/go-maxflow-metrics" + flow "github.com/libp2p/go-flow-metrics" "github.com/benbjohnson/clock" ) @@ -21,7 +22,7 @@ func TestBasic(t *testing.T) { m.Mark(1000) cl.Add(40 * time.Millisecond) } - if rate := m.Snapshot().Rate; rate != 25000 { + if rate := m.Snapshot().Rate; approxEq(rate, 25000, 1) { t.Errorf("expected rate 25000, got %f", rate) } @@ -31,7 +32,7 @@ func TestBasic(t *testing.T) { } // Adjusts - if rate := m.Snapshot().Rate; rate != 5017.776503840969 { + if rate := m.Snapshot().Rate; approxEq(rate, 5017.776503840969, 0.0001) { t.Errorf("expected rate 5017.776503840969, got %f", rate) } @@ -41,3 +42,7 @@ func TestBasic(t *testing.T) { t.Errorf("expected total 3400000, got %d", total) } } + +func approxEq(a, b, err float64) bool { + return math.Abs(a-b) < err +} From abc0ba3e49523144a085f461e45db71a61ed48c4 Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Mon, 4 Dec 2023 00:41:04 +0300 Subject: [PATCH 09/11] Simplified interface, added time argument to Update() --- meter.go | 11 ++++------- registry_test.go | 10 +++++----- sweeper.go | 2 +- sweeper_test.go | 6 +++--- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/meter.go b/meter.go index cb2ba15..bb645d7 100644 --- a/meter.go +++ b/meter.go @@ -25,11 +25,8 @@ type Snapshot struct { } type MeterInterface interface { - String() string - Mark(count uint64) Snapshot() Snapshot - Reset() - Update(tdiff time.Duration) + Update(tdiff time.Duration, now time.Time) IsIdle() bool SetIdle() SetActive() @@ -95,7 +92,7 @@ func (m *Meter) String() string { return m.Snapshot().String() } -func (m *Meter) Update(tdiff time.Duration) { +func (m *Meter) Update(tdiff time.Duration, now time.Time) { if !m.fresh { timeMultiplier := float64(time.Second) / float64(tdiff) total := m.accumulator.Load() @@ -103,7 +100,7 @@ func (m *Meter) Update(tdiff time.Duration) { instant := timeMultiplier * float64(diff) if diff > 0 { - m.snapshot.LastUpdate = cl.Now() + m.snapshot.LastUpdate = now } if m.snapshot.Rate == 0 { @@ -156,7 +153,7 @@ func (m *Meter) Update(tdiff time.Duration) { // 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation. total := m.accumulator.Add(m.snapshot.Total) if total > m.snapshot.Total { - m.snapshot.LastUpdate = cl.Now() + m.snapshot.LastUpdate = now } m.snapshot.Total = total m.fresh = false diff --git a/registry_test.go b/registry_test.go index ea94cb7..a39b930 100644 --- a/registry_test.go +++ b/registry_test.go @@ -7,8 +7,8 @@ import ( func TestRegistry(t *testing.T) { r := new(MeterRegistry) - m1 := r.Get("first") - m2 := r.Get("second") + m1 := r.Get("first").(*Meter) + m2 := r.Get("second").(*Meter) m1Update := m1.Snapshot().LastUpdate mockClock.Add(5 * time.Second) @@ -63,7 +63,7 @@ func TestRegistry(t *testing.T) { t.Errorf("didn't find second meter") } - m3 := r.Get("first") + m3 := r.Get("first").(*Meter) if m3 == m1 { t.Error("should have gotten a new meter") } @@ -105,8 +105,8 @@ func TestRegistry(t *testing.T) { func TestClearRegistry(t *testing.T) { r := new(MeterRegistry) - m1 := r.Get("first") - m2 := r.Get("second") + m1 := r.Get("first").(*Meter) + m2 := r.Get("second").(*Meter) m1.Mark(10) m2.Mark(30) diff --git a/sweeper.go b/sweeper.go index 3296d8d..cc53912 100644 --- a/sweeper.go +++ b/sweeper.go @@ -90,7 +90,7 @@ func (sw *sweeper) update() { // Calculate the bandwidth for all active meters. for i, m := range sw.meters { - m.Update(tdiff) + m.Update(tdiff, now) // Reset the rate, keep the total. if m.IsIdle() { sw.meters[i] = nil diff --git a/sweeper_test.go b/sweeper_test.go index 6e7da6d..344c16e 100644 --- a/sweeper_test.go +++ b/sweeper_test.go @@ -16,9 +16,9 @@ func init() { // regression test for libp2p/go-libp2p-core#65 func TestIdleInconsistency(t *testing.T) { r := new(MeterRegistry) - m1 := r.Get("first") - m2 := r.Get("second") - m3 := r.Get("third") + m1 := r.Get("first").(*Meter) + m2 := r.Get("second").(*Meter) + m3 := r.Get("third").(*Meter) m1.Mark(10) m2.Mark(20) From 2ba877feb259618d76debe88f225575f46840ce9 Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Tue, 12 Dec 2023 21:38:46 +0300 Subject: [PATCH 10/11] Added compatibility with libp2p --- registry.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/registry.go b/registry.go index 9a77bf6..c709bba 100644 --- a/registry.go +++ b/registry.go @@ -66,11 +66,21 @@ func (r *MeterRegistry) Remove(name string) { } // ForEach calls the passed function for each registered meter. -func (r *MeterRegistry) ForEach(iterFunc func(string, MeterInterface)) { - r.meters.Range(func(k, v interface{}) bool { - iterFunc(k.(string), v.(MeterInterface)) - return true - }) +// +// Note: switch was added for compatibility reasons +func (r *MeterRegistry) ForEach(iterFunc interface{}) { + switch f := iterFunc.(type) { + case func(string, MeterInterface): + r.meters.Range(func(k, v interface{}) bool { + f(k.(string), v.(MeterInterface)) + return true + }) + case func(string, *Meter): + r.meters.Range(func(k, v interface{}) bool { + f(k.(string), v.(*Meter)) + return true + }) + } } // Clear removes all meters from the registry. From f6487e6a1bf6825c364357921126d4b6059ff0e5 Mon Sep 17 00:00:00 2001 From: Artem Vasilev Date: Mon, 18 Dec 2023 11:39:36 +0300 Subject: [PATCH 11/11] Returned 'Mark' method --- meter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/meter.go b/meter.go index bb645d7..24097d6 100644 --- a/meter.go +++ b/meter.go @@ -27,6 +27,7 @@ type Snapshot struct { type MeterInterface interface { Snapshot() Snapshot Update(tdiff time.Duration, now time.Time) + Mark(count uint64) IsIdle() bool SetIdle() SetActive()