Skip to content

Commit

Permalink
revert(relayer): potential deadlock (#3036)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwasse authored Aug 19, 2024
1 parent 7976f7a commit c26fce3
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 55 deletions.
53 changes: 10 additions & 43 deletions services/rfq/relayer/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,36 +162,6 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r
return nil
}

// check balance and mark it as CommitPending
err = q.commitPendingBalance(ctx, span, request)
if err != nil {
return fmt.Errorf("could not commit pending balance: %w", err)
}

// immediately forward the request to handleCommitPending
span.AddEvent("forwarding to handleCommitPending")
fwdErr := q.Forward(ctx, request)
if fwdErr != nil {
logger.Errorf("could not forward to handle commit pending: %w", fwdErr)
span.AddEvent("could not forward to handle commit pending")
}

return nil
}

// commitPendingBalance locks the balance and marks the request as CommitPending.
func (q *QuoteRequestHandler) commitPendingBalance(ctx context.Context, span trace.Span, request reldb.QuoteRequest) (err error) {
// lock the consumed balance
key := getBalanceMtxKey(q.Dest.ChainID, request.Transaction.DestToken)
span.SetAttributes(attribute.String("balance_lock_key", key))
unlocker, ok := q.balanceMtx.TryLock(key)
if !ok {
// balance is locked due to concurrent request, try again later
span.SetAttributes(attribute.Bool("locked", true))
return nil
}
defer unlocker.Unlock()

// get destination committable balance
committableBalance, err := q.Inventory.GetCommittableBalance(ctx, int(q.Dest.ChainID), request.Transaction.DestToken)
if errors.Is(err, inventory.ErrUnsupportedChain) {
Expand Down Expand Up @@ -241,6 +211,14 @@ func (q *QuoteRequestHandler) commitPendingBalance(ctx context.Context, span tra
return fmt.Errorf("could not update request status: %w", err)
}

// immediately forward the request to handleCommitPending
span.AddEvent("forwarding to handleCommitPending")
fwdErr := q.Forward(ctx, request)
if fwdErr != nil {
logger.Errorf("could not forward to handle commit pending: %w", fwdErr)
span.AddEvent("could not forward to handle commit pending")
}

return nil
}

Expand Down Expand Up @@ -509,23 +487,12 @@ func (q *QuoteRequestHandler) handleProofPosted(ctx context.Context, span trace.
// Error Handlers Only from this point below.
//
// handleNotEnoughInventory handles the not enough inventory status.
func (q *QuoteRequestHandler) handleNotEnoughInventory(ctx context.Context, span trace.Span, request reldb.QuoteRequest) (err error) {
// acquire balance lock
key := getBalanceMtxKey(q.Dest.ChainID, request.Transaction.DestToken)
span.SetAttributes(attribute.String("balance_lock_key", key))
unlocker, ok := q.balanceMtx.TryLock(key)
if !ok {
// balance is locked due to concurrent request, try again later
span.SetAttributes(attribute.Bool("locked", true))
return nil
}
defer unlocker.Unlock()

// commit destination balance
func (q *QuoteRequestHandler) handleNotEnoughInventory(ctx context.Context, _ trace.Span, request reldb.QuoteRequest) (err error) {
committableBalance, err := q.Inventory.GetCommittableBalance(ctx, int(q.Dest.ChainID), request.Transaction.DestToken)
if err != nil {
return fmt.Errorf("could not get committable balance: %w", err)
}
// if committableBalance > destAmount
if committableBalance.Cmp(request.Transaction.DestAmount) > 0 {
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending, &request.Status)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions services/rfq/relayer/service/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ type Relayer struct {
decimalsCache *xsync.MapOf[string, *uint8]
// semaphore is used to limit the number of concurrent requests
semaphore *semaphore.Weighted
// handlerMtx is used to synchronize handling of relay requests, keyed on transaction ID
handlerMtx mapmutex.StringMapMutex
// balanceMtx is used to synchronize balance requests, keyed on a chainID and tokenAddress pair
balanceMtx mapmutex.StringMapMutex
// handlerMtx is used to synchronize handling of relay requests
handlerMtx mapmutex.StringMapMutex
otelRecorder iOtelRecorder
}

Expand Down Expand Up @@ -167,7 +165,6 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi
apiClient: apiClient,
semaphore: semaphore.NewWeighted(maxConcurrentRequests),
handlerMtx: mapmutex.NewStringMapMutex(),
balanceMtx: mapmutex.NewStringMapMutex(),
otelRecorder: otelRecorder,
}
return &rel, nil
Expand Down
7 changes: 0 additions & 7 deletions services/rfq/relayer/service/statushandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ type QuoteRequestHandler struct {
mutexMiddlewareFunc func(func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error
// handlerMtx is the mutex for relaying.
handlerMtx mapmutex.StringMapMutex
// balanceMtx is the mutex for balances.
balanceMtx mapmutex.StringMapMutex
}

func getBalanceMtxKey(chainID uint32, token common.Address) string {
return fmt.Sprintf("%d-%s", chainID, token.Hex())
}

// Handler is the handler for a quote request.
Expand Down Expand Up @@ -87,7 +81,6 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest)
apiClient: r.apiClient,
mutexMiddlewareFunc: r.mutexMiddleware,
handlerMtx: r.handlerMtx,
balanceMtx: r.balanceMtx,
}

// wrap in deadline middleware since the relay has not yet happened
Expand Down

0 comments on commit c26fce3

Please sign in to comment.