diff --git a/core/scripts/chaincli/handler/debug.go b/core/scripts/chaincli/handler/debug.go index 0075862d95d..fec8c6cd414 100644 --- a/core/scripts/chaincli/handler/debug.go +++ b/core/scripts/chaincli/handler/debug.go @@ -22,17 +22,12 @@ import ( ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" - evm21 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21" - "github.com/smartcontractkit/chainlink/core/scripts/chaincli/config" "github.com/smartcontractkit/chainlink/core/scripts/common" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/models" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding" - "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams" "github.com/smartcontractkit/chainlink/v2/core/utils" bigmath "github.com/smartcontractkit/chainlink/v2/core/utils/big_math" @@ -41,7 +36,12 @@ import ( const ( ConditionTrigger uint8 = iota LogTrigger + + blockNumber = "blockNumber" expectedTypeAndVersion = "KeeperRegistry 2.1.0" + feedIdHex = "feedIdHex" + feedIDs = "feedIDs" + timestamp = "timestamp" ) var packer = encoding.NewAbiPacker() @@ -125,8 +125,6 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { var checkResult iregistry21.CheckUpkeep var blockNum uint64 var performData []byte - var workID [32]byte - var trigger ocr2keepers.Trigger upkeepNeeded := false // check upkeep if triggerType == ConditionTrigger { @@ -179,8 +177,7 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { } // check that tx for this upkeep / tx was not already performed message(fmt.Sprintf("LogTrigger{blockNum: %d, blockHash: %s, txHash: %s, logIndex: %d}", blockNum, receipt.BlockHash.Hex(), txHash, logIndex)) - trigger = mustAutomationTrigger(txHash, logIndex, blockNum, receipt.BlockHash) - workID = mustUpkeepWorkID(upkeepID, trigger) + workID := mustUpkeepWorkID(upkeepID, blockNum, receipt.BlockHash, txHash, logIndex) message(fmt.Sprintf("workID computed: %s", hex.EncodeToString(workID[:]))) hasKey, err := keeperRegistry21.HasDedupKey(latestCallOpts, workID) if err != nil { @@ -232,82 +229,73 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { if checkResult.UpkeepFailureReason != 0 { message(fmt.Sprintf("checkUpkeep failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason)) } - if checkResult.UpkeepFailureReason == uint8(encoding.UpkeepFailureReasonTargetCheckReverted) { - mc := &models.MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey} - mercuryConfig := evm21.NewMercuryConfig(mc, core.StreamsCompatibleABI) - lggr, _ := logger.NewLogger() - blockSub := &blockSubscriber{k.client} - streams := streams.NewStreamsLookup(packer, mercuryConfig, blockSub, k.rpcClient, keeperRegistry21, lggr) + // TODO use the new streams lookup lib + //mc := &models.MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey} + //mercuryConfig := evm.NewMercuryConfig(mc, core.StreamsCompatibleABI) + //lggr, _ := logger.NewLogger() + //blockSub := &blockSubscriber{k.client} + //_ = streams.NewStreamsLookup(packer, mercuryConfig, blockSub, keeperRegistry21, k.rpcClient, lggr) streamsLookupErr, err := packer.DecodeStreamsLookupRequest(checkResult.PerformData) if err == nil { message("upkeep reverted with StreamsLookup") message(fmt.Sprintf("StreamsLookup data: {FeedParamKey: %s, Feeds: %v, TimeParamKey: %s, Time: %d, ExtraData: %s}", streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time.Uint64(), hexutil.Encode(streamsLookupErr.ExtraData))) - - streamsLookup := &mercury.StreamsLookup{ - StreamsLookupError: &mercury.StreamsLookupError{ - FeedParamKey: streamsLookupErr.FeedParamKey, - Feeds: streamsLookupErr.Feeds, - TimeParamKey: streamsLookupErr.TimeParamKey, - Time: streamsLookupErr.Time, - ExtraData: streamsLookupErr.ExtraData, - }, - UpkeepId: upkeepID, - Block: blockNum, - } - - if streamsLookup.IsMercuryV02() { + if streamsLookupErr.FeedParamKey == feedIdHex && streamsLookupErr.TimeParamKey == blockNumber { message("using mercury lookup v0.2") - // check if upkeep is allowed to use mercury v0.2 - _, _, _, allowed, err := streams.AllowedToUseMercury(latestCallOpts, upkeepID) + // handle v0.2 + cfg, err := keeperRegistry21.GetUpkeepPrivilegeConfig(triggerCallOpts, upkeepID) if err != nil { - failUnknown("failed to check if upkeep is allowed to use mercury", err) + failUnknown("failed to get upkeep privilege config ", err) + } + allowed := false + if len(cfg) > 0 { + var privilegeConfig streams.UpkeepPrivilegeConfig + if err := json.Unmarshal(cfg, &privilegeConfig); err != nil { + failUnknown("failed to unmarshal privilege config ", err) + } + allowed = privilegeConfig.MercuryEnabled } if !allowed { resolveIneligible("upkeep reverted with StreamsLookup but is not allowed to access streams") } - } else if streamsLookup.IsMercuryV03() { + } else if streamsLookupErr.FeedParamKey != feedIDs || streamsLookupErr.TimeParamKey != timestamp { // handle v0.3 - message("using mercury lookup v0.3") - } else { resolveIneligible("upkeep reverted with StreamsLookup but the configuration is invalid") + } else { + message("using mercury lookup v0.3") } + streamsLookup := &StreamsLookup{streamsLookupErr.FeedParamKey, streamsLookupErr.Feeds, streamsLookupErr.TimeParamKey, streamsLookupErr.Time, streamsLookupErr.ExtraData, upkeepID, blockNum} if k.cfg.MercuryLegacyURL == "" || k.cfg.MercuryURL == "" || k.cfg.MercuryID == "" || k.cfg.MercuryKey == "" { failCheckConfig("Mercury configs not set properly, check your MERCURY_LEGACY_URL, MERCURY_URL, MERCURY_ID and MERCURY_KEY", nil) } - - // do mercury request - automationCheckResult := mustAutomationCheckResult(upkeepID, checkResult, trigger) - values, err := streams.DoMercuryRequest(ctx, streamsLookup, &automationCheckResult) - - if automationCheckResult.IneligibilityReason == uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) { + handler := NewMercuryLookupHandler(&MercuryCredentials{k.cfg.MercuryLegacyURL, k.cfg.MercuryURL, k.cfg.MercuryID, k.cfg.MercuryKey}, k.rpcClient) + state, failureReason, values, _, err := handler.doMercuryRequest(ctx, streamsLookup) + if failureReason == UpkeepFailureReasonInvalidRevertDataInput { resolveIneligible("upkeep used invalid revert data") } - if automationCheckResult.PipelineExecutionState == uint8(mercury.InvalidMercuryRequest) { + if state == InvalidMercuryRequest { resolveIneligible("the mercury request data is invalid") } if err != nil { - resolveIneligible("failed to DoMercuryRequest") + failCheckConfig("failed to do mercury request ", err) } - - // do checkCallback - err = streams.CheckCallback(ctx, values, streamsLookup, &automationCheckResult) + callbackResult, err := keeperRegistry21.CheckCallback(triggerCallOpts, upkeepID, values, streamsLookup.extraData) if err != nil { failUnknown("failed to execute mercury callback ", err) } - if automationCheckResult.IneligibilityReason != 0 { - message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", automationCheckResult.IneligibilityReason)) + if callbackResult.UpkeepFailureReason != 0 { + message(fmt.Sprintf("checkCallback failed with UpkeepFailureReason %d", checkResult.UpkeepFailureReason)) } - upkeepNeeded, performData = automationCheckResult.Eligible, automationCheckResult.PerformData - // do tenderly simulations for checkCallback - rawCall, err := core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.ExtraData) + upkeepNeeded, performData = callbackResult.UpkeepNeeded, callbackResult.PerformData + // do tenderly simulations + rawCall, err := core.RegistryABI.Pack("checkCallback", upkeepID, values, streamsLookup.extraData) if err != nil { failUnknown("failed to pack raw checkCallback call", err) } addLink("checkCallback simulation", tenderlySimLink(k.cfg, chainID, blockNum, rawCall, registryAddress)) - rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.ExtraData) + rawCall, err = core.StreamsCompatibleABI.Pack("checkCallback", values, streamsLookup.extraData) if err != nil { failUnknown("failed to pack raw checkCallback (direct) call", err) } @@ -329,23 +317,6 @@ func (k *Keeper) Debug(ctx context.Context, args []string) { } } -func mustAutomationCheckResult(upkeepID *big.Int, checkResult iregistry21.CheckUpkeep, trigger ocr2keepers.Trigger) ocr2keepers.CheckResult { - upkeepIdentifier := mustUpkeepIdentifier(upkeepID) - checkResult2 := ocr2keepers.CheckResult{ - Eligible: checkResult.UpkeepNeeded, - IneligibilityReason: checkResult.UpkeepFailureReason, - UpkeepID: upkeepIdentifier, - Trigger: trigger, - WorkID: core.UpkeepWorkID(upkeepIdentifier, trigger), - GasAllocated: 0, - PerformData: checkResult.PerformData, - FastGasWei: checkResult.FastGasWei, - LinkNative: checkResult.LinkNative, - } - - return checkResult2 -} - type blockSubscriber struct { ethClient *ethclient.Client } @@ -399,27 +370,9 @@ func packTriggerData(log *types.Log, blockTime uint64) ([]byte, error) { return b, nil } -func mustUpkeepWorkID(upkeepID *big.Int, trigger ocr2keepers.Trigger) [32]byte { - upkeepIdentifier := mustUpkeepIdentifier(upkeepID) - - workID := core.UpkeepWorkID(upkeepIdentifier, trigger) - workIDBytes, err := hex.DecodeString(workID) - if err != nil { - failUnknown("failed to decode workID", err) - } - - var result [32]byte - copy(result[:], workIDBytes[:]) - return result -} - -func mustUpkeepIdentifier(upkeepID *big.Int) ocr2keepers.UpkeepIdentifier { - upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{} - upkeepIdentifier.FromBigInt(upkeepID) - return *upkeepIdentifier -} - -func mustAutomationTrigger(txHash [32]byte, logIndex int64, blockNum uint64, blockHash [32]byte) ocr2keepers.Trigger { +func mustUpkeepWorkID(upkeepID *big.Int, blockNum uint64, blockHash [32]byte, txHash [32]byte, logIndex int64) [32]byte { + // TODO - this is a copy of the code in core.UpkeepWorkID + // We should refactor that code to be more easily exported ex not rely on Trigger structs trigger := ocr2keepers.Trigger{ LogTriggerExtension: &ocr2keepers.LogTriggerExtension{ TxHash: txHash, @@ -428,7 +381,16 @@ func mustAutomationTrigger(txHash [32]byte, logIndex int64, blockNum uint64, blo BlockHash: blockHash, }, } - return trigger + upkeepIdentifier := &ocr2keepers.UpkeepIdentifier{} + upkeepIdentifier.FromBigInt(upkeepID) + workID := core.UpkeepWorkID(*upkeepIdentifier, trigger) + workIDBytes, err := hex.DecodeString(workID) + if err != nil { + failUnknown("failed to decode workID", err) + } + var result [32]byte + copy(result[:], workIDBytes[:]) + return result } func message(msg string) { @@ -440,11 +402,11 @@ func warning(msg string) { } func resolveIneligible(msg string) { - exit(fmt.Sprintf("✅ %s: this upkeep is not currently eligible", msg), nil, 0) + exit(fmt.Sprintf("✅ %s: this upkeep is not currently elligible", msg), nil, 0) } func resolveEligible() { - exit("❌ this upkeep is currently eligible", nil, 0) + exit("❌ this upkeep is currently elligible", nil, 0) } func rerun(msg string, err error) { @@ -545,3 +507,5 @@ func tenderlySimLink(cfg *config.Config, chainID int64, blockNumber uint64, inpu } return common.TenderlySimLink(responseJSON.Simulation.Id) } + +// TODO - link to performUpkeep tx if exists diff --git a/core/scripts/chaincli/handler/mercury_lookup_handler.go b/core/scripts/chaincli/handler/mercury_lookup_handler.go new file mode 100644 index 00000000000..1bd4b2e183c --- /dev/null +++ b/core/scripts/chaincli/handler/mercury_lookup_handler.go @@ -0,0 +1,534 @@ +package handler + +import ( + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "math/big" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/avast/retry-go" + ethabi "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/rpc" + "github.com/pkg/errors" +) + +// MercuryLookupHandler is responsible for initiating the calls to the Mercury server +// to determine whether the upkeeps are eligible +type MercuryLookupHandler struct { + credentials *MercuryCredentials + httpClient HttpClient + rpcClient *rpc.Client +} + +func NewMercuryLookupHandler( + credentials *MercuryCredentials, + rpcClient *rpc.Client, +) *MercuryLookupHandler { + return &MercuryLookupHandler{ + credentials: credentials, + httpClient: http.DefaultClient, + rpcClient: rpcClient, + } +} + +type MercuryVersion string + +type StreamsLookup struct { + feedParamKey string + feeds []string + timeParamKey string + time *big.Int + extraData []byte + upkeepId *big.Int + block uint64 +} + +//go:generate mockery --quiet --name HttpClient --output ./mocks/ --case=underscore +type HttpClient interface { + Do(req *http.Request) (*http.Response, error) +} + +type MercuryCredentials struct { + LegacyURL string + URL string + ClientID string + ClientKey string +} + +func (mc *MercuryCredentials) Validate() bool { + return mc.URL != "" && mc.ClientID != "" && mc.ClientKey != "" +} + +type MercuryData struct { + Index int + Error error + Retryable bool + Bytes [][]byte + State PipelineExecutionState +} + +// MercuryV02Response represents a JSON structure used by Mercury v0.2 +type MercuryV02Response struct { + ChainlinkBlob string `json:"chainlinkBlob"` +} + +// MercuryV03Response represents a JSON structure used by Mercury v0.3 +type MercuryV03Response struct { + Reports []MercuryV03Report `json:"reports"` +} + +type MercuryV03Report struct { + FeedID string `json:"feedID"` // feed id in hex encoded + ValidFromTimestamp uint32 `json:"validFromTimestamp"` + ObservationsTimestamp uint32 `json:"observationsTimestamp"` + FullReport string `json:"fullReport"` // the actual hex encoded mercury report of this feed, can be sent to verifier +} + +const ( + // DefaultAllowListExpiration decides how long an upkeep's allow list info will be valid for. + DefaultAllowListExpiration = 20 * time.Minute + // CleanupInterval decides when the expired items in cache will be deleted. + CleanupInterval = 25 * time.Minute +) + +const ( + ApplicationJson = "application/json" + BlockNumber = "blockNumber" // valid for v0.2 + FeedIDs = "feedIDs" // valid for v0.3 + FeedIdHex = "feedIdHex" // valid for v0.2 + HeaderAuthorization = "Authorization" + HeaderContentType = "Content-Type" + HeaderTimestamp = "X-Authorization-Timestamp" + HeaderSignature = "X-Authorization-Signature-SHA256" + HeaderUpkeepId = "X-Authorization-Upkeep-Id" + MercuryPathV2 = "/client?" // only used to access mercury v0.2 server + MercuryBatchPathV3 = "/api/v1/reports/bulk?" // only used to access mercury v0.3 server + RetryDelay = 500 * time.Millisecond + Timestamp = "timestamp" // valid for v0.3 + TotalAttempt = 3 + UserId = "userId" +) + +type UpkeepFailureReason uint8 +type PipelineExecutionState uint8 + +const ( + // upkeep failure onchain reasons + UpkeepFailureReasonNone UpkeepFailureReason = 0 + UpkeepFailureReasonUpkeepCancelled UpkeepFailureReason = 1 + UpkeepFailureReasonUpkeepPaused UpkeepFailureReason = 2 + UpkeepFailureReasonTargetCheckReverted UpkeepFailureReason = 3 + UpkeepFailureReasonUpkeepNotNeeded UpkeepFailureReason = 4 + UpkeepFailureReasonPerformDataExceedsLimit UpkeepFailureReason = 5 + UpkeepFailureReasonInsufficientBalance UpkeepFailureReason = 6 + UpkeepFailureReasonMercuryCallbackReverted UpkeepFailureReason = 7 + UpkeepFailureReasonRevertDataExceedsLimit UpkeepFailureReason = 8 + UpkeepFailureReasonRegistryPaused UpkeepFailureReason = 9 + // leaving a gap here for more onchain failure reasons in the future + // upkeep failure offchain reasons + UpkeepFailureReasonMercuryAccessNotAllowed UpkeepFailureReason = 32 + UpkeepFailureReasonTxHashNoLongerExists UpkeepFailureReason = 33 + UpkeepFailureReasonInvalidRevertDataInput UpkeepFailureReason = 34 + UpkeepFailureReasonSimulationFailed UpkeepFailureReason = 35 + UpkeepFailureReasonTxHashReorged UpkeepFailureReason = 36 + + // pipeline execution error + NoPipelineError PipelineExecutionState = 0 + CheckBlockTooOld PipelineExecutionState = 1 + CheckBlockInvalid PipelineExecutionState = 2 + RpcFlakyFailure PipelineExecutionState = 3 + MercuryFlakyFailure PipelineExecutionState = 4 + PackUnpackDecodeFailed PipelineExecutionState = 5 + MercuryUnmarshalError PipelineExecutionState = 6 + InvalidMercuryRequest PipelineExecutionState = 7 + InvalidMercuryResponse PipelineExecutionState = 8 // this will only happen if Mercury server sends bad responses + UpkeepNotAuthorized PipelineExecutionState = 9 +) + +// UpkeepPrivilegeConfig represents the administrative offchain config for each upkeep. It can be set by s_upkeepPrivilegeManager +// role on the registry. Upkeeps allowed to use Mercury server will have this set to true. +type UpkeepPrivilegeConfig struct { + MercuryEnabled bool `json:"mercuryEnabled"` +} + +// generateHMAC calculates a user HMAC for Mercury server authentication. +func (mlh *MercuryLookupHandler) generateHMAC(method string, path string, body []byte, clientId string, secret string, ts int64) string { + bodyHash := sha256.New() + bodyHash.Write(body) + hashString := fmt.Sprintf("%s %s %s %s %d", + method, + path, + hex.EncodeToString(bodyHash.Sum(nil)), + clientId, + ts) + signedMessage := hmac.New(sha256.New, []byte(secret)) + signedMessage.Write([]byte(hashString)) + userHmac := hex.EncodeToString(signedMessage.Sum(nil)) + return userHmac +} + +// singleFeedRequest sends a v0.2 Mercury request for a single feed report. +func (mlh *MercuryLookupHandler) singleFeedRequest(ctx context.Context, ch chan<- MercuryData, index int, ml *StreamsLookup) { + q := url.Values{ + ml.feedParamKey: {ml.feeds[index]}, + ml.timeParamKey: {ml.time.String()}, + } + mercuryURL := mlh.credentials.LegacyURL + reqUrl := fmt.Sprintf("%s%s%s", mercuryURL, MercuryPathV2, q.Encode()) + // mlh.logger.Debugf("request URL for upkeep %s feed %s: %s", ml.upkeepId.String(), ml.feeds[index], reqUrl) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) + if err != nil { + ch <- MercuryData{Index: index, Error: err, Retryable: false, State: InvalidMercuryRequest} + return + } + + ts := time.Now().UTC().UnixMilli() + signature := mlh.generateHMAC(http.MethodGet, MercuryPathV2+q.Encode(), []byte{}, mlh.credentials.ClientID, mlh.credentials.ClientKey, ts) + req.Header.Set(HeaderContentType, ApplicationJson) + req.Header.Set(HeaderAuthorization, mlh.credentials.ClientID) + req.Header.Set(HeaderTimestamp, strconv.FormatInt(ts, 10)) + req.Header.Set(HeaderSignature, signature) + + // in the case of multiple retries here, use the last attempt's data + state := NoPipelineError + retryable := false + sent := false + retryErr := retry.Do( + func() error { + retryable = false + resp, err1 := mlh.httpClient.Do(req) + if err1 != nil { + // mlh.logger.Errorw("StreamsLookup GET request failed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "feed", ml.feeds[index], "error", err1) + retryable = true + state = MercuryFlakyFailure + return err1 + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + // mlh.logger.Errorf("Encountered error when closing the body of the response in single feed: %s", err) + } + }(resp.Body) + + body, err1 := io.ReadAll(resp.Body) + if err1 != nil { + retryable = false + state = InvalidMercuryResponse + return err1 + } + + if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError { + // mlh.logger.Errorw("StreamsLookup received retryable status code", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "statusCode", resp.StatusCode, "feed", ml.feeds[index]) + retryable = true + state = MercuryFlakyFailure + return errors.New(strconv.FormatInt(int64(resp.StatusCode), 10)) + } else if resp.StatusCode != http.StatusOK { + retryable = false + state = InvalidMercuryRequest + return fmt.Errorf("StreamsLookup upkeep %s block %s received status code %d for feed %s", ml.upkeepId.String(), ml.time.String(), resp.StatusCode, ml.feeds[index]) + } + + // mlh.logger.Debugf("at block %s upkeep %s received status code %d from mercury v0.2 with BODY=%s", ml.time.String(), ml.upkeepId.String(), resp.StatusCode, hexutil.Encode(body)) + + var m MercuryV02Response + err1 = json.Unmarshal(body, &m) + if err1 != nil { + // mlh.logger.Errorw("StreamsLookup failed to unmarshal body to MercuryResponse", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "feed", ml.feeds[index], "error", err1) + retryable = false + state = MercuryUnmarshalError + return err1 + } + blobBytes, err1 := hexutil.Decode(m.ChainlinkBlob) + if err1 != nil { + // mlh.logger.Errorw("StreamsLookup failed to decode chainlinkBlob for feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "blob", m.ChainlinkBlob, "feed", ml.feeds[index], "error", err1) + retryable = false + state = InvalidMercuryResponse + return err1 + } + ch <- MercuryData{ + Index: index, + Bytes: [][]byte{blobBytes}, + Retryable: false, + State: NoPipelineError, + } + sent = true + return nil + }, + // only retry when the error is 404 Not Found or 500 Internal Server Error + retry.RetryIf(func(err error) bool { + return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) + }), + retry.Context(ctx), + retry.Delay(RetryDelay), + retry.Attempts(TotalAttempt)) + + if !sent { + md := MercuryData{ + Index: index, + Bytes: [][]byte{}, + Retryable: retryable, + Error: fmt.Errorf("failed to request feed for %s: %w", ml.feeds[index], retryErr), + State: state, + } + ch <- md + } +} + +// multiFeedsRequest sends a Mercury v0.3 request for a multi-feed report +func (mlh *MercuryLookupHandler) multiFeedsRequest(ctx context.Context, ch chan<- MercuryData, ml *StreamsLookup) { + params := fmt.Sprintf("%s=%s&%s=%s", FeedIDs, strings.Join(ml.feeds, ","), Timestamp, ml.time.String()) + reqUrl := fmt.Sprintf("%s%s%s", mlh.credentials.URL, MercuryBatchPathV3, params) + // mlh.logger.Debugf("request URL for upkeep %s userId %s: %s", ml.upkeepId.String(), mlh.credentials.ClientID, reqUrl) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil) + if err != nil { + ch <- MercuryData{Index: 0, Error: err, Retryable: false, State: InvalidMercuryRequest} + return + } + + ts := time.Now().UTC().UnixMilli() + signature := mlh.generateHMAC(http.MethodGet, MercuryBatchPathV3+params, []byte{}, mlh.credentials.ClientID, mlh.credentials.ClientKey, ts) + req.Header.Set(HeaderContentType, ApplicationJson) + // username here is often referred to as user id + req.Header.Set(HeaderAuthorization, mlh.credentials.ClientID) + req.Header.Set(HeaderTimestamp, strconv.FormatInt(ts, 10)) + req.Header.Set(HeaderSignature, signature) + // mercury will inspect authorization headers above to make sure this user (in automation's context, this node) is eligible to access mercury + // and if it has an automation role. it will then look at this upkeep id to check if it has access to all the requested feeds. + req.Header.Set(HeaderUpkeepId, ml.upkeepId.String()) + + // in the case of multiple retries here, use the last attempt's data + state := NoPipelineError + retryable := false + sent := false + retryErr := retry.Do( + func() error { + retryable = false + resp, err1 := mlh.httpClient.Do(req) + if err1 != nil { + // mlh.logger.Errorw("StreamsLookup GET request fails for multi feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "error", err1) + retryable = true + state = MercuryFlakyFailure + return err1 + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + // mlh.logger.Errorf("Encountered error when closing the body of the response in the multi feed: %s", err) + } + }(resp.Body) + body, err1 := io.ReadAll(resp.Body) + if err1 != nil { + retryable = false + state = InvalidMercuryResponse + return err1 + } + + // mlh.logger.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) + if resp.StatusCode == http.StatusUnauthorized { + retryable = false + state = UpkeepNotAuthorized + return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) + } else if resp.StatusCode == http.StatusBadRequest { + retryable = false + state = InvalidMercuryRequest + return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by invalid format of timestamp", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) + } else if resp.StatusCode == http.StatusInternalServerError { + retryable = true + state = MercuryFlakyFailure + return fmt.Errorf("%d", http.StatusInternalServerError) + } else if resp.StatusCode == 420 { + // in 0.3, this will happen when missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds + retryable = false + state = InvalidMercuryRequest + return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by missing/malformed query args, missing or bad required headers, non-existent feeds, or no permissions for feeds", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) + } else if resp.StatusCode != http.StatusOK { + retryable = false + state = InvalidMercuryRequest + return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", ml.time.String(), ml.upkeepId.String(), resp.StatusCode) + } + + var response MercuryV03Response + err1 = json.Unmarshal(body, &response) + if err1 != nil { + // mlh.logger.Errorw("StreamsLookup failed to unmarshal body to MercuryResponse for multi feed", "upkeepID", ml.upkeepId.String(), "time", ml.time.String(), "error", err1) + retryable = false + state = MercuryUnmarshalError + return err1 + } + // in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract + // hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated + if len(response.Reports) != len(ml.feeds) { + // TODO: AUTO-5044: calculate what reports are missing and log a warning + retryable = true + state = MercuryFlakyFailure + return fmt.Errorf("%d", http.StatusNotFound) + } + var reportBytes [][]byte + for _, rsp := range response.Reports { + b, err := hexutil.Decode(rsp.FullReport) + if err != nil { + retryable = false + state = InvalidMercuryResponse + return err + } + reportBytes = append(reportBytes, b) + } + ch <- MercuryData{ + Index: 0, + Bytes: reportBytes, + Retryable: false, + State: NoPipelineError, + } + sent = true + return nil + }, + // only retry when the error is 404 Not Found or 500 Internal Server Error + retry.RetryIf(func(err error) bool { + return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) + }), + retry.Context(ctx), + retry.Delay(RetryDelay), + retry.Attempts(TotalAttempt)) + + if !sent { + md := MercuryData{ + Index: 0, + Bytes: [][]byte{}, + Retryable: retryable, + Error: retryErr, + State: state, + } + ch <- md + } +} + +// doMercuryRequest sends requests to Mercury API to retrieve ChainlinkBlob. +func (mlh *MercuryLookupHandler) doMercuryRequest(ctx context.Context, ml *StreamsLookup) (PipelineExecutionState, UpkeepFailureReason, [][]byte, bool, error) { + var isMercuryV03 bool + resultLen := len(ml.feeds) + ch := make(chan MercuryData, resultLen) + if len(ml.feeds) == 0 { + return NoPipelineError, UpkeepFailureReasonInvalidRevertDataInput, nil, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", ml.feedParamKey, ml.timeParamKey, ml.feeds) + } + if ml.feedParamKey == FeedIdHex && ml.timeParamKey == BlockNumber { + // only v0.2 + for i := range ml.feeds { + go mlh.singleFeedRequest(ctx, ch, i, ml) + } + } else if ml.feedParamKey == FeedIDs && ml.timeParamKey == Timestamp { + // only v0.3 + resultLen = 1 + isMercuryV03 = true + ch = make(chan MercuryData, resultLen) + go mlh.multiFeedsRequest(ctx, ch, ml) + } else { + return NoPipelineError, UpkeepFailureReasonInvalidRevertDataInput, nil, false, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", ml.feedParamKey, ml.timeParamKey, ml.feeds) + } + + var reqErr error + results := make([][]byte, len(ml.feeds)) + retryable := true + allSuccess := true + // in v0.2, use the last execution error as the state, if no execution errors, state will be no error + state := NoPipelineError + for i := 0; i < resultLen; i++ { + m := <-ch + if m.Error != nil { + if reqErr == nil { + reqErr = errors.New(m.Error.Error()) + } else { + reqErr = errors.New(reqErr.Error() + m.Error.Error()) + } + retryable = retryable && m.Retryable + allSuccess = false + if m.State != NoPipelineError { + state = m.State + } + continue + } + if isMercuryV03 { + results = m.Bytes + } else { + results[m.Index] = m.Bytes[0] + } + } + // only retry when not all successful AND none are not retryable + return state, UpkeepFailureReasonNone, results, retryable && !allSuccess, reqErr +} + +// decodeStreamsLookup decodes the revert error StreamsLookup(string feedParamKey, string[] feeds, string timeParamKey, uint256 time, byte[] extraData) +// func (mlh *MercuryLookupHandler) decodeStreamsLookup(data []byte) (*StreamsLookup, error) { +// e := mlh.mercuryConfig.Abi.Errors["StreamsLookup"] +// unpack, err := e.Unpack(data) +// if err != nil { +// return nil, fmt.Errorf("unpack error: %w", err) +// } +// errorParameters := unpack.([]interface{}) + +// return &StreamsLookup{ +// feedParamKey: *abi.ConvertType(errorParameters[0], new(string)).(*string), +// feeds: *abi.ConvertType(errorParameters[1], new([]string)).(*[]string), +// timeParamKey: *abi.ConvertType(errorParameters[2], new(string)).(*string), +// time: *abi.ConvertType(errorParameters[3], new(*big.Int)).(**big.Int), +// extraData: *abi.ConvertType(errorParameters[4], new([]byte)).(*[]byte), +// }, nil +// } + +// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if +// this upkeep is allowed to use Mercury service. +// func (mlh *MercuryLookupHandler) allowedToUseMercury(upkeep models.Upkeep) (bool, error) { +// allowed, ok := mlh.mercuryConfig.AllowListCache.Get(upkeep.Admin.Hex()) +// if ok { +// return allowed.(bool), nil +// } + +// if upkeep.UpkeepPrivilegeConfig == nil { +// return false, fmt.Errorf("the upkeep privilege config was not retrieved for upkeep with ID %s", upkeep.UpkeepID) +// } + +// if len(upkeep.UpkeepPrivilegeConfig) == 0 { +// return false, fmt.Errorf("the upkeep privilege config is empty") +// } + +// var a UpkeepPrivilegeConfig +// err := json.Unmarshal(upkeep.UpkeepPrivilegeConfig, &a) +// if err != nil { +// return false, fmt.Errorf("failed to unmarshal privilege config for upkeep ID %s: %v", upkeep.UpkeepID, err) +// } + +// mlh.mercuryConfig.AllowListCache.Set(upkeep.Admin.Hex(), a.MercuryEnabled, cache.DefaultExpiration) +// return a.MercuryEnabled, nil +// } + +func (mlh *MercuryLookupHandler) CheckCallback(ctx context.Context, values [][]byte, lookup *StreamsLookup, registryABI ethabi.ABI, registryAddress common.Address) (hexutil.Bytes, error) { + payload, err := registryABI.Pack("checkCallback", lookup.upkeepId, values, lookup.extraData) + if err != nil { + return nil, err + } + + var theBytes hexutil.Bytes + args := map[string]interface{}{ + "to": registryAddress.Hex(), + "data": hexutil.Bytes(payload), + } + + // call checkCallback function at the block which OCR3 has agreed upon + err = mlh.rpcClient.CallContext(ctx, &theBytes, "eth_call", args, hexutil.EncodeUint64(lookup.block)) + if err != nil { + return nil, err + } + return theBytes, nil +} diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 89ddc54ca7e..91e26caef69 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -7,6 +7,7 @@ replace github.com/smartcontractkit/chainlink/v2 => ../../ require ( github.com/ava-labs/coreth v0.12.1 + github.com/avast/retry-go v3.0.0+incompatible github.com/docker/docker v24.0.7+incompatible github.com/docker/go-connections v0.4.0 github.com/ethereum/go-ethereum v1.12.0 @@ -18,6 +19,7 @@ require ( github.com/montanaflynn/stats v0.7.1 github.com/olekukonko/tablewriter v0.0.5 github.com/pelletier/go-toml/v2 v2.1.0 + github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.1 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 @@ -234,7 +236,6 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pressly/goose/v3 v3.16.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 5f0e22fba03..1622f1d24df 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -150,6 +150,8 @@ github.com/ava-labs/avalanchego v1.10.1 h1:lBeamJ1iNq+p2oKg2nAs+A65m8vhSDjkiTDbw github.com/ava-labs/avalanchego v1.10.1/go.mod h1:ZvSXWlbkUKlbk3BsWx29a+8eVHe/WBsOxh55BSGoeRk= github.com/ava-labs/coreth v0.12.1 h1:EWSkFGHGVUxmu1pnSK/2pdcxaAVHbGspHqO3Ag+i7sA= github.com/ava-labs/coreth v0.12.1/go.mod h1:/5x54QlIKjlPebkdzTA5ic9wXdejbWOnQosztkv9jxo= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/avast/retry-go/v4 v4.5.1 h1:AxIx0HGi4VZ3I02jr78j5lZ3M6x1E0Ivxa6b0pUUh7o= github.com/avast/retry-go/v4 v4.5.1/go.mod h1:/sipNsvNB3RRuT5iNcb6h73nw3IBmXJ/H3XrCQYSOpc= github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index cb9e2dd6752..aec23431921 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -16,6 +16,7 @@ import ( "github.com/patrickmn/go-cache" ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v3/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1" @@ -91,9 +92,8 @@ func NewStreamsLookup( // Lookup looks through check upkeep results looking for any that need off chain lookup func (s *streams) Lookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult { lookups := map[int]*mercury.StreamsLookup{} - for _, checkResult := range checkResults { - copyCheckResult := checkResult - s.buildResult(ctx, ©CheckResult, lookups) + for i, checkResult := range checkResults { + s.buildResult(ctx, i, checkResult, checkResults, lookups) } var wg sync.WaitGroup @@ -101,7 +101,7 @@ func (s *streams) Lookup(ctx context.Context, checkResults []ocr2keepers.CheckRe wg.Add(1) func(i int, lookup *mercury.StreamsLookup) { s.threadCtrl.Go(func(ctx context.Context) { - s.doLookup(ctx, &wg, lookup, &checkResults[i]) + s.doLookup(ctx, &wg, lookup, i, checkResults) }) }(i, lookup) } @@ -112,7 +112,7 @@ func (s *streams) Lookup(ctx context.Context, checkResults []ocr2keepers.CheckRe } // buildResult checks if the upkeep is allowed by Mercury and builds a streams lookup request from the check result -func (s *streams) buildResult(ctx context.Context, checkResult *ocr2keepers.CheckResult, lookups map[int]*mercury.StreamsLookup) { +func (s *streams) buildResult(ctx context.Context, i int, checkResult ocr2keepers.CheckResult, checkResults []ocr2keepers.CheckResult, lookups map[int]*mercury.StreamsLookup) { lookupLggr := s.lggr.With("where", "StreamsLookup") if checkResult.IneligibilityReason != uint8(mercury.MercuryUpkeepFailureReasonTargetCheckReverted) { // Streams Lookup only works when upkeep target check reverts @@ -129,7 +129,7 @@ func (s *streams) buildResult(ctx context.Context, checkResult *ocr2keepers.Chec // Try to decode the revert error into streams lookup format. User upkeeps can revert with any reason, see if they // tried to call mercury - lookupLggr.Infof("at block %d upkeep %s trying to DecodeStreamsLookupRequest performData=%s", block, upkeepId, hexutil.Encode(checkResult.PerformData)) + lookupLggr.Infof("at block %d upkeep %s trying to DecodeStreamsLookupRequest performData=%s", block, upkeepId, hexutil.Encode(checkResults[i].PerformData)) streamsLookupErr, err := s.packer.DecodeStreamsLookupRequest(checkResult.PerformData) if err != nil { lookupLggr.Debugf("at block %d upkeep %s DecodeStreamsLookupRequest failed: %v", block, upkeepId, err) @@ -139,7 +139,7 @@ func (s *streams) buildResult(ctx context.Context, checkResult *ocr2keepers.Chec streamsLookupResponse := &mercury.StreamsLookup{StreamsLookupError: streamsLookupErr} if len(streamsLookupResponse.Feeds) == 0 { - checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) + checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) lookupLggr.Debugf("at block %s upkeep %s has empty feeds array", block, upkeepId) return } @@ -148,21 +148,21 @@ func (s *streams) buildResult(ctx context.Context, checkResult *ocr2keepers.Chec if streamsLookupResponse.IsMercuryV02() { // check permission on the registry for mercury v0.2 opts := s.buildCallOpts(ctx, block) - if state, reason, retryable, allowed, err := s.AllowedToUseMercury(opts, upkeepId.BigInt()); err != nil { + if state, reason, retryable, allowed, err := s.allowedToUseMercury(opts, upkeepId.BigInt()); err != nil { lookupLggr.Warnf("at block %s upkeep %s failed to query mercury allow list: %s", block, upkeepId, err) - checkResult.PipelineExecutionState = uint8(state) - checkResult.IneligibilityReason = uint8(reason) - checkResult.Retryable = retryable + checkResults[i].PipelineExecutionState = uint8(state) + checkResults[i].IneligibilityReason = uint8(reason) + checkResults[i].Retryable = retryable return } else if !allowed { lookupLggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId) - checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryAccessNotAllowed) + checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryAccessNotAllowed) return } } else if streamsLookupResponse.IsMercuryVersionUnkown() { // if mercury version cannot be determined, set failure reason lookupLggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId) - checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) + checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput) return } @@ -171,103 +171,71 @@ func (s *streams) buildResult(ctx context.Context, checkResult *ocr2keepers.Chec // in the revert for mercury v0.2, which is denoted by time in the struct bc starting from v0.3, only timestamp will be supported streamsLookupResponse.Block = uint64(block.Int64()) lookupLggr.Infof("at block %d upkeep %s DecodeStreamsLookupRequest feedKey=%s timeKey=%s feeds=%v time=%s extraData=%s", block, upkeepId, streamsLookupResponse.FeedParamKey, streamsLookupResponse.TimeParamKey, streamsLookupResponse.Feeds, streamsLookupResponse.Time, hexutil.Encode(streamsLookupResponse.ExtraData)) - lookups[len(lookups)] = streamsLookupResponse + lookups[i] = streamsLookupResponse } -func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *mercury.StreamsLookup, checkResult *ocr2keepers.CheckResult) { +func (s *streams) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *mercury.StreamsLookup, i int, checkResults []ocr2keepers.CheckResult) { defer wg.Done() - values, err := s.DoMercuryRequest(ctx, lookup, checkResult) - if err != nil { - s.lggr.Errorf("at block %d upkeep %s requested time %s DoMercuryRequest err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) - } + state, reason, values, retryable, retryInterval, err := mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) + pluginRetryKey := generatePluginRetryKey(checkResults[i].WorkID, lookup.Block) - if err := s.CheckCallback(ctx, values, lookup, checkResult); err != nil { - s.lggr.Errorf("at block %d upkeep %s requested time %s CheckCallback err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) + if lookup.IsMercuryV02() { + state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) + } else if lookup.IsMercuryV03() { + state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) } -} -func (s *streams) CheckCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup, checkResult *ocr2keepers.CheckResult) error { - payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData) if err != nil { - s.lggr.Errorf("at block %d upkeep %s checkCallback packing err: %s", lookup.Block, lookup.UpkeepId, err.Error()) - checkResult.Retryable = false - checkResult.PipelineExecutionState = uint8(mercury.PackUnpackDecodeFailed) - return err + s.lggr.Errorf("at block %d upkeep %s requested time %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.Block, lookup.UpkeepId, lookup.Time, retryable, retryInterval, err.Error()) + checkResults[i].Retryable = retryable + checkResults[i].RetryInterval = retryInterval + checkResults[i].PipelineExecutionState = uint8(state) + checkResults[i].IneligibilityReason = uint8(reason) + return } - var mercuryBytes hexutil.Bytes - args := map[string]interface{}{ - "to": s.registry.Address().Hex(), - "data": hexutil.Bytes(payload), + for j, v := range values { + s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest values[%d]: %s", lookup.Block, lookup.UpkeepId, lookup.Time, j, hexutil.Encode(v)) } - // call checkCallback function at the block which OCR3 has agreed upon - if err = s.client.CallContext(ctx, &mercuryBytes, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { + state, retryable, mercuryBytes, err := s.checkCallback(ctx, values, lookup) + if err != nil { s.lggr.Errorf("at block %d upkeep %s checkCallback err: %s", lookup.Block, lookup.UpkeepId, err.Error()) - checkResult.Retryable = true - checkResult.PipelineExecutionState = uint8(mercury.RpcFlakyFailure) - return err + checkResults[i].Retryable = retryable + checkResults[i].PipelineExecutionState = uint8(state) + return } - s.lggr.Infof("at block %d upkeep %s requested time %s checkCallback mercuryBytes: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(mercuryBytes)) unpackCallBackState, needed, performData, failureReason, _, err := s.packer.UnpackCheckCallbackResult(mercuryBytes) if err != nil { s.lggr.Errorf("at block %d upkeep %s requested time %s UnpackCheckCallbackResult err: %s", lookup.Block, lookup.UpkeepId, lookup.Time, err.Error()) - checkResult.PipelineExecutionState = unpackCallBackState - return err + checkResults[i].PipelineExecutionState = unpackCallBackState + return } if failureReason == uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) { - checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) + checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonMercuryCallbackReverted) s.lggr.Debugf("at block %d upkeep %s requested time %s mercury callback reverts", lookup.Block, lookup.UpkeepId, lookup.Time) - return fmt.Errorf("at block %d upkeep %s requested time %s mercury callback reverts", lookup.Block, lookup.UpkeepId, lookup.Time) - + return } if !needed { - checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonUpkeepNotNeeded) + checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonUpkeepNotNeeded) s.lggr.Debugf("at block %d upkeep %s requested time %s callback reports upkeep not needed", lookup.Block, lookup.UpkeepId, lookup.Time) - return fmt.Errorf("at block %d upkeep %s requested time %s callback reports upkeep not needed", lookup.Block, lookup.UpkeepId, lookup.Time) + return } - checkResult.IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonNone) - checkResult.Eligible = true - checkResult.PerformData = performData + checkResults[i].IneligibilityReason = uint8(mercury.MercuryUpkeepFailureReasonNone) + checkResults[i].Eligible = true + checkResults[i].PerformData = performData s.lggr.Infof("at block %d upkeep %s requested time %s successful with perform data: %s", lookup.Block, lookup.UpkeepId, lookup.Time, hexutil.Encode(performData)) - - return nil -} - -func (s *streams) DoMercuryRequest(ctx context.Context, lookup *mercury.StreamsLookup, checkResult *ocr2keepers.CheckResult) ([][]byte, error) { - state, reason, values, retryable, retryInterval, err := mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0*time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", lookup.FeedParamKey, lookup.TimeParamKey, lookup.Feeds) - pluginRetryKey := generatePluginRetryKey(checkResult.WorkID, lookup.Block) - - if lookup.IsMercuryV02() { - state, reason, values, retryable, retryInterval, err = s.v02Client.DoRequest(ctx, lookup, pluginRetryKey) - } else if lookup.IsMercuryV03() { - state, reason, values, retryable, retryInterval, err = s.v03Client.DoRequest(ctx, lookup, pluginRetryKey) - } - - if err != nil { - s.lggr.Errorf("at block %d upkeep %s requested time %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.Block, lookup.UpkeepId, lookup.Time, retryable, retryInterval, err.Error()) - checkResult.Retryable = retryable - checkResult.RetryInterval = retryInterval - checkResult.PipelineExecutionState = uint8(state) - checkResult.IneligibilityReason = uint8(reason) - return nil, err - } - - for j, v := range values { - s.lggr.Infof("at block %d upkeep %s requested time %s doMercuryRequest values[%d]: %s", lookup.Block, lookup.UpkeepId, lookup.Time, j, hexutil.Encode(v)) - } - return values, nil } -// AllowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if +// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if // this upkeep is allowed to use Mercury service. -func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state mercury.MercuryUpkeepState, reason mercury.MercuryUpkeepFailureReason, retryable bool, allow bool, err error) { +func (s *streams) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state mercury.MercuryUpkeepState, reason mercury.MercuryUpkeepFailureReason, retryable bool, allow bool, err error) { allowed, ok := s.mercuryConfig.IsUpkeepAllowed(upkeepId.String()) if ok { return mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonNone, false, allowed.(bool), nil @@ -287,6 +255,7 @@ func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (s "data": hexutil.Bytes(payload), } + // call checkCallback function at the block which OCR3 has agreed upon if err = s.client.CallContext(opts.Context, &resultBytes, "eth_call", args, hexutil.EncodeBig(opts.BlockNumber)); err != nil { return mercury.RpcFlakyFailure, mercury.MercuryUpkeepFailureReasonNone, true, false, fmt.Errorf("failed to get upkeep privilege config: %v", err) } @@ -312,6 +281,26 @@ func (s *streams) AllowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (s return mercury.NoPipelineError, mercury.MercuryUpkeepFailureReasonNone, false, privilegeConfig.MercuryEnabled, nil } +func (s *streams) checkCallback(ctx context.Context, values [][]byte, lookup *mercury.StreamsLookup) (mercury.MercuryUpkeepState, bool, hexutil.Bytes, error) { + payload, err := s.abi.Pack("checkCallback", lookup.UpkeepId, values, lookup.ExtraData) + if err != nil { + return mercury.PackUnpackDecodeFailed, false, nil, err + } + + var b hexutil.Bytes + args := map[string]interface{}{ + "to": s.registry.Address().Hex(), + "data": hexutil.Bytes(payload), + } + + // call checkCallback function at the block which OCR3 has agreed upon + if err := s.client.CallContext(ctx, &b, "eth_call", args, hexutil.EncodeUint64(lookup.Block)); err != nil { + return mercury.RpcFlakyFailure, true, nil, err + } + + return mercury.NoPipelineError, false, b, nil +} + func (s *streams) buildCallOpts(ctx context.Context, block *big.Int) *bind.CallOpts { opts := bind.CallOpts{ Context: ctx, diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go index 2475244b4d0..abcc37dca18 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go @@ -126,7 +126,6 @@ func TestStreams_CheckCallback(t *testing.T) { tests := []struct { name string lookup *mercury.StreamsLookup - input []ocr2keepers.CheckResult values [][]byte statusCode int @@ -154,9 +153,6 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, - input: []ocr2keepers.CheckResult{ - {}, - }, values: values, statusCode: http.StatusOK, callbackResp: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 48, 120, 48, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -189,9 +185,6 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, - input: []ocr2keepers.CheckResult{ - {}, - }, values: values, statusCode: http.StatusOK, callbackResp: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -223,9 +216,6 @@ func TestStreams_CheckCallback(t *testing.T) { UpkeepId: upkeepId, Block: bn, }, - input: []ocr2keepers.CheckResult{ - {}, - }, values: values, statusCode: http.StatusOK, callbackResp: []byte{}, @@ -265,10 +255,10 @@ func TestStreams_CheckCallback(t *testing.T) { }).Once() s.client = client - err = s.CheckCallback(testutils.Context(t), tt.values, tt.lookup, &tt.input[0]) - tt.wantErr(t, err, fmt.Sprintf("Error assertion failed: %v", tt.name)) - assert.Equal(t, uint8(tt.state), tt.input[0].PipelineExecutionState) - assert.Equal(t, tt.retryable, tt.input[0].Retryable) + state, retryable, _, err := s.checkCallback(testutils.Context(t), tt.values, tt.lookup) + tt.wantErr(t, err, fmt.Sprintf("Error asserion failed: %v", tt.name)) + assert.Equal(t, tt.state, state) + assert.Equal(t, tt.retryable, retryable) }) } } @@ -444,7 +434,7 @@ func TestStreams_AllowedToUseMercury(t *testing.T) { BlockNumber: big.NewInt(10), } - state, reason, retryable, allowed, err := s.AllowedToUseMercury(opts, upkeepId) + state, reason, retryable, allowed, err := s.allowedToUseMercury(opts, upkeepId) assert.Equal(t, tt.err, err) assert.Equal(t, tt.allowed, allowed) assert.Equal(t, tt.state, state)