From 55de7d49cdccf32ffde5d1b2d4da4842beb4a168 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Mon, 3 Jun 2019 18:00:15 -0700 Subject: [PATCH] Merge pull request #133 from ipfs/feat/improve-provider-requests feat(sessions): add rebroadcasting, search backoff --- bitswap.go | 64 ++++++++---- bitswap_test.go | 4 +- bitswap_with_sessions_test.go | 4 +- session/session.go | 119 ++++++++++++++++------- session/session_test.go | 135 ++++++++++++++++++++++++-- sessionmanager/sessionmanager.go | 10 +- sessionmanager/sessionmanager_test.go | 32 +++--- 7 files changed, 284 insertions(+), 84 deletions(-) diff --git a/bitswap.go b/bitswap.go index 757e8be9..7000fee2 100644 --- a/bitswap.go +++ b/bitswap.go @@ -9,6 +9,7 @@ import ( "time" bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter" + delay "github.com/ipfs/go-ipfs-delay" decision "github.com/ipfs/go-bitswap/decision" bsgetter "github.com/ipfs/go-bitswap/getter" @@ -38,7 +39,8 @@ var _ exchange.SessionExchange = (*Bitswap)(nil) const ( // these requests take at _least_ two minutes at the moment. - provideTimeout = time.Minute * 3 + provideTimeout = time.Minute * 3 + defaultProvSearchDelay = time.Second ) var ( @@ -65,6 +67,20 @@ func ProvideEnabled(enabled bool) Option { } } +// ProviderSearchDelay overwrites the global provider search delay +func ProviderSearchDelay(newProvSearchDelay time.Duration) Option { + return func(bs *Bitswap) { + bs.provSearchDelay = newProvSearchDelay + } +} + +// RebroadcastDelay overwrites the global provider rebroadcast delay +func RebroadcastDelay(newRebroadcastDelay delay.D) Option { + return func(bs *Bitswap) { + bs.rebroadcastDelay = newRebroadcastDelay + } +} + // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network // delegate. Runs until context is cancelled or bitswap.Close is called. @@ -99,8 +115,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory)) pqm := bspqm.New(ctx, network) - sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session { - return bssession.New(ctx, id, wm, pm, srs) + sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, + provSearchDelay time.Duration, + rebroadcastDelay delay.D) bssm.Session { + return bssession.New(ctx, id, wm, pm, srs, provSearchDelay, rebroadcastDelay) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager { return bsspm.New(ctx, id, network.ConnectionManager(), pqm) @@ -110,20 +128,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, } bs := &Bitswap{ - blockstore: bstore, - engine: decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method - network: network, - process: px, - newBlocks: make(chan cid.Cid, HasBlockBufferSize), - provideKeys: make(chan cid.Cid, provideKeysBufferSize), - wm: wm, - pqm: pqm, - sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory), - counters: new(counters), - dupMetric: dupHist, - allMetric: allHist, - sentHistogram: sentHistogram, - provideEnabled: true, + blockstore: bstore, + engine: decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method + network: network, + process: px, + newBlocks: make(chan cid.Cid, HasBlockBufferSize), + provideKeys: make(chan cid.Cid, provideKeysBufferSize), + wm: wm, + pqm: pqm, + sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory), + counters: new(counters), + dupMetric: dupHist, + allMetric: allHist, + sentHistogram: sentHistogram, + provideEnabled: true, + provSearchDelay: defaultProvSearchDelay, + rebroadcastDelay: delay.Fixed(time.Minute), } // apply functional options before starting and running bitswap @@ -190,6 +210,12 @@ type Bitswap struct { // whether or not to make provide announcements provideEnabled bool + + // how long to wait before looking for providers in a session + provSearchDelay time.Duration + + // how often to rebroadcast providing requests to find more optimized providers + rebroadcastDelay delay.D } type counters struct { @@ -232,7 +258,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt { // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) { - session := bs.sm.NewSession(ctx) + session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay) return session.GetBlocks(ctx, keys) } @@ -398,5 +424,5 @@ func (bs *Bitswap) IsOnline() bool { // be more efficient in its requests to peers. If you are using a session // from go-blockservice, it will create a bitswap session automatically. func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { - return bs.sm.NewSession(ctx) + return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay) } diff --git a/bitswap_test.go b/bitswap_test.go index fd3066ab..10fbbf35 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -102,11 +102,9 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { - bssession.SetProviderSearchDelay(50 * time.Millisecond) - defer bssession.SetProviderSearchDelay(time.Second) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false)) + ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond)) defer ig.Close() hasBlock := ig.Next() diff --git a/bitswap_with_sessions_test.go b/bitswap_with_sessions_test.go index 50be52ca..9495ce6b 100644 --- a/bitswap_with_sessions_test.go +++ b/bitswap_with_sessions_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + bitswap "github.com/ipfs/go-bitswap" bssession "github.com/ipfs/go-bitswap/session" testinstance "github.com/ipfs/go-bitswap/testinstance" blocks "github.com/ipfs/go-block-format" @@ -161,9 +162,8 @@ func TestFetchNotConnected(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - bssession.SetProviderSearchDelay(10 * time.Millisecond) vnet := getVirtualNetwork() - ig := testinstance.NewTestInstanceGenerator(vnet) + ig := testinstance.NewTestInstanceGenerator(vnet, bitswap.ProviderSearchDelay(10*time.Millisecond)) defer ig.Close() bgen := blocksutil.NewBlockGenerator() diff --git a/session/session.go b/session/session.go index b57f472e..1e5a1d0f 100644 --- a/session/session.go +++ b/session/session.go @@ -2,6 +2,7 @@ package session import ( "context" + "math/rand" "time" lru "github.com/hashicorp/golang-lru" @@ -9,6 +10,7 @@ import ( notifications "github.com/ipfs/go-bitswap/notifications" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + delay "github.com/ipfs/go-ipfs-delay" logging "github.com/ipfs/go-log" loggables "github.com/libp2p/go-libp2p-loggables" peer "github.com/libp2p/go-libp2p-peer" @@ -75,14 +77,18 @@ type Session struct { tickDelayReqs chan time.Duration // do not touch outside run loop - tofetch *cidQueue - interest *lru.Cache - pastWants *cidQueue - liveWants map[cid.Cid]time.Time - tick *time.Timer - baseTickDelay time.Duration - latTotal time.Duration - fetchcnt int + tofetch *cidQueue + interest *lru.Cache + pastWants *cidQueue + liveWants map[cid.Cid]time.Time + tick *time.Timer + rebroadcast *time.Timer + baseTickDelay time.Duration + latTotal time.Duration + fetchcnt int + consecutiveTicks int + provSearchDelay time.Duration + rebroadcastDelay delay.D // identifiers notif notifications.PubSub uuid logging.Loggable @@ -91,25 +97,33 @@ type Session struct { // New creates a new bitswap session whose lifetime is bounded by the // given context. -func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session { +func New(ctx context.Context, + id uint64, + wm WantManager, + pm PeerManager, + srs RequestSplitter, + provSearchDelay time.Duration, + rebroadcastDelay delay.D) *Session { s := &Session{ - liveWants: make(map[cid.Cid]time.Time), - newReqs: make(chan []cid.Cid), - cancelKeys: make(chan []cid.Cid), - tofetch: newCidQueue(), - pastWants: newCidQueue(), - interestReqs: make(chan interestReq), - latencyReqs: make(chan chan time.Duration), - tickDelayReqs: make(chan time.Duration), - ctx: ctx, - wm: wm, - pm: pm, - srs: srs, - incoming: make(chan blkRecv), - notif: notifications.New(), - uuid: loggables.Uuid("GetBlockRequest"), - baseTickDelay: time.Millisecond * 500, - id: id, + liveWants: make(map[cid.Cid]time.Time), + newReqs: make(chan []cid.Cid), + cancelKeys: make(chan []cid.Cid), + tofetch: newCidQueue(), + pastWants: newCidQueue(), + interestReqs: make(chan interestReq), + latencyReqs: make(chan chan time.Duration), + tickDelayReqs: make(chan time.Duration), + ctx: ctx, + wm: wm, + pm: pm, + srs: srs, + incoming: make(chan blkRecv), + notif: notifications.New(), + uuid: loggables.Uuid("GetBlockRequest"), + baseTickDelay: time.Millisecond * 500, + id: id, + provSearchDelay: provSearchDelay, + rebroadcastDelay: rebroadcastDelay, } cache, _ := lru.New(2048) @@ -222,17 +236,11 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { } } -var provSearchDelay = time.Second - -// SetProviderSearchDelay overwrites the global provider search delay -func SetProviderSearchDelay(newProvSearchDelay time.Duration) { - provSearchDelay = newProvSearchDelay -} - // Session run loop -- everything function below here should not be called // of this loop func (s *Session) run(ctx context.Context) { - s.tick = time.NewTimer(provSearchDelay) + s.tick = time.NewTimer(s.provSearchDelay) + s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get()) for { select { case blk := <-s.incoming: @@ -247,6 +255,8 @@ func (s *Session) run(ctx context.Context) { s.handleCancel(keys) case <-s.tick.C: s.handleTick(ctx) + case <-s.rebroadcast.C: + s.handleRebroadcast(ctx) case lwchk := <-s.interestReqs: lwchk.resp <- s.cidIsWanted(lwchk.c) case resp := <-s.latencyReqs: @@ -310,12 +320,42 @@ func (s *Session) handleTick(ctx context.Context) { s.pm.RecordPeerRequests(nil, live) s.wm.WantBlocks(ctx, live, nil, s.id) - if len(live) > 0 { + // do no find providers on consecutive ticks + // -- just rely on periodic rebroadcast + if len(live) > 0 && (s.consecutiveTicks == 0) { s.pm.FindMorePeers(ctx, live[0]) } s.resetTick() + + if len(s.liveWants) > 0 { + s.consecutiveTicks++ + } +} + +func (s *Session) handleRebroadcast(ctx context.Context) { + + if len(s.liveWants) == 0 { + return + } + + // TODO: come up with a better strategy for determining when to search + // for new providers for blocks. + s.pm.FindMorePeers(ctx, s.randomLiveWant()) + + s.rebroadcast.Reset(s.rebroadcastDelay.Get()) } +func (s *Session) randomLiveWant() cid.Cid { + i := rand.Intn(len(s.liveWants)) + // picking a random live want + for k := range s.liveWants { + if i == 0 { + return k + } + i-- + } + return cid.Cid{} +} func (s *Session) handleShutdown() { s.tick.Stop() s.notif.Shutdown() @@ -347,6 +387,8 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { s.tofetch.Remove(c) } s.fetchcnt++ + // we've received new wanted blocks, so future ticks are not consecutive + s.consecutiveTicks = 0 s.notif.Publish(blk) toAdd := s.wantBudget() @@ -395,12 +437,15 @@ func (s *Session) averageLatency() time.Duration { } func (s *Session) resetTick() { + var tickDelay time.Duration if s.latTotal == 0 { - s.tick.Reset(provSearchDelay) + tickDelay = s.provSearchDelay } else { avLat := s.averageLatency() - s.tick.Reset(s.baseTickDelay + (3 * avLat)) + tickDelay = s.baseTickDelay + (3 * avLat) } + tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks) + s.tick.Reset(tickDelay) } func (s *Session) wantBudget() int { diff --git a/session/session_test.go b/session/session_test.go index 9f6aef54..553a38ef 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -6,12 +6,12 @@ import ( "testing" "time" - "github.com/ipfs/go-block-format" - bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter" "github.com/ipfs/go-bitswap/testutil" + blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" + delay "github.com/ipfs/go-ipfs-delay" peer "github.com/libp2p/go-libp2p-peer" ) @@ -42,12 +42,12 @@ func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, pee type fakePeerManager struct { lk sync.RWMutex peers []peer.ID - findMorePeersRequested chan struct{} + findMorePeersRequested chan cid.Cid } func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) { select { - case fpm.findMorePeersRequested <- struct{}{}: + case fpm.findMorePeersRequested <- k: case <-ctx.Done(): } } @@ -84,7 +84,7 @@ func TestSessionGetBlocks(t *testing.T) { fpm := &fakePeerManager{} frs := &fakeRequestSplitter{} id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, frs) + session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute)) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) var cids []cid.Cid @@ -193,10 +193,10 @@ func TestSessionFindMorePeers(t *testing.T) { wantReqs := make(chan wantReq, 1) cancelReqs := make(chan wantReq, 1) fwm := &fakeWantManager{wantReqs, cancelReqs} - fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{}, 1)} + fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)} frs := &fakeRequestSplitter{} id := testutil.GenerateSessionID() - session := New(ctx, id, fwm, fpm, frs) + session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute)) session.SetBaseTickDelay(200 * time.Microsecond) blockGenerator := blocksutil.NewBlockGenerator() blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2) @@ -258,3 +258,124 @@ func TestSessionFindMorePeers(t *testing.T) { t.Fatal("Did not find more peers") } } + +func TestSessionFailingToGetFirstBlock(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + wantReqs := make(chan wantReq, 1) + cancelReqs := make(chan wantReq, 1) + fwm := &fakeWantManager{wantReqs, cancelReqs} + fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)} + frs := &fakeRequestSplitter{} + id := testutil.GenerateSessionID() + + session := New(ctx, id, fwm, fpm, frs, 10*time.Millisecond, delay.Fixed(100*time.Millisecond)) + blockGenerator := blocksutil.NewBlockGenerator() + blks := blockGenerator.Blocks(4) + var cids []cid.Cid + for _, block := range blks { + cids = append(cids, block.Cid()) + } + startTick := time.Now() + _, err := session.GetBlocks(ctx, cids) + if err != nil { + t.Fatal("error getting blocks") + } + + // clear the initial block of wants + select { + case <-wantReqs: + case <-ctx.Done(): + t.Fatal("Did not make first want request ") + } + + // verify a broadcast is made + select { + case receivedWantReq := <-wantReqs: + if len(receivedWantReq.cids) < len(cids) { + t.Fatal("did not rebroadcast whole live list") + } + if receivedWantReq.peers != nil { + t.Fatal("did not make a broadcast") + } + case <-ctx.Done(): + t.Fatal("Never rebroadcast want list") + } + + // wait for a request to get more peers to occur + select { + case k := <-fpm.findMorePeersRequested: + if testutil.IndexOf(blks, k) == -1 { + t.Fatal("did not rebroadcast an active want") + } + case <-ctx.Done(): + t.Fatal("Did not find more peers") + } + firstTickLength := time.Since(startTick) + + // wait for another broadcast to occur + select { + case receivedWantReq := <-wantReqs: + if len(receivedWantReq.cids) < len(cids) { + t.Fatal("did not rebroadcast whole live list") + } + if receivedWantReq.peers != nil { + t.Fatal("did not make a broadcast") + } + case <-ctx.Done(): + t.Fatal("Never rebroadcast want list") + } + startTick = time.Now() + // wait for another broadcast to occur + select { + case receivedWantReq := <-wantReqs: + if len(receivedWantReq.cids) < len(cids) { + t.Fatal("did not rebroadcast whole live list") + } + if receivedWantReq.peers != nil { + t.Fatal("did not make a broadcast") + } + case <-ctx.Done(): + t.Fatal("Never rebroadcast want list") + } + consecutiveTickLength := time.Since(startTick) + // tick should take longer + if firstTickLength > consecutiveTickLength { + t.Fatal("Should have increased tick length after first consecutive tick") + } + startTick = time.Now() + // wait for another broadcast to occur + select { + case receivedWantReq := <-wantReqs: + if len(receivedWantReq.cids) < len(cids) { + t.Fatal("did not rebroadcast whole live list") + } + if receivedWantReq.peers != nil { + t.Fatal("did not make a broadcast") + } + case <-ctx.Done(): + t.Fatal("Never rebroadcast want list") + } + secondConsecutiveTickLength := time.Since(startTick) + // tick should take longer + if consecutiveTickLength > secondConsecutiveTickLength { + t.Fatal("Should have increased tick length after first consecutive tick") + } + + // should not have looked for peers on consecutive ticks + select { + case <-fpm.findMorePeersRequested: + t.Fatal("Should not have looked for peers on consecutive tick") + default: + } + + // wait for rebroadcast to occur + select { + case k := <-fpm.findMorePeersRequested: + if testutil.IndexOf(blks, k) == -1 { + t.Fatal("did not rebroadcast an active want") + } + case <-ctx.Done(): + t.Fatal("Did not rebroadcast to find more peers") + } +} diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index ac1bb700..a645d5e4 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -3,9 +3,11 @@ package sessionmanager import ( "context" "sync" + "time" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + delay "github.com/ipfs/go-ipfs-delay" bssession "github.com/ipfs/go-bitswap/session" exchange "github.com/ipfs/go-ipfs-exchange-interface" @@ -27,7 +29,7 @@ type sesTrk struct { } // SessionFactory generates a new session for the SessionManager to track. -type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session +type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session // RequestSplitterFactory generates a new request splitter for a session. type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter @@ -64,13 +66,15 @@ func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory // NewSession initializes a session with the given context, and adds to the // session manager. -func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher { +func (sm *SessionManager) NewSession(ctx context.Context, + provSearchDelay time.Duration, + rebroadcastDelay delay.D) exchange.Fetcher { id := sm.GetNextSessionID() sessionctx, cancel := context.WithCancel(ctx) pm := sm.peerManagerFactory(sessionctx, id) srs := sm.requestSplitterFactory(sessionctx) - session := sm.sessionFactory(sessionctx, id, pm, srs) + session := sm.sessionFactory(sessionctx, id, pm, srs, provSearchDelay, rebroadcastDelay) tracked := sesTrk{session, pm, srs} sm.sessLk.Lock() sm.sessions = append(sm.sessions, tracked) diff --git a/sessionmanager/sessionmanager_test.go b/sessionmanager/sessionmanager_test.go index 1310ac97..ae8c2383 100644 --- a/sessionmanager/sessionmanager_test.go +++ b/sessionmanager/sessionmanager_test.go @@ -6,6 +6,7 @@ import ( "time" bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter" + delay "github.com/ipfs/go-ipfs-delay" bssession "github.com/ipfs/go-bitswap/session" @@ -53,7 +54,12 @@ func (frs *fakeRequestSplitter) RecordUniqueBlock() {} var nextInterestedIn bool -func sessionFactory(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session { +func sessionFactory(ctx context.Context, + id uint64, + pm bssession.PeerManager, + srs bssession.RequestSplitter, + provSearchDelay time.Duration, + rebroadcastDelay delay.D) Session { return &fakeSession{ interested: nextInterestedIn, receivedBlock: false, @@ -83,18 +89,18 @@ func TestAddingSessions(t *testing.T) { nextInterestedIn = true currentID := sm.GetNextSessionID() - firstSession := sm.NewSession(ctx).(*fakeSession) + firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) if firstSession.id != firstSession.pm.id || firstSession.id != currentID+1 { t.Fatal("session does not have correct id set") } - secondSession := sm.NewSession(ctx).(*fakeSession) + secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) if secondSession.id != secondSession.pm.id || secondSession.id != firstSession.id+1 { t.Fatal("session does not have correct id set") } sm.GetNextSessionID() - thirdSession := sm.NewSession(ctx).(*fakeSession) + thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) if thirdSession.id != thirdSession.pm.id || thirdSession.id != secondSession.id+2 { t.Fatal("session does not have correct id set") @@ -117,11 +123,11 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) { block := blocks.NewBlock([]byte("block")) // we'll be interested in all blocks for this test nextInterestedIn = false - firstSession := sm.NewSession(ctx).(*fakeSession) + firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) nextInterestedIn = true - secondSession := sm.NewSession(ctx).(*fakeSession) + secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) nextInterestedIn = false - thirdSession := sm.NewSession(ctx).(*fakeSession) + thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) sm.ReceiveBlockFrom(p, block) if firstSession.receivedBlock || @@ -140,9 +146,9 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) { block := blocks.NewBlock([]byte("block")) // we'll be interested in all blocks for this test nextInterestedIn = true - firstSession := sm.NewSession(ctx).(*fakeSession) - secondSession := sm.NewSession(ctx).(*fakeSession) - thirdSession := sm.NewSession(ctx).(*fakeSession) + firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) + secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) + thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) cancel() // wait for sessions to get removed @@ -165,10 +171,10 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) { block := blocks.NewBlock([]byte("block")) // we'll be interested in all blocks for this test nextInterestedIn = true - firstSession := sm.NewSession(ctx).(*fakeSession) + firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) sessionCtx, sessionCancel := context.WithCancel(ctx) - secondSession := sm.NewSession(sessionCtx).(*fakeSession) - thirdSession := sm.NewSession(ctx).(*fakeSession) + secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) + thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession) sessionCancel() // wait for sessions to get removed