Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
added configurable channel buffer sizes to the backoff cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Oct 13, 2019
1 parent f88f93e commit c839e68
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions backoffcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package discovery

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -16,14 +17,50 @@ type BackoffDiscovery struct {
stratFactory BackoffFactory
peerCache map[string]*backoffCache
peerCacheMux sync.RWMutex

parallelBufSz int
returnedBufSz int
}

func NewBackoffDiscovery(disc discovery.Discovery, stratFactory BackoffFactory) (discovery.Discovery, error) {
return &BackoffDiscovery{
type BackoffDiscoveryOption func(*BackoffDiscovery) error

func NewBackoffDiscovery(disc discovery.Discovery, stratFactory BackoffFactory, opts ...BackoffDiscoveryOption) (discovery.Discovery, error) {
b := &BackoffDiscovery{
disc: disc,
stratFactory: stratFactory,
peerCache: make(map[string]*backoffCache),
}, nil

parallelBufSz: 32,
returnedBufSz: 32,
}

for _, opt := range opts {
if err := opt(b); err != nil {
return nil, err
}
}

return b, nil
}

func WithBackoffDiscoverySimultaneousQueryBufferSize(size int) BackoffDiscoveryOption {
return func(b *BackoffDiscovery) error {
if size < 0 {
return fmt.Errorf("cannot set size to be smaller than 0")
}
b.parallelBufSz = size
return nil
}
}

func WithBackoffDiscoveryReturnedChannelSize(size int) BackoffDiscoveryOption {
return func(b *BackoffDiscovery) error {
if size < 0 {
return fmt.Errorf("cannot set size to be smaller than 0")
}
b.returnedBufSz = size
return nil
}
}

type backoffCache struct {
Expand Down Expand Up @@ -119,8 +156,8 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
}

// Setup receiver channel for receiving peers from ongoing requests
evtCh := make(chan peer.AddrInfo, 32)
pch := make(chan peer.AddrInfo, 32)
evtCh := make(chan peer.AddrInfo, d.parallelBufSz)
pch := make(chan peer.AddrInfo, d.returnedBufSz)
rcvPeers := make([]peer.AddrInfo, 0, 32)
for _, ai := range c.peers {
rcvPeers = append(rcvPeers, ai)
Expand Down

0 comments on commit c839e68

Please sign in to comment.