From fde6560c1778ff43a70d76717cfb85a315f5787d Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 15 May 2024 11:45:17 -0500 Subject: [PATCH 01/24] WIP: add relay ack cache and GetRelayAck endpoint --- services/rfq/api/config/config.go | 16 ++++++++++-- services/rfq/api/rest/server.go | 38 +++++++++++++++++++++++++++- services/rfq/relayer/relapi/model.go | 6 +++++ 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/services/rfq/api/config/config.go b/services/rfq/api/config/config.go index 66cb1d343c..d1885ca0a8 100644 --- a/services/rfq/api/config/config.go +++ b/services/rfq/api/config/config.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/jftuga/ellipsis" "gopkg.in/yaml.v2" @@ -21,8 +22,19 @@ type Config struct { Database DatabaseConfig `yaml:"database"` OmniRPCURL string `yaml:"omnirpc_url"` // bridges is a map of chainid->address - Bridges map[uint32]string `yaml:"bridges"` - Port string `yaml:"port"` + Bridges map[uint32]string `yaml:"bridges"` + Port string `yaml:"port"` + RelayAckTimeout time.Duration `yaml:"relay_ack_timeout"` +} + +const defaultRelayAckTimeout = 5 * time.Second + +// GetRelayAckTimeout returns the relay ack timeout. +func (c Config) GetRelayAckTimeout() time.Duration { + if c.RelayAckTimeout == 0 { + return defaultRelayAckTimeout + } + return c.RelayAckTimeout } // LoadConfig loads the config from the given path. diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index e1a4a9c37d..972af38b17 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -25,6 +25,7 @@ import ( "github.com/synapsecns/sanguine/services/rfq/api/docs" "github.com/synapsecns/sanguine/services/rfq/api/model" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + "github.com/synapsecns/sanguine/services/rfq/relayer/relapi" ) // QuoterAPIServer is a struct that holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts. @@ -37,6 +38,9 @@ type QuoterAPIServer struct { handler metrics.Handler fastBridgeContracts map[uint32]*fastbridge.FastBridge roleCache map[uint32]*ttlcache.Cache[string, bool] + // relayAckCache contains a set of transactionID values that reflect + // transactions that have been acked for relay + relayAckCache *ttlcache.Cache[string, bool] } // NewAPI holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts. @@ -80,12 +84,21 @@ func NewAPI( ttlcache.WithTTL[string, bool](cacheInterval), ) roleCache := roles[chainID] - go roleCache.Start() + + // create the relay ack cache + relayAckCache := ttlcache.New[string, bool]( + ttlcache.WithTTL[string, bool](cfg.GetRelayAckTimeout()), + ttlcache.WithDisableTouchOnHit[string, bool](), + ) + go relayAckCache.Start() + go func() { <-ctx.Done() roleCache.Stop() + relayAckCache.Stop() }() + } return &QuoterAPIServer{ @@ -188,3 +201,26 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { c.Next() } } + +// GetRelayAck checks if a relay is pending or not. +func (r *QuoterAPIServer) GetRelayAck(c *gin.Context) { + transactionID := c.Query("transaction_id") + if transactionID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'txID'"}) + return + } + + // If the tx id is already in the cache, it should not be relayed. + // Otherwise, insert into the cache. + ack := r.relayAckCache.Get(transactionID) + shouldRelay := ack == nil + if shouldRelay { + r.relayAckCache.Set(transactionID, true, ttlcache.DefaultTTL) + } + + resp := relapi.GetRelayAckResponse{ + TxID: transactionID, + ShouldRelay: shouldRelay, + } + c.JSON(http.StatusOK, resp) +} diff --git a/services/rfq/relayer/relapi/model.go b/services/rfq/relayer/relapi/model.go index 721c83d6c0..fbcb9e2f02 100644 --- a/services/rfq/relayer/relapi/model.go +++ b/services/rfq/relayer/relapi/model.go @@ -15,3 +15,9 @@ type GetTxRetryResponse struct { Nonce uint64 `json:"nonce"` GasAmount string `json:"gas_amount"` } + +// GetRelayAckResponse contains the schema for a POST /relay/ack response. +type GetRelayAckResponse struct { + TxID string `json:"tx_id"` + ShouldRelay bool `json:"should_relay"` +} From e3b690f70d9369a374c2a01c994824f17dcd776c Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 15 May 2024 11:46:34 -0500 Subject: [PATCH 02/24] Feat: register AckRoute --- services/rfq/api/rest/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index 972af38b17..df53260587 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -114,6 +114,7 @@ func NewAPI( // QuoteRoute is the API endpoint for handling quote related requests. const ( QuoteRoute = "/quotes" + AckRoute = "/ack" cacheInterval = time.Minute ) @@ -133,6 +134,7 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error { // GET routes without the AuthMiddleware // engine.PUT("/quotes", h.ModifyQuote) engine.GET(QuoteRoute, h.GetQuotes) + engine.GET(AckRoute, r.GetRelayAck) r.engine = engine From 3984a359e7a8d415585b016cef049c2c340ba799 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 15 May 2024 11:50:06 -0500 Subject: [PATCH 03/24] Feat: add ackMux --- services/rfq/api/rest/server.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index df53260587..ff9e420655 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" "github.com/ipfs/go-log" @@ -41,6 +42,8 @@ type QuoterAPIServer struct { // relayAckCache contains a set of transactionID values that reflect // transactions that have been acked for relay relayAckCache *ttlcache.Cache[string, bool] + // ackMux is a mutex used to ensure that only one transaction id can be acked at a time. + ackMux sync.Mutex } // NewAPI holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts. @@ -85,22 +88,23 @@ func NewAPI( ) roleCache := roles[chainID] go roleCache.Start() - - // create the relay ack cache - relayAckCache := ttlcache.New[string, bool]( - ttlcache.WithTTL[string, bool](cfg.GetRelayAckTimeout()), - ttlcache.WithDisableTouchOnHit[string, bool](), - ) - go relayAckCache.Start() - go func() { <-ctx.Done() roleCache.Stop() - relayAckCache.Stop() }() - } + // create the relay ack cache + relayAckCache := ttlcache.New[string, bool]( + ttlcache.WithTTL[string, bool](cfg.GetRelayAckTimeout()), + ttlcache.WithDisableTouchOnHit[string, bool](), + ) + go relayAckCache.Start() + go func() { + <-ctx.Done() + relayAckCache.Stop() + }() + return &QuoterAPIServer{ cfg: cfg, db: store, @@ -108,6 +112,8 @@ func NewAPI( handler: handler, fastBridgeContracts: bridges, roleCache: roles, + relayAckCache: relayAckCache, + ackMux: sync.Mutex{}, }, nil } @@ -214,11 +220,13 @@ func (r *QuoterAPIServer) GetRelayAck(c *gin.Context) { // If the tx id is already in the cache, it should not be relayed. // Otherwise, insert into the cache. + r.ackMux.Lock() ack := r.relayAckCache.Get(transactionID) shouldRelay := ack == nil if shouldRelay { r.relayAckCache.Set(transactionID, true, ttlcache.DefaultTTL) } + r.ackMux.Unlock() resp := relapi.GetRelayAckResponse{ TxID: transactionID, From f5155e6eddc2382e2913471c9f57a7e840214f23 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 15 May 2024 15:19:32 -0500 Subject: [PATCH 04/24] Feat: add GetRelayAck test --- services/rfq/api/rest/server.go | 4 +-- services/rfq/api/rest/server_test.go | 45 ++++++++++++++++++++++++++ services/rfq/relayer/relapi/handler.go | 2 +- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index ff9e420655..3b057dc07d 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -212,9 +212,9 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { // GetRelayAck checks if a relay is pending or not. func (r *QuoterAPIServer) GetRelayAck(c *gin.Context) { - transactionID := c.Query("transaction_id") + transactionID := c.Query("id") if transactionID == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'txID'"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'id'"}) return } diff --git a/services/rfq/api/rest/server_test.go b/services/rfq/api/rest/server_test.go index 16e6e11d30..f93274e170 100644 --- a/services/rfq/api/rest/server_test.go +++ b/services/rfq/api/rest/server_test.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/synapsecns/sanguine/ethergo/signer/wallet" "github.com/synapsecns/sanguine/services/rfq/api/model" + "github.com/synapsecns/sanguine/services/rfq/relayer/relapi" ) func (c *ServerSuite) TestNewQuoterAPIServer() { @@ -203,6 +204,50 @@ func (c *ServerSuite) TestPutAndGetQuoteByRelayer() { c.Assert().True(found, "Newly added quote not found") } +func (c *ServerSuite) TestGetAck() { + c.startQuoterAPIServer() + + // Send GET request + client := &http.Client{} + testTxID := "0x123" + req, err := http.NewRequestWithContext(c.GetTestContext(), http.MethodGet, fmt.Sprintf("http://localhost:%d/ack?id=%s", c.port, testTxID), nil) + c.Require().NoError(err) + resp, err := client.Do(req) + c.Require().NoError(err) + c.Equal(http.StatusOK, resp.StatusCode) + + // Expect ack with shouldRelay=true + var result relapi.GetRelayAckResponse + err = json.NewDecoder(resp.Body).Decode(&result) + c.Require().NoError(err) + expectedResult := relapi.GetRelayAckResponse{ + TxID: testTxID, + ShouldRelay: true, + } + c.Equal(expectedResult, result) + err = resp.Body.Close() + c.Require().NoError(err) + + // Send another request with same txID + req, err = http.NewRequestWithContext(c.GetTestContext(), http.MethodGet, fmt.Sprintf("http://localhost:%d/ack?id=%s", c.port, testTxID), nil) + c.Require().NoError(err) + resp, err = client.Do(req) + c.Require().NoError(err) + c.Equal(http.StatusOK, resp.StatusCode) + + // Expect ack with shouldRelay=false + err = json.NewDecoder(resp.Body).Decode(&result) + c.Require().NoError(err) + expectedResult = relapi.GetRelayAckResponse{ + TxID: testTxID, + ShouldRelay: false, + } + c.Equal(expectedResult, result) + err = resp.Body.Close() + c.Require().NoError(err) + c.GetTestContext().Done() +} + // startQuoterAPIServer starts the API server and waits for it to initialize. func (c *ServerSuite) startQuoterAPIServer() { go func() { diff --git a/services/rfq/relayer/relapi/handler.go b/services/rfq/relayer/relapi/handler.go index 2182db44e8..94514c9476 100644 --- a/services/rfq/relayer/relapi/handler.go +++ b/services/rfq/relayer/relapi/handler.go @@ -60,7 +60,7 @@ func (h *Handler) GetQuoteRequestStatusByTxHash(c *gin.Context) { func (h *Handler) GetQuoteRequestStatusByTxID(c *gin.Context) { txIDStr := c.Query("id") if txIDStr == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'txID'"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'id'"}) return } From f56104a474260f047cccd4bc8db2e9c6f802d79c Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 15 May 2024 15:40:51 -0500 Subject: [PATCH 05/24] Feat: add GetRelayAck to UnauthenticatedClient --- services/rfq/api/client/client.go | 22 ++++++++++++++++++++++ services/rfq/api/model/response.go | 8 ++++++++ 2 files changed, 30 insertions(+) diff --git a/services/rfq/api/client/client.go b/services/rfq/api/client/client.go index 4ed3f80ee0..832acc3cfe 100644 --- a/services/rfq/api/client/client.go +++ b/services/rfq/api/client/client.go @@ -34,6 +34,7 @@ type UnauthenticatedClient interface { GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) GetSpecificQuote(ctx context.Context, q *model.GetQuoteSpecificRequest) ([]*model.GetQuoteResponse, error) GetQuoteByRelayerAddress(ctx context.Context, relayerAddr string) ([]*model.GetQuoteResponse, error) + GetRelayAck(ctx context.Context, txID string) (*model.GetRelayAckResponse, error) resty() *resty.Client } @@ -188,3 +189,24 @@ func (c *unauthenticatedClient) GetQuoteByRelayerAddress(ctx context.Context, re return quotes, nil } + +func (c *unauthenticatedClient) GetRelayAck(ctx context.Context, txID string) (*model.GetRelayAckResponse, error) { + var ack *model.GetRelayAckResponse + resp, err := c.rClient.R(). + SetContext(ctx). + SetQueryParams(map[string]string{ + "id": txID, + }). + SetResult(&ack). + Get(rest.AckRoute) + + if err != nil { + return nil, fmt.Errorf("error from server: %s %w", resp.Status(), err) + } + + if resp.IsError() { + return nil, fmt.Errorf("error from server: %s", resp.Status()) + } + + return ack, nil +} diff --git a/services/rfq/api/model/response.go b/services/rfq/api/model/response.go index a3fbc2fa47..598641a092 100644 --- a/services/rfq/api/model/response.go +++ b/services/rfq/api/model/response.go @@ -25,3 +25,11 @@ type GetQuoteResponse struct { // UpdatedAt is the time that the quote was last upserted UpdatedAt string `json:"updated_at"` } + +// GetRelayAckResponse contains the schema for a GET /relay/ack response. +type GetRelayAckResponse struct { + // TxID is the transaction ID + TransactionID string `json:"tx_id"` + // ShouldRelay is a boolean indicating whether the transaction should be relayed + ShouldRelay bool `json:"should_relay"` +} From b2012991b29117a0db9c023a5e85437b4ba10cfb Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 15 May 2024 15:41:11 -0500 Subject: [PATCH 06/24] Feat: relayer fetches ack before updating to CommittedPending --- services/rfq/relayer/service/handlers.go | 13 ++++++++++++- services/rfq/relayer/service/relayer.go | 8 ++++++++ services/rfq/relayer/service/statushandler.go | 4 ++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/services/rfq/relayer/service/handlers.go b/services/rfq/relayer/service/handlers.go index 3b0ec386f4..2bd1e11b83 100644 --- a/services/rfq/relayer/service/handlers.go +++ b/services/rfq/relayer/service/handlers.go @@ -138,7 +138,7 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r return fmt.Errorf("could not get committable balance: %w", err) } - // if committableBalance > destAmount + // check if we have enough inventory to handle the request if committableBalance.Cmp(request.Transaction.DestAmount) < 0 { err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.NotEnoughInventory) if err != nil { @@ -146,6 +146,17 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r } return nil } + + // get ack from API to synchronize calls with other relayers and avoid reverts + resp, err := q.apiClient.GetRelayAck(ctx, hexutil.Encode(request.TransactionID[:])) + if err != nil { + return fmt.Errorf("could not get relay ack: %w", err) + } + if !resp.ShouldRelay { + span.SetAttributes(attribute.Bool("should_relay", false)) + return nil + } + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending) if err != nil { return fmt.Errorf("could not update request status: %w", err) diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index 276bcb869f..bb38e71833 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -21,6 +21,7 @@ import ( cctpSql "github.com/synapsecns/sanguine/services/cctp-relayer/db/sql" "github.com/synapsecns/sanguine/services/cctp-relayer/relayer" omniClient "github.com/synapsecns/sanguine/services/omnirpc/client" + rfqAPIClient "github.com/synapsecns/sanguine/services/rfq/api/client" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" "github.com/synapsecns/sanguine/services/rfq/relayer/inventory" "github.com/synapsecns/sanguine/services/rfq/relayer/pricer" @@ -40,6 +41,7 @@ type Relayer struct { client omniClient.RPCClient chainListeners map[int]listener.ContractListener apiServer *relapi.RelayerAPIServer + apiClient rfqAPIClient.UnauthenticatedClient inventory inventory.Manager quoter quoter.Quoter submitter submitter.TransactionSubmitter @@ -120,6 +122,11 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi return nil, fmt.Errorf("could not get api server: %w", err) } + apiClient, err := rfqAPIClient.NewUnauthenticatedClient(metricHandler, cfg.GetRfqAPIURL()) + if err != nil { + return nil, fmt.Errorf("error creating RFQ API client: %w", err) + } + cache := ttlcache.New[common.Hash, bool](ttlcache.WithTTL[common.Hash, bool](time.Second * 30)) rel := Relayer{ db: store, @@ -134,6 +141,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi signer: sg, chainListeners: chainListeners, apiServer: apiServer, + apiClient: apiClient, } return &rel, nil } diff --git a/services/rfq/relayer/service/statushandler.go b/services/rfq/relayer/service/statushandler.go index 9864649446..d64320a4f3 100644 --- a/services/rfq/relayer/service/statushandler.go +++ b/services/rfq/relayer/service/statushandler.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/jellydator/ttlcache/v3" "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/services/rfq/api/client" "github.com/synapsecns/sanguine/services/rfq/relayer/chain" "github.com/synapsecns/sanguine/services/rfq/relayer/inventory" "github.com/synapsecns/sanguine/services/rfq/relayer/quoter" @@ -42,6 +43,8 @@ type QuoteRequestHandler struct { RelayerAddress common.Address // metrics is the metrics handler. metrics metrics.Handler + // apiClient is used to get acks before submitting a relay transaction. + apiClient client.UnauthenticatedClient } // Handler is the handler for a quote request. @@ -68,6 +71,7 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest) metrics: r.metrics, RelayerAddress: r.signer.Address(), claimCache: r.claimCache, + apiClient: r.apiClient, } qr.handlers[reldb.Seen] = r.deadlineMiddleware(r.gasMiddleware(qr.handleSeen)) From 26fcd005321116a8a1956e025924917d04a69d8e Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Wed, 15 May 2024 15:48:42 -0500 Subject: [PATCH 07/24] [goreleaser] From 71651dfafaa150ec00051ff3b5a8f74122cf42ff Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 11:13:52 -0500 Subject: [PATCH 08/24] Feat: move GET /ack to PUT /ack --- services/rfq/api/client/client.go | 6 +++--- services/rfq/api/model/response.go | 4 ++-- services/rfq/api/rest/server.go | 18 +++++++++++------- services/rfq/api/rest/server_test.go | 6 +++--- services/rfq/relayer/relapi/model.go | 4 ++-- 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/services/rfq/api/client/client.go b/services/rfq/api/client/client.go index 832acc3cfe..27298bb1b4 100644 --- a/services/rfq/api/client/client.go +++ b/services/rfq/api/client/client.go @@ -34,7 +34,7 @@ type UnauthenticatedClient interface { GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) GetSpecificQuote(ctx context.Context, q *model.GetQuoteSpecificRequest) ([]*model.GetQuoteResponse, error) GetQuoteByRelayerAddress(ctx context.Context, relayerAddr string) ([]*model.GetQuoteResponse, error) - GetRelayAck(ctx context.Context, txID string) (*model.GetRelayAckResponse, error) + GetRelayAck(ctx context.Context, txID string) (*model.PutRelayAckResponse, error) resty() *resty.Client } @@ -190,8 +190,8 @@ func (c *unauthenticatedClient) GetQuoteByRelayerAddress(ctx context.Context, re return quotes, nil } -func (c *unauthenticatedClient) GetRelayAck(ctx context.Context, txID string) (*model.GetRelayAckResponse, error) { - var ack *model.GetRelayAckResponse +func (c *unauthenticatedClient) GetRelayAck(ctx context.Context, txID string) (*model.PutRelayAckResponse, error) { + var ack *model.PutRelayAckResponse resp, err := c.rClient.R(). SetContext(ctx). SetQueryParams(map[string]string{ diff --git a/services/rfq/api/model/response.go b/services/rfq/api/model/response.go index 598641a092..5fbdb5ba23 100644 --- a/services/rfq/api/model/response.go +++ b/services/rfq/api/model/response.go @@ -26,8 +26,8 @@ type GetQuoteResponse struct { UpdatedAt string `json:"updated_at"` } -// GetRelayAckResponse contains the schema for a GET /relay/ack response. -type GetRelayAckResponse struct { +// PutRelayAckResponse contains the schema for a PUT /relay/ack response. +type PutRelayAckResponse struct { // TxID is the transaction ID TransactionID string `json:"tx_id"` // ShouldRelay is a boolean indicating whether the transaction should be relayed diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index 3b057dc07d..069bb58bb9 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -117,9 +117,10 @@ func NewAPI( }, nil } -// QuoteRoute is the API endpoint for handling quote related requests. const ( - QuoteRoute = "/quotes" + // QuoteRoute is the API endpoint for handling quote related requests. + QuoteRoute = "/quotes" + // AckRoute is the API endpoint for handling relay ack related requests. AckRoute = "/ack" cacheInterval = time.Minute ) @@ -133,14 +134,17 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error { h := NewHandler(r.db) engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerfiles.Handler)) - // Apply AuthMiddleware only to the PUT route + // Apply AuthMiddleware only to the PUT routes quotesPut := engine.Group(QuoteRoute) quotesPut.Use(r.AuthMiddleware()) quotesPut.PUT("", h.ModifyQuote) + ackPut := engine.Group(AckRoute) + ackPut.Use(r.AuthMiddleware()) + ackPut.PUT("", r.PutRelayAck) + // GET routes without the AuthMiddleware // engine.PUT("/quotes", h.ModifyQuote) engine.GET(QuoteRoute, h.GetQuotes) - engine.GET(AckRoute, r.GetRelayAck) r.engine = engine @@ -210,8 +214,8 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { } } -// GetRelayAck checks if a relay is pending or not. -func (r *QuoterAPIServer) GetRelayAck(c *gin.Context) { +// PutRelayAck checks if a relay is pending or not. +func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { transactionID := c.Query("id") if transactionID == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'id'"}) @@ -228,7 +232,7 @@ func (r *QuoterAPIServer) GetRelayAck(c *gin.Context) { } r.ackMux.Unlock() - resp := relapi.GetRelayAckResponse{ + resp := relapi.PutRelayAckResponse{ TxID: transactionID, ShouldRelay: shouldRelay, } diff --git a/services/rfq/api/rest/server_test.go b/services/rfq/api/rest/server_test.go index f93274e170..96b67a361a 100644 --- a/services/rfq/api/rest/server_test.go +++ b/services/rfq/api/rest/server_test.go @@ -217,10 +217,10 @@ func (c *ServerSuite) TestGetAck() { c.Equal(http.StatusOK, resp.StatusCode) // Expect ack with shouldRelay=true - var result relapi.GetRelayAckResponse + var result relapi.PutRelayAckResponse err = json.NewDecoder(resp.Body).Decode(&result) c.Require().NoError(err) - expectedResult := relapi.GetRelayAckResponse{ + expectedResult := relapi.PutRelayAckResponse{ TxID: testTxID, ShouldRelay: true, } @@ -238,7 +238,7 @@ func (c *ServerSuite) TestGetAck() { // Expect ack with shouldRelay=false err = json.NewDecoder(resp.Body).Decode(&result) c.Require().NoError(err) - expectedResult = relapi.GetRelayAckResponse{ + expectedResult = relapi.PutRelayAckResponse{ TxID: testTxID, ShouldRelay: false, } diff --git a/services/rfq/relayer/relapi/model.go b/services/rfq/relayer/relapi/model.go index fbcb9e2f02..96ba1e1dff 100644 --- a/services/rfq/relayer/relapi/model.go +++ b/services/rfq/relayer/relapi/model.go @@ -16,8 +16,8 @@ type GetTxRetryResponse struct { GasAmount string `json:"gas_amount"` } -// GetRelayAckResponse contains the schema for a POST /relay/ack response. -type GetRelayAckResponse struct { +// PutRelayAckResponse contains the schema for a POST /relay/ack response. +type PutRelayAckResponse struct { TxID string `json:"tx_id"` ShouldRelay bool `json:"should_relay"` } From 43b58f8d682ff52ab5be9f613751b2e3abe1824e Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 11:40:19 -0500 Subject: [PATCH 09/24] WIP: generalize AuthMiddleware() --- services/rfq/api/rest/server.go | 89 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index 069bb58bb9..fcdfcb6272 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -161,59 +161,70 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error { // AuthMiddleware is the Gin authentication middleware that authenticates requests using EIP191. func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { return func(c *gin.Context) { - var req model.PutQuoteRequest - if err := c.BindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - c.Abort() - return - } - - bridge, ok := r.fastBridgeContracts[uint32(req.DestChainID)] - if !ok { - c.JSON(http.StatusBadRequest, gin.H{"msg": "dest chain id not supported"}) - c.Abort() - return - } - - ops := &bind.CallOpts{Context: c} - relayerRole := crypto.Keccak256Hash([]byte("RELAYER_ROLE")) - - // authenticate relayer signature with EIP191 - deadline := time.Now().Unix() - 1000 // TODO: Replace with some type of r.cfg.AuthExpiryDelta - addressRecovered, err := EIP191Auth(c, deadline) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"msg": fmt.Sprintf("unable to authenticate relayer: %v", err)}) - c.Abort() - return - } - - hasRole := r.roleCache[uint32(req.DestChainID)].Get(addressRecovered.Hex()) - - if hasRole == nil || hasRole.IsExpired() { - has, err := bridge.HasRole(ops, relayerRole, addressRecovered) - if err == nil { - r.roleCache[uint32(req.DestChainID)].Set(addressRecovered.Hex(), has, cacheInterval) - } - - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"msg": "unable to check relayer role on-chain"}) + var addressRecovered common.Address + var err error + + switch c.Request.URL.Path { + case QuoteRoute: + var req model.PutQuoteRequest + if err := c.BindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) c.Abort() return - } else if !has { - c.JSON(http.StatusBadRequest, gin.H{"msg": "q.Relayer not an on-chain relayer"}) + } + addressRecovered, err = r.checkRole(c, uint32(req.DestChainID)) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()}) c.Abort() return } + c.Set("putRequest", &req) } // Log and pass to the next middleware if authentication succeeds // Store the request in context after binding and validation - c.Set("putRequest", &req) c.Set("relayerAddr", addressRecovered.Hex()) c.Next() } } +func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (addressRecovered common.Address, err error) { + bridge, ok := r.fastBridgeContracts[uint32(destChainID)] + if !ok { + err = fmt.Errorf("dest chain id not supported: %d", destChainID) + return addressRecovered, err + } + + ops := &bind.CallOpts{Context: c} + relayerRole := crypto.Keccak256Hash([]byte("RELAYER_ROLE")) + + // authenticate relayer signature with EIP191 + deadline := time.Now().Unix() - 1000 // TODO: Replace with some type of r.cfg.AuthExpiryDelta + addressRecovered, err = EIP191Auth(c, deadline) + if err != nil { + err = fmt.Errorf("unable to authenticate relayer: %v", err) + return addressRecovered, err + } + + hasRole := r.roleCache[uint32(destChainID)].Get(addressRecovered.Hex()) + + if hasRole == nil || hasRole.IsExpired() { + has, roleErr := bridge.HasRole(ops, relayerRole, addressRecovered) + if roleErr == nil { + r.roleCache[uint32(destChainID)].Set(addressRecovered.Hex(), has, cacheInterval) + } + + if roleErr != nil { + err = fmt.Errorf("unable to check relayer role on-chain") + return addressRecovered, err + } else if !has { + err = fmt.Errorf("q.Relayer not an on-chain relayer") + return addressRecovered, err + } + } + return addressRecovered, nil +} + // PutRelayAck checks if a relay is pending or not. func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { transactionID := c.Query("id") From b9fe8ca2d6c019eeb7eb5cd2f1b79fa8e6552a8e Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:01:18 -0500 Subject: [PATCH 10/24] Fix: working refactor for auth --- services/rfq/api/rest/server.go | 35 ++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index fcdfcb6272..570fb61d9c 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -161,28 +161,39 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error { // AuthMiddleware is the Gin authentication middleware that authenticates requests using EIP191. func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { return func(c *gin.Context) { - var addressRecovered common.Address + var loggedRequest interface{} + var destChainID uint32 var err error + fmt.Printf("PATH: %v\n", c.Request.URL.Path) switch c.Request.URL.Path { case QuoteRoute: var req model.PutQuoteRequest - if err := c.BindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - c.Abort() - return + err = c.BindJSON(&req) + if err == nil { + destChainID = uint32(req.DestChainID) + loggedRequest = &req } - addressRecovered, err = r.checkRole(c, uint32(req.DestChainID)) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()}) - c.Abort() - return - } - c.Set("putRequest", &req) + default: + err = fmt.Errorf("unexpected request path: %s", c.Request.URL.Path) + } + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + c.Abort() + return + } + + // Authenticate and fetch the address from the request + addressRecovered, err := r.checkRole(c, destChainID) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()}) + c.Abort() + return } // Log and pass to the next middleware if authentication succeeds // Store the request in context after binding and validation + c.Set("putRequest", loggedRequest) c.Set("relayerAddr", addressRecovered.Hex()) c.Next() } From 1941b30d5bb0b8b6065a89d1fdf5d5dcc143e86c Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:06:55 -0500 Subject: [PATCH 11/24] Feat: add PutAckRequest and parse in auth middleware --- services/rfq/api/model/request.go | 6 ++++++ services/rfq/api/rest/server.go | 9 ++++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/services/rfq/api/model/request.go b/services/rfq/api/model/request.go index 865c1b4af6..f3ce466ce1 100644 --- a/services/rfq/api/model/request.go +++ b/services/rfq/api/model/request.go @@ -13,6 +13,12 @@ type PutQuoteRequest struct { DestFastBridgeAddress string `json:"dest_fast_bridge_address"` } +// PutAckRequest contains the schema for a PUT /ack request. +type PutAckRequest struct { + TxID string `json:"tx_id"` + DestChainID int `json:"dest_chain_id"` +} + // GetQuoteSpecificRequest contains the schema for a GET /quote request with specific params. type GetQuoteSpecificRequest struct { OriginChainID int `json:"originChainId"` diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index 570fb61d9c..f39111b5c5 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -165,7 +165,7 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { var destChainID uint32 var err error - fmt.Printf("PATH: %v\n", c.Request.URL.Path) + // Parse the dest chain id from the request switch c.Request.URL.Path { case QuoteRoute: var req model.PutQuoteRequest @@ -174,6 +174,13 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { destChainID = uint32(req.DestChainID) loggedRequest = &req } + case AckRoute: + var req model.PutAckRequest + err = c.BindJSON(&req) + if err == nil { + destChainID = uint32(req.DestChainID) + loggedRequest = &req + } default: err = fmt.Errorf("unexpected request path: %s", c.Request.URL.Path) } From 3c47bfb3e7478a2db9c8fbd0b0d2643a671b049f Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:19:55 -0500 Subject: [PATCH 12/24] Feat: impl PUT /ack request with json req body --- services/rfq/api/client/client.go | 42 +++++++-------- services/rfq/api/rest/server.go | 17 +++--- services/rfq/api/rest/server_test.go | 53 ++++++++++++++----- services/rfq/relayer/service/handlers.go | 7 ++- services/rfq/relayer/service/relayer.go | 4 +- services/rfq/relayer/service/statushandler.go | 2 +- 6 files changed, 80 insertions(+), 45 deletions(-) diff --git a/services/rfq/api/client/client.go b/services/rfq/api/client/client.go index 27298bb1b4..2e878afaf2 100644 --- a/services/rfq/api/client/client.go +++ b/services/rfq/api/client/client.go @@ -26,6 +26,7 @@ import ( // It provides methods for creating, retrieving and updating quotes. type AuthenticatedClient interface { PutQuote(ctx context.Context, q *model.PutQuoteRequest) error + PutRelayAck(ctx context.Context, req *model.PutAckRequest) (*model.PutRelayAckResponse, error) UnauthenticatedClient } @@ -34,7 +35,6 @@ type UnauthenticatedClient interface { GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) GetSpecificQuote(ctx context.Context, q *model.GetQuoteSpecificRequest) ([]*model.GetQuoteResponse, error) GetQuoteByRelayerAddress(ctx context.Context, relayerAddr string) ([]*model.GetQuoteResponse, error) - GetRelayAck(ctx context.Context, txID string) (*model.PutRelayAckResponse, error) resty() *resty.Client } @@ -125,6 +125,25 @@ func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutQuoteRequest) err return err } +func (c *clientImpl) PutRelayAck(ctx context.Context, req *model.PutAckRequest) (*model.PutRelayAckResponse, error) { + var ack *model.PutRelayAckResponse + resp, err := c.rClient.R(). + SetContext(ctx). + SetBody(req). + SetResult(&ack). + Put(rest.AckRoute) + + if err != nil { + return nil, fmt.Errorf("error from server: %s %w", resp.Status(), err) + } + + if resp.IsError() { + return nil, fmt.Errorf("error from server: %s", resp.Status()) + } + + return ack, nil +} + // GetAllQuotes retrieves all quotes from the RFQ quoting API. func (c *unauthenticatedClient) GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) { var quotes []*model.GetQuoteResponse @@ -189,24 +208,3 @@ func (c *unauthenticatedClient) GetQuoteByRelayerAddress(ctx context.Context, re return quotes, nil } - -func (c *unauthenticatedClient) GetRelayAck(ctx context.Context, txID string) (*model.PutRelayAckResponse, error) { - var ack *model.PutRelayAckResponse - resp, err := c.rClient.R(). - SetContext(ctx). - SetQueryParams(map[string]string{ - "id": txID, - }). - SetResult(&ack). - Get(rest.AckRoute) - - if err != nil { - return nil, fmt.Errorf("error from server: %s %w", resp.Status(), err) - } - - if resp.IsError() { - return nil, fmt.Errorf("error from server: %s", resp.Status()) - } - - return ack, nil -} diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index f39111b5c5..535d234374 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -245,24 +245,29 @@ func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (address // PutRelayAck checks if a relay is pending or not. func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { - transactionID := c.Query("id") - if transactionID == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'id'"}) + req, exists := c.Get("putRequest") + if !exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "Request not found"}) + return + } + ackReq, ok := req.(*model.PutAckRequest) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request type"}) return } // If the tx id is already in the cache, it should not be relayed. // Otherwise, insert into the cache. r.ackMux.Lock() - ack := r.relayAckCache.Get(transactionID) + ack := r.relayAckCache.Get(ackReq.TxID) shouldRelay := ack == nil if shouldRelay { - r.relayAckCache.Set(transactionID, true, ttlcache.DefaultTTL) + r.relayAckCache.Set(ackReq.TxID, true, ttlcache.DefaultTTL) } r.ackMux.Unlock() resp := relapi.PutRelayAckResponse{ - TxID: transactionID, + TxID: ackReq.TxID, ShouldRelay: shouldRelay, } c.JSON(http.StatusOK, resp) diff --git a/services/rfq/api/rest/server_test.go b/services/rfq/api/rest/server_test.go index 96b67a361a..4a2e812136 100644 --- a/services/rfq/api/rest/server_test.go +++ b/services/rfq/api/rest/server_test.go @@ -45,7 +45,7 @@ func (c *ServerSuite) TestEIP191_SuccessfulSignature() { } // Perform a PUT request to the API server with the authorization header. - resp, err := c.sendPutRequest(header) + resp, err := c.sendPutQuoteRequest(header) if err != nil { c.Error(err) return @@ -78,7 +78,7 @@ func (c *ServerSuite) TestEIP191_UnsuccessfulSignature() { } // Perform a PUT request to the API server with the incorrect authorization header. - resp, err := c.sendPutRequest(header) + resp, err := c.sendPutQuoteRequest(header) if err != nil { c.Error(err) return @@ -105,7 +105,7 @@ func (c *ServerSuite) TestEIP191_SuccessfulPutSubmission() { c.Require().NoError(err) // Perform a PUT request to the API server with the authorization header. - resp, err := c.sendPutRequest(header) + resp, err := c.sendPutQuoteRequest(header) c.Require().NoError(err) defer func() { _ = resp.Body.Close() @@ -127,7 +127,7 @@ func (c *ServerSuite) TestPutAndGetQuote() { c.Require().NoError(err) // Send PUT request - putResp, err := c.sendPutRequest(header) + putResp, err := c.sendPutQuoteRequest(header) c.Require().NoError(err) defer func() { err = putResp.Body.Close() @@ -169,7 +169,7 @@ func (c *ServerSuite) TestPutAndGetQuoteByRelayer() { c.Require().NoError(err) // Send PUT request - putResp, err := c.sendPutRequest(header) + putResp, err := c.sendPutQuoteRequest(header) c.Require().NoError(err) defer func() { err = putResp.Body.Close() @@ -204,15 +204,14 @@ func (c *ServerSuite) TestPutAndGetQuoteByRelayer() { c.Assert().True(found, "Newly added quote not found") } -func (c *ServerSuite) TestGetAck() { +func (c *ServerSuite) TestPutAck() { c.startQuoterAPIServer() // Send GET request - client := &http.Client{} testTxID := "0x123" - req, err := http.NewRequestWithContext(c.GetTestContext(), http.MethodGet, fmt.Sprintf("http://localhost:%d/ack?id=%s", c.port, testTxID), nil) + header, err := c.prepareAuthHeader(c.testWallet) c.Require().NoError(err) - resp, err := client.Do(req) + resp, err := c.sendPutAckRequest(header, testTxID) c.Require().NoError(err) c.Equal(http.StatusOK, resp.StatusCode) @@ -229,9 +228,9 @@ func (c *ServerSuite) TestGetAck() { c.Require().NoError(err) // Send another request with same txID - req, err = http.NewRequestWithContext(c.GetTestContext(), http.MethodGet, fmt.Sprintf("http://localhost:%d/ack?id=%s", c.port, testTxID), nil) + header, err = c.prepareAuthHeader(c.testWallet) c.Require().NoError(err) - resp, err = client.Do(req) + resp, err = c.sendPutAckRequest(header, testTxID) c.Require().NoError(err) c.Equal(http.StatusOK, resp.StatusCode) @@ -277,8 +276,8 @@ func (c *ServerSuite) prepareAuthHeader(wallet wallet.Wallet) (string, error) { return now + ":" + signature, nil } -// sendPutRequest sends a PUT request to the server with the given authorization header. -func (c *ServerSuite) sendPutRequest(header string) (*http.Response, error) { +// sendPutQuoteRequest sends a PUT request to the server with the given authorization header. +func (c *ServerSuite) sendPutQuoteRequest(header string) (*http.Response, error) { // Prepare the PUT request with JSON data. client := &http.Client{} putData := model.PutQuoteRequest{ @@ -309,3 +308,31 @@ func (c *ServerSuite) sendPutRequest(header string) (*http.Response, error) { } return resp, nil } + +// sendPutAckRequest sends a PUT request to the server with the given authorization header. +func (c *ServerSuite) sendPutAckRequest(header string, txID string) (*http.Response, error) { + // Prepare the PUT request. + client := &http.Client{} + putData := model.PutAckRequest{ + TxID: txID, + DestChainID: 42161, + } + jsonData, err := json.Marshal(putData) + if err != nil { + return nil, fmt.Errorf("failed to marshal putData: %w", err) + } + + req, err := http.NewRequestWithContext(c.GetTestContext(), http.MethodPut, fmt.Sprintf("http://localhost:%d/ack", c.port), bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create PUT request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Add("Authorization", header) + + // Send the request to the server. + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send PUT request: %w", err) + } + return resp, nil +} diff --git a/services/rfq/relayer/service/handlers.go b/services/rfq/relayer/service/handlers.go index 2bd1e11b83..515b2f9699 100644 --- a/services/rfq/relayer/service/handlers.go +++ b/services/rfq/relayer/service/handlers.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/services/rfq/api/model" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" "github.com/synapsecns/sanguine/services/rfq/relayer/inventory" "github.com/synapsecns/sanguine/services/rfq/relayer/reldb" @@ -148,7 +149,11 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r } // get ack from API to synchronize calls with other relayers and avoid reverts - resp, err := q.apiClient.GetRelayAck(ctx, hexutil.Encode(request.TransactionID[:])) + req := model.PutAckRequest{ + TxID: hexutil.Encode(request.TransactionID[:]), + DestChainID: int(request.Transaction.DestChainId), + } + resp, err := q.apiClient.PutRelayAck(ctx, &req) if err != nil { return fmt.Errorf("could not get relay ack: %w", err) } diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index bb38e71833..ef3aeb41e8 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -41,7 +41,7 @@ type Relayer struct { client omniClient.RPCClient chainListeners map[int]listener.ContractListener apiServer *relapi.RelayerAPIServer - apiClient rfqAPIClient.UnauthenticatedClient + apiClient rfqAPIClient.AuthenticatedClient inventory inventory.Manager quoter quoter.Quoter submitter submitter.TransactionSubmitter @@ -122,7 +122,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi return nil, fmt.Errorf("could not get api server: %w", err) } - apiClient, err := rfqAPIClient.NewUnauthenticatedClient(metricHandler, cfg.GetRfqAPIURL()) + apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRfqAPIURL(), sg) if err != nil { return nil, fmt.Errorf("error creating RFQ API client: %w", err) } diff --git a/services/rfq/relayer/service/statushandler.go b/services/rfq/relayer/service/statushandler.go index d64320a4f3..a2bc5d72f6 100644 --- a/services/rfq/relayer/service/statushandler.go +++ b/services/rfq/relayer/service/statushandler.go @@ -44,7 +44,7 @@ type QuoteRequestHandler struct { // metrics is the metrics handler. metrics metrics.Handler // apiClient is used to get acks before submitting a relay transaction. - apiClient client.UnauthenticatedClient + apiClient client.AuthenticatedClient } // Handler is the handler for a quote request. From a682021feeebf18755b0b32cc941a15a90cb61f1 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:20:32 -0500 Subject: [PATCH 13/24] Feat: bump default timeout from 5 to 10 --- services/rfq/api/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/rfq/api/config/config.go b/services/rfq/api/config/config.go index d1885ca0a8..aab38d7b2a 100644 --- a/services/rfq/api/config/config.go +++ b/services/rfq/api/config/config.go @@ -27,7 +27,7 @@ type Config struct { RelayAckTimeout time.Duration `yaml:"relay_ack_timeout"` } -const defaultRelayAckTimeout = 5 * time.Second +const defaultRelayAckTimeout = 10 * time.Second // GetRelayAckTimeout returns the relay ack timeout. func (c Config) GetRelayAckTimeout() time.Duration { From b56e82502d2f858ece2d7b4d1d87d62dafb20359 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:28:09 -0500 Subject: [PATCH 14/24] Feat: add swagger comments --- services/rfq/api/docs/docs.go | 31 ++++++++++++++++++++++++++++++ services/rfq/api/docs/swagger.json | 31 ++++++++++++++++++++++++++++++ services/rfq/api/docs/swagger.yaml | 20 +++++++++++++++++++ services/rfq/api/rest/server.go | 12 ++++++++++++ 4 files changed, 94 insertions(+) diff --git a/services/rfq/api/docs/docs.go b/services/rfq/api/docs/docs.go index b90c70e6ec..fdd79b976b 100644 --- a/services/rfq/api/docs/docs.go +++ b/services/rfq/api/docs/docs.go @@ -15,6 +15,37 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/ack": { + "put": { + "description": "cache an ack request to synchronize relayer actions.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "ack" + ], + "summary": "Relay ack", + "parameters": [ + { + "description": "query params", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/model.PutQuoteRequest" + } + } + ], + "responses": { + "200": { + "description": "OK" + } + } + } + }, "/quotes": { "get": { "description": "get quotes from all relayers.", diff --git a/services/rfq/api/docs/swagger.json b/services/rfq/api/docs/swagger.json index 7341e36f97..0f4707ab3f 100644 --- a/services/rfq/api/docs/swagger.json +++ b/services/rfq/api/docs/swagger.json @@ -4,6 +4,37 @@ "contact": {} }, "paths": { + "/ack": { + "put": { + "description": "cache an ack request to synchronize relayer actions.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "ack" + ], + "summary": "Relay ack", + "parameters": [ + { + "description": "query params", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/model.PutQuoteRequest" + } + } + ], + "responses": { + "200": { + "description": "OK" + } + } + } + }, "/quotes": { "get": { "description": "get quotes from all relayers.", diff --git a/services/rfq/api/docs/swagger.yaml b/services/rfq/api/docs/swagger.yaml index 3901126f16..a95f1dcddc 100644 --- a/services/rfq/api/docs/swagger.yaml +++ b/services/rfq/api/docs/swagger.yaml @@ -67,6 +67,26 @@ definitions: info: contact: {} paths: + /ack: + put: + consumes: + - application/json + description: cache an ack request to synchronize relayer actions. + parameters: + - description: query params + in: body + name: request + required: true + schema: + $ref: '#/definitions/model.PutQuoteRequest' + produces: + - application/json + responses: + "200": + description: OK + summary: Relay ack + tags: + - ack /quotes: get: consumes: diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index 535d234374..85260e2643 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -244,6 +244,18 @@ func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (address } // PutRelayAck checks if a relay is pending or not. +// +// PUT /ack. +// @dev Protected Method: Authentication is handled through middleware in server.go. +// @Summary Relay ack +// @Schemes +// @Description cache an ack request to synchronize relayer actions. +// @Param request body model.PutQuoteRequest true "query params" +// @Tags ack +// @Accept json +// @Produce json +// @Success 200 +// @Router /ack [put]. func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { req, exists := c.Get("putRequest") if !exists { From c8cf4730a77ed5a3b2e737b610d17be879063d72 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:32:06 -0500 Subject: [PATCH 15/24] Cleanup: pass API client into quoter --- services/rfq/relayer/quoter/quoter.go | 8 ++------ services/rfq/relayer/quoter/quoter_test.go | 2 +- services/rfq/relayer/quoter/suite_test.go | 2 +- services/rfq/relayer/service/relayer.go | 12 ++++++------ 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/services/rfq/relayer/quoter/quoter.go b/services/rfq/relayer/quoter/quoter.go index f9b65ad30d..c3ed50f850 100644 --- a/services/rfq/relayer/quoter/quoter.go +++ b/services/rfq/relayer/quoter/quoter.go @@ -78,15 +78,11 @@ type Manager struct { } // NewQuoterManager creates a new QuoterManager. -func NewQuoterManager(config relconfig.Config, metricsHandler metrics.Handler, inventoryManager inventory.Manager, relayerSigner signer.Signer, feePricer pricer.FeePricer) (Quoter, error) { - apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricsHandler, config.GetRfqAPIURL(), relayerSigner) - if err != nil { - return nil, fmt.Errorf("error creating RFQ API client: %w", err) - } - +func NewQuoterManager(config relconfig.Config, metricsHandler metrics.Handler, inventoryManager inventory.Manager, relayerSigner signer.Signer, feePricer pricer.FeePricer, apiClient rfqAPIClient.AuthenticatedClient) (Quoter, error) { qt := make(map[string][]string) // fix any casing issues. + var err error for tokenID, destTokenIDs := range config.QuotableTokens { processedDestTokens := make([]string, len(destTokenIDs)) for i := range destTokenIDs { diff --git a/services/rfq/relayer/quoter/quoter_test.go b/services/rfq/relayer/quoter/quoter_test.go index fd5f4bed80..507b981e0d 100644 --- a/services/rfq/relayer/quoter/quoter_test.go +++ b/services/rfq/relayer/quoter/quoter_test.go @@ -231,7 +231,7 @@ func (s *QuoterSuite) setGasSufficiency(sufficient bool) { feePricer := pricer.NewFeePricer(s.config, clientFetcher, priceFetcher, metrics.NewNullHandler()) inventoryManager := new(inventoryMocks.Manager) inventoryManager.On(testsuite.GetFunctionName(inventoryManager.HasSufficientGas), mock.Anything, mock.Anything, mock.Anything).Return(sufficient, nil) - mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer) + mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer, nil) s.NoError(err) var ok bool diff --git a/services/rfq/relayer/quoter/suite_test.go b/services/rfq/relayer/quoter/suite_test.go index 119b7cf6fc..f58808dedc 100644 --- a/services/rfq/relayer/quoter/suite_test.go +++ b/services/rfq/relayer/quoter/suite_test.go @@ -116,7 +116,7 @@ func (s *QuoterSuite) SetupTest() { inventoryManager := new(inventoryMocks.Manager) inventoryManager.On(testsuite.GetFunctionName(inventoryManager.HasSufficientGas), mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer) + mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer, nil) s.NoError(err) var ok bool diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index ef3aeb41e8..33396ecec5 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -112,19 +112,19 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi priceFetcher := pricer.NewCoingeckoPriceFetcher(cfg.GetHTTPTimeout()) fp := pricer.NewFeePricer(cfg, omniClient, priceFetcher, metricHandler) - q, err := quoter.NewQuoterManager(cfg, metricHandler, im, sg, fp) + apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRfqAPIURL(), sg) if err != nil { - return nil, fmt.Errorf("could not get quoter") + return nil, fmt.Errorf("error creating RFQ API client: %w", err) } - apiServer, err := relapi.NewRelayerAPI(ctx, cfg, metricHandler, omniClient, store, sm) + q, err := quoter.NewQuoterManager(cfg, metricHandler, im, sg, fp, apiClient) if err != nil { - return nil, fmt.Errorf("could not get api server: %w", err) + return nil, fmt.Errorf("could not get quoter") } - apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRfqAPIURL(), sg) + apiServer, err := relapi.NewRelayerAPI(ctx, cfg, metricHandler, omniClient, store, sm) if err != nil { - return nil, fmt.Errorf("error creating RFQ API client: %w", err) + return nil, fmt.Errorf("could not get api server: %w", err) } cache := ttlcache.New[common.Hash, bool](ttlcache.WithTTL[common.Hash, bool](time.Second * 30)) From 7e85d350eb212f666790922f0400000c20cce4b2 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:39:13 -0500 Subject: [PATCH 16/24] Feat: return RelayerAddress in PutRelayAckResponse --- services/rfq/api/model/response.go | 2 ++ services/rfq/api/rest/server.go | 25 +++++++++++++++++-------- services/rfq/api/rest/server_test.go | 10 ++++++---- services/rfq/relayer/relapi/model.go | 5 +++-- 4 files changed, 28 insertions(+), 14 deletions(-) diff --git a/services/rfq/api/model/response.go b/services/rfq/api/model/response.go index 5fbdb5ba23..8e664f9e0e 100644 --- a/services/rfq/api/model/response.go +++ b/services/rfq/api/model/response.go @@ -32,4 +32,6 @@ type PutRelayAckResponse struct { TransactionID string `json:"tx_id"` // ShouldRelay is a boolean indicating whether the transaction should be relayed ShouldRelay bool `json:"should_relay"` + // RelayerAddress is the address of the relayer that is currently acked + RelayerAddress string `json:"relayer_address"` } diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index 85260e2643..dfe7998d86 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -41,7 +41,7 @@ type QuoterAPIServer struct { roleCache map[uint32]*ttlcache.Cache[string, bool] // relayAckCache contains a set of transactionID values that reflect // transactions that have been acked for relay - relayAckCache *ttlcache.Cache[string, bool] + relayAckCache *ttlcache.Cache[string, string] // ackMux is a mutex used to ensure that only one transaction id can be acked at a time. ackMux sync.Mutex } @@ -95,9 +95,9 @@ func NewAPI( } // create the relay ack cache - relayAckCache := ttlcache.New[string, bool]( - ttlcache.WithTTL[string, bool](cfg.GetRelayAckTimeout()), - ttlcache.WithDisableTouchOnHit[string, bool](), + relayAckCache := ttlcache.New[string, string]( + ttlcache.WithTTL[string, string](cfg.GetRelayAckTimeout()), + ttlcache.WithDisableTouchOnHit[string, string](), ) go relayAckCache.Start() go func() { @@ -262,6 +262,12 @@ func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "Request not found"}) return } + rawRelayerAddr, exists := c.Get("relayerAddr") + if !exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"}) + return + } + relayerAddr := rawRelayerAddr.(string) ackReq, ok := req.(*model.PutAckRequest) if !ok { c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request type"}) @@ -269,18 +275,21 @@ func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { } // If the tx id is already in the cache, it should not be relayed. - // Otherwise, insert into the cache. + // Otherwise, insert the current relayer's address into the cache. r.ackMux.Lock() ack := r.relayAckCache.Get(ackReq.TxID) shouldRelay := ack == nil if shouldRelay { - r.relayAckCache.Set(ackReq.TxID, true, ttlcache.DefaultTTL) + r.relayAckCache.Set(ackReq.TxID, relayerAddr, ttlcache.DefaultTTL) + } else { + relayerAddr = ack.Value() } r.ackMux.Unlock() resp := relapi.PutRelayAckResponse{ - TxID: ackReq.TxID, - ShouldRelay: shouldRelay, + TxID: ackReq.TxID, + ShouldRelay: shouldRelay, + RelayerAddress: relayerAddr, } c.JSON(http.StatusOK, resp) } diff --git a/services/rfq/api/rest/server_test.go b/services/rfq/api/rest/server_test.go index 4a2e812136..52059820c7 100644 --- a/services/rfq/api/rest/server_test.go +++ b/services/rfq/api/rest/server_test.go @@ -220,8 +220,9 @@ func (c *ServerSuite) TestPutAck() { err = json.NewDecoder(resp.Body).Decode(&result) c.Require().NoError(err) expectedResult := relapi.PutRelayAckResponse{ - TxID: testTxID, - ShouldRelay: true, + TxID: testTxID, + ShouldRelay: true, + RelayerAddress: c.testWallet.Address().Hex(), } c.Equal(expectedResult, result) err = resp.Body.Close() @@ -238,8 +239,9 @@ func (c *ServerSuite) TestPutAck() { err = json.NewDecoder(resp.Body).Decode(&result) c.Require().NoError(err) expectedResult = relapi.PutRelayAckResponse{ - TxID: testTxID, - ShouldRelay: false, + TxID: testTxID, + ShouldRelay: false, + RelayerAddress: c.testWallet.Address().Hex(), } c.Equal(expectedResult, result) err = resp.Body.Close() diff --git a/services/rfq/relayer/relapi/model.go b/services/rfq/relayer/relapi/model.go index 96ba1e1dff..563a3ab5ba 100644 --- a/services/rfq/relayer/relapi/model.go +++ b/services/rfq/relayer/relapi/model.go @@ -18,6 +18,7 @@ type GetTxRetryResponse struct { // PutRelayAckResponse contains the schema for a POST /relay/ack response. type PutRelayAckResponse struct { - TxID string `json:"tx_id"` - ShouldRelay bool `json:"should_relay"` + TxID string `json:"tx_id"` + ShouldRelay bool `json:"should_relay"` + RelayerAddress string `json:"relayer_address"` } From d9037b479e0b85887398b6f90726f34499401a26 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:41:02 -0500 Subject: [PATCH 17/24] Cleanup: add clarifying comment --- services/rfq/api/rest/server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index dfe7998d86..10e04acb5e 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -244,6 +244,10 @@ func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (address } // PutRelayAck checks if a relay is pending or not. +// Note that the ack is not binding; that is, any relayer can still relay the transaction +// on chain if they ignore the response to this call. +// Also, this will not work if the API is run on multiple servers, since there is no inter-server +// communication to maintain the cache. // // PUT /ack. // @dev Protected Method: Authentication is handled through middleware in server.go. From e50f170e7205921dd94d93722255fb2c5b8751e4 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 12:41:04 -0500 Subject: [PATCH 18/24] [goreleaser] From 7e9975d5925d9a75d429c02df2fd91856457a2c6 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 14:12:12 -0500 Subject: [PATCH 19/24] Cleanup: lint --- services/rfq/api/rest/server.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index 10e04acb5e..c52992d8c1 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -207,7 +207,7 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { } func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (addressRecovered common.Address, err error) { - bridge, ok := r.fastBridgeContracts[uint32(destChainID)] + bridge, ok := r.fastBridgeContracts[destChainID] if !ok { err = fmt.Errorf("dest chain id not supported: %d", destChainID) return addressRecovered, err @@ -220,16 +220,16 @@ func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (address deadline := time.Now().Unix() - 1000 // TODO: Replace with some type of r.cfg.AuthExpiryDelta addressRecovered, err = EIP191Auth(c, deadline) if err != nil { - err = fmt.Errorf("unable to authenticate relayer: %v", err) + err = fmt.Errorf("unable to authenticate relayer: %w", err) return addressRecovered, err } - hasRole := r.roleCache[uint32(destChainID)].Get(addressRecovered.Hex()) + hasRole := r.roleCache[destChainID].Get(addressRecovered.Hex()) if hasRole == nil || hasRole.IsExpired() { has, roleErr := bridge.HasRole(ops, relayerRole, addressRecovered) if roleErr == nil { - r.roleCache[uint32(destChainID)].Set(addressRecovered.Hex(), has, cacheInterval) + r.roleCache[destChainID].Set(addressRecovered.Hex(), has, cacheInterval) } if roleErr != nil { @@ -271,7 +271,11 @@ func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"}) return } - relayerAddr := rawRelayerAddr.(string) + relayerAddr, ok := rawRelayerAddr.(string) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"}) + return + } ackReq, ok := req.(*model.PutAckRequest) if !ok { c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request type"}) From 86ff8eeaf2986465a723932e83b34042561c7fb1 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Thu, 16 May 2024 14:12:14 -0500 Subject: [PATCH 20/24] [goreleaser] From 8265f2025e2fc6d9fc4017bc0c6e92cad8e9b33c Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Fri, 17 May 2024 11:06:41 -0500 Subject: [PATCH 21/24] Cleanup: add tracing --- services/rfq/relayer/service/handlers.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/services/rfq/relayer/service/handlers.go b/services/rfq/relayer/service/handlers.go index 515b2f9699..8bbe5dc0c0 100644 --- a/services/rfq/relayer/service/handlers.go +++ b/services/rfq/relayer/service/handlers.go @@ -157,8 +157,13 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r if err != nil { return fmt.Errorf("could not get relay ack: %w", err) } + span.SetAttributes( + attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])), + attribute.Bool("should_relay", resp.ShouldRelay), + attribute.String("relayer_address", resp.RelayerAddress), + ) if !resp.ShouldRelay { - span.SetAttributes(attribute.Bool("should_relay", false)) + span.AddEvent("not relaying due to ack") return nil } From 2327ba5789adc6adb8978edcc442bb6dbe703d7e Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Fri, 17 May 2024 11:06:43 -0500 Subject: [PATCH 22/24] [goreleaser] From 81c7791829b6bfc5cd8cec0c4da1fb644b388b1d Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Mon, 20 May 2024 10:13:02 -0500 Subject: [PATCH 23/24] Config: bump relay ack timeout to 30 seconds --- services/rfq/api/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/rfq/api/config/config.go b/services/rfq/api/config/config.go index aab38d7b2a..a6933cc4c2 100644 --- a/services/rfq/api/config/config.go +++ b/services/rfq/api/config/config.go @@ -27,7 +27,7 @@ type Config struct { RelayAckTimeout time.Duration `yaml:"relay_ack_timeout"` } -const defaultRelayAckTimeout = 10 * time.Second +const defaultRelayAckTimeout = 30 * time.Second // GetRelayAckTimeout returns the relay ack timeout. func (c Config) GetRelayAckTimeout() time.Duration { From 85b3dc084491d0cf1c69e84f71e17e1f7bfa3a39 Mon Sep 17 00:00:00 2001 From: Daniel Wasserman Date: Mon, 20 May 2024 10:13:05 -0500 Subject: [PATCH 24/24] [goreleaser]