From c783e018cd986b3773e8d7d2ede9d2be5e9495fa Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 11 Jun 2019 17:04:50 -0700 Subject: [PATCH 1/3] fix(session): obey delay function when searching for more providers --- session/session.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/session/session.go b/session/session.go index 6ac47470..1db2abc3 100644 --- a/session/session.go +++ b/session/session.go @@ -240,7 +240,7 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { // of this loop func (s *Session) run(ctx context.Context) { s.tick = time.NewTimer(s.provSearchDelay) - s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get()) + s.rebroadcast = time.NewTimer(s.rebroadcastDelay.NextWaitTime()) for { select { case blk := <-s.incoming: @@ -342,7 +342,7 @@ func (s *Session) handleRebroadcast(ctx context.Context) { // for new providers for blocks. s.pm.FindMorePeers(ctx, s.randomLiveWant()) - s.rebroadcast.Reset(s.rebroadcastDelay.Get()) + s.rebroadcast.Reset(s.rebroadcastDelay.NextWaitTime()) } func (s *Session) randomLiveWant() cid.Cid { From 2a00256b53fa695b161431a7a4502e08cf627cf7 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 11 Jun 2019 17:11:06 -0700 Subject: [PATCH 2/3] nit(session): improve naming --- session/session.go | 100 ++++++++++++++++++++++----------------------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/session/session.go b/session/session.go index 1db2abc3..04fd2bbd 100644 --- a/session/session.go +++ b/session/session.go @@ -77,18 +77,18 @@ type Session struct { tickDelayReqs chan time.Duration // do not touch outside run loop - tofetch *cidQueue - interest *lru.Cache - pastWants *cidQueue - liveWants map[cid.Cid]time.Time - tick *time.Timer - rebroadcast *time.Timer - baseTickDelay time.Duration - latTotal time.Duration - fetchcnt int - consecutiveTicks int - provSearchDelay time.Duration - rebroadcastDelay delay.D + tofetch *cidQueue + interest *lru.Cache + pastWants *cidQueue + liveWants map[cid.Cid]time.Time + idleTick *time.Timer + periodicSearchTimer *time.Timer + baseTickDelay time.Duration + latTotal time.Duration + fetchcnt int + consecutiveTicks int + initialSearchDelay time.Duration + periodicSearchDelay delay.D // identifiers notif notifications.PubSub uuid logging.Loggable @@ -102,28 +102,28 @@ func New(ctx context.Context, wm WantManager, pm PeerManager, srs RequestSplitter, - provSearchDelay time.Duration, - rebroadcastDelay delay.D) *Session { + initialSearchDelay time.Duration, + periodicSearchDelay delay.D) *Session { s := &Session{ - liveWants: make(map[cid.Cid]time.Time), - newReqs: make(chan []cid.Cid), - cancelKeys: make(chan []cid.Cid), - tofetch: newCidQueue(), - pastWants: newCidQueue(), - interestReqs: make(chan interestReq), - latencyReqs: make(chan chan time.Duration), - tickDelayReqs: make(chan time.Duration), - ctx: ctx, - wm: wm, - pm: pm, - srs: srs, - incoming: make(chan blkRecv), - notif: notifications.New(), - uuid: loggables.Uuid("GetBlockRequest"), - baseTickDelay: time.Millisecond * 500, - id: id, - provSearchDelay: provSearchDelay, - rebroadcastDelay: rebroadcastDelay, + liveWants: make(map[cid.Cid]time.Time), + newReqs: make(chan []cid.Cid), + cancelKeys: make(chan []cid.Cid), + tofetch: newCidQueue(), + pastWants: newCidQueue(), + interestReqs: make(chan interestReq), + latencyReqs: make(chan chan time.Duration), + tickDelayReqs: make(chan time.Duration), + ctx: ctx, + wm: wm, + pm: pm, + srs: srs, + incoming: make(chan blkRecv), + notif: notifications.New(), + uuid: loggables.Uuid("GetBlockRequest"), + baseTickDelay: time.Millisecond * 500, + id: id, + initialSearchDelay: initialSearchDelay, + periodicSearchDelay: periodicSearchDelay, } cache, _ := lru.New(2048) @@ -239,8 +239,8 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) { // Session run loop -- everything function below here should not be called // of this loop func (s *Session) run(ctx context.Context) { - s.tick = time.NewTimer(s.provSearchDelay) - s.rebroadcast = time.NewTimer(s.rebroadcastDelay.NextWaitTime()) + s.idleTick = time.NewTimer(s.initialSearchDelay) + s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime()) for { select { case blk := <-s.incoming: @@ -253,10 +253,10 @@ func (s *Session) run(ctx context.Context) { s.handleNewRequest(ctx, keys) case keys := <-s.cancelKeys: s.handleCancel(keys) - case <-s.tick.C: - s.handleTick(ctx) - case <-s.rebroadcast.C: - s.handleRebroadcast(ctx) + case <-s.idleTick.C: + s.handleIdleTick(ctx) + case <-s.periodicSearchTimer.C: + s.handlePeriodicSearch(ctx) case lwchk := <-s.interestReqs: lwchk.resp <- s.cidIsWanted(lwchk.c) case resp := <-s.latencyReqs: @@ -271,7 +271,7 @@ func (s *Session) run(ctx context.Context) { } func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) { - s.tick.Stop() + s.idleTick.Stop() if blk.from != "" { s.pm.RecordPeerResponse(blk.from, blk.blk.Cid()) @@ -279,7 +279,7 @@ func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) { s.receiveBlock(ctx, blk.blk) - s.resetTick() + s.resetIdleTick() } func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) { @@ -307,7 +307,7 @@ func (s *Session) handleCancel(keys []cid.Cid) { } } -func (s *Session) handleTick(ctx context.Context) { +func (s *Session) handleIdleTick(ctx context.Context) { live := make([]cid.Cid, 0, len(s.liveWants)) now := time.Now() @@ -321,18 +321,18 @@ func (s *Session) handleTick(ctx context.Context) { s.wm.WantBlocks(ctx, live, nil, s.id) // do no find providers on consecutive ticks - // -- just rely on periodic rebroadcast + // -- just rely on periodic search widening if len(live) > 0 && (s.consecutiveTicks == 0) { s.pm.FindMorePeers(ctx, live[0]) } - s.resetTick() + s.resetIdleTick() if len(s.liveWants) > 0 { s.consecutiveTicks++ } } -func (s *Session) handleRebroadcast(ctx context.Context) { +func (s *Session) handlePeriodicSearch(ctx context.Context) { if len(s.liveWants) == 0 { return @@ -342,7 +342,7 @@ func (s *Session) handleRebroadcast(ctx context.Context) { // for new providers for blocks. s.pm.FindMorePeers(ctx, s.randomLiveWant()) - s.rebroadcast.Reset(s.rebroadcastDelay.NextWaitTime()) + s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime()) } func (s *Session) randomLiveWant() cid.Cid { @@ -357,7 +357,7 @@ func (s *Session) randomLiveWant() cid.Cid { return cid.Cid{} } func (s *Session) handleShutdown() { - s.tick.Stop() + s.idleTick.Stop() s.notif.Shutdown() live := make([]cid.Cid, 0, len(s.liveWants)) @@ -436,16 +436,16 @@ func (s *Session) averageLatency() time.Duration { return s.latTotal / time.Duration(s.fetchcnt) } -func (s *Session) resetTick() { +func (s *Session) resetIdleTick() { var tickDelay time.Duration if s.latTotal == 0 { - tickDelay = s.provSearchDelay + tickDelay = s.initialSearchDelay } else { avLat := s.averageLatency() tickDelay = s.baseTickDelay + (3 * avLat) } tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks) - s.tick.Reset(tickDelay) + s.idleTick.Reset(tickDelay) } func (s *Session) wantBudget() int { From eb28a2e1cb5a345f7be48329b7a18fcb1702183a Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 11 Jun 2019 17:18:02 -0700 Subject: [PATCH 3/3] feat(session): when periodically searching, broadcast want to connected peers This fixes the case where: 1. I start downloading something. 2. A friend jumps on our WiFi. 3. Our IPFS daemons connect via local discovery. 4. I never notice that they have the file I'm looking for because I'm already downloading it from a peer. 5. The peer I'm downloading from is _really_ slow. --- session/session.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/session/session.go b/session/session.go index 04fd2bbd..f10d9605 100644 --- a/session/session.go +++ b/session/session.go @@ -333,14 +333,15 @@ func (s *Session) handleIdleTick(ctx context.Context) { } func (s *Session) handlePeriodicSearch(ctx context.Context) { - - if len(s.liveWants) == 0 { + randomWant := s.randomLiveWant() + if !randomWant.Defined() { return } // TODO: come up with a better strategy for determining when to search // for new providers for blocks. - s.pm.FindMorePeers(ctx, s.randomLiveWant()) + s.pm.FindMorePeers(ctx, randomWant) + s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id) s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime()) }