Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Add Backoff Cache Discovery #26

Merged
merged 14 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package discovery

import (
"math"
"math/rand"
"time"
)

type BackoffFactory func() BackoffStrategy

// BackoffStrategy describes how backoff will be implemented. BackoffStratgies are stateful.
type BackoffStrategy interface {
// Delay calculates how long the next backoff duration should be, given the prior calls to Delay
Delay() time.Duration
// Reset clears the internal state of the BackoffStrategy
Reset()
}

// Jitter implementations taken roughly from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

// Jitter must return a duration between min and max. Min must be lower than, or equal to, max.
type Jitter func(duration, min, max time.Duration, rng *rand.Rand) time.Duration

// FullJitter returns a random number uniformly chose from the range [min, boundedDur].
// boundedDur is the duration bounded between min and max.
func FullJitter(duration, min, max time.Duration, rng *rand.Rand) time.Duration {
if duration <= min {
return min
}

normalizedDur := boundedDuration(duration, min, max) - min

return boundedDuration(time.Duration(rng.Int63n(int64(normalizedDur)))+min, min, max)
}

// NoJitter returns the duration bounded between min and max
func NoJitter(duration, min, max time.Duration, rng *rand.Rand) time.Duration {
return boundedDuration(duration, min, max)
}

type randomizedBackoff struct {
min time.Duration
max time.Duration
rng *rand.Rand
}

func (b *randomizedBackoff) BoundedDelay(duration time.Duration) time.Duration {
return boundedDuration(duration, b.min, b.max)
}

func boundedDuration(d, min, max time.Duration) time.Duration {
if d < min {
return min
}
if d > max {
return max
}
return d
}

type attemptBackoff struct {
attempt int
jitter Jitter
randomizedBackoff
}

func (b *attemptBackoff) Reset() {
b.attempt = 0
}

// NewFixedBackoff creates a BackoffFactory with a constant backoff duration
func NewFixedBackoff(delay time.Duration) BackoffFactory {
return func() BackoffStrategy {
return &fixedBackoff{delay: delay}
}
}

type fixedBackoff struct {
delay time.Duration
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

func (b *fixedBackoff) Delay() time.Duration {
return b.delay
}

func (b *fixedBackoff) Reset() {}

// NewPolynomialBackoff creates a BackoffFactory with backoff of the form c0*x^0, c1*x^1, ...cn*x^n where x is the attempt number
// jitter is the function for adding randomness around the backoff
// timeUnits are the units of time the polynomial is evaluated in
// polyCoefs is the array of polynomial coefficients from [c0, c1, ... cn]
func NewPolynomialBackoff(min, max time.Duration, jitter Jitter,
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
timeUnits time.Duration, polyCoefs []float64, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &polynomialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
poly: polyCoefs,
}
}
}

type polynomialBackoff struct {
attemptBackoff
timeUnits time.Duration
poly []float64
}

func (b *polynomialBackoff) Delay() time.Duration {
var polySum float64
switch len(b.poly) {
case 0:
return 0
case 1:
polySum = b.poly[0]
default:
polySum = b.poly[0]
exp := 1
attempt := b.attempt
b.attempt++

for _, c := range b.poly[1:] {
exp *= attempt
polySum += float64(exp) * c
}
}
return b.jitter(time.Duration(float64(b.timeUnits)*polySum), b.min, b.max, b.rng)
}

// NewExponentialBackoff creates a BackoffFactory with backoff of the form base^x + offset where x is the attempt number
// jitter is the function for adding randomness around the backoff
// timeUnits are the units of time the base^x is evaluated in
func NewExponentialBackoff(min, max time.Duration, jitter Jitter,
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
timeUnits time.Duration, base float64, offset time.Duration, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &exponentialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
base: base,
offset: offset,
}
}
}

type exponentialBackoff struct {
attemptBackoff
timeUnits time.Duration
base float64
offset time.Duration
}

func (b *exponentialBackoff) Delay() time.Duration {
attempt := b.attempt
b.attempt++
return b.jitter(
time.Duration(math.Pow(b.base, float64(attempt))*float64(b.timeUnits))+b.offset, b.min, b.max, b.rng)
}

// NewExponentialDecorrelatedJitter creates a BackoffFactory with backoff of the roughly of the form base^x where x is the attempt number.
// Delays start at the minimum duration and after each attempt delay = rand(min, delay * base), bounded by the max
// See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for more information
func NewExponentialDecorrelatedJitter(min, max time.Duration, base float64, rng *rand.Rand) BackoffFactory {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return func() BackoffStrategy {
return &exponentialDecorrelatedJitter{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
base: base,
}
}
}

type exponentialDecorrelatedJitter struct {
randomizedBackoff
base float64
lastDelay time.Duration
}

func (b *exponentialDecorrelatedJitter) Delay() time.Duration {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
if b.lastDelay < b.min {
b.lastDelay = b.min
return b.lastDelay
}

nextMax := int64(float64(b.lastDelay) * b.base)
b.lastDelay = boundedDuration(time.Duration(b.rng.Int63n(nextMax-int64(b.min)))+b.min, b.min, b.max)
return b.lastDelay
}

func (b *exponentialDecorrelatedJitter) Reset() { b.lastDelay = 0 }
125 changes: 125 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package discovery

import (
"math/rand"
"testing"
"time"
)

func checkDelay(bkf BackoffStrategy, expected time.Duration, t *testing.T) {
t.Helper()
if calculated := bkf.Delay(); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}

func TestFixedBackoff(t *testing.T) {
startDelay := time.Second
delay := startDelay

bkf := NewFixedBackoff(delay)
delay *= 2
b1 := bkf()
delay *= 2
b2 := bkf()

if b1.Delay() != startDelay || b2.Delay() != startDelay {
t.Fatal("incorrect delay time")
}

if b1.Delay() != startDelay {
t.Fatal("backoff is stateful")
}

if b1.Reset(); b1.Delay() != startDelay {
t.Fatalf("Reset does something")
}
}

func TestPolynomialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Second || b2.Delay() != time.Second {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*5500, t)
checkDelay(b1, time.Millisecond*16500, t)
checkDelay(b1, time.Millisecond*33000, t)
checkDelay(b2, time.Millisecond*5500, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*5500, t)
}

func TestExponentialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewExponentialBackoff(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Millisecond*650 || b2.Delay() != time.Millisecond*650 {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*1100, t)
checkDelay(b1, time.Millisecond*1850, t)
checkDelay(b1, time.Millisecond*2975, t)
checkDelay(b1, time.Microsecond*4662500, t)
checkDelay(b1, time.Second*7, t)
checkDelay(b2, time.Millisecond*1100, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*1100, t)
}

func minMaxJitterTest(jitter Jitter, t *testing.T) {
rng := rand.New(rand.NewSource(0))
if jitter(time.Nanosecond, time.Hour*10, time.Hour*20, rng) < time.Hour*10 {
t.Fatal("Min not working")
}
if jitter(time.Hour, time.Nanosecond, time.Nanosecond*10, rng) > time.Nanosecond*10 {
t.Fatal("Max not working")
}
}

func TestNoJitter(t *testing.T) {
minMaxJitterTest(NoJitter, t)
for i := 0; i < 10; i++ {
expected := time.Second * time.Duration(i)
if calculated := NoJitter(expected, time.Duration(0), time.Second*100, nil); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}
}

func TestFullJitter(t *testing.T) {
rng := rand.New(rand.NewSource(0))
minMaxJitterTest(FullJitter, t)
const numBuckets = 51
const multiplier = 10
const threshold = 20

histogram := make([]int, numBuckets)

for i := 0; i < (numBuckets-1)*multiplier; i++ {
started := time.Nanosecond * 50
calculated := FullJitter(started, 0, 100, rng)
histogram[calculated]++
}

for _, count := range histogram {
if count > threshold {
t.Fatal("jitter is not close to evenly spread")
}
}

if histogram[numBuckets-1] > 0 {
t.Fatal("jitter increased overall time")
}
}
Loading