Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat(sessions): add rebroadcasting, search backoff #133

Merged
merged 5 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 45 additions & 19 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"

decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
Expand Down Expand Up @@ -38,7 +39,8 @@ var _ exchange.SessionExchange = (*Bitswap)(nil)

const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second
)

var (
Expand All @@ -65,6 +67,20 @@ func ProvideEnabled(enabled bool) Option {
}
}

// ProviderSearchDelay overwrites the global provider search delay
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return func(bs *Bitswap) {
bs.provSearchDelay = newProvSearchDelay
}
}

// RebroadcastDelay overwrites the global provider rebroadcast delay
func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
return func(bs *Bitswap) {
bs.rebroadcastDelay = newRebroadcastDelay
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
Expand Down Expand Up @@ -99,8 +115,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs, provSearchDelay, rebroadcastDelay)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
Expand All @@ -110,20 +128,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

bs := &Bitswap{
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
}

// apply functional options before starting and running bitswap
Expand Down Expand Up @@ -190,6 +210,12 @@ type Bitswap struct {

// whether or not to make provide announcements
provideEnabled bool

// how long to wait before looking for providers in a session
provSearchDelay time.Duration

// how often to rebroadcast providing requests to find more optimized providers
rebroadcastDelay delay.D
}

type counters struct {
Expand Down Expand Up @@ -232,7 +258,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
session := bs.sm.NewSession(ctx)
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}

Expand Down Expand Up @@ -398,5 +424,5 @@ func (bs *Bitswap) IsOnline() bool {
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
4 changes: 1 addition & 3 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,9 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
bssession.SetProviderSearchDelay(50 * time.Millisecond)
defer bssession.SetProviderSearchDelay(time.Second)
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false))
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond))
defer ig.Close()

hasBlock := ig.Next()
Expand Down
4 changes: 2 additions & 2 deletions bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -161,9 +162,8 @@ func TestFetchNotConnected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

bssession.SetProviderSearchDelay(10 * time.Millisecond)
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet)
ig := testinstance.NewTestInstanceGenerator(vnet, bitswap.ProviderSearchDelay(10*time.Millisecond))
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

Expand Down
119 changes: 82 additions & 37 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package session

import (
"context"
"math/rand"
"time"

lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
notifications "github.com/ipfs/go-bitswap/notifications"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"
Expand Down Expand Up @@ -75,14 +77,18 @@ type Session struct {
tickDelayReqs chan time.Duration

// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
rebroadcast *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
consecutiveTicks int
provSearchDelay time.Duration
rebroadcastDelay delay.D
// identifiers
notif notifications.PubSub
uuid logging.Loggable
Expand All @@ -91,25 +97,33 @@ type Session struct {

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
func New(ctx context.Context,
id uint64,
wm WantManager,
pm PeerManager,
srs RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
provSearchDelay: provSearchDelay,
rebroadcastDelay: rebroadcastDelay,
}

cache, _ := lru.New(2048)
Expand Down Expand Up @@ -222,17 +236,11 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
}
}

var provSearchDelay = time.Second

// SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
provSearchDelay = newProvSearchDelay
}

// Session run loop -- everything function below here should not be called
// of this loop
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay)
s.tick = time.NewTimer(s.provSearchDelay)
s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get())
Copy link
Member

@Stebalien Stebalien Jun 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm not sure using delay.D is all that useful here as we always just reuse the same initial delay. delay.D is designed for random delays (we'd need to use a ticker and reset it with rebroadcast.Reset(s.rebroadcastDelay.NextWaitTime())).

for {
select {
case blk := <-s.incoming:
Expand All @@ -247,6 +255,8 @@ func (s *Session) run(ctx context.Context) {
s.handleCancel(keys)
case <-s.tick.C:
s.handleTick(ctx)
case <-s.rebroadcast.C:
s.handleRebroadcast(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
Expand Down Expand Up @@ -310,12 +320,42 @@ func (s *Session) handleTick(ctx context.Context) {
s.pm.RecordPeerRequests(nil, live)
s.wm.WantBlocks(ctx, live, nil, s.id)

if len(live) > 0 {
// do no find providers on consecutive ticks
// -- just rely on periodic rebroadcast
if len(live) > 0 && (s.consecutiveTicks == 0) {
s.pm.FindMorePeers(ctx, live[0])
}
s.resetTick()

if len(s.liveWants) > 0 {
s.consecutiveTicks++
}
}

func (s *Session) handleRebroadcast(ctx context.Context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually rebroadcast anything, does it? We should probably just call this something like handlePeriodicSearch.


if len(s.liveWants) == 0 {
return
}

// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
s.pm.FindMorePeers(ctx, s.randomLiveWant())

s.rebroadcast.Reset(s.rebroadcastDelay.Get())
}

func (s *Session) randomLiveWant() cid.Cid {
i := rand.Intn(len(s.liveWants))
// picking a random live want
for k := range s.liveWants {
if i == 0 {
return k
}
i--
}
return cid.Cid{}
}
func (s *Session) handleShutdown() {
s.tick.Stop()
s.notif.Shutdown()
Expand Down Expand Up @@ -347,6 +387,8 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
s.tofetch.Remove(c)
}
s.fetchcnt++
// we've received new wanted blocks, so future ticks are not consecutive
s.consecutiveTicks = 0
s.notif.Publish(blk)

toAdd := s.wantBudget()
Expand Down Expand Up @@ -395,12 +437,15 @@ func (s *Session) averageLatency() time.Duration {
}

func (s *Session) resetTick() {
var tickDelay time.Duration
if s.latTotal == 0 {
s.tick.Reset(provSearchDelay)
tickDelay = s.provSearchDelay
} else {
avLat := s.averageLatency()
s.tick.Reset(s.baseTickDelay + (3 * avLat))
tickDelay = s.baseTickDelay + (3 * avLat)
}
tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
s.tick.Reset(tickDelay)
}

func (s *Session) wantBudget() int {
Expand Down
Loading