Skip to content

Commit

Permalink
Engine API: NewPayload fails with a "context canceled" error in Curre…
Browse files Browse the repository at this point in the history
…nt/GetHeader (#9786) (#9894)

* improved logging
* check ctx in ServeHTTP: The context might be cancelled if the client's
connection was closed while waiting for ServeHTTP.
* If execution API returns ExecutionStatus_Busy, limit retry attempts to
10 seconds. This timeout must be lower than a typical client timeout (30
sec), in order to give the client feedback about the server status.
* If execution API returns ExecutionStatus_Busy, increase retry delay
from 10 ms to 100 ms to avoid stalling ourselves with multiple busy
loops. IMO this delay should be higher (e.g. 1 sec). Ideally we
shouldn't do polling at all, but doing a blocking ctx call requires
rearchitecting the ExecutionStatus_Busy logic.

see #9786
  • Loading branch information
battlmonstr committed May 2, 2024
1 parent 8f2cdbe commit 24645c6
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 73 deletions.
18 changes: 17 additions & 1 deletion erigon-lib/common/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,27 @@

package common

import "errors"
import (
"errors"

"golang.org/x/net/context"
)

var ErrStopped = errors.New("stopped")
var ErrUnwind = errors.New("unwound")

// FastContextErr is faster than ctx.Err() because usually it doesn't lock an internal mutex.
// It locks it only if the context is done and at the first call.
// See implementation of cancelCtx in context/context.go.
func FastContextErr(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

func Stopped(ch <-chan struct{}) error {
if ch == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ require (
go.opentelemetry.io/otel v1.8.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/net v0.24.0
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
db.trackTxEnd()
return nil, semErr
return nil, fmt.Errorf("mdbx.MdbxKV.BeginRo: roTxsLimiter error %w", semErr)
}

defer func() {
Expand Down
7 changes: 4 additions & 3 deletions erigon-lib/kv/remotedb/kv_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"runtime"
"unsafe"

"github.com/ledgerwatch/erigon-lib/kv/iter"
"github.com/ledgerwatch/erigon-lib/kv/order"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/kv/iter"
"github.com/ledgerwatch/erigon-lib/kv/order"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (db *DB) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
}

if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
return nil, semErr
return nil, fmt.Errorf("remotedb.DB.BeginRo: roTxsLimiter error %w", semErr)
}

defer func() {
Expand Down
13 changes: 12 additions & 1 deletion rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (

"github.com/golang-jwt/jwt/v4"
jsoniter "github.com/json-iterator/go"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
)

const (
Expand Down Expand Up @@ -237,6 +239,15 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// until EOF, writes the response to w, and orders the server to process a
// single request.
ctx := r.Context()

// The context might be cancelled if the client's connection was closed while waiting for ServeHTTP.
if libcommon.FastContextErr(ctx) != nil {
// TODO: introduce an log message for all possible cases
// s.logger.Warn("rpc.Server.ServeHTTP: client connection was lost. Check if the server is able to keep up with the request rate.", "url", r.URL.String())
w.WriteHeader(http.StatusServiceUnavailable)
return
}

ctx = context.WithValue(ctx, "remote", r.RemoteAddr)
ctx = context.WithValue(ctx, "scheme", r.Proto)
ctx = context.WithValue(ctx, "local", r.Host)
Expand Down
2 changes: 2 additions & 0 deletions turbo/execution/eth1/block_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (e *EthereumExecutionModule) evictOldBuilders() {
// Missing: NewPayload, AssembleBlock
func (e *EthereumExecutionModule) AssembleBlock(ctx context.Context, req *execution.AssembleBlockRequest) (*execution.AssembleBlockResponse, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.AssembleBlock: ExecutionStatus_Busy")
return &execution.AssembleBlockResponse{
Id: 0,
Busy: true,
Expand Down Expand Up @@ -108,6 +109,7 @@ func blockValue(br *types.BlockWithReceipts, baseFee *uint256.Int) *uint256.Int

func (e *EthereumExecutionModule) GetAssembledBlock(ctx context.Context, req *execution.GetAssembledBlockRequest) (*execution.GetAssembledBlockResponse, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.GetAssembledBlock: ExecutionStatus_Busy")
return &execution.GetAssembledBlockResponse{
Busy: true,
}, nil
Expand Down
48 changes: 13 additions & 35 deletions turbo/execution/eth1/eth1_chain_reader.go/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func (c ChainReaderWriterEth1) FrozenBlocks(ctx context.Context) uint64 {
return ret.FrozenBlocks
}

const retryTimeout = 10 * time.Millisecond

func (c ChainReaderWriterEth1) InsertBlocksAndWait(ctx context.Context, blocks []*types.Block) error {
request := &execution.InsertBlocksRequest{
Blocks: eth1_utils.ConvertBlocksToRPC(blocks),
Expand All @@ -281,22 +279,26 @@ func (c ChainReaderWriterEth1) InsertBlocksAndWait(ctx context.Context, blocks [
if err != nil {
return err
}
retryInterval := time.NewTicker(retryTimeout)
defer retryInterval.Stop()

// limit the number of retries
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

for response.Result == execution.ExecutionStatus_Busy {
const retryDelay = 100 * time.Millisecond
select {
case <-retryInterval.C:
response, err = c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
case <-time.After(retryDelay):
case <-ctx.Done():
return ctx.Err()
}

response, err = c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
}
if response.Result != execution.ExecutionStatus_Success {
return fmt.Errorf("insertHeadersAndWait: invalid code recieved from execution module: %s", response.Result.String())
return fmt.Errorf("InsertBlocksAndWait: executionModule.InsertBlocks ExecutionStatus = %s", response.Result.String())
}
return nil
}
Expand All @@ -321,31 +323,7 @@ func (c ChainReaderWriterEth1) InsertBlocks(ctx context.Context, blocks []*types

func (c ChainReaderWriterEth1) InsertBlockAndWait(ctx context.Context, block *types.Block) error {
blocks := []*types.Block{block}
request := &execution.InsertBlocksRequest{
Blocks: eth1_utils.ConvertBlocksToRPC(blocks),
}

response, err := c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
retryInterval := time.NewTicker(retryTimeout)
defer retryInterval.Stop()
for response.Result == execution.ExecutionStatus_Busy {
select {
case <-retryInterval.C:
response, err = c.executionModule.InsertBlocks(ctx, request)
if err != nil {
return err
}
case <-ctx.Done():
return context.Canceled
}
}
if response.Result != execution.ExecutionStatus_Success {
return fmt.Errorf("insertHeadersAndWait: invalid code recieved from execution module: %s", response.Result.String())
}
return c.InsertBlocksAndWait(ctx, []*types.Block{block})
return c.InsertBlocksAndWait(ctx, blocks)
}

func (c ChainReaderWriterEth1) ValidateChain(ctx context.Context, hash libcommon.Hash, number uint64) (execution.ExecutionStatus, *string, libcommon.Hash, error) {
Expand Down
9 changes: 6 additions & 3 deletions turbo/execution/eth1/ethereum_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"errors"
"math/big"

"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv/dbutils"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/math"
Expand Down Expand Up @@ -155,6 +156,7 @@ func (e *EthereumExecutionModule) canonicalHash(ctx context.Context, tx kv.Tx, b

func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execution.ValidationRequest) (*execution.ValidationReceipt, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.ValidateChain: ExecutionStatus_Busy")
return &execution.ValidationReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
ValidationStatus: execution.ExecutionStatus_Busy,
Expand Down Expand Up @@ -258,6 +260,7 @@ func (e *EthereumExecutionModule) Start(ctx context.Context) {

func (e *EthereumExecutionModule) Ready(context.Context, *emptypb.Empty) (*execution.ReadyResponse, error) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.Ready: ExecutionStatus_Busy")
return &execution.ReadyResponse{Ready: false}, nil
}
defer e.semaphore.Release(1)
Expand Down
1 change: 1 addition & 0 deletions turbo/execution/eth1/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func writeForkChoiceHashes(tx kv.RwTx, blockHash, safeHash, finalizedHash common

func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, originalBlockHash, safeHash, finalizedHash common.Hash, outcomeCh chan forkchoiceOutcome) {
if !e.semaphore.TryAcquire(1) {
e.logger.Warn("ethereumExecutionModule.updateForkChoice: ExecutionStatus_Busy")
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(common.Hash{}),
Status: execution.ExecutionStatus_Busy,
Expand Down
Loading

0 comments on commit 24645c6

Please sign in to comment.