From 393cf505c6af94ee205b20c54d9520e4575e068e Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 6 Sep 2019 16:43:45 -0400 Subject: [PATCH] added initial backoff cache discovery --- backoff.go | 181 ++++++++++++++++++++++++++ backoff_test.go | 125 ++++++++++++++++++ backoffcache.go | 257 +++++++++++++++++++++++++++++++++++++ backoffcache_test.go | 297 +++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + 5 files changed, 861 insertions(+) create mode 100644 backoff.go create mode 100644 backoff_test.go create mode 100644 backoffcache.go create mode 100644 backoffcache_test.go diff --git a/backoff.go b/backoff.go new file mode 100644 index 0000000..15dd780 --- /dev/null +++ b/backoff.go @@ -0,0 +1,181 @@ +package discovery + +import ( + "math" + "math/rand" + "time" +) + +type BackoffFactory func() BackoffStrategy + +type BackoffStrategy interface { + Delay() time.Duration + Reset() +} + +// Jitter implementations taken roughly from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + +// Jitter must return a duration between min and max. Min must be lower than, or equal to, max. +type Jitter func(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration + +func FullJitter(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration { + if duration <= min { + return min + } + + normalizedDur := boundedDuration(duration, min, max) - min + + return boundedDuration(time.Duration(rng.Int63n(int64(normalizedDur)))+min, min, max) +} + +func NoJitter(duration time.Duration, min time.Duration, max time.Duration, rng *rand.Rand) time.Duration { + return boundedDuration(duration, min, max) +} + +type randomizedBackoff struct { + min time.Duration + max time.Duration + rng *rand.Rand +} + +func (b *randomizedBackoff) BoundedDelay(duration time.Duration) time.Duration { + return boundedDuration(duration, b.min, b.max) +} + +func boundedDuration(d time.Duration, min time.Duration, max time.Duration) time.Duration { + if d < min { + return min + } + if d > max { + return max + } + return d +} + +type attemptBackoff struct { + attempt int + jitter Jitter + randomizedBackoff +} + +func (b *attemptBackoff) Reset() { + b.attempt = 0 +} + +func NewFixedBackoffFactory(delay time.Duration) BackoffFactory { + return func() BackoffStrategy { + return &fixedBackoff{delay: delay} + } +} + +type fixedBackoff struct { + delay time.Duration +} + +func (b *fixedBackoff) Delay() time.Duration { + return b.delay +} + +func (b *fixedBackoff) Reset() {} + +func NewPolynomialBackoffFactory(min, max time.Duration, jitter Jitter, + timeUnits time.Duration, polyCoefs []float64, rng *rand.Rand) BackoffFactory { + return func() BackoffStrategy { + return &polynomialBackoff{ + attemptBackoff: attemptBackoff{ + randomizedBackoff: randomizedBackoff{ + min: min, + max: max, + rng: rng, + }, + jitter: jitter, + }, + timeUnits: timeUnits, + poly: polyCoefs, + } + } +} + +type polynomialBackoff struct { + attemptBackoff + timeUnits time.Duration + poly []float64 +} + +func (b *polynomialBackoff) Delay() time.Duration { + polySum := b.poly[0] + exp := 1 + attempt := b.attempt + b.attempt++ + + for _, c := range b.poly[1:] { + exp *= attempt + polySum += float64(exp) * c + } + return b.jitter(time.Duration(float64(b.timeUnits)*polySum), b.min, b.max, b.rng) +} + +func NewExponentialBackoffFactory(min, max time.Duration, jitter Jitter, + timeUnits time.Duration, base float64, offset time.Duration, rng *rand.Rand) BackoffFactory { + return func() BackoffStrategy { + return &exponentialBackoff{ + attemptBackoff: attemptBackoff{ + randomizedBackoff: randomizedBackoff{ + min: min, + max: max, + rng: rng, + }, + jitter: jitter, + }, + timeUnits: timeUnits, + base: base, + offset: offset, + } + } +} + +type exponentialBackoff struct { + attemptBackoff + timeUnits time.Duration + base float64 + offset time.Duration +} + +func (b *exponentialBackoff) Delay() time.Duration { + attempt := b.attempt + b.attempt++ + return b.jitter( + time.Duration(math.Pow(b.base, float64(attempt))*float64(b.timeUnits))+b.offset, b.min, b.max, b.rng) +} + +func NewExponentialDecorrelatedJitterFactory(min, max time.Duration, base float64, rng *rand.Rand) BackoffFactory { + return func() BackoffStrategy { + return &exponentialDecorrelatedJitter{ + randomizedBackoff: randomizedBackoff{ + min: min, + max: max, + rng: rng, + }, + base: base, + } + } +} + +type exponentialDecorrelatedJitter struct { + randomizedBackoff + base float64 + lastDelay time.Duration +} + +func (b *exponentialDecorrelatedJitter) Delay() time.Duration { + if b.lastDelay < b.min { + b.lastDelay = b.min + return b.lastDelay + } + + nextMax := int64(float64(b.lastDelay) * b.base) + b.lastDelay = boundedDuration(time.Duration(b.rng.Int63n(nextMax-int64(b.min)))+b.min, b.min, b.max) + return b.lastDelay +} + +func (b *exponentialDecorrelatedJitter) Reset() { b.lastDelay = 0 } diff --git a/backoff_test.go b/backoff_test.go new file mode 100644 index 0000000..45cbb52 --- /dev/null +++ b/backoff_test.go @@ -0,0 +1,125 @@ +package discovery + +import ( + "math/rand" + "testing" + "time" +) + +func checkDelay(bkf BackoffStrategy, expected time.Duration, t *testing.T) { + t.Helper() + if calculated := bkf.Delay(); calculated != expected { + t.Fatalf("expected %v, got %v", expected, calculated) + } +} + +func TestFixedBackoff(t *testing.T) { + startDelay := time.Second + delay := startDelay + + bkf := NewFixedBackoffFactory(delay) + delay *= 2 + b1 := bkf() + delay *= 2 + b2 := bkf() + + if b1.Delay() != startDelay || b2.Delay() != startDelay { + t.Fatal("incorrect delay time") + } + + if b1.Delay() != startDelay { + t.Fatal("backoff is stateful") + } + + if b1.Reset(); b1.Delay() != startDelay { + t.Fatalf("Reset does something") + } +} + +func TestPolynomialBackoff(t *testing.T) { + rng := rand.New(rand.NewSource(0)) + bkf := NewPolynomialBackoffFactory(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rng) + b1 := bkf() + b2 := bkf() + + if b1.Delay() != time.Second || b2.Delay() != time.Second { + t.Fatal("incorrect delay time") + } + + checkDelay(b1, time.Millisecond*5500, t) + checkDelay(b1, time.Millisecond*16500, t) + checkDelay(b1, time.Millisecond*33000, t) + checkDelay(b2, time.Millisecond*5500, t) + + b1.Reset() + b1.Delay() + checkDelay(b1, time.Millisecond*5500, t) +} + +func TestExponentialBackoff(t *testing.T) { + rng := rand.New(rand.NewSource(0)) + bkf := NewExponentialBackoffFactory(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rng) + b1 := bkf() + b2 := bkf() + + if b1.Delay() != time.Millisecond*650 || b2.Delay() != time.Millisecond*650 { + t.Fatal("incorrect delay time") + } + + checkDelay(b1, time.Millisecond*1100, t) + checkDelay(b1, time.Millisecond*1850, t) + checkDelay(b1, time.Millisecond*2975, t) + checkDelay(b1, time.Microsecond*4662500, t) + checkDelay(b1, time.Second*7, t) + checkDelay(b2, time.Millisecond*1100, t) + + b1.Reset() + b1.Delay() + checkDelay(b1, time.Millisecond*1100, t) +} + +func minMaxJitterTest(jitter Jitter, t *testing.T) { + rng := rand.New(rand.NewSource(0)) + if jitter(time.Nanosecond, time.Hour*10, time.Hour*20, rng) < time.Hour*10 { + t.Fatal("Min not working") + } + if jitter(time.Hour, time.Nanosecond, time.Nanosecond*10, rng) > time.Nanosecond*10 { + t.Fatal("Max not working") + } +} + +func TestNoJitter(t *testing.T) { + minMaxJitterTest(NoJitter, t) + for i := 0; i < 10; i++ { + expected := time.Second * time.Duration(i) + if calculated := NoJitter(expected, time.Duration(0), time.Second*100, nil); calculated != expected { + t.Fatalf("expected %v, got %v", expected, calculated) + } + } +} + +func TestFullJitter(t *testing.T) { + rng := rand.New(rand.NewSource(0)) + minMaxJitterTest(FullJitter, t) + const numBuckets = 51 + const multiplier = 10 + const threshold = 20 + + histogram := make([]int, numBuckets) + + for i := 0; i < (numBuckets-1)*multiplier; i++ { + started := time.Nanosecond * 50 + calculated := FullJitter(started, 0, 100, rng) + histogram[calculated]++ + } + + for _, count := range histogram { + if count > threshold { + t.Fatal("jitter is not close to evenly spread") + } + } + + if histogram[numBuckets-1] > 0 { + t.Fatal("jitter increased overall time") + } +} diff --git a/backoffcache.go b/backoffcache.go new file mode 100644 index 0000000..89651d3 --- /dev/null +++ b/backoffcache.go @@ -0,0 +1,257 @@ +package discovery + +import ( + "context" + "sync" + "time" + + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-peerstore/addr" +) + +// BackoffDiscovery is an implementation of discovery that caches peer data and attenuates repeated queries +type BackoffDiscovery struct { + disc discovery.Discovery + strat BackoffFactory + peerCache map[string]*backoffCache + peerCacheMux sync.RWMutex +} + +func NewBackoffDiscovery(disc discovery.Discovery, strat BackoffFactory) (discovery.Discovery, error) { + return &BackoffDiscovery{ + disc: disc, + strat: strat, + peerCache: make(map[string]*backoffCache), + }, nil +} + +type backoffCache struct { + nextDiscover time.Time + prevPeers map[peer.ID]peer.AddrInfo + + peers map[peer.ID]peer.AddrInfo + sendingChs map[chan peer.AddrInfo]int + + ongoing bool + strat BackoffStrategy + mux sync.Mutex +} + +func (d *BackoffDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + return d.disc.Advertise(ctx, ns, opts...) +} + +func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + // Get options + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return nil, err + } + + // Get cached peers + d.peerCacheMux.RLock() + c, ok := d.peerCache[ns] + d.peerCacheMux.RUnlock() + + /* + Overall plan: + If it's time to look for peers, look for peers, then return them + If it's not time then return cache + If it's time to look for peers, but we have already started looking. Get up to speed with ongoing request + */ + + // Setup cache if we don't have one yet + if !ok { + pc := &backoffCache{ + nextDiscover: time.Time{}, + prevPeers: make(map[peer.ID]peer.AddrInfo), + peers: make(map[peer.ID]peer.AddrInfo), + sendingChs: make(map[chan peer.AddrInfo]int), + strat: d.strat(), + } + d.peerCacheMux.Lock() + c, ok = d.peerCache[ns] + + if !ok { + d.peerCache[ns] = pc + c = pc + } + + d.peerCacheMux.Unlock() + } + + c.mux.Lock() + defer c.mux.Unlock() + + findPeers := !ok + timeExpired := false + if !findPeers { + timeExpired = time.Now().After(c.nextDiscover) + findPeers = timeExpired && !c.ongoing + } + + // If we should find peers then setup a dispatcher channel for dispatching incoming peers + if findPeers { + pch, err := d.disc.FindPeers(ctx, ns, opts...) + if err != nil { + return nil, err + } + + c.ongoing = true + + go func() { + defer func() { + c.mux.Lock() + + for ch := range c.sendingChs { + close(ch) + } + + // If the peer addresses have changed reset the backoff + if checkUpdates(c.prevPeers, c.peers) { + c.strat.Reset() + c.prevPeers = c.peers + } + c.nextDiscover = time.Now().Add(c.strat.Delay()) + + c.ongoing = false + c.peers = make(map[peer.ID]peer.AddrInfo) + c.sendingChs = make(map[chan peer.AddrInfo]int) + c.mux.Unlock() + }() + + for { + select { + case ai, ok := <-pch: + if !ok { + return + } + c.mux.Lock() + + // If we receive the same peer multiple times return the address union + var sendAi peer.AddrInfo + if prevAi, ok := c.peers[ai.ID]; ok { + if combinedAi := mergeAddrInfos(prevAi, ai); combinedAi != nil { + sendAi = *combinedAi + } else { + c.mux.Unlock() + continue + } + } else { + sendAi = ai + } + + c.peers[ai.ID] = sendAi + + for ch, rem := range c.sendingChs { + ch <- sendAi + if rem == 1 { + close(ch) + delete(c.sendingChs, ch) + break + } else if rem > 0 { + rem-- + } + } + + c.mux.Unlock() + case <-ctx.Done(): + return + } + } + }() + // If it's not yet time to search again then return cached peers + } else if !timeExpired { + chLen := options.Limit + + if chLen == 0 { + chLen = len(c.prevPeers) + } else if chLen > len(c.prevPeers) { + chLen = len(c.prevPeers) + } + pch := make(chan peer.AddrInfo, chLen) + for _, ai := range c.prevPeers { + pch <- ai + } + close(pch) + return pch, nil + } + + // Setup receiver channel for receiving peers from ongoing requests + + evtCh := make(chan peer.AddrInfo, 32) + pch := make(chan peer.AddrInfo, 8) + rcvPeers := make([]peer.AddrInfo, 0, 32) + for _, ai := range c.peers { + rcvPeers = append(rcvPeers, ai) + } + c.sendingChs[evtCh] = options.Limit + + go func() { + defer close(pch) + + for { + select { + case ai, ok := <-evtCh: + if ok { + rcvPeers = append(rcvPeers, ai) + + sentAll := true + sendPeers: + for i, p := range rcvPeers { + select { + case pch <- p: + default: + rcvPeers = rcvPeers[i:] + sentAll = false + break sendPeers + } + } + if sentAll { + rcvPeers = []peer.AddrInfo{} + } + } else { + for _, p := range rcvPeers { + select { + case pch <- p: + case <-ctx.Done(): + return + } + } + return + } + case <-ctx.Done(): + return + } + } + }() + + return pch, nil +} + +func mergeAddrInfos(prevAi, newAi peer.AddrInfo) *peer.AddrInfo { + combinedAddrs := addr.UniqueSource(addr.Slice(prevAi.Addrs), addr.Slice(newAi.Addrs)).Addrs() + if len(combinedAddrs) > len(prevAi.Addrs) { + combinedAi := &peer.AddrInfo{ID: prevAi.ID, Addrs: combinedAddrs} + return combinedAi + } + return nil +} + +func checkUpdates(orig, update map[peer.ID]peer.AddrInfo) bool { + if len(orig) != len(update) { + return true + } + for p, ai := range update { + if prevAi, ok := orig[p]; ok { + if combinedAi := mergeAddrInfos(prevAi, ai); combinedAi != nil { + return true + } + } else { + return true + } + } + return false +} diff --git a/backoffcache_test.go b/backoffcache_test.go new file mode 100644 index 0000000..096e971 --- /dev/null +++ b/backoffcache_test.go @@ -0,0 +1,297 @@ +package discovery + +import ( + "context" + "sync" + "testing" + "time" + + bhost "github.com/libp2p/go-libp2p-blankhost" + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" +) + +type mockDiscoveryServer struct { + mx sync.Mutex + db map[string]map[peer.ID]*discoveryRegistration +} + +type discoveryRegistration struct { + info peer.AddrInfo + expiration time.Time +} + +func newDiscoveryServer() *mockDiscoveryServer { + return &mockDiscoveryServer{ + db: make(map[string]map[peer.ID]*discoveryRegistration), + } +} + +func (s *mockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time.Duration) (time.Duration, error) { + s.mx.Lock() + defer s.mx.Unlock() + + peers, ok := s.db[ns] + if !ok { + peers = make(map[peer.ID]*discoveryRegistration) + s.db[ns] = peers + } + peers[info.ID] = &discoveryRegistration{info, time.Now().Add(ttl)} + return ttl, nil +} + +func (s *mockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrInfo, error) { + s.mx.Lock() + defer s.mx.Unlock() + + peers, ok := s.db[ns] + if !ok || len(peers) == 0 { + emptyCh := make(chan peer.AddrInfo) + close(emptyCh) + return emptyCh, nil + } + + count := len(peers) + if limit != 0 && count > limit { + count = limit + } + + iterTime := time.Now() + ch := make(chan peer.AddrInfo, count) + numSent := 0 + for p, reg := range peers { + if numSent == count { + break + } + if iterTime.After(reg.expiration) { + delete(peers, p) + continue + } + + numSent++ + ch <- reg.info + } + close(ch) + + return ch, nil +} + +func (s *mockDiscoveryServer) hasPeerRecord(ns string, pid peer.ID) bool { + s.mx.Lock() + defer s.mx.Unlock() + + if peers, ok := s.db[ns]; ok { + _, ok := peers[pid] + return ok + } + return false +} + +type mockDiscoveryClient struct { + host host.Host + server *mockDiscoveryServer +} + +func (d *mockDiscoveryClient) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return 0, err + } + + return d.server.Advertise(ns, *host.InfoFromHost(d.host), options.Ttl) +} + +func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + var options discovery.Options + err := options.Apply(opts...) + if err != nil { + return nil, err + } + + return d.server.FindPeers(ns, options.Limit) +} + +type delayedDiscovery struct { + disc discovery.Discovery + delay time.Duration +} + +func (d *delayedDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { + return d.disc.Advertise(ctx, ns, opts...) +} + +func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { + dch, err := d.disc.FindPeers(ctx, ns, opts...) + if err != nil { + return nil, err + } + + ch := make(chan peer.AddrInfo, 32) + go func() { + defer close(ch) + for ai := range dch { + ch <- ai + time.Sleep(d.delay) + } + }() + + return ch, nil +} + +func assertNumPeers(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, count int) { + t.Helper() + peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(10)) + if err != nil { + t.Fatal(err) + } + + peerset := make(map[peer.ID]struct{}) + for p := range peerCh { + peerset[p.ID] = struct{}{} + } + + if len(peerset) != count { + t.Fatalf("Was supposed to find %d, found %d instead", count, len(peerset)) + } +} + +func TestBackoffDiscoverySingleBackoff(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + discServer := newDiscoveryServer() + + h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + d1 := &mockDiscoveryClient{h1, discServer} + d2 := &mockDiscoveryClient{h2, discServer} + + bkf := NewExponentialBackoffFactory(time.Millisecond*100, time.Second*10, NoJitter, + time.Millisecond*100, 2.5, 0, nil) + dCache, err := NewBackoffDiscovery(d1, bkf) + if err != nil { + t.Fatal(err) + } + + const ns = "test" + + // try adding a peer then find it + d1.Advertise(ctx, ns, discovery.TTL(time.Hour)) + assertNumPeers(t, ctx, dCache, ns, 1) + + // add a new peer and make sure it is still hidden by the caching layer + d2.Advertise(ctx, ns, discovery.TTL(time.Hour)) + assertNumPeers(t, ctx, dCache, ns, 1) + + // wait for cache to expire and check for the new peer + time.Sleep(time.Millisecond * 110) + assertNumPeers(t, ctx, dCache, ns, 2) +} + +func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + discServer := newDiscoveryServer() + + h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + d1 := &mockDiscoveryClient{h1, discServer} + d2 := &mockDiscoveryClient{h2, discServer} + + // Startup delay is 0ms. First backoff after finding data is 100ms, second backoff is 250ms. + bkf := NewExponentialBackoffFactory(time.Millisecond*100, time.Second*10, NoJitter, + time.Millisecond*100, 2.5, 0, nil) + dCache, err := NewBackoffDiscovery(d1, bkf) + if err != nil { + t.Fatal(err) + } + + const ns = "test" + + // try adding a peer then find it + d1.Advertise(ctx, ns, discovery.TTL(time.Hour)) + assertNumPeers(t, ctx, dCache, ns, 1) + + // wait a little to make sure the extra request doesn't modify the backoff + time.Sleep(time.Millisecond * 50) //50 < 100 + assertNumPeers(t, ctx, dCache, ns, 1) + + // wait for backoff to expire and check if we increase it + time.Sleep(time.Millisecond * 60) // 50+60 > 100 + assertNumPeers(t, ctx, dCache, ns, 1) + + d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400)) + + time.Sleep(time.Millisecond * 150) //150 < 250 + assertNumPeers(t, ctx, dCache, ns, 1) + + time.Sleep(time.Millisecond * 150) //150 + 150 > 250 + assertNumPeers(t, ctx, dCache, ns, 2) + + // check that the backoff has been reset + // also checks that we can decrease our peer count (i.e. not just growing a set) + time.Sleep(time.Millisecond * 110) //110 > 100, also 150+150+110>400 + assertNumPeers(t, ctx, dCache, ns, 1) +} + +func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + discServer := newDiscoveryServer() + + // Testing with n larger than most internal buffer sizes (32) + n := 40 + advertisers := make([]discovery.Discovery, n) + + for i := 0; i < n; i++ { + h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + advertisers[i] = &mockDiscoveryClient{h, discServer} + } + + d1 := &delayedDiscovery{advertisers[0], time.Millisecond * 10} + + bkf := NewFixedBackoffFactory(time.Millisecond * 200) + dCache, err := NewBackoffDiscovery(d1, bkf) + if err != nil { + t.Fatal(err) + } + + const ns = "test" + + for _, a := range advertisers { + if _, err := a.Advertise(ctx, ns, discovery.TTL(time.Hour)); err != nil { + t.Fatal(err) + } + } + + ch1, err := dCache.FindPeers(ctx, ns) + if err != nil { + t.Fatal(err) + } + + _ = <-ch1 + ch2, err := dCache.FindPeers(ctx, ns) + if err != nil { + t.Fatal(err) + } + + szCh2 := 0 + for ai := range ch2 { + _ = ai + szCh2++ + } + + szCh1 := 1 + for _ = range ch1 { + szCh1++ + } + + if szCh1 != n && szCh2 != n { + t.Fatalf("Channels returned %d, %d elements instead of %d", szCh1, szCh2, n) + } +} diff --git a/go.mod b/go.mod index f8ea495..4c5c436 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ require ( github.com/ipfs/go-log v0.0.1 github.com/libp2p/go-libp2p-blankhost v0.1.1 github.com/libp2p/go-libp2p-core v0.0.1 + github.com/libp2p/go-libp2p-peerstore v0.1.0 github.com/libp2p/go-libp2p-swarm v0.1.0 github.com/multiformats/go-multihash v0.0.5 )