diff --git a/backoff.go b/backoff.go index ecd9470..384a95c 100644 --- a/backoff.go +++ b/backoff.go @@ -3,6 +3,7 @@ package discovery import ( "math" "math/rand" + "sync" "time" ) @@ -90,7 +91,8 @@ func (b *fixedBackoff) Reset() {} // timeUnits are the units of time the polynomial is evaluated in // polyCoefs is the array of polynomial coefficients from [c0, c1, ... cn] func NewPolynomialBackoff(min, max time.Duration, jitter Jitter, - timeUnits time.Duration, polyCoefs []float64, rng *rand.Rand) BackoffFactory { + timeUnits time.Duration, polyCoefs []float64, rngSrc rand.Source) BackoffFactory { + rng := rand.New(&lockedSource{src: rngSrc}) return func() BackoffStrategy { return &polynomialBackoff{ attemptBackoff: attemptBackoff{ @@ -138,7 +140,8 @@ func (b *polynomialBackoff) Delay() time.Duration { // jitter is the function for adding randomness around the backoff // timeUnits are the units of time the base^x is evaluated in func NewExponentialBackoff(min, max time.Duration, jitter Jitter, - timeUnits time.Duration, base float64, offset time.Duration, rng *rand.Rand) BackoffFactory { + timeUnits time.Duration, base float64, offset time.Duration, rngSrc rand.Source) BackoffFactory { + rng := rand.New(&lockedSource{src: rngSrc}) return func() BackoffStrategy { return &exponentialBackoff{ attemptBackoff: attemptBackoff{ @@ -173,7 +176,8 @@ func (b *exponentialBackoff) Delay() time.Duration { // NewExponentialDecorrelatedJitter creates a BackoffFactory with backoff of the roughly of the form base^x where x is the attempt number. // Delays start at the minimum duration and after each attempt delay = rand(min, delay * base), bounded by the max // See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for more information -func NewExponentialDecorrelatedJitter(min, max time.Duration, base float64, rng *rand.Rand) BackoffFactory { +func NewExponentialDecorrelatedJitter(min, max time.Duration, base float64, rngSrc rand.Source) BackoffFactory { + rng := rand.New(&lockedSource{src: rngSrc}) return func() BackoffStrategy { return &exponentialDecorrelatedJitter{ randomizedBackoff: randomizedBackoff{ @@ -204,3 +208,21 @@ func (b *exponentialDecorrelatedJitter) Delay() time.Duration { } func (b *exponentialDecorrelatedJitter) Reset() { b.lastDelay = 0 } + +type lockedSource struct { + lk sync.Mutex + src rand.Source +} + +func (r *lockedSource) Int63() (n int64) { + r.lk.Lock() + n = r.src.Int63() + r.lk.Unlock() + return +} + +func (r *lockedSource) Seed(seed int64) { + r.lk.Lock() + r.src.Seed(seed) + r.lk.Unlock() +} diff --git a/backoff_test.go b/backoff_test.go index 655d720..1d0f0fc 100644 --- a/backoff_test.go +++ b/backoff_test.go @@ -1,6 +1,8 @@ package discovery import ( + "fmt" + "golang.org/x/sync/errgroup" "math/rand" "testing" "time" @@ -37,8 +39,7 @@ func TestFixedBackoff(t *testing.T) { } func TestPolynomialBackoff(t *testing.T) { - rng := rand.New(rand.NewSource(0)) - bkf := NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rng) + bkf := NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rand.NewSource(0)) b1 := bkf() b2 := bkf() @@ -57,8 +58,7 @@ func TestPolynomialBackoff(t *testing.T) { } func TestExponentialBackoff(t *testing.T) { - rng := rand.New(rand.NewSource(0)) - bkf := NewExponentialBackoff(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rng) + bkf := NewExponentialBackoff(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rand.NewSource(0)) b1 := bkf() b2 := bkf() @@ -123,3 +123,69 @@ func TestFullJitter(t *testing.T) { t.Fatal("jitter increased overall time") } } + +func TestManyBackoffFactory(t *testing.T) { + rngSource := rand.NewSource(0) + concurrent := 10 + + t.Run("Exponential", func(t *testing.T) { + testManyBackoffFactoryHelper(concurrent, + NewExponentialBackoff(time.Millisecond*650, time.Second*7, FullJitter, time.Second, 1.5, -time.Millisecond*400, rngSource), + ) + }) + t.Run("Polynomial", func(t *testing.T) { + testManyBackoffFactoryHelper(concurrent, + NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rngSource), + ) + }) + t.Run("Fixed", func(t *testing.T) { + testManyBackoffFactoryHelper(concurrent, + NewFixedBackoff(time.Second), + ) + }) +} + +func testManyBackoffFactoryHelper(concurrent int, bkf BackoffFactory) { + backoffCh := make(chan BackoffStrategy, concurrent) + + errGrp := errgroup.Group{} + for i := 0; i < concurrent; i++ { + errGrp.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic %v", r) + } + }() + backoffCh <- bkf() + return + }) + } + if err := errGrp.Wait(); err != nil { + panic(err) + } + close(backoffCh) + + errGrp = errgroup.Group{} + for b := range backoffCh { + backoff := b + errGrp.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic %v", r) + } + }() + + for i := 0; i < 5; i++ { + for j := 0; j < 10; j++ { + backoff.Delay() + } + backoff.Reset() + } + return + }) + } + + if err := errGrp.Wait(); err != nil { + panic(err) + } +} diff --git a/backoffcache_test.go b/backoffcache_test.go index 5d7e5dd..7b0621d 100644 --- a/backoffcache_test.go +++ b/backoffcache_test.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "math/rand" "testing" "time" @@ -67,7 +68,7 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) { d2 := &mockDiscoveryClient{h2, discServer} bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter, - time.Millisecond*100, 2.5, 0, nil) + time.Millisecond*100, 2.5, 0, rand.NewSource(0)) dCache, err := NewBackoffDiscovery(d1, bkf) if err != nil { t.Fatal(err) @@ -101,7 +102,7 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { // Startup delay is 0ms. First backoff after finding data is 100ms, second backoff is 250ms. bkf := NewExponentialBackoff(time.Millisecond*100, time.Second*10, NoJitter, - time.Millisecond*100, 2.5, 0, nil) + time.Millisecond*100, 2.5, 0, rand.NewSource(0)) dCache, err := NewBackoffDiscovery(d1, bkf) if err != nil { t.Fatal(err) diff --git a/go.mod b/go.mod index a9496aa..18b142d 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/libp2p/go-libp2p-peerstore v0.2.2 github.com/libp2p/go-libp2p-swarm v0.2.3 github.com/multiformats/go-multihash v0.0.13 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 ) go 1.13 diff --git a/go.sum b/go.sum index 38e4ee9..8b49f57 100644 --- a/go.sum +++ b/go.sum @@ -371,6 +371,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6Zh golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=