Skip to content

Commit

Permalink
implement rpc checking after confirmation delay
Browse files Browse the repository at this point in the history
  • Loading branch information
golangisfun123 committed Sep 25, 2024
1 parent 4a87e3d commit 0aa10b6
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 147 deletions.
3 changes: 2 additions & 1 deletion services/rfq/e2e/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (i *IntegrationSuite) getRelayerConfig() relconfig.Config {
},
},
NativeToken: "ETH",
VolumeLimit: 10_000,
},
destBackendChainID: {
RFQAddress: i.manager.Get(i.GetTestContext(), i.destBackend, testutil.FastBridgeType).Address().String(),
Expand All @@ -304,6 +305,7 @@ func (i *IntegrationSuite) getRelayerConfig() relconfig.Config {
},
},
NativeToken: "ETH",
VolumeLimit: 10_000,
},
},
OmniRPCURL: i.omniServer,
Expand All @@ -329,7 +331,6 @@ func (i *IntegrationSuite) getRelayerConfig() relconfig.Config {
TokenPriceCacheTTLSeconds: 60,
},
RebalanceInterval: 0,
VolumeLimit: 10_000,
}
}

Expand Down
163 changes: 87 additions & 76 deletions services/rfq/relayer/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package limiter

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/synapsecns/sanguine/core/metrics"
omnirpc "github.com/synapsecns/sanguine/services/omnirpc/client"
"github.com/synapsecns/sanguine/ethergo/client"
"github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge"
"github.com/synapsecns/sanguine/services/rfq/relayer/quoter"
"github.com/synapsecns/sanguine/services/rfq/relayer/relconfig"
Expand All @@ -32,31 +34,30 @@ type Limiter interface {
}

type limiterImpl struct {
listener LatestBlockFetcher
metrics metrics.Handler
quoter quoter.Quoter
cfg relconfig.Config
tokenNames map[string]relconfig.TokenConfig
omnirpcClient omnirpc.RPCClient
listener LatestBlockFetcher
metrics metrics.Handler
quoter quoter.Quoter
cfg relconfig.Config
tokenNames map[string]relconfig.TokenConfig
evmClient client.EVM
}

// NewRateLimiter creates a new Limiter.
// TODO: implement the sliding window: queue up requests and process them in order if cumulative volume is above limit.
func NewRateLimiter(
cfg relconfig.Config,
l LatestBlockFetcher,
q quoter.Quoter,
metricHandler metrics.Handler,
tokens map[string]relconfig.TokenConfig,
omnirpcClient omnirpc.RPCClient,
evmClient client.EVM,
) Limiter {
return &limiterImpl{
listener: l,
metrics: metricHandler,
quoter: q,
cfg: cfg,
tokenNames: tokens,
omnirpcClient: omnirpcClient,
listener: l,
metrics: metricHandler,
quoter: q,
cfg: cfg,
tokenNames: tokens,
evmClient: evmClient,
}
}

Expand All @@ -69,33 +70,37 @@ func (l *limiterImpl) IsAllowed(ctx context.Context, request *reldb.QuoteRequest
defer func() {
metrics.EndSpanWithErr(span, err)
}()

// if enough confirmations, allow because reorgs are rare at this point
hasEnoughConfirmations, err := l.hasEnoughConfirmations(ctx, request)
// if not enough confirmations, check volume. if under limit, allow.
underVolumeLimit, err := l.isUnderVolumeLimit(ctx, request)
if err != nil {
return false, fmt.Errorf("could not check confirmations: %w", err)
return false, fmt.Errorf("could not check volume limit: %w", err)

Check warning on line 76 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L76

Added line #L76 was not covered by tests
}
if hasEnoughConfirmations {
if underVolumeLimit {
return true, nil
}

// if not enough confirmations, check volume
withinSize, err := l.withinSizeLimit(ctx, request)
// if enough confirmations, allow because reorgs are rare at this point.
hasEnoughConfirmations, err := l.hasEnoughConfirmations(ctx, request)
if err != nil {
return false, fmt.Errorf("could not check volume limit: %w", err)
return false, fmt.Errorf("could not check confirmations: %w", err)
}

Check warning on line 86 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L85-L86

Added lines #L85 - L86 were not covered by tests
if hasEnoughConfirmations {
// we need to check if the receipt exists, parse the events from it, check for possible reverts,
// and has the correct fields in case of a reorg. then, and only then, we can be sure a reorg will not
// revert this.
receiptFieldsMatch, err := l.checkReceipt(ctx, request)
if err != nil {
return false, fmt.Errorf("could not check receipt: %w", err)
}

Check warning on line 94 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L93-L94

Added lines #L93 - L94 were not covered by tests
return receiptFieldsMatch, nil
}

span.SetAttributes(
attribute.Bool("has_enough_confirmations", hasEnoughConfirmations),
attribute.Bool("within_size_limit", withinSize),
attribute.Bool("within_size_limit", underVolumeLimit),
)

receiptFieldsMatch, err := l.checkReceipt(ctx, request)
if err != nil {
return false, fmt.Errorf("could not check receipt: %w", err)
}

return withinSize && receiptFieldsMatch, nil
return false, nil
}

func (l *limiterImpl) hasEnoughConfirmations(ctx context.Context, request *reldb.QuoteRequest) (_ bool, err error) {
Expand All @@ -109,10 +114,7 @@ func (l *limiterImpl) hasEnoughConfirmations(ctx context.Context, request *reldb
return false, fmt.Errorf("could not get block number: %w", err)
}

requiredConfirmations, err := l.cfg.GetFinalityConfirmations(int(request.Transaction.OriginChainId))
if err != nil {
return false, fmt.Errorf("could not get required confirmations from config: %w", err)
}
requiredConfirmations := l.cfg.GetLimitConfirmations(int(request.Transaction.OriginChainId))

actualConfirmations := currentBlockNumber - request.BlockNumber
hasEnoughConfirmations := actualConfirmations >= requiredConfirmations
Expand All @@ -128,8 +130,8 @@ func (l *limiterImpl) hasEnoughConfirmations(ctx context.Context, request *reldb
return hasEnoughConfirmations, nil
}

func (l *limiterImpl) withinSizeLimit(ctx context.Context, request *reldb.QuoteRequest) (_ bool, err error) {
ctx, span := l.metrics.Tracer().Start(ctx, "limiter.withinSizeLimit")
func (l *limiterImpl) isUnderVolumeLimit(ctx context.Context, request *reldb.QuoteRequest) (_ bool, err error) {
ctx, span := l.metrics.Tracer().Start(ctx, "limiter.underVolumeLimitLimit")
defer func() {
metrics.EndSpanWithErr(span, err)
}()
Expand All @@ -145,14 +147,14 @@ func (l *limiterImpl) withinSizeLimit(ctx context.Context, request *reldb.QuoteR
return false, fmt.Errorf("could not get USD amount of token: %w", err)
}

withinSizeLimit := tokenPrice.Cmp(volumeLimit) < 0
underVolumeLimitLimit := tokenPrice.Cmp(volumeLimit) < 0
span.SetAttributes(
attribute.String("volume_limit", volumeLimit.String()),
attribute.String("token_price", tokenPrice.String()),
attribute.Bool("within_size_limit", withinSizeLimit),
attribute.Bool("within_size_limit", underVolumeLimitLimit),
)

return withinSizeLimit, nil
return underVolumeLimitLimit, nil
}

// getRequestVolumeOfToken returns the volume of the token in USD. This value is NOT human readable.
Expand Down Expand Up @@ -195,55 +197,64 @@ func (l *limiterImpl) getRequestVolumeOfToken(
return product, nil
}

func (l *limiterImpl) checkReceipt(ctx context.Context, request *reldb.QuoteRequest) (bool, error) {
confirmationsClient, err := l.omnirpcClient.GetConfirmationsClient(
ctx,
int(request.Transaction.OriginChainId),
int(l.cfg.GetRPCConfirmations()),
)
if err != nil {
return false, fmt.Errorf("could not get confirmations client: %w", err)
}
// checkReceipt checks if the receipt exists and has the correct fields in the case a reorg happened.
func (l *limiterImpl) checkReceipt(ctx context.Context, request *reldb.QuoteRequest) (_ bool, err error) {
// Make sure receipt exists and has the correct fields in case of a reorg.
// Note: https://community.infura.io/t/does-eth-gettransactionreceipt-respond-to-re-orged-transactions/7765
// "You will get a tx receipt back but, as you note, there is a small chance of a reorg.
// In a re-org the shorter side chain (one block usually, two occasionally) will have all its tx’s reverted and
// placed back in the mempool. Calling eth_getTransactionReceipt at this point will return null, until the tx is
// added to a new block and validated."
ctx, span := l.metrics.Tracer().Start(ctx, "limiter.checkReceipt")
defer func() {
metrics.EndSpanWithErr(span, err)
}()

// make sure receipt exists and has the correct fields in case of a reorg
receipt, err := confirmationsClient.TransactionReceipt(ctx, request.OriginTxHash)
receipt, err := l.evmClient.TransactionReceipt(ctx, request.OriginTxHash)
if err != nil {
return false, fmt.Errorf("could not check for receipt: %w", err)
}

// not sure if this is needed.
if receipt.Logs[0] == nil {
return false, fmt.Errorf("no logs in receipt")
if errors.Is(err, ethereum.NotFound) {
return false, nil
}
return false, fmt.Errorf("could not get transaction receipt: %w", err)

Check warning on line 218 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L215-L218

Added lines #L215 - L218 were not covered by tests
}
log := receipt.Logs[0]

// nonce check
if log.Topics[1] != request.TransactionID {
return false, fmt.Errorf("incorrect transactionID got %s expected %s", log.Topics[1].String(), hexutil.Encode((request.TransactionID[:])))
rfqAddr, err := l.cfg.GetRFQAddress(int(request.Transaction.OriginChainId))
if err != nil {
return false, fmt.Errorf("could not get RFQ address: %w", err)
}

Check warning on line 224 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L223-L224

Added lines #L223 - L224 were not covered by tests

parser, err := fastbridge.NewParser(common.HexToAddress(""))
parser, err := fastbridge.NewParser(rfqAddr)
if err != nil {
return false, fmt.Errorf("could not create parser: %w", err)
}

Check warning on line 229 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L228-L229

Added lines #L228 - L229 were not covered by tests

_, parsedEvent, ok := parser.ParseEvent(*log)
if !ok {
return false, fmt.Errorf("could not parse event")
}
for _, log := range receipt.Logs {
_, parsedEvent, ok := parser.ParseEvent(*log)
if !ok {
continue

Check warning on line 234 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L234

Added line #L234 was not covered by tests
}

switch event := parsedEvent.(type) {
case *fastbridge.FastBridgeBridgeRequested:
return rfqFieldsMatch(request, event), nil
default:
return false, fmt.Errorf("failed to decode event: unknown event")
event, ok := parsedEvent.(*fastbridge.FastBridgeBridgeRequested)
if ok {
return rfqFieldsMatch(request, event) && !log.Removed, nil
}
}
span.SetAttributes(
attribute.String("receipt txHash", receipt.TxHash.Hex()),
attribute.String("receipt log address", receipt.ContractAddress.String()),
attribute.String("receipt data", hex.EncodeToString(receipt.Logs[0].Data)),
)

return false, nil

Check warning on line 248 in services/rfq/relayer/limiter/limiter.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/limiter/limiter.go#L242-L248

Added lines #L242 - L248 were not covered by tests
}

// TODO: is this exhaustive?
func rfqFieldsMatch(request *reldb.QuoteRequest, event *fastbridge.FastBridgeBridgeRequested) bool {
return request.TransactionID == event.TransactionId &&
request.Sender.String() == event.Sender.String() &&
request.Transaction.OriginAmount.String() == event.OriginAmount.String() &&
request.Transaction.DestAmount.String() == event.DestAmount.String() &&
request.Transaction.OriginToken.String() == event.OriginToken.String()
transactionIDMatch := request.TransactionID == event.TransactionId
senderMatch := request.Sender.String() == event.Sender.String()
originAmountMatch := request.Transaction.OriginAmount.String() == event.OriginAmount.String()
destAmountMatch := request.Transaction.DestAmount.String() == event.DestAmount.String()
originTokenMatch := request.Transaction.OriginToken.String() == event.OriginToken.String()

return transactionIDMatch && senderMatch && originAmountMatch && destAmountMatch && originTokenMatch
}
Loading

0 comments on commit 0aa10b6

Please sign in to comment.