-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFQ: failed rebalance does not block chain listener #2430
Changes from 6 commits
ef2ea02
7f308e8
bebced3
e398414
3610ad6
fccc667
b8f6b6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,10 +4,11 @@ | |||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
db2 "github.com/synapsecns/sanguine/ethergo/listener/db" | ||||||
"math/big" | ||||||
"time" | ||||||
|
||||||
listenerDB "github.com/synapsecns/sanguine/ethergo/listener/db" | ||||||
|
||||||
"github.com/ethereum/go-ethereum" | ||||||
"github.com/ethereum/go-ethereum/common" | ||||||
"github.com/ethereum/go-ethereum/core/types" | ||||||
|
@@ -17,7 +18,6 @@ | |||||
"github.com/synapsecns/sanguine/ethergo/client" | ||||||
"go.opentelemetry.io/otel/attribute" | ||||||
"go.opentelemetry.io/otel/trace" | ||||||
"golang.org/x/sync/errgroup" | ||||||
) | ||||||
|
||||||
// ContractListener listens for chain events and calls HandleLog. | ||||||
|
@@ -39,7 +39,7 @@ | |||||
client client.EVM | ||||||
address common.Address | ||||||
initialBlock uint64 | ||||||
store db2.ChainListenerDB | ||||||
store listenerDB.ChainListenerDB | ||||||
handler metrics.Handler | ||||||
backoff *backoff.Backoff | ||||||
// IMPORTANT! These fields cannot be used until they has been set. They are NOT | ||||||
|
@@ -52,11 +52,11 @@ | |||||
var ( | ||||||
logger = log.Logger("chainlistener-logger") | ||||||
// ErrNoLatestBlockForChainID is returned when no block exists for the chain. | ||||||
ErrNoLatestBlockForChainID = db2.ErrNoLatestBlockForChainID | ||||||
ErrNoLatestBlockForChainID = listenerDB.ErrNoLatestBlockForChainID | ||||||
) | ||||||
|
||||||
// NewChainListener creates a new chain listener. | ||||||
func NewChainListener(omnirpcClient client.EVM, store db2.ChainListenerDB, address common.Address, initialBlock uint64, handler metrics.Handler) (ContractListener, error) { | ||||||
func NewChainListener(omnirpcClient client.EVM, store listenerDB.ChainListenerDB, address common.Address, initialBlock uint64, handler metrics.Handler) (ContractListener, error) { | ||||||
return &chainListener{ | ||||||
handler: handler, | ||||||
address: address, | ||||||
|
@@ -108,7 +108,13 @@ | |||||
c.pollInterval = defaultPollInterval | ||||||
|
||||||
// Note: in the case of an error, you don't have to handle the poll interval by calling b.duration. | ||||||
var endBlock uint64 | ||||||
defer func() { | ||||||
span.SetAttributes( | ||||||
attribute.Int64("start_block", int64(c.startBlock)), | ||||||
attribute.Int64("end_block", int64(endBlock)), | ||||||
attribute.Int64("latest_block", int64(c.latestBlock)), | ||||||
) | ||||||
metrics.EndSpanWithErr(span, err) | ||||||
if err != nil { | ||||||
c.backoff.Attempt() | ||||||
|
@@ -130,7 +136,7 @@ | |||||
|
||||||
// Handle if the listener is more than one get logs range behind the head | ||||||
// Note: this does not cover the edge case of a reorg that includes a new tx | ||||||
endBlock := c.latestBlock | ||||||
endBlock = c.latestBlock | ||||||
lastUnconfirmedBlock := c.latestBlock | ||||||
if c.startBlock+maxGetLogsRange < c.latestBlock { | ||||||
endBlock = c.startBlock + maxGetLogsRange | ||||||
|
@@ -154,7 +160,7 @@ | |||||
|
||||||
err = c.store.PutLatestBlock(ctx, c.chainID, endBlock) | ||||||
if err != nil { | ||||||
return fmt.Errorf("could not put lastest block: %w", err) | ||||||
return fmt.Errorf("could not put latest block: %w", err) | ||||||
} | ||||||
|
||||||
c.startBlock = lastUnconfirmedBlock | ||||||
|
@@ -166,40 +172,28 @@ | |||||
ctx, span := c.handler.Tracer().Start(parentCtx, "getMetadata") | ||||||
|
||||||
defer func() { | ||||||
span.SetAttributes( | ||||||
attribute.Int64("start_block", int64(startBlock)), | ||||||
attribute.Int64("last_indexed", int64(lastIndexed)), | ||||||
attribute.Int(metrics.ChainID, int(chainID)), | ||||||
) | ||||||
metrics.EndSpanWithErr(span, err) | ||||||
}() | ||||||
|
||||||
// TODO: consider some kind of backoff here in case rpcs are down at boot. | ||||||
// this becomes more of an issue as we add more chains | ||||||
g, ctx := errgroup.WithContext(ctx) | ||||||
g.Go(func() error { | ||||||
// TODO: one thing I've been going back and forth on is whether or not this method should be chain aware | ||||||
// passing in the chain ID would allow us to pull everything directly from the config, but be less testable | ||||||
// for now, this is probably the best solution for testability, but it's certainly a bit annoying we need to do | ||||||
// an rpc call in order to get the chain id | ||||||
// | ||||||
rpcChainID, err := c.client.ChainID(ctx) | ||||||
if err != nil { | ||||||
return fmt.Errorf("could not get chain ID: %w", err) | ||||||
} | ||||||
chainID = rpcChainID.Uint64() | ||||||
|
||||||
lastIndexed, err = c.store.LatestBlockForChain(ctx, chainID) | ||||||
// Workaround: TODO remove | ||||||
if errors.Is(err, ErrNoLatestBlockForChainID) || err != nil && err.Error() == ErrNoLatestBlockForChainID.Error() { | ||||||
// TODO: consider making this negative 1, requires type change | ||||||
lastIndexed = 0 | ||||||
return nil | ||||||
} | ||||||
if err != nil { | ||||||
return fmt.Errorf("could not get the latest block for chainID: %w", err) | ||||||
} | ||||||
return nil | ||||||
}) | ||||||
// TODO: one thing I've been going back and forth on is whether or not this method should be chain aware | ||||||
// passing in the chain ID would allow us to pull everything directly from the config, but be less testable | ||||||
// for now, this is probably the best solution for testability, but it's certainly a bit annoying we need to do | ||||||
// an rpc call in order to get the chain id | ||||||
// | ||||||
rpcChainID, err := c.client.ChainID(ctx) | ||||||
if err != nil { | ||||||
return 0, 0, fmt.Errorf("could not get chain ID: %w", err) | ||||||
} | ||||||
Comment on lines
+190
to
+191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The added lines related to error handling and chain ID retrieval were not covered by tests. It's crucial to add unit tests for these changes to ensure the new logic works as expected and does not introduce regressions. Also applies to: 196-196, 218-219 |
||||||
chainID = rpcChainID.Uint64() | ||||||
|
||||||
err = g.Wait() | ||||||
lastIndexed, err = c.getLastIndexed(ctx, chainID) | ||||||
if err != nil { | ||||||
return 0, 0, fmt.Errorf("could not get metadata: %w", err) | ||||||
return 0, 0, fmt.Errorf("could not get last indexed: %w", err) | ||||||
} | ||||||
|
||||||
if lastIndexed > c.startBlock { | ||||||
|
@@ -211,6 +205,21 @@ | |||||
return startBlock, chainID, nil | ||||||
} | ||||||
|
||||||
// TODO: consider some kind of backoff here in case rpcs are down at boot. | ||||||
// this becomes more of an issue as we add more chains | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment should end in a period to follow Go documentation conventions. - // this becomes more of an issue as we add more chains
+ // This becomes more of an issue as we add more chains. Committable suggestion
Suggested change
|
||||||
func (c chainListener) getLastIndexed(ctx context.Context, chainID uint64) (lastIndexed uint64, err error) { | ||||||
lastIndexed, err = c.store.LatestBlockForChain(ctx, chainID) | ||||||
// Workaround: TODO remove | ||||||
if errors.Is(err, ErrNoLatestBlockForChainID) || err != nil && err.Error() == ErrNoLatestBlockForChainID.Error() { | ||||||
// TODO: consider making this negative 1, requires type change | ||||||
return 0, nil | ||||||
} | ||||||
if err != nil { | ||||||
return 0, fmt.Errorf("could not get the latest block for chainID: %w", err) | ||||||
} | ||||||
return lastIndexed, nil | ||||||
} | ||||||
|
||||||
func newBackoffConfig() *backoff.Backoff { | ||||||
return &backoff.Backoff{ | ||||||
Factor: 2, | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -205,7 +205,7 @@ | |||||
func (r *Relayer) handleDepositClaimed(ctx context.Context, event *fastbridge.FastBridgeBridgeDepositClaimed, chainID int) error { | ||||||
err := r.inventory.Rebalance(ctx, chainID, event.Token) | ||||||
if err != nil { | ||||||
return fmt.Errorf("could not rebalance: %w", err) | ||||||
logger.Errorf("could not rebalance: %w", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace the - logger.Errorf("could not rebalance: %w", err)
+ logger.Errorf("could not rebalance: %v", err) The Committable suggestion
Suggested change
|
||||||
} | ||||||
err = r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.ClaimCompleted) | ||||||
if err != nil { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the edge case where
c.startBlock + maxGetLogsRange
exceeds the maximum value ofuint64
to prevent overflow.This change ensures that the calculation of
endBlock
does not overflow, which could potentially cause the application to behave unexpectedly or miss logs.Committable suggestion