Skip to content

Commit

Permalink
Small cleanup in mercury v0.3 request code (#11442)
Browse files Browse the repository at this point in the history
  • Loading branch information
infiloop2 authored and Borja Aranda committed Dec 14, 2023
1 parent 1b20f8a commit 73696e7
Showing 1 changed file with 13 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package v03
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -62,42 +61,34 @@ func NewClient(mercuryConfig mercury.MercuryConfigProvider, httpClient mercury.H
}

func (c *client) DoRequest(ctx context.Context, streamsLookup *mercury.StreamsLookup, pluginRetryKey string) (mercury.MercuryUpkeepState, mercury.MercuryUpkeepFailureReason, [][]byte, bool, time.Duration, error) {
resultLen := len(streamsLookup.Feeds)
ch := make(chan mercury.MercuryData, resultLen)
if len(streamsLookup.Feeds) == 0 {
return mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", streamsLookup.FeedParamKey, streamsLookup.TimeParamKey, streamsLookup.Feeds)
}
resultLen = 1
resultLen := 1 // Only 1 multi-feed request is made for all feeds
ch := make(chan mercury.MercuryData, resultLen)
c.threadCtrl.Go(func(ctx context.Context) {
c.multiFeedsRequest(ctx, ch, streamsLookup)
})

var reqErr error
var retryInterval time.Duration
results := make([][]byte, len(streamsLookup.Feeds))
retryable := true
allSuccess := true
retryable := false
state := mercury.NoPipelineError

for i := 0; i < resultLen; i++ {
m := <-ch
if m.Error != nil {
reqErr = errors.Join(reqErr, m.Error)
retryable = retryable && m.Retryable
allSuccess = false
if m.State != mercury.NoPipelineError {
state = m.State
}
continue
m := <-ch
if m.Error != nil {
reqErr = m.Error
retryable = m.Retryable
state = m.State
if retryable {
retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig)
}
} else {
results = m.Bytes
}
// only retry when not all successful AND none are not retryable
if retryable && !allSuccess {
retryInterval = mercury.CalculateRetryConfigFn(pluginRetryKey, c.mercuryConfig)
}
// only retry when not all successful AND none are not retryable
return state, mercury.MercuryUpkeepFailureReasonNone, results, retryable && !allSuccess, retryInterval, reqErr

return state, mercury.MercuryUpkeepFailureReasonNone, results, retryable, retryInterval, reqErr
}

func (c *client) multiFeedsRequest(ctx context.Context, ch chan<- mercury.MercuryData, sl *mercury.StreamsLookup) {
Expand Down

0 comments on commit 73696e7

Please sign in to comment.