diff --git a/p2p/discovery/backoff/backoffcache.go b/p2p/discovery/backoff/backoffcache.go index 7d238195a5..9aacfdadc7 100644 --- a/p2p/discovery/backoff/backoffcache.go +++ b/p2p/discovery/backoff/backoffcache.go @@ -21,6 +21,8 @@ type BackoffDiscovery struct { parallelBufSz int returnedBufSz int + + clock clock } type BackoffDiscoveryOption func(*BackoffDiscovery) error @@ -33,6 +35,8 @@ func NewBackoffDiscovery(disc discovery.Discovery, stratFactory BackoffFactory, parallelBufSz: 32, returnedBufSz: 32, + + clock: realClock{}, } for _, opt := range opts { @@ -68,6 +72,24 @@ func WithBackoffDiscoveryReturnedChannelSize(size int) BackoffDiscoveryOption { } } +type clock interface { + Now() time.Time +} + +type realClock struct{} + +func (c realClock) Now() time.Time { + return time.Now() +} + +// withClock lets you override the default time.Now() call. Useful for tests. +func withClock(c clock) BackoffDiscoveryOption { + return func(b *BackoffDiscovery) error { + b.clock = c + return nil + } +} + type backoffCache struct { // strat is assigned on creation and not written to strat BackoffStrategy @@ -78,6 +100,8 @@ type backoffCache struct { peers map[peer.ID]peer.AddrInfo sendingChs map[chan peer.AddrInfo]int ongoing bool + + clock clock } func (d *BackoffDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { @@ -112,6 +136,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis peers: make(map[peer.ID]peer.AddrInfo), sendingChs: make(map[chan peer.AddrInfo]int), strat: d.stratFactory(), + clock: d.clock, } d.peerCacheMux.Lock() @@ -128,7 +153,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis c.mux.Lock() defer c.mux.Unlock() - timeExpired := time.Now().After(c.nextDiscover) + timeExpired := d.clock.Now().After(c.nextDiscover) // If it's not yet time to search again and no searches are in progress then return cached peers if !(timeExpired || c.ongoing) { @@ -180,19 +205,19 @@ func findPeerDispatcher(ctx context.Context, c *backoffCache, pch <-chan peer.Ad defer func() { c.mux.Lock() - for ch := range c.sendingChs { - close(ch) - } - // If the peer addresses have changed reset the backoff if checkUpdates(c.prevPeers, c.peers) { c.strat.Reset() c.prevPeers = c.peers } - c.nextDiscover = time.Now().Add(c.strat.Delay()) + c.nextDiscover = c.clock.Now().Add(c.strat.Delay()) c.ongoing = false c.peers = make(map[peer.ID]peer.AddrInfo) + + for ch := range c.sendingChs { + close(ch) + } c.sendingChs = make(map[chan peer.AddrInfo]int) c.mux.Unlock() }() @@ -221,13 +246,9 @@ func findPeerDispatcher(ctx context.Context, c *backoffCache, pch <-chan peer.Ad c.peers[ai.ID] = sendAi for ch, rem := range c.sendingChs { - ch <- sendAi - if rem == 1 { - close(ch) - delete(c.sendingChs, ch) - break - } else if rem > 0 { - rem-- + if rem > 0 { + ch <- sendAi + c.sendingChs[ch] = rem - 1 } } diff --git a/p2p/discovery/backoff/backoffcache_test.go b/p2p/discovery/backoff/backoffcache_test.go index 2d674c7438..15b998af3f 100644 --- a/p2p/discovery/backoff/backoffcache_test.go +++ b/p2p/discovery/backoff/backoffcache_test.go @@ -3,7 +3,6 @@ package backoff import ( "context" "math/rand" - "os" "testing" "time" @@ -13,18 +12,14 @@ import ( "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/peer" -) -func scaleDuration(t time.Duration) time.Duration { - if os.Getenv("CI") != "" { - return 3 * t - } - return t -} + mockClock "github.com/benbjohnson/clock" +) type delayedDiscovery struct { disc discovery.Discovery delay time.Duration + clock *mockClock.Mock } func (d *delayedDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { @@ -38,11 +33,25 @@ func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis } ch := make(chan peer.AddrInfo, 32) + doneCh := make(chan struct{}) go func() { defer close(ch) + defer close(doneCh) for ai := range dch { ch <- ai - time.Sleep(d.delay) + d.clock.Sleep(d.delay) + } + }() + + // Tick the clock forward to advance the sleep above + go func() { + for { + select { + case <-doneCh: + return + default: + d.clock.Add(d.delay) + } } }() @@ -75,7 +84,8 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discServer := mocks.NewDiscoveryServer() + clock := mockClock.NewMock() + discServer := mocks.NewDiscoveryServer(clock) h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) defer h1.Close() @@ -85,15 +95,15 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) { d2 := mocks.NewDiscoveryClient(h2, discServer) bkf := NewExponentialBackoff( - scaleDuration(time.Millisecond*100), - scaleDuration(time.Second*10), + time.Millisecond*100, + time.Second*10, NoJitter, - scaleDuration(time.Millisecond*100), + time.Millisecond*100, 2.5, 0, rand.NewSource(0), ) - dCache, err := NewBackoffDiscovery(d1, bkf) + dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock)) if err != nil { t.Fatal(err) } @@ -102,22 +112,27 @@ func TestBackoffDiscoverySingleBackoff(t *testing.T) { // try adding a peer then find it d1.Advertise(ctx, ns, discovery.TTL(time.Hour)) + // Advance clock by one step + clock.Add(1) assertNumPeers(t, ctx, dCache, ns, 1) // add a new peer and make sure it is still hidden by the caching layer d2.Advertise(ctx, ns, discovery.TTL(time.Hour)) + // Advance clock by one step + clock.Add(1) assertNumPeers(t, ctx, dCache, ns, 1) // wait for cache to expire and check for the new peer - time.Sleep(scaleDuration(time.Millisecond * 110)) + clock.Add(time.Millisecond * 110) assertNumPeers(t, ctx, dCache, ns, 2) } func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { + clock := mockClock.NewMock() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discServer := mocks.NewDiscoveryServer() + discServer := mocks.NewDiscoveryServer(clock) h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) defer h1.Close() @@ -128,15 +143,15 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { // Startup delay is 0ms. First backoff after finding data is 100ms, second backoff is 250ms. bkf := NewExponentialBackoff( - scaleDuration(time.Millisecond*100), - scaleDuration(time.Second*10), + time.Millisecond*100, + time.Second*10, NoJitter, - scaleDuration(time.Millisecond*100), + time.Millisecond*100, 2.5, 0, rand.NewSource(0), ) - dCache, err := NewBackoffDiscovery(d1, bkf) + dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock)) if err != nil { t.Fatal(err) } @@ -144,28 +159,30 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { const ns = "test" // try adding a peer then find it - d1.Advertise(ctx, ns, discovery.TTL(scaleDuration(time.Hour))) + d1.Advertise(ctx, ns, discovery.TTL(time.Hour)) + // Advance clock by one step + clock.Add(1) assertNumPeers(t, ctx, dCache, ns, 1) // wait a little to make sure the extra request doesn't modify the backoff - time.Sleep(scaleDuration(time.Millisecond * 50)) // 50 < 100 + clock.Add(time.Millisecond * 50) // 50 < 100 assertNumPeers(t, ctx, dCache, ns, 1) // wait for backoff to expire and check if we increase it - time.Sleep(scaleDuration(time.Millisecond * 60)) // 50+60 > 100 + clock.Add(time.Millisecond * 60) // 50+60 > 100 assertNumPeers(t, ctx, dCache, ns, 1) - d2.Advertise(ctx, ns, discovery.TTL(scaleDuration(time.Millisecond*400))) + d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400)) - time.Sleep(scaleDuration(time.Millisecond * 150)) // 150 < 250 + clock.Add(time.Millisecond * 150) // 150 < 250 assertNumPeers(t, ctx, dCache, ns, 1) - time.Sleep(scaleDuration(time.Millisecond * 150)) // 150 + 150 > 250 + clock.Add(time.Millisecond * 150) // 150 + 150 > 250 assertNumPeers(t, ctx, dCache, ns, 2) // check that the backoff has been reset // also checks that we can decrease our peer count (i.e. not just growing a set) - time.Sleep(scaleDuration(time.Millisecond * 110)) // 110 > 100, also 150+150+110>400 + clock.Add(time.Millisecond * 110) // 110 > 100, also 150+150+110>400 assertNumPeers(t, ctx, dCache, ns, 1) } @@ -173,7 +190,8 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discServer := mocks.NewDiscoveryServer() + clock := mockClock.NewMock() + discServer := mocks.NewDiscoveryServer(clock) // Testing with n larger than most internal buffer sizes (32) n := 40 @@ -185,10 +203,10 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { advertisers[i] = mocks.NewDiscoveryClient(h, discServer) } - d1 := &delayedDiscovery{advertisers[0], scaleDuration(time.Millisecond * 10)} + d1 := &delayedDiscovery{advertisers[0], time.Millisecond * 10, clock} - bkf := NewFixedBackoff(scaleDuration(time.Millisecond * 200)) - dCache, err := NewBackoffDiscovery(d1, bkf) + bkf := NewFixedBackoff(time.Millisecond * 200) + dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock)) if err != nil { t.Fatal(err) } @@ -200,6 +218,8 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { t.Fatal(err) } } + // Advance clock by one step + clock.Add(1) ch1, err := dCache.FindPeers(ctx, ns) if err != nil { @@ -232,7 +252,8 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - discServer := mocks.NewDiscoveryServer() + clock := mockClock.NewMock() + discServer := mocks.NewDiscoveryServer(clock) // Testing with n larger than most internal buffer sizes (32) n := 40 @@ -247,10 +268,10 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) { h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) d1 := mocks.NewDiscoveryClient(h1, discServer) - discoveryInterval := scaleDuration(time.Millisecond * 10) + discoveryInterval := time.Millisecond * 10 bkf := NewFixedBackoff(discoveryInterval) - dCache, err := NewBackoffDiscovery(d1, bkf) + dCache, err := NewBackoffDiscovery(d1, bkf, withClock(clock)) if err != nil { t.Fatal(err) } @@ -261,6 +282,8 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) { for i := 0; i < n; i++ { advertisers[i].Advertise(ctx, ns, discovery.TTL(time.Hour)) } + // Advance clock by one step + clock.Add(1) // Request all peers, all will be present assertNumPeersWithLimit(t, ctx, dCache, ns, n, n) @@ -269,7 +292,7 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) { assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1) // Wait a little time but don't allow cache to expire - time.Sleep(discoveryInterval / 10) + clock.Add(discoveryInterval / 10) // Request peers with a lower limit this time using cache // Here we are testing that the cache logic does not block when there are more peers known than the limit requested @@ -277,7 +300,7 @@ func TestBackoffDiscoveryCacheCapacity(t *testing.T) { assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1) // Wait for next discovery so next request will bypass cache - time.Sleep(scaleDuration(time.Millisecond * 100)) + clock.Add(time.Millisecond * 100) // Ask for all peers again assertNumPeersWithLimit(t, ctx, dCache, ns, n, n) diff --git a/p2p/discovery/mocks/mocks.go b/p2p/discovery/mocks/mocks.go index 1cf97dd60b..8b31ead06e 100644 --- a/p2p/discovery/mocks/mocks.go +++ b/p2p/discovery/mocks/mocks.go @@ -10,9 +10,14 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +type clock interface { + Now() time.Time +} + type MockDiscoveryServer struct { - mx sync.Mutex - db map[string]map[peer.ID]*discoveryRegistration + mx sync.Mutex + db map[string]map[peer.ID]*discoveryRegistration + clock clock } type discoveryRegistration struct { @@ -20,9 +25,10 @@ type discoveryRegistration struct { expiration time.Time } -func NewDiscoveryServer() *MockDiscoveryServer { +func NewDiscoveryServer(clock clock) *MockDiscoveryServer { return &MockDiscoveryServer{ - db: make(map[string]map[peer.ID]*discoveryRegistration), + db: make(map[string]map[peer.ID]*discoveryRegistration), + clock: clock, } } @@ -35,7 +41,7 @@ func (s *MockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time. peers = make(map[peer.ID]*discoveryRegistration) s.db[ns] = peers } - peers[info.ID] = &discoveryRegistration{info, time.Now().Add(ttl)} + peers[info.ID] = &discoveryRegistration{info, s.clock.Now().Add(ttl)} return ttl, nil } @@ -55,7 +61,7 @@ func (s *MockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrI count = limit } - iterTime := time.Now() + iterTime := s.clock.Now() ch := make(chan peer.AddrInfo, count) numSent := 0 for p, reg := range peers { diff --git a/p2p/discovery/routing/routing_test.go b/p2p/discovery/routing/routing_test.go index 3f88297237..d24a5818e6 100644 --- a/p2p/discovery/routing/routing_test.go +++ b/p2p/discovery/routing/routing_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/p2p/discovery/mocks" "github.com/libp2p/go-libp2p/p2p/discovery/util" bhost "github.com/libp2p/go-libp2p/p2p/host/blank" @@ -115,7 +116,8 @@ func TestDiscoveryRouting(t *testing.T) { h1 := bhost.NewBlankHost(swarmt.GenSwarm(t)) h2 := bhost.NewBlankHost(swarmt.GenSwarm(t)) - dserver := mocks.NewDiscoveryServer() + clock := clock.NewMock() + dserver := mocks.NewDiscoveryServer(clock) d1 := mocks.NewDiscoveryClient(h1, dserver) d2 := mocks.NewDiscoveryClient(h2, dserver)