Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFQ: failed rebalance does not block chain listener #2430

Merged
merged 7 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions ethergo/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
"context"
"errors"
"fmt"
db2 "github.com/synapsecns/sanguine/ethergo/listener/db"
"math/big"
"time"

listenerDB "github.com/synapsecns/sanguine/ethergo/listener/db"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -17,7 +18,6 @@
"github.com/synapsecns/sanguine/ethergo/client"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)

// ContractListener listens for chain events and calls HandleLog.
Expand All @@ -39,7 +39,7 @@
client client.EVM
address common.Address
initialBlock uint64
store db2.ChainListenerDB
store listenerDB.ChainListenerDB
handler metrics.Handler
backoff *backoff.Backoff
// IMPORTANT! These fields cannot be used until they has been set. They are NOT
Expand All @@ -52,11 +52,11 @@
var (
logger = log.Logger("chainlistener-logger")
// ErrNoLatestBlockForChainID is returned when no block exists for the chain.
ErrNoLatestBlockForChainID = db2.ErrNoLatestBlockForChainID
ErrNoLatestBlockForChainID = listenerDB.ErrNoLatestBlockForChainID
)

// NewChainListener creates a new chain listener.
func NewChainListener(omnirpcClient client.EVM, store db2.ChainListenerDB, address common.Address, initialBlock uint64, handler metrics.Handler) (ContractListener, error) {
func NewChainListener(omnirpcClient client.EVM, store listenerDB.ChainListenerDB, address common.Address, initialBlock uint64, handler metrics.Handler) (ContractListener, error) {
return &chainListener{
handler: handler,
address: address,
Expand Down Expand Up @@ -108,7 +108,13 @@
c.pollInterval = defaultPollInterval

// Note: in the case of an error, you don't have to handle the poll interval by calling b.duration.
var endBlock uint64
defer func() {
span.SetAttributes(
attribute.Int64("start_block", int64(c.startBlock)),
attribute.Int64("end_block", int64(endBlock)),
attribute.Int64("latest_block", int64(c.latestBlock)),
)
metrics.EndSpanWithErr(span, err)
if err != nil {
c.backoff.Attempt()
Expand All @@ -130,7 +136,7 @@

// Handle if the listener is more than one get logs range behind the head
// Note: this does not cover the edge case of a reorg that includes a new tx
endBlock := c.latestBlock
endBlock = c.latestBlock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling the edge case where c.startBlock + maxGetLogsRange exceeds the maximum value of uint64 to prevent overflow.

- endBlock = c.startBlock + maxGetLogsRange
+ if c.startBlock > math.MaxUint64 - maxGetLogsRange {
+     endBlock = math.MaxUint64
+ } else {
+     endBlock = c.startBlock + maxGetLogsRange
+ }

This change ensures that the calculation of endBlock does not overflow, which could potentially cause the application to behave unexpectedly or miss logs.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
endBlock = c.latestBlock
if c.startBlock > math.MaxUint64 - maxGetLogsRange {
endBlock = math.MaxUint64
} else {
endBlock = c.startBlock + maxGetLogsRange
}

lastUnconfirmedBlock := c.latestBlock
if c.startBlock+maxGetLogsRange < c.latestBlock {
endBlock = c.startBlock + maxGetLogsRange
Expand All @@ -154,7 +160,7 @@

err = c.store.PutLatestBlock(ctx, c.chainID, endBlock)
if err != nil {
return fmt.Errorf("could not put lastest block: %w", err)
return fmt.Errorf("could not put latest block: %w", err)
}

c.startBlock = lastUnconfirmedBlock
Expand All @@ -166,40 +172,28 @@
ctx, span := c.handler.Tracer().Start(parentCtx, "getMetadata")

defer func() {
span.SetAttributes(
attribute.Int64("start_block", int64(startBlock)),
attribute.Int64("last_indexed", int64(lastIndexed)),
attribute.Int(metrics.ChainID, int(chainID)),
)
metrics.EndSpanWithErr(span, err)
}()

// TODO: consider some kind of backoff here in case rpcs are down at boot.
// this becomes more of an issue as we add more chains
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
// TODO: one thing I've been going back and forth on is whether or not this method should be chain aware
// passing in the chain ID would allow us to pull everything directly from the config, but be less testable
// for now, this is probably the best solution for testability, but it's certainly a bit annoying we need to do
// an rpc call in order to get the chain id
//
rpcChainID, err := c.client.ChainID(ctx)
if err != nil {
return fmt.Errorf("could not get chain ID: %w", err)
}
chainID = rpcChainID.Uint64()

lastIndexed, err = c.store.LatestBlockForChain(ctx, chainID)
// Workaround: TODO remove
if errors.Is(err, ErrNoLatestBlockForChainID) || err != nil && err.Error() == ErrNoLatestBlockForChainID.Error() {
// TODO: consider making this negative 1, requires type change
lastIndexed = 0
return nil
}
if err != nil {
return fmt.Errorf("could not get the latest block for chainID: %w", err)
}
return nil
})
// TODO: one thing I've been going back and forth on is whether or not this method should be chain aware
// passing in the chain ID would allow us to pull everything directly from the config, but be less testable
// for now, this is probably the best solution for testability, but it's certainly a bit annoying we need to do
// an rpc call in order to get the chain id
//
rpcChainID, err := c.client.ChainID(ctx)
if err != nil {
return 0, 0, fmt.Errorf("could not get chain ID: %w", err)
}

Check warning on line 191 in ethergo/listener/listener.go

View check run for this annotation

Codecov / codecov/patch

ethergo/listener/listener.go#L190-L191

Added lines #L190 - L191 were not covered by tests
Comment on lines +190 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added lines related to error handling and chain ID retrieval were not covered by tests. It's crucial to add unit tests for these changes to ensure the new logic works as expected and does not introduce regressions.

Also applies to: 196-196, 218-219

chainID = rpcChainID.Uint64()

err = g.Wait()
lastIndexed, err = c.getLastIndexed(ctx, chainID)
if err != nil {
return 0, 0, fmt.Errorf("could not get metadata: %w", err)
return 0, 0, fmt.Errorf("could not get last indexed: %w", err)

Check warning on line 196 in ethergo/listener/listener.go

View check run for this annotation

Codecov / codecov/patch

ethergo/listener/listener.go#L196

Added line #L196 was not covered by tests
}

if lastIndexed > c.startBlock {
Expand All @@ -211,6 +205,21 @@
return startBlock, chainID, nil
}

// TODO: consider some kind of backoff here in case rpcs are down at boot.
// this becomes more of an issue as we add more chains

Check failure on line 209 in ethergo/listener/listener.go

View workflow job for this annotation

GitHub Actions / Lint (ethergo)

Comment should end in a period (godot)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment should end in a period to follow Go documentation conventions.

- // this becomes more of an issue as we add more chains
+ // This becomes more of an issue as we add more chains.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// this becomes more of an issue as we add more chains
// This becomes more of an issue as we add more chains.

func (c chainListener) getLastIndexed(ctx context.Context, chainID uint64) (lastIndexed uint64, err error) {
lastIndexed, err = c.store.LatestBlockForChain(ctx, chainID)
// Workaround: TODO remove
if errors.Is(err, ErrNoLatestBlockForChainID) || err != nil && err.Error() == ErrNoLatestBlockForChainID.Error() {
// TODO: consider making this negative 1, requires type change
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("could not get the latest block for chainID: %w", err)
}

Check warning on line 219 in ethergo/listener/listener.go

View check run for this annotation

Codecov / codecov/patch

ethergo/listener/listener.go#L218-L219

Added lines #L218 - L219 were not covered by tests
return lastIndexed, nil
}

func newBackoffConfig() *backoff.Backoff {
return &backoff.Backoff{
Factor: 2,
Expand Down
2 changes: 1 addition & 1 deletion services/rfq/relayer/service/chainindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
func (r *Relayer) handleDepositClaimed(ctx context.Context, event *fastbridge.FastBridgeBridgeDepositClaimed, chainID int) error {
err := r.inventory.Rebalance(ctx, chainID, event.Token)
if err != nil {
return fmt.Errorf("could not rebalance: %w", err)
logger.Errorf("could not rebalance: %w", err)

Check warning on line 208 in services/rfq/relayer/service/chainindexer.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/relayer/service/chainindexer.go#L208

Added line #L208 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace the %w verb with %v when logging errors.

- logger.Errorf("could not rebalance: %w", err)
+ logger.Errorf("could not rebalance: %v", err)

The %w verb is used for wrapping errors when using fmt.Errorf to allow errors to be unwrapped later. For logging purposes, %v or %s should be used instead, as wrapping is not necessary.


Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
logger.Errorf("could not rebalance: %w", err)
logger.Errorf("could not rebalance: %v", err)

}
err = r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.ClaimCompleted)
if err != nil {
Expand Down
Loading