Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

WIP improve provider request rate limiting #138

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 47 additions & 41 deletions providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,43 @@ func (pqm *ProviderQueryManager) cancelProviderRequest(k cid.Cid, incomingProvid
}
}

func (pqm *ProviderQueryManager) findProviders(fpr *findProviderRequest) {
k := fpr.k
log.Debugf("Beginning Find Provider Request for cid: %s", k.String())
pqm.timeoutMutex.RLock()
findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
pqm.timeoutMutex.RUnlock()
providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := pqm.network.ConnectTo(findProviderCtx, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
return
}
select {
case pqm.providerQueryMessages <- &receivedProviderMessage{
k: k,
p: p,
}:
case <-pqm.ctx.Done():
return
}
}(p)
}
cancel()
wg.Wait()
select {
case pqm.providerQueryMessages <- &finishedProviderQueryMessage{
k: k,
}:
case <-pqm.ctx.Done():
}
}

func (pqm *ProviderQueryManager) findProviderWorker() {
// findProviderWorker just cycles through incoming provider queries one
// at a time. We have six of these workers running at once
Expand All @@ -226,40 +263,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
if !ok {
return
}
k := fpr.k
log.Debugf("Beginning Find Provider Request for cid: %s", k.String())
pqm.timeoutMutex.RLock()
findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
pqm.timeoutMutex.RUnlock()
providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := pqm.network.ConnectTo(findProviderCtx, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
return
}
select {
case pqm.providerQueryMessages <- &receivedProviderMessage{
k: k,
p: p,
}:
case <-pqm.ctx.Done():
return
}
}(p)
}
cancel()
wg.Wait()
select {
case pqm.providerQueryMessages <- &finishedProviderQueryMessage{
k: k,
}:
case <-pqm.ctx.Done():
}
pqm.findProviders(fpr)
case <-pqm.ctx.Done():
return
}
Expand All @@ -271,7 +275,7 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() {
// buffer for incoming provider queries and dispatches to the find
// provider workers as they become available
// based on: https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
var providerQueryRequestBuffer []*findProviderRequest
/*var providerQueryRequestBuffer []*findProviderRequest
nextProviderQuery := func() *findProviderRequest {
if len(providerQueryRequestBuffer) == 0 {
return nil
Expand All @@ -283,17 +287,19 @@ func (pqm *ProviderQueryManager) providerRequestBufferWorker() {
return nil
}
return pqm.providerRequestsProcessing
}
}*/

for {
select {
case incomingRequest, ok := <-pqm.incomingFindProviderRequests:
if !ok {
return
}
providerQueryRequestBuffer = append(providerQueryRequestBuffer, incomingRequest)
case outgoingRequests() <- nextProviderQuery():
providerQueryRequestBuffer = providerQueryRequestBuffer[1:]

/* providerQueryRequestBuffer = append(providerQueryRequestBuffer, incomingRequest)
case outgoingRequests() <- nextProviderQuery():
providerQueryRequestBuffer = providerQueryRequestBuffer[1:]*/
go pqm.findProviders(incomingRequest)
case <-pqm.ctx.Done():
return
}
Expand All @@ -313,9 +319,9 @@ func (pqm *ProviderQueryManager) run() {
defer pqm.cleanupInProcessRequests()

go pqm.providerRequestBufferWorker()
for i := 0; i < maxInProcessRequests; i++ {
/*for i := 0; i < maxInProcessRequests; i++ {
go pqm.findProviderWorker()
}
}*/

for {
select {
Expand Down
4 changes: 2 additions & 2 deletions providerquerymanager/providerquerymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) {

}

func TestRateLimitingRequests(t *testing.T) {
/*func TestRateLimitingRequests(t *testing.T) {
peers := testutil.GeneratePeers(10)
fpn := &fakeProviderNetwork{
peersFound: peers,
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestRateLimitingRequests(t *testing.T) {
if fpn.queriesMade != maxInProcessRequests+1 {
t.Fatal("Did not make all seperate requests")
}
}
}*/

func TestFindProviderTimeout(t *testing.T) {
peers := testutil.GeneratePeers(10)
Expand Down