Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
added initial backoff cache discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Sep 6, 2019
1 parent d248d63 commit 393cf50
Show file tree
Hide file tree
Showing 5 changed files with 861 additions and 0 deletions.
181 changes: 181 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package discovery

import (
"math"
"math/rand"
"time"
)

type BackoffFactory func() BackoffStrategy

type BackoffStrategy interface {
Delay() time.Duration
Reset()
}

// Jitter implementations taken roughly from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

// Jitter must return a duration between min and max. Min must be lower than, or equal to, max.
type Jitter func(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration

func FullJitter(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration {
if duration <= min {
return min
}

normalizedDur := boundedDuration(duration, min, max) - min

return boundedDuration(time.Duration(rng.Int63n(int64(normalizedDur)))+min, min, max)
}

func NoJitter(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration {
return boundedDuration(duration, min, max)
}

type randomizedBackoff struct {
min time.Duration
max time.Duration
rng *rand.Rand
}

func (b *randomizedBackoff) BoundedDelay(duration time.Duration) time.Duration {
return boundedDuration(duration, b.min, b.max)
}

func boundedDuration(d time.Duration, min time.Duration, max time.Duration) time.Duration {
if d < min {
return min
}
if d > max {
return max
}
return d
}

type attemptBackoff struct {
attempt int
jitter Jitter
randomizedBackoff
}

func (b *attemptBackoff) Reset() {
b.attempt = 0
}

func NewFixedBackoffFactory(delay time.Duration) BackoffFactory {
return func() BackoffStrategy {
return &fixedBackoff{delay: delay}
}
}

type fixedBackoff struct {
delay time.Duration
}

func (b *fixedBackoff) Delay() time.Duration {
return b.delay
}

func (b *fixedBackoff) Reset() {}

func NewPolynomialBackoffFactory(min, max time.Duration, jitter Jitter,
timeUnits time.Duration, polyCoefs []float64, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &polynomialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
poly: polyCoefs,
}
}
}

type polynomialBackoff struct {
attemptBackoff
timeUnits time.Duration
poly []float64
}

func (b *polynomialBackoff) Delay() time.Duration {
polySum := b.poly[0]
exp := 1
attempt := b.attempt
b.attempt++

for _, c := range b.poly[1:] {
exp *= attempt
polySum += float64(exp) * c
}
return b.jitter(time.Duration(float64(b.timeUnits)*polySum), b.min, b.max, b.rng)
}

func NewExponentialBackoffFactory(min, max time.Duration, jitter Jitter,
timeUnits time.Duration, base float64, offset time.Duration, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &exponentialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
base: base,
offset: offset,
}
}
}

type exponentialBackoff struct {
attemptBackoff
timeUnits time.Duration
base float64
offset time.Duration
}

func (b *exponentialBackoff) Delay() time.Duration {
attempt := b.attempt
b.attempt++
return b.jitter(
time.Duration(math.Pow(b.base, float64(attempt))*float64(b.timeUnits))+b.offset, b.min, b.max, b.rng)
}

func NewExponentialDecorrelatedJitterFactory(min, max time.Duration, base float64, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &exponentialDecorrelatedJitter{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
base: base,
}
}
}

type exponentialDecorrelatedJitter struct {
randomizedBackoff
base float64
lastDelay time.Duration
}

func (b *exponentialDecorrelatedJitter) Delay() time.Duration {
if b.lastDelay < b.min {
b.lastDelay = b.min
return b.lastDelay
}

nextMax := int64(float64(b.lastDelay) * b.base)
b.lastDelay = boundedDuration(time.Duration(b.rng.Int63n(nextMax-int64(b.min)))+b.min, b.min, b.max)
return b.lastDelay
}

func (b *exponentialDecorrelatedJitter) Reset() { b.lastDelay = 0 }
125 changes: 125 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package discovery

import (
"math/rand"
"testing"
"time"
)

func checkDelay(bkf BackoffStrategy, expected time.Duration, t *testing.T) {
t.Helper()
if calculated := bkf.Delay(); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}

func TestFixedBackoff(t *testing.T) {
startDelay := time.Second
delay := startDelay

bkf := NewFixedBackoffFactory(delay)
delay *= 2
b1 := bkf()
delay *= 2
b2 := bkf()

if b1.Delay() != startDelay || b2.Delay() != startDelay {
t.Fatal("incorrect delay time")
}

if b1.Delay() != startDelay {
t.Fatal("backoff is stateful")
}

if b1.Reset(); b1.Delay() != startDelay {
t.Fatalf("Reset does something")
}
}

func TestPolynomialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewPolynomialBackoffFactory(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Second || b2.Delay() != time.Second {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*5500, t)
checkDelay(b1, time.Millisecond*16500, t)
checkDelay(b1, time.Millisecond*33000, t)
checkDelay(b2, time.Millisecond*5500, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*5500, t)
}

func TestExponentialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewExponentialBackoffFactory(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Millisecond*650 || b2.Delay() != time.Millisecond*650 {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*1100, t)
checkDelay(b1, time.Millisecond*1850, t)
checkDelay(b1, time.Millisecond*2975, t)
checkDelay(b1, time.Microsecond*4662500, t)
checkDelay(b1, time.Second*7, t)
checkDelay(b2, time.Millisecond*1100, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*1100, t)
}

func minMaxJitterTest(jitter Jitter, t *testing.T) {
rng := rand.New(rand.NewSource(0))
if jitter(time.Nanosecond, time.Hour*10, time.Hour*20, rng) < time.Hour*10 {
t.Fatal("Min not working")
}
if jitter(time.Hour, time.Nanosecond, time.Nanosecond*10, rng) > time.Nanosecond*10 {
t.Fatal("Max not working")
}
}

func TestNoJitter(t *testing.T) {
minMaxJitterTest(NoJitter, t)
for i := 0; i < 10; i++ {
expected := time.Second * time.Duration(i)
if calculated := NoJitter(expected, time.Duration(0), time.Second*100, nil); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}
}

func TestFullJitter(t *testing.T) {
rng := rand.New(rand.NewSource(0))
minMaxJitterTest(FullJitter, t)
const numBuckets = 51
const multiplier = 10
const threshold = 20

histogram := make([]int, numBuckets)

for i := 0; i < (numBuckets-1)*multiplier; i++ {
started := time.Nanosecond * 50
calculated := FullJitter(started, 0, 100, rng)
histogram[calculated]++
}

for _, count := range histogram {
if count > threshold {
t.Fatal("jitter is not close to evenly spread")
}
}

if histogram[numBuckets-1] > 0 {
t.Fatal("jitter increased overall time")
}
}
Loading

0 comments on commit 393cf50

Please sign in to comment.