Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #133 from ipfs/feat/improve-provider-requests
Browse files Browse the repository at this point in the history
feat(sessions): add rebroadcasting, search backoff
  • Loading branch information
hannahhoward authored and Stebalien committed Jul 4, 2019
1 parent acc22c2 commit 55de7d4
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 84 deletions.
64 changes: 45 additions & 19 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"

decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
Expand Down Expand Up @@ -38,7 +39,8 @@ var _ exchange.SessionExchange = (*Bitswap)(nil)

const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second
)

var (
Expand All @@ -65,6 +67,20 @@ func ProvideEnabled(enabled bool) Option {
}
}

// ProviderSearchDelay overwrites the global provider search delay
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return func(bs *Bitswap) {
bs.provSearchDelay = newProvSearchDelay
}
}

// RebroadcastDelay overwrites the global provider rebroadcast delay
func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
return func(bs *Bitswap) {
bs.rebroadcastDelay = newRebroadcastDelay
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
Expand Down Expand Up @@ -99,8 +115,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs, provSearchDelay, rebroadcastDelay)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
Expand All @@ -110,20 +128,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

bs := &Bitswap{
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
blockstore: bstore,
engine: decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method
network: network,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
}

// apply functional options before starting and running bitswap
Expand Down Expand Up @@ -190,6 +210,12 @@ type Bitswap struct {

// whether or not to make provide announcements
provideEnabled bool

// how long to wait before looking for providers in a session
provSearchDelay time.Duration

// how often to rebroadcast providing requests to find more optimized providers
rebroadcastDelay delay.D
}

type counters struct {
Expand Down Expand Up @@ -232,7 +258,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
session := bs.sm.NewSession(ctx)
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}

Expand Down Expand Up @@ -398,5 +424,5 @@ func (bs *Bitswap) IsOnline() bool {
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
4 changes: 1 addition & 3 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,9 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}

func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
bssession.SetProviderSearchDelay(50 * time.Millisecond)
defer bssession.SetProviderSearchDelay(time.Second)
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false))
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond))
defer ig.Close()

hasBlock := ig.Next()
Expand Down
4 changes: 2 additions & 2 deletions bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -161,9 +162,8 @@ func TestFetchNotConnected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

bssession.SetProviderSearchDelay(10 * time.Millisecond)
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet)
ig := testinstance.NewTestInstanceGenerator(vnet, bitswap.ProviderSearchDelay(10*time.Millisecond))
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

Expand Down
119 changes: 82 additions & 37 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package session

import (
"context"
"math/rand"
"time"

lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
notifications "github.com/ipfs/go-bitswap/notifications"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"
Expand Down Expand Up @@ -75,14 +77,18 @@ type Session struct {
tickDelayReqs chan time.Duration

// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
rebroadcast *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
consecutiveTicks int
provSearchDelay time.Duration
rebroadcastDelay delay.D
// identifiers
notif notifications.PubSub
uuid logging.Loggable
Expand All @@ -91,25 +97,33 @@ type Session struct {

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
func New(ctx context.Context,
id uint64,
wm WantManager,
pm PeerManager,
srs RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
provSearchDelay: provSearchDelay,
rebroadcastDelay: rebroadcastDelay,
}

cache, _ := lru.New(2048)
Expand Down Expand Up @@ -222,17 +236,11 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
}
}

var provSearchDelay = time.Second

// SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
provSearchDelay = newProvSearchDelay
}

// Session run loop -- everything function below here should not be called
// of this loop
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay)
s.tick = time.NewTimer(s.provSearchDelay)
s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get())
for {
select {
case blk := <-s.incoming:
Expand All @@ -247,6 +255,8 @@ func (s *Session) run(ctx context.Context) {
s.handleCancel(keys)
case <-s.tick.C:
s.handleTick(ctx)
case <-s.rebroadcast.C:
s.handleRebroadcast(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
Expand Down Expand Up @@ -310,12 +320,42 @@ func (s *Session) handleTick(ctx context.Context) {
s.pm.RecordPeerRequests(nil, live)
s.wm.WantBlocks(ctx, live, nil, s.id)

if len(live) > 0 {
// do no find providers on consecutive ticks
// -- just rely on periodic rebroadcast
if len(live) > 0 && (s.consecutiveTicks == 0) {
s.pm.FindMorePeers(ctx, live[0])
}
s.resetTick()

if len(s.liveWants) > 0 {
s.consecutiveTicks++
}
}

func (s *Session) handleRebroadcast(ctx context.Context) {

if len(s.liveWants) == 0 {
return
}

// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
s.pm.FindMorePeers(ctx, s.randomLiveWant())

s.rebroadcast.Reset(s.rebroadcastDelay.Get())
}

func (s *Session) randomLiveWant() cid.Cid {
i := rand.Intn(len(s.liveWants))
// picking a random live want
for k := range s.liveWants {
if i == 0 {
return k
}
i--
}
return cid.Cid{}
}
func (s *Session) handleShutdown() {
s.tick.Stop()
s.notif.Shutdown()
Expand Down Expand Up @@ -347,6 +387,8 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
s.tofetch.Remove(c)
}
s.fetchcnt++
// we've received new wanted blocks, so future ticks are not consecutive
s.consecutiveTicks = 0
s.notif.Publish(blk)

toAdd := s.wantBudget()
Expand Down Expand Up @@ -395,12 +437,15 @@ func (s *Session) averageLatency() time.Duration {
}

func (s *Session) resetTick() {
var tickDelay time.Duration
if s.latTotal == 0 {
s.tick.Reset(provSearchDelay)
tickDelay = s.provSearchDelay
} else {
avLat := s.averageLatency()
s.tick.Reset(s.baseTickDelay + (3 * avLat))
tickDelay = s.baseTickDelay + (3 * avLat)
}
tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
s.tick.Reset(tickDelay)
}

func (s *Session) wantBudget() int {
Expand Down
Loading

0 comments on commit 55de7d4

Please sign in to comment.