Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix(decision): cleanup request queues #116

Merged
merged 2 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (tl *prq) Push(to peer.ID, entries ...wantlist.Entry) {
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
if !ok {
partner = newActivePartner()
partner = newActivePartner(to)
tl.pQueue.Push(partner)
tl.partners[to] = partner
}
Expand Down Expand Up @@ -136,7 +136,13 @@ func (tl *prq) Pop() *peerRequestTask {
break // and return |out|
}

tl.pQueue.Push(partner)
if partner.IsIdle() {
target := partner.target
delete(tl.partners, target)
delete(tl.frozen, target)
} else {
tl.pQueue.Push(partner)
}
return out
}

Expand Down Expand Up @@ -252,7 +258,7 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
}

type activePartner struct {

target peer.ID
// Active is the number of blocks this peer is currently being sent
// active must be locked around as it will be updated externally
activelk sync.Mutex
Expand All @@ -274,8 +280,9 @@ type activePartner struct {
taskQueue pq.PQ
}

func newActivePartner() *activePartner {
func newActivePartner(target peer.ID) *activePartner {
return &activePartner{
target: target,
taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: cid.NewSet(),
}
Expand Down Expand Up @@ -323,6 +330,7 @@ func (p *activePartner) StartTask(k cid.Cid) {
// TaskDone signals that a task was completed for this partner.
func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Lock()

p.activeBlocks.Remove(k)
p.active--
if p.active < 0 {
Expand All @@ -331,6 +339,12 @@ func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Unlock()
}

func (p *activePartner) IsIdle() bool {
p.activelk.Lock()
defer p.activelk.Unlock()
return p.requests == 0 && p.active == 0
}

// Index implements pq.Elem.
func (p *activePartner) Index() int {
return p.index
Expand Down
32 changes: 32 additions & 0 deletions decision/peer_request_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,35 @@ func TestPeerRepeats(t *testing.T) {
}
}
}

func TestCleaningUpQueues(t *testing.T) {
partner := testutil.RandPeerIDFatal(t)
var entries []wantlist.Entry
for i := 0; i < 5; i++ {
entries = append(entries, wantlist.Entry{Cid: cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))})
}

prq := newPRQ()

// push a block, pop a block, complete everything, should be removed
prq.Push(partner, entries...)
task := prq.Pop()
task.Done(task.Entries)
task = prq.Pop()

if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}

// push a block, remove each of its entries, should be removed
prq.Push(partner, entries...)
for _, entry := range entries {
prq.Remove(entry.Cid, partner)
}
task = prq.Pop()

if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}

}