diff --git a/services/rfq/api/client/client.go b/services/rfq/api/client/client.go index 4ed3f80ee0..2e878afaf2 100644 --- a/services/rfq/api/client/client.go +++ b/services/rfq/api/client/client.go @@ -26,6 +26,7 @@ import ( // It provides methods for creating, retrieving and updating quotes. type AuthenticatedClient interface { PutQuote(ctx context.Context, q *model.PutQuoteRequest) error + PutRelayAck(ctx context.Context, req *model.PutAckRequest) (*model.PutRelayAckResponse, error) UnauthenticatedClient } @@ -124,6 +125,25 @@ func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutQuoteRequest) err return err } +func (c *clientImpl) PutRelayAck(ctx context.Context, req *model.PutAckRequest) (*model.PutRelayAckResponse, error) { + var ack *model.PutRelayAckResponse + resp, err := c.rClient.R(). + SetContext(ctx). + SetBody(req). + SetResult(&ack). + Put(rest.AckRoute) + + if err != nil { + return nil, fmt.Errorf("error from server: %s %w", resp.Status(), err) + } + + if resp.IsError() { + return nil, fmt.Errorf("error from server: %s", resp.Status()) + } + + return ack, nil +} + // GetAllQuotes retrieves all quotes from the RFQ quoting API. func (c *unauthenticatedClient) GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) { var quotes []*model.GetQuoteResponse diff --git a/services/rfq/api/config/config.go b/services/rfq/api/config/config.go index 66cb1d343c..a6933cc4c2 100644 --- a/services/rfq/api/config/config.go +++ b/services/rfq/api/config/config.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/jftuga/ellipsis" "gopkg.in/yaml.v2" @@ -21,8 +22,19 @@ type Config struct { Database DatabaseConfig `yaml:"database"` OmniRPCURL string `yaml:"omnirpc_url"` // bridges is a map of chainid->address - Bridges map[uint32]string `yaml:"bridges"` - Port string `yaml:"port"` + Bridges map[uint32]string `yaml:"bridges"` + Port string `yaml:"port"` + RelayAckTimeout time.Duration `yaml:"relay_ack_timeout"` +} + +const defaultRelayAckTimeout = 30 * time.Second + +// GetRelayAckTimeout returns the relay ack timeout. +func (c Config) GetRelayAckTimeout() time.Duration { + if c.RelayAckTimeout == 0 { + return defaultRelayAckTimeout + } + return c.RelayAckTimeout } // LoadConfig loads the config from the given path. diff --git a/services/rfq/api/docs/docs.go b/services/rfq/api/docs/docs.go index b90c70e6ec..fdd79b976b 100644 --- a/services/rfq/api/docs/docs.go +++ b/services/rfq/api/docs/docs.go @@ -15,6 +15,37 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/ack": { + "put": { + "description": "cache an ack request to synchronize relayer actions.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "ack" + ], + "summary": "Relay ack", + "parameters": [ + { + "description": "query params", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/model.PutQuoteRequest" + } + } + ], + "responses": { + "200": { + "description": "OK" + } + } + } + }, "/quotes": { "get": { "description": "get quotes from all relayers.", diff --git a/services/rfq/api/docs/swagger.json b/services/rfq/api/docs/swagger.json index 7341e36f97..0f4707ab3f 100644 --- a/services/rfq/api/docs/swagger.json +++ b/services/rfq/api/docs/swagger.json @@ -4,6 +4,37 @@ "contact": {} }, "paths": { + "/ack": { + "put": { + "description": "cache an ack request to synchronize relayer actions.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "ack" + ], + "summary": "Relay ack", + "parameters": [ + { + "description": "query params", + "name": "request", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/model.PutQuoteRequest" + } + } + ], + "responses": { + "200": { + "description": "OK" + } + } + } + }, "/quotes": { "get": { "description": "get quotes from all relayers.", diff --git a/services/rfq/api/docs/swagger.yaml b/services/rfq/api/docs/swagger.yaml index 3901126f16..a95f1dcddc 100644 --- a/services/rfq/api/docs/swagger.yaml +++ b/services/rfq/api/docs/swagger.yaml @@ -67,6 +67,26 @@ definitions: info: contact: {} paths: + /ack: + put: + consumes: + - application/json + description: cache an ack request to synchronize relayer actions. + parameters: + - description: query params + in: body + name: request + required: true + schema: + $ref: '#/definitions/model.PutQuoteRequest' + produces: + - application/json + responses: + "200": + description: OK + summary: Relay ack + tags: + - ack /quotes: get: consumes: diff --git a/services/rfq/api/model/request.go b/services/rfq/api/model/request.go index 865c1b4af6..f3ce466ce1 100644 --- a/services/rfq/api/model/request.go +++ b/services/rfq/api/model/request.go @@ -13,6 +13,12 @@ type PutQuoteRequest struct { DestFastBridgeAddress string `json:"dest_fast_bridge_address"` } +// PutAckRequest contains the schema for a PUT /ack request. +type PutAckRequest struct { + TxID string `json:"tx_id"` + DestChainID int `json:"dest_chain_id"` +} + // GetQuoteSpecificRequest contains the schema for a GET /quote request with specific params. type GetQuoteSpecificRequest struct { OriginChainID int `json:"originChainId"` diff --git a/services/rfq/api/model/response.go b/services/rfq/api/model/response.go index a3fbc2fa47..8e664f9e0e 100644 --- a/services/rfq/api/model/response.go +++ b/services/rfq/api/model/response.go @@ -25,3 +25,13 @@ type GetQuoteResponse struct { // UpdatedAt is the time that the quote was last upserted UpdatedAt string `json:"updated_at"` } + +// PutRelayAckResponse contains the schema for a PUT /relay/ack response. +type PutRelayAckResponse struct { + // TxID is the transaction ID + TransactionID string `json:"tx_id"` + // ShouldRelay is a boolean indicating whether the transaction should be relayed + ShouldRelay bool `json:"should_relay"` + // RelayerAddress is the address of the relayer that is currently acked + RelayerAddress string `json:"relayer_address"` +} diff --git a/services/rfq/api/rest/server.go b/services/rfq/api/rest/server.go index e1a4a9c37d..c52992d8c1 100644 --- a/services/rfq/api/rest/server.go +++ b/services/rfq/api/rest/server.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" "github.com/ipfs/go-log" @@ -25,6 +26,7 @@ import ( "github.com/synapsecns/sanguine/services/rfq/api/docs" "github.com/synapsecns/sanguine/services/rfq/api/model" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" + "github.com/synapsecns/sanguine/services/rfq/relayer/relapi" ) // QuoterAPIServer is a struct that holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts. @@ -37,6 +39,11 @@ type QuoterAPIServer struct { handler metrics.Handler fastBridgeContracts map[uint32]*fastbridge.FastBridge roleCache map[uint32]*ttlcache.Cache[string, bool] + // relayAckCache contains a set of transactionID values that reflect + // transactions that have been acked for relay + relayAckCache *ttlcache.Cache[string, string] + // ackMux is a mutex used to ensure that only one transaction id can be acked at a time. + ackMux sync.Mutex } // NewAPI holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts. @@ -80,7 +87,6 @@ func NewAPI( ttlcache.WithTTL[string, bool](cacheInterval), ) roleCache := roles[chainID] - go roleCache.Start() go func() { <-ctx.Done() @@ -88,6 +94,17 @@ func NewAPI( }() } + // create the relay ack cache + relayAckCache := ttlcache.New[string, string]( + ttlcache.WithTTL[string, string](cfg.GetRelayAckTimeout()), + ttlcache.WithDisableTouchOnHit[string, string](), + ) + go relayAckCache.Start() + go func() { + <-ctx.Done() + relayAckCache.Stop() + }() + return &QuoterAPIServer{ cfg: cfg, db: store, @@ -95,12 +112,16 @@ func NewAPI( handler: handler, fastBridgeContracts: bridges, roleCache: roles, + relayAckCache: relayAckCache, + ackMux: sync.Mutex{}, }, nil } -// QuoteRoute is the API endpoint for handling quote related requests. const ( - QuoteRoute = "/quotes" + // QuoteRoute is the API endpoint for handling quote related requests. + QuoteRoute = "/quotes" + // AckRoute is the API endpoint for handling relay ack related requests. + AckRoute = "/ack" cacheInterval = time.Minute ) @@ -113,10 +134,14 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error { h := NewHandler(r.db) engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerfiles.Handler)) - // Apply AuthMiddleware only to the PUT route + // Apply AuthMiddleware only to the PUT routes quotesPut := engine.Group(QuoteRoute) quotesPut.Use(r.AuthMiddleware()) quotesPut.PUT("", h.ModifyQuote) + ackPut := engine.Group(AckRoute) + ackPut.Use(r.AuthMiddleware()) + ackPut.PUT("", r.PutRelayAck) + // GET routes without the AuthMiddleware // engine.PUT("/quotes", h.ModifyQuote) engine.GET(QuoteRoute, h.GetQuotes) @@ -136,55 +161,143 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error { // AuthMiddleware is the Gin authentication middleware that authenticates requests using EIP191. func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc { return func(c *gin.Context) { - var req model.PutQuoteRequest - if err := c.BindJSON(&req); err != nil { + var loggedRequest interface{} + var destChainID uint32 + var err error + + // Parse the dest chain id from the request + switch c.Request.URL.Path { + case QuoteRoute: + var req model.PutQuoteRequest + err = c.BindJSON(&req) + if err == nil { + destChainID = uint32(req.DestChainID) + loggedRequest = &req + } + case AckRoute: + var req model.PutAckRequest + err = c.BindJSON(&req) + if err == nil { + destChainID = uint32(req.DestChainID) + loggedRequest = &req + } + default: + err = fmt.Errorf("unexpected request path: %s", c.Request.URL.Path) + } + if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) c.Abort() return } - bridge, ok := r.fastBridgeContracts[uint32(req.DestChainID)] - if !ok { - c.JSON(http.StatusBadRequest, gin.H{"msg": "dest chain id not supported"}) + // Authenticate and fetch the address from the request + addressRecovered, err := r.checkRole(c, destChainID) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()}) c.Abort() return } - ops := &bind.CallOpts{Context: c} - relayerRole := crypto.Keccak256Hash([]byte("RELAYER_ROLE")) + // Log and pass to the next middleware if authentication succeeds + // Store the request in context after binding and validation + c.Set("putRequest", loggedRequest) + c.Set("relayerAddr", addressRecovered.Hex()) + c.Next() + } +} - // authenticate relayer signature with EIP191 - deadline := time.Now().Unix() - 1000 // TODO: Replace with some type of r.cfg.AuthExpiryDelta - addressRecovered, err := EIP191Auth(c, deadline) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"msg": fmt.Sprintf("unable to authenticate relayer: %v", err)}) - c.Abort() - return - } +func (r *QuoterAPIServer) checkRole(c *gin.Context, destChainID uint32) (addressRecovered common.Address, err error) { + bridge, ok := r.fastBridgeContracts[destChainID] + if !ok { + err = fmt.Errorf("dest chain id not supported: %d", destChainID) + return addressRecovered, err + } - hasRole := r.roleCache[uint32(req.DestChainID)].Get(addressRecovered.Hex()) + ops := &bind.CallOpts{Context: c} + relayerRole := crypto.Keccak256Hash([]byte("RELAYER_ROLE")) - if hasRole == nil || hasRole.IsExpired() { - has, err := bridge.HasRole(ops, relayerRole, addressRecovered) - if err == nil { - r.roleCache[uint32(req.DestChainID)].Set(addressRecovered.Hex(), has, cacheInterval) - } + // authenticate relayer signature with EIP191 + deadline := time.Now().Unix() - 1000 // TODO: Replace with some type of r.cfg.AuthExpiryDelta + addressRecovered, err = EIP191Auth(c, deadline) + if err != nil { + err = fmt.Errorf("unable to authenticate relayer: %w", err) + return addressRecovered, err + } - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"msg": "unable to check relayer role on-chain"}) - c.Abort() - return - } else if !has { - c.JSON(http.StatusBadRequest, gin.H{"msg": "q.Relayer not an on-chain relayer"}) - c.Abort() - return - } + hasRole := r.roleCache[destChainID].Get(addressRecovered.Hex()) + + if hasRole == nil || hasRole.IsExpired() { + has, roleErr := bridge.HasRole(ops, relayerRole, addressRecovered) + if roleErr == nil { + r.roleCache[destChainID].Set(addressRecovered.Hex(), has, cacheInterval) } - // Log and pass to the next middleware if authentication succeeds - // Store the request in context after binding and validation - c.Set("putRequest", &req) - c.Set("relayerAddr", addressRecovered.Hex()) - c.Next() + if roleErr != nil { + err = fmt.Errorf("unable to check relayer role on-chain") + return addressRecovered, err + } else if !has { + err = fmt.Errorf("q.Relayer not an on-chain relayer") + return addressRecovered, err + } + } + return addressRecovered, nil +} + +// PutRelayAck checks if a relay is pending or not. +// Note that the ack is not binding; that is, any relayer can still relay the transaction +// on chain if they ignore the response to this call. +// Also, this will not work if the API is run on multiple servers, since there is no inter-server +// communication to maintain the cache. +// +// PUT /ack. +// @dev Protected Method: Authentication is handled through middleware in server.go. +// @Summary Relay ack +// @Schemes +// @Description cache an ack request to synchronize relayer actions. +// @Param request body model.PutQuoteRequest true "query params" +// @Tags ack +// @Accept json +// @Produce json +// @Success 200 +// @Router /ack [put]. +func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) { + req, exists := c.Get("putRequest") + if !exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "Request not found"}) + return + } + rawRelayerAddr, exists := c.Get("relayerAddr") + if !exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"}) + return + } + relayerAddr, ok := rawRelayerAddr.(string) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"}) + return + } + ackReq, ok := req.(*model.PutAckRequest) + if !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request type"}) + return + } + + // If the tx id is already in the cache, it should not be relayed. + // Otherwise, insert the current relayer's address into the cache. + r.ackMux.Lock() + ack := r.relayAckCache.Get(ackReq.TxID) + shouldRelay := ack == nil + if shouldRelay { + r.relayAckCache.Set(ackReq.TxID, relayerAddr, ttlcache.DefaultTTL) + } else { + relayerAddr = ack.Value() + } + r.ackMux.Unlock() + + resp := relapi.PutRelayAckResponse{ + TxID: ackReq.TxID, + ShouldRelay: shouldRelay, + RelayerAddress: relayerAddr, } + c.JSON(http.StatusOK, resp) } diff --git a/services/rfq/api/rest/server_test.go b/services/rfq/api/rest/server_test.go index 16e6e11d30..52059820c7 100644 --- a/services/rfq/api/rest/server_test.go +++ b/services/rfq/api/rest/server_test.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/synapsecns/sanguine/ethergo/signer/wallet" "github.com/synapsecns/sanguine/services/rfq/api/model" + "github.com/synapsecns/sanguine/services/rfq/relayer/relapi" ) func (c *ServerSuite) TestNewQuoterAPIServer() { @@ -44,7 +45,7 @@ func (c *ServerSuite) TestEIP191_SuccessfulSignature() { } // Perform a PUT request to the API server with the authorization header. - resp, err := c.sendPutRequest(header) + resp, err := c.sendPutQuoteRequest(header) if err != nil { c.Error(err) return @@ -77,7 +78,7 @@ func (c *ServerSuite) TestEIP191_UnsuccessfulSignature() { } // Perform a PUT request to the API server with the incorrect authorization header. - resp, err := c.sendPutRequest(header) + resp, err := c.sendPutQuoteRequest(header) if err != nil { c.Error(err) return @@ -104,7 +105,7 @@ func (c *ServerSuite) TestEIP191_SuccessfulPutSubmission() { c.Require().NoError(err) // Perform a PUT request to the API server with the authorization header. - resp, err := c.sendPutRequest(header) + resp, err := c.sendPutQuoteRequest(header) c.Require().NoError(err) defer func() { _ = resp.Body.Close() @@ -126,7 +127,7 @@ func (c *ServerSuite) TestPutAndGetQuote() { c.Require().NoError(err) // Send PUT request - putResp, err := c.sendPutRequest(header) + putResp, err := c.sendPutQuoteRequest(header) c.Require().NoError(err) defer func() { err = putResp.Body.Close() @@ -168,7 +169,7 @@ func (c *ServerSuite) TestPutAndGetQuoteByRelayer() { c.Require().NoError(err) // Send PUT request - putResp, err := c.sendPutRequest(header) + putResp, err := c.sendPutQuoteRequest(header) c.Require().NoError(err) defer func() { err = putResp.Body.Close() @@ -203,6 +204,51 @@ func (c *ServerSuite) TestPutAndGetQuoteByRelayer() { c.Assert().True(found, "Newly added quote not found") } +func (c *ServerSuite) TestPutAck() { + c.startQuoterAPIServer() + + // Send GET request + testTxID := "0x123" + header, err := c.prepareAuthHeader(c.testWallet) + c.Require().NoError(err) + resp, err := c.sendPutAckRequest(header, testTxID) + c.Require().NoError(err) + c.Equal(http.StatusOK, resp.StatusCode) + + // Expect ack with shouldRelay=true + var result relapi.PutRelayAckResponse + err = json.NewDecoder(resp.Body).Decode(&result) + c.Require().NoError(err) + expectedResult := relapi.PutRelayAckResponse{ + TxID: testTxID, + ShouldRelay: true, + RelayerAddress: c.testWallet.Address().Hex(), + } + c.Equal(expectedResult, result) + err = resp.Body.Close() + c.Require().NoError(err) + + // Send another request with same txID + header, err = c.prepareAuthHeader(c.testWallet) + c.Require().NoError(err) + resp, err = c.sendPutAckRequest(header, testTxID) + c.Require().NoError(err) + c.Equal(http.StatusOK, resp.StatusCode) + + // Expect ack with shouldRelay=false + err = json.NewDecoder(resp.Body).Decode(&result) + c.Require().NoError(err) + expectedResult = relapi.PutRelayAckResponse{ + TxID: testTxID, + ShouldRelay: false, + RelayerAddress: c.testWallet.Address().Hex(), + } + c.Equal(expectedResult, result) + err = resp.Body.Close() + c.Require().NoError(err) + c.GetTestContext().Done() +} + // startQuoterAPIServer starts the API server and waits for it to initialize. func (c *ServerSuite) startQuoterAPIServer() { go func() { @@ -232,8 +278,8 @@ func (c *ServerSuite) prepareAuthHeader(wallet wallet.Wallet) (string, error) { return now + ":" + signature, nil } -// sendPutRequest sends a PUT request to the server with the given authorization header. -func (c *ServerSuite) sendPutRequest(header string) (*http.Response, error) { +// sendPutQuoteRequest sends a PUT request to the server with the given authorization header. +func (c *ServerSuite) sendPutQuoteRequest(header string) (*http.Response, error) { // Prepare the PUT request with JSON data. client := &http.Client{} putData := model.PutQuoteRequest{ @@ -264,3 +310,31 @@ func (c *ServerSuite) sendPutRequest(header string) (*http.Response, error) { } return resp, nil } + +// sendPutAckRequest sends a PUT request to the server with the given authorization header. +func (c *ServerSuite) sendPutAckRequest(header string, txID string) (*http.Response, error) { + // Prepare the PUT request. + client := &http.Client{} + putData := model.PutAckRequest{ + TxID: txID, + DestChainID: 42161, + } + jsonData, err := json.Marshal(putData) + if err != nil { + return nil, fmt.Errorf("failed to marshal putData: %w", err) + } + + req, err := http.NewRequestWithContext(c.GetTestContext(), http.MethodPut, fmt.Sprintf("http://localhost:%d/ack", c.port), bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create PUT request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Add("Authorization", header) + + // Send the request to the server. + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send PUT request: %w", err) + } + return resp, nil +} diff --git a/services/rfq/relayer/quoter/quoter.go b/services/rfq/relayer/quoter/quoter.go index f9b65ad30d..c3ed50f850 100644 --- a/services/rfq/relayer/quoter/quoter.go +++ b/services/rfq/relayer/quoter/quoter.go @@ -78,15 +78,11 @@ type Manager struct { } // NewQuoterManager creates a new QuoterManager. -func NewQuoterManager(config relconfig.Config, metricsHandler metrics.Handler, inventoryManager inventory.Manager, relayerSigner signer.Signer, feePricer pricer.FeePricer) (Quoter, error) { - apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricsHandler, config.GetRfqAPIURL(), relayerSigner) - if err != nil { - return nil, fmt.Errorf("error creating RFQ API client: %w", err) - } - +func NewQuoterManager(config relconfig.Config, metricsHandler metrics.Handler, inventoryManager inventory.Manager, relayerSigner signer.Signer, feePricer pricer.FeePricer, apiClient rfqAPIClient.AuthenticatedClient) (Quoter, error) { qt := make(map[string][]string) // fix any casing issues. + var err error for tokenID, destTokenIDs := range config.QuotableTokens { processedDestTokens := make([]string, len(destTokenIDs)) for i := range destTokenIDs { diff --git a/services/rfq/relayer/quoter/quoter_test.go b/services/rfq/relayer/quoter/quoter_test.go index fd5f4bed80..507b981e0d 100644 --- a/services/rfq/relayer/quoter/quoter_test.go +++ b/services/rfq/relayer/quoter/quoter_test.go @@ -231,7 +231,7 @@ func (s *QuoterSuite) setGasSufficiency(sufficient bool) { feePricer := pricer.NewFeePricer(s.config, clientFetcher, priceFetcher, metrics.NewNullHandler()) inventoryManager := new(inventoryMocks.Manager) inventoryManager.On(testsuite.GetFunctionName(inventoryManager.HasSufficientGas), mock.Anything, mock.Anything, mock.Anything).Return(sufficient, nil) - mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer) + mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer, nil) s.NoError(err) var ok bool diff --git a/services/rfq/relayer/quoter/suite_test.go b/services/rfq/relayer/quoter/suite_test.go index 119b7cf6fc..f58808dedc 100644 --- a/services/rfq/relayer/quoter/suite_test.go +++ b/services/rfq/relayer/quoter/suite_test.go @@ -116,7 +116,7 @@ func (s *QuoterSuite) SetupTest() { inventoryManager := new(inventoryMocks.Manager) inventoryManager.On(testsuite.GetFunctionName(inventoryManager.HasSufficientGas), mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer) + mgr, err := quoter.NewQuoterManager(s.config, metrics.NewNullHandler(), inventoryManager, nil, feePricer, nil) s.NoError(err) var ok bool diff --git a/services/rfq/relayer/relapi/handler.go b/services/rfq/relayer/relapi/handler.go index 2182db44e8..94514c9476 100644 --- a/services/rfq/relayer/relapi/handler.go +++ b/services/rfq/relayer/relapi/handler.go @@ -60,7 +60,7 @@ func (h *Handler) GetQuoteRequestStatusByTxHash(c *gin.Context) { func (h *Handler) GetQuoteRequestStatusByTxID(c *gin.Context) { txIDStr := c.Query("id") if txIDStr == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'txID'"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'id'"}) return } diff --git a/services/rfq/relayer/relapi/model.go b/services/rfq/relayer/relapi/model.go index 721c83d6c0..563a3ab5ba 100644 --- a/services/rfq/relayer/relapi/model.go +++ b/services/rfq/relayer/relapi/model.go @@ -15,3 +15,10 @@ type GetTxRetryResponse struct { Nonce uint64 `json:"nonce"` GasAmount string `json:"gas_amount"` } + +// PutRelayAckResponse contains the schema for a POST /relay/ack response. +type PutRelayAckResponse struct { + TxID string `json:"tx_id"` + ShouldRelay bool `json:"should_relay"` + RelayerAddress string `json:"relayer_address"` +} diff --git a/services/rfq/relayer/service/handlers.go b/services/rfq/relayer/service/handlers.go index 3b0ec386f4..8bbe5dc0c0 100644 --- a/services/rfq/relayer/service/handlers.go +++ b/services/rfq/relayer/service/handlers.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/services/rfq/api/model" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" "github.com/synapsecns/sanguine/services/rfq/relayer/inventory" "github.com/synapsecns/sanguine/services/rfq/relayer/reldb" @@ -138,7 +139,7 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r return fmt.Errorf("could not get committable balance: %w", err) } - // if committableBalance > destAmount + // check if we have enough inventory to handle the request if committableBalance.Cmp(request.Transaction.DestAmount) < 0 { err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.NotEnoughInventory) if err != nil { @@ -146,6 +147,26 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r } return nil } + + // get ack from API to synchronize calls with other relayers and avoid reverts + req := model.PutAckRequest{ + TxID: hexutil.Encode(request.TransactionID[:]), + DestChainID: int(request.Transaction.DestChainId), + } + resp, err := q.apiClient.PutRelayAck(ctx, &req) + if err != nil { + return fmt.Errorf("could not get relay ack: %w", err) + } + span.SetAttributes( + attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])), + attribute.Bool("should_relay", resp.ShouldRelay), + attribute.String("relayer_address", resp.RelayerAddress), + ) + if !resp.ShouldRelay { + span.AddEvent("not relaying due to ack") + return nil + } + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending) if err != nil { return fmt.Errorf("could not update request status: %w", err) diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index 276bcb869f..33396ecec5 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -21,6 +21,7 @@ import ( cctpSql "github.com/synapsecns/sanguine/services/cctp-relayer/db/sql" "github.com/synapsecns/sanguine/services/cctp-relayer/relayer" omniClient "github.com/synapsecns/sanguine/services/omnirpc/client" + rfqAPIClient "github.com/synapsecns/sanguine/services/rfq/api/client" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" "github.com/synapsecns/sanguine/services/rfq/relayer/inventory" "github.com/synapsecns/sanguine/services/rfq/relayer/pricer" @@ -40,6 +41,7 @@ type Relayer struct { client omniClient.RPCClient chainListeners map[int]listener.ContractListener apiServer *relapi.RelayerAPIServer + apiClient rfqAPIClient.AuthenticatedClient inventory inventory.Manager quoter quoter.Quoter submitter submitter.TransactionSubmitter @@ -110,7 +112,12 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi priceFetcher := pricer.NewCoingeckoPriceFetcher(cfg.GetHTTPTimeout()) fp := pricer.NewFeePricer(cfg, omniClient, priceFetcher, metricHandler) - q, err := quoter.NewQuoterManager(cfg, metricHandler, im, sg, fp) + apiClient, err := rfqAPIClient.NewAuthenticatedClient(metricHandler, cfg.GetRfqAPIURL(), sg) + if err != nil { + return nil, fmt.Errorf("error creating RFQ API client: %w", err) + } + + q, err := quoter.NewQuoterManager(cfg, metricHandler, im, sg, fp, apiClient) if err != nil { return nil, fmt.Errorf("could not get quoter") } @@ -134,6 +141,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi signer: sg, chainListeners: chainListeners, apiServer: apiServer, + apiClient: apiClient, } return &rel, nil } diff --git a/services/rfq/relayer/service/statushandler.go b/services/rfq/relayer/service/statushandler.go index 9864649446..a2bc5d72f6 100644 --- a/services/rfq/relayer/service/statushandler.go +++ b/services/rfq/relayer/service/statushandler.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/jellydator/ttlcache/v3" "github.com/synapsecns/sanguine/core/metrics" + "github.com/synapsecns/sanguine/services/rfq/api/client" "github.com/synapsecns/sanguine/services/rfq/relayer/chain" "github.com/synapsecns/sanguine/services/rfq/relayer/inventory" "github.com/synapsecns/sanguine/services/rfq/relayer/quoter" @@ -42,6 +43,8 @@ type QuoteRequestHandler struct { RelayerAddress common.Address // metrics is the metrics handler. metrics metrics.Handler + // apiClient is used to get acks before submitting a relay transaction. + apiClient client.AuthenticatedClient } // Handler is the handler for a quote request. @@ -68,6 +71,7 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest) metrics: r.metrics, RelayerAddress: r.signer.Address(), claimCache: r.claimCache, + apiClient: r.apiClient, } qr.handlers[reldb.Seen] = r.deadlineMiddleware(r.gasMiddleware(qr.handleSeen))