From 44fe93eaf18f1997d2490f687e62427212d62d08 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 5 Feb 2019 10:56:16 -0800 Subject: [PATCH] fix(providerquerymanager): minor channel cleanup Keep channels unblocked in cancelling request -- refactored to function. Also cancel find provider context as soon as it can be. --- providerquerymanager/providerquerymanager.go | 41 ++++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/providerquerymanager/providerquerymanager.go b/providerquerymanager/providerquerymanager.go index b84463a7..1a431214 100644 --- a/providerquerymanager/providerquerymanager.go +++ b/providerquerymanager/providerquerymanager.go @@ -170,22 +170,8 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k case <-pqm.ctx.Done(): return case <-sessionCtx.Done(): - pqm.providerQueryMessages <- &cancelRequestMessage{ - incomingProviders: incomingProviders, - k: k, - } - // clear out any remaining providers, in case and "incoming provider" - // messages get processed before our cancel message - for { - select { - case _, ok := <-incomingProviders: - if !ok { - return - } - case <-pqm.ctx.Done(): - return - } - } + pqm.cancelProviderRequest(k, incomingProviders) + return case provider, ok := <-incomingProviders: if !ok { incomingProviders = nil @@ -200,6 +186,27 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k return returnedProviders } +func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProviders chan peer.ID) { + cancelMessageChannel := pqm.providerQueryMessages + for { + select { + case cancelMessageChannel <- &cancelRequestMessage{ + incomingProviders: incomingProviders, + k: k, + }: + cancelMessageChannel = nil + // clear out any remaining providers, in case and "incoming provider" + // messages get processed before our cancel message + case _, ok := <-incomingProviders: + if !ok { + return + } + case <-pqm.ctx.Done(): + return + } + } +} + func (pqm *ProviderQueryManager) findProviderWorker() { // findProviderWorker just cycles through incoming provider queries one // at a time. We have six of these workers running at once @@ -215,7 +222,6 @@ func (pqm *ProviderQueryManager) findProviderWorker() { pqm.timeoutMutex.RLock() findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) pqm.timeoutMutex.RUnlock() - defer cancel() providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) wg := &sync.WaitGroup{} for p := range providers { @@ -237,6 +243,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { } }(p) } + cancel() wg.Wait() select { case pqm.providerQueryMessages <- &finishedProviderQueryMessage{