Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix some naming nits and broadcast on search #139

Merged
merged 3 commits into from
Jun 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 54 additions & 53 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ type Session struct {
tickDelayReqs chan time.Duration

// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
rebroadcast *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
consecutiveTicks int
provSearchDelay time.Duration
rebroadcastDelay delay.D
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
idleTick *time.Timer
periodicSearchTimer *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
consecutiveTicks int
initialSearchDelay time.Duration
periodicSearchDelay delay.D
// identifiers
notif notifications.PubSub
uuid logging.Loggable
Expand All @@ -102,28 +102,28 @@ func New(ctx context.Context,
wm WantManager,
pm PeerManager,
srs RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) *Session {
initialSearchDelay time.Duration,
periodicSearchDelay delay.D) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
provSearchDelay: provSearchDelay,
rebroadcastDelay: rebroadcastDelay,
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
initialSearchDelay: initialSearchDelay,
periodicSearchDelay: periodicSearchDelay,
}

cache, _ := lru.New(2048)
Expand Down Expand Up @@ -239,8 +239,8 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
// Session run loop -- everything function below here should not be called
// of this loop
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(s.provSearchDelay)
s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get())
s.idleTick = time.NewTimer(s.initialSearchDelay)
s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
for {
select {
case blk := <-s.incoming:
Expand All @@ -253,10 +253,10 @@ func (s *Session) run(ctx context.Context) {
s.handleNewRequest(ctx, keys)
case keys := <-s.cancelKeys:
s.handleCancel(keys)
case <-s.tick.C:
s.handleTick(ctx)
case <-s.rebroadcast.C:
s.handleRebroadcast(ctx)
case <-s.idleTick.C:
s.handleIdleTick(ctx)
case <-s.periodicSearchTimer.C:
s.handlePeriodicSearch(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
Expand All @@ -271,15 +271,15 @@ func (s *Session) run(ctx context.Context) {
}

func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
s.tick.Stop()
s.idleTick.Stop()

if blk.from != "" {
s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
}

s.receiveBlock(ctx, blk.blk)

s.resetTick()
s.resetIdleTick()
}

func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
Expand Down Expand Up @@ -307,7 +307,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}
}

func (s *Session) handleTick(ctx context.Context) {
func (s *Session) handleIdleTick(ctx context.Context) {

live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
Expand All @@ -321,28 +321,29 @@ func (s *Session) handleTick(ctx context.Context) {
s.wm.WantBlocks(ctx, live, nil, s.id)

// do no find providers on consecutive ticks
// -- just rely on periodic rebroadcast
// -- just rely on periodic search widening
if len(live) > 0 && (s.consecutiveTicks == 0) {
s.pm.FindMorePeers(ctx, live[0])
}
s.resetTick()
s.resetIdleTick()

if len(s.liveWants) > 0 {
s.consecutiveTicks++
}
}

func (s *Session) handleRebroadcast(ctx context.Context) {

if len(s.liveWants) == 0 {
func (s *Session) handlePeriodicSearch(ctx context.Context) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is now a rebroadcast, but, IMO, periodic search is still more accurate.

randomWant := s.randomLiveWant()
if !randomWant.Defined() {
return
}

// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
s.pm.FindMorePeers(ctx, s.randomLiveWant())
s.pm.FindMorePeers(ctx, randomWant)
s.wm.WantBlocks(ctx, []cid.Cid{randomWant}, nil, s.id)

s.rebroadcast.Reset(s.rebroadcastDelay.Get())
s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
}

func (s *Session) randomLiveWant() cid.Cid {
Expand All @@ -357,7 +358,7 @@ func (s *Session) randomLiveWant() cid.Cid {
return cid.Cid{}
}
func (s *Session) handleShutdown() {
s.tick.Stop()
s.idleTick.Stop()
s.notif.Shutdown()

live := make([]cid.Cid, 0, len(s.liveWants))
Expand Down Expand Up @@ -436,16 +437,16 @@ func (s *Session) averageLatency() time.Duration {
return s.latTotal / time.Duration(s.fetchcnt)
}

func (s *Session) resetTick() {
func (s *Session) resetIdleTick() {
var tickDelay time.Duration
if s.latTotal == 0 {
tickDelay = s.provSearchDelay
tickDelay = s.initialSearchDelay
} else {
avLat := s.averageLatency()
tickDelay = s.baseTickDelay + (3 * avLat)
}
tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
s.tick.Reset(tickDelay)
s.idleTick.Reset(tickDelay)
}

func (s *Session) wantBudget() int {
Expand Down