Skip to content

Commit

Permalink
Merge pull request ipfs#262 from raulk/fix/dialqueue-enqchan
Browse files Browse the repository at this point in the history
dial queue: fix possible goroutine leak
  • Loading branch information
raulk authored Feb 19, 2019
2 parents 52b75dd + 4f0cf48 commit 5f67727
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type waitingCh struct {
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
sq := &dialQueue{
dq := &dialQueue{
dqParams: params,
nWorkers: params.config.minParallelism,
out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
Expand All @@ -121,10 +121,10 @@ func newDialQueue(params *dqParams) (*dialQueue, error) {
}

for i := 0; i < int(params.config.minParallelism); i++ {
go sq.worker()
go dq.worker()
}
go sq.control()
return sq, nil
go dq.control()
return dq, nil
}

func (dq *dialQueue) control() {
Expand Down Expand Up @@ -323,7 +323,14 @@ func (dq *dialQueue) worker() {
}
logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
waiting := len(dq.waitingCh)
dq.out.EnqChan <- p

// by the time we're done dialling, it's possible that the context is closed, in which case there will
// be nobody listening on dq.out.EnqChan and we could block forever.
select {
case dq.out.EnqChan <- p:
case <-dq.ctx.Done():
return
}
if waiting > 0 {
// we have somebody to deliver this value to, so no need to shrink.
continue
Expand Down

0 comments on commit 5f67727

Please sign in to comment.