Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix(providerquerymanager): minor channel cleanup
Browse files Browse the repository at this point in the history
Keep channels unblocked in cancelling request -- refactored to function. Also cancel find provider
context as soon as it can be.
  • Loading branch information
hannahhoward committed Feb 5, 2019
1 parent b48b3c3 commit 44fe93e
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,22 +170,8 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
case <-pqm.ctx.Done():
return
case <-sessionCtx.Done():
pqm.providerQueryMessages <- &cancelRequestMessage{
incomingProviders: incomingProviders,
k: k,
}
// clear out any remaining providers, in case and "incoming provider"
// messages get processed before our cancel message
for {
select {
case _, ok := <-incomingProviders:
if !ok {
return
}
case <-pqm.ctx.Done():
return
}
}
pqm.cancelProviderRequest(k, incomingProviders)
return
case provider, ok := <-incomingProviders:
if !ok {
incomingProviders = nil
Expand All @@ -200,6 +186,27 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
return returnedProviders
}

func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProviders chan peer.ID) {
cancelMessageChannel := pqm.providerQueryMessages
for {
select {
case cancelMessageChannel <- &cancelRequestMessage{
incomingProviders: incomingProviders,
k: k,
}:
cancelMessageChannel = nil
// clear out any remaining providers, in case and "incoming provider"
// messages get processed before our cancel message
case _, ok := <-incomingProviders:
if !ok {
return
}
case <-pqm.ctx.Done():
return
}
}
}

func (pqm *ProviderQueryManager) findProviderWorker() {
// findProviderWorker just cycles through incoming provider queries one
// at a time. We have six of these workers running at once
Expand All @@ -215,7 +222,6 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
pqm.timeoutMutex.RLock()
findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
pqm.timeoutMutex.RUnlock()
defer cancel()
providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
wg := &sync.WaitGroup{}
for p := range providers {
Expand All @@ -237,6 +243,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
}
}(p)
}
cancel()
wg.Wait()
select {
case pqm.providerQueryMessages <- &finishedProviderQueryMessage{
Expand Down

0 comments on commit 44fe93e

Please sign in to comment.