From a2c78b06f96fee295eb4819b75b20ef52abc9f6f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 11 Jun 2019 14:49:28 -0700 Subject: [PATCH] feat(providerquerymanager): do not rate limit requests remove rate limit on simultaneous requests for more providers License: MIT Signed-off-by: Jakub Sztandera --- providerquerymanager/providerquerymanager.go | 88 ++++++++++--------- .../providerquerymanager_test.go | 4 +- 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/providerquerymanager/providerquerymanager.go b/providerquerymanager/providerquerymanager.go index e1f77edf..05c202a9 100644 --- a/providerquerymanager/providerquerymanager.go +++ b/providerquerymanager/providerquerymanager.go @@ -216,6 +216,43 @@ func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProvid } } +func (pqm *ProviderQueryManager) findProviders(fpr *findProviderRequest) { + k := fpr.k + log.Debugf("Beginning Find Provider Request for cid: %s", k.String()) + pqm.timeoutMutex.RLock() + findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) + pqm.timeoutMutex.RUnlock() + providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) + wg := &sync.WaitGroup{} + for p := range providers { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + err := pqm.network.ConnectTo(findProviderCtx, p) + if err != nil { + log.Debugf("failed to connect to provider %s: %s", p, err) + return + } + select { + case pqm.providerQueryMessages <- &receivedProviderMessage{ + k: k, + p: p, + }: + case <-pqm.ctx.Done(): + return + } + }(p) + } + cancel() + wg.Wait() + select { + case pqm.providerQueryMessages <- &finishedProviderQueryMessage{ + k: k, + }: + case <-pqm.ctx.Done(): + } +} + func (pqm *ProviderQueryManager) findProviderWorker() { // findProviderWorker just cycles through incoming provider queries one // at a time. We have six of these workers running at once @@ -226,40 +263,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { if !ok { return } - k := fpr.k - log.Debugf("Beginning Find Provider Request for cid: %s", k.String()) - pqm.timeoutMutex.RLock() - findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout) - pqm.timeoutMutex.RUnlock() - providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders) - wg := &sync.WaitGroup{} - for p := range providers { - wg.Add(1) - go func(p peer.ID) { - defer wg.Done() - err := pqm.network.ConnectTo(findProviderCtx, p) - if err != nil { - log.Debugf("failed to connect to provider %s: %s", p, err) - return - } - select { - case pqm.providerQueryMessages <- &receivedProviderMessage{ - k: k, - p: p, - }: - case <-pqm.ctx.Done(): - return - } - }(p) - } - cancel() - wg.Wait() - select { - case pqm.providerQueryMessages <- &finishedProviderQueryMessage{ - k: k, - }: - case <-pqm.ctx.Done(): - } + pqm.findProviders(fpr) case <-pqm.ctx.Done(): return } @@ -271,7 +275,7 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() { // buffer for incoming provider queries and dispatches to the find // provider workers as they become available // based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd - var providerQueryRequestBuffer []*findProviderRequest + /*var providerQueryRequestBuffer []*findProviderRequest nextProviderQuery := func() *findProviderRequest { if len(providerQueryRequestBuffer) == 0 { return nil @@ -283,7 +287,7 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() { return nil } return pqm.providerRequestsProcessing - } + }*/ for { select { @@ -291,9 +295,11 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() { if !ok { return } - providerQueryRequestBuffer = append(providerQueryRequestBuffer, incomingRequest) - case outgoingRequests() <- nextProviderQuery(): - providerQueryRequestBuffer = providerQueryRequestBuffer[1:] + + /* providerQueryRequestBuffer = append(providerQueryRequestBuffer, incomingRequest) + case outgoingRequests() <- nextProviderQuery(): + providerQueryRequestBuffer = providerQueryRequestBuffer[1:]*/ + go pqm.findProviders(incomingRequest) case <-pqm.ctx.Done(): return } @@ -313,9 +319,9 @@ func (pqm *ProviderQueryManager) run() { defer pqm.cleanupInProcessRequests() go pqm.providerRequestBufferWorker() - for i := 0; i < maxInProcessRequests; i++ { + /*for i := 0; i < maxInProcessRequests; i++ { go pqm.findProviderWorker() - } + }*/ for { select { diff --git a/providerquerymanager/providerquerymanager_test.go b/providerquerymanager/providerquerymanager_test.go index 689c5ec2..33aedd54 100644 --- a/providerquerymanager/providerquerymanager_test.go +++ b/providerquerymanager/providerquerymanager_test.go @@ -243,7 +243,7 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) { } -func TestRateLimitingRequests(t *testing.T) { +/*func TestRateLimitingRequests(t *testing.T) { peers := testutil.GeneratePeers(10) fpn := &fakeProviderNetwork{ peersFound: peers, @@ -279,7 +279,7 @@ func TestRateLimitingRequests(t *testing.T) { if fpn.queriesMade != maxInProcessRequests+1 { t.Fatal("Did not make all seperate requests") } -} +}*/ func TestFindProviderTimeout(t *testing.T) { peers := testutil.GeneratePeers(10)