Skip to content

Commit

Permalink
miner: upstream alignment
Browse files Browse the repository at this point in the history
  • Loading branch information
darioush committed Sep 27, 2024
1 parent 311dbd9 commit 669c17d
Showing 1 changed file with 101 additions and 35 deletions.
136 changes: 101 additions & 35 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/ava-labs/avalanchego/utils/timer/mockable"
Expand Down Expand Up @@ -61,12 +62,20 @@ const (
targetTxsSize = 1792 * units.KiB
)

// environment is the worker's current environment and holds all of the current state information.
var (
errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
errBlockInterruptedByTimeout = errors.New("timeout while building block")
)

// environment is the worker's current environment and holds all
// information of the sealing block generation.
type environment struct {
signer types.Signer
state *state.StateDB // apply state changes here
tcount int // tx count in cycle
gasPool *core.GasPool // available gas used to pack transactions
signer types.Signer
state *state.StateDB // apply state changes here
tcount int // tx count in cycle
gasPool *core.GasPool // available gas used to pack transactions
coinbase common.Address

parent *types.Header
header *types.Header
Expand All @@ -87,6 +96,13 @@ type environment struct {
start time.Time // Time that block building began
}

const (
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit
commitInterruptTimeout
)

// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
Expand All @@ -97,12 +113,11 @@ type worker struct {
chain *core.BlockChain

// Feeds
// TODO remove since this will never be written to
pendingLogsFeed event.Feed

// Subscriptions
mux *event.TypeMux // TODO replace
mu sync.RWMutex // The lock used to protect the coinbase and extra fields
mux *event.TypeMux
mu sync.RWMutex // The lock used to protect the coinbase and extra fields
coinbase common.Address
clock *mockable.Clock // Allows us mock the clock for testing
beaconRoot *common.Hash // TODO: set to empty hash, retained for upstream compatibility and future use
Expand Down Expand Up @@ -131,6 +146,13 @@ func (w *worker) setEtherbase(addr common.Address) {
w.coinbase = addr
}

// etherbase retrieves the configured etherbase address.
func (w *worker) etherbase() common.Address {

Check failure on line 150 in miner/worker.go

View workflow job for this annotation

GitHub Actions / Lint

func `(*worker).etherbase` is unused (unused)
w.mu.RLock()
defer w.mu.RUnlock()
return w.coinbase
}

// commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateContext) (*types.Block, error) {
w.mu.RLock()
Expand Down Expand Up @@ -194,7 +216,7 @@ func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateConte
return nil, fmt.Errorf("failed to prepare header for mining: %w", err)
}

env, err := w.createCurrentEnvironment(predicateContext, parent, header, tstart)
env, err := w.makeEnv(predicateContext, parent, header, w.coinbase, tstart)
if err != nil {
return nil, fmt.Errorf("failed to create new current environment: %w", err)
}
Expand Down Expand Up @@ -251,43 +273,51 @@ func (w *worker) commitNewWork(predicateContext *precompileconfig.PredicateConte
plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee)

w.commitTransactions(env, plainTxs, blobTxs, env.header.Coinbase)
w.commitTransactions(env, plainTxs, blobTxs, nil)
}
if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee)

w.commitTransactions(env, plainTxs, blobTxs, env.header.Coinbase)
w.commitTransactions(env, plainTxs, blobTxs, nil)
}

return w.commit(env)
}

func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.PredicateContext, parent *types.Header, header *types.Header, tstart time.Time) (*environment, error) {
// makeEnv creates a new environment for the sealing block.
func (w *worker) makeEnv(predicateContext *precompileconfig.PredicateContext, parent *types.Header, header *types.Header, coinbase common.Address, tstart time.Time) (*environment, error) {
// Retrieve the parent state to execute on top and start a prefetcher for
// the miner to speed block sealing up a bit.
state, err := w.chain.StateAt(parent.Root)
if err != nil {
return nil, err
}
state.StartPrefetcher("miner", w.eth.BlockChain().CacheConfig().TriePrefetcherParallelism)
return &environment{
// Note the passed coinbase may be different with header.Coinbase.
env := &environment{
signer: types.MakeSigner(w.chainConfig, header.Number, header.Time),
state: state,
parent: parent,
coinbase: coinbase,
header: header,
parent: parent,
tcount: 0,
gasPool: new(core.GasPool).AddGas(header.GasLimit),
rules: w.chainConfig.Rules(header.Number, header.Time),
predicateContext: predicateContext,
predicateResults: predicate.NewResults(),
start: tstart,
}, nil
}
// Keep track of transactions which return errors so they can be removed
env.tcount = 0
return env, nil
}

func (w *worker) commitTransaction(env *environment, tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) {
if tx.Type() == types.BlobTxType {
return w.commitBlobTransaction(env, tx, coinbase)
return w.commitBlobTransaction(env, tx)
}
receipt, err := w.applyTransaction(env, tx, coinbase)
receipt, err := w.applyTransaction(env, tx)
if err != nil {
return nil, err
}
Expand All @@ -296,7 +326,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, coin
return receipt.Logs, nil
}

func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) {
sc := tx.BlobTxSidecar()
if sc == nil {
panic("blob transaction without blobs in miner")
Expand All @@ -308,7 +338,7 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction,
if (env.blobs+len(sc.Blobs))*params.BlobTxBlobGasPerBlob > params.MaxBlobGasPerBlock {
return nil, errors.New("max data blobs reached")
}
receipt, err := w.applyTransaction(env, tx, coinbase)
receipt, err := w.applyTransaction(env, tx)
if err != nil {
return nil, err
}
Expand All @@ -321,7 +351,7 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction,
}

// applyTransaction runs the transaction. If execution fails, state and gas pool are reverted.
func (w *worker) applyTransaction(env *environment, tx *types.Transaction, coinbase common.Address) (*types.Receipt, error) {
func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*types.Receipt, error) {
var (
snap = env.state.Snapshot()
gp = env.gasPool.Gas()
Expand All @@ -336,9 +366,9 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction, coinb
}
env.predicateResults.SetTxResults(tx.Hash(), results)

blockContext = core.NewEVMBlockContextWithPredicateResults(env.header, w.chain, &coinbase, env.predicateResults)
blockContext = core.NewEVMBlockContextWithPredicateResults(env.header, w.chain, &env.coinbase, env.predicateResults)
} else {
blockContext = core.NewEVMBlockContext(env.header, w.chain, &coinbase)
blockContext = core.NewEVMBlockContext(env.header, w.chain, &env.coinbase)
}

receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, blockContext, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig())
Expand All @@ -350,8 +380,20 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction, coinb
return receipt, err
}

func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, coinbase common.Address) {
func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
}
var coalescedLogs []*types.Log

for {
// Check interruption signal and abort building if it's fired.
if interrupt != nil {
if signal := interrupt.Load(); signal != commitInterruptNone {
return signalToErr(signal)
}
}
// If we don't have enough gas for any further transactions then we're done.
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
Expand All @@ -364,13 +406,6 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac
blobTxs.Clear()
// Fall though to pick up any plain txs
}
// If we don't have enough blob space for any further blob transactions,
// skip that list altogether
if !blobTxs.Empty() && env.blobs*params.BlobTxBlobGasPerBlob >= params.MaxBlobGasPerBlock {
log.Trace("Not enough blob space for further blob transactions")
blobTxs.Clear()
// Fall though to pick up any plain txs
}
// Retrieve the next transaction and abort if all done.
var (
ltx *txpool.LazyTransaction
Expand Down Expand Up @@ -431,18 +466,19 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac
txs.Pop()
continue
}

// Start executing the transaction
env.state.SetTxContext(tx.Hash(), env.tcount)

_, err := w.commitTransaction(env, tx, coinbase)
logs, err := w.commitTransaction(env, tx)
switch {
case errors.Is(err, core.ErrNonceTooLow):
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "hash", ltx.Hash, "sender", from, "nonce", tx.Nonce())
txs.Shift()

case errors.Is(err, nil):
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
env.tcount++
txs.Shift()

Expand All @@ -453,6 +489,22 @@ func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transac
txs.Pop()
}
}
if /*!w.isRunning() &&*/ len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are sealing. The reason is that
// when we are sealing, the worker will regenerate a sealing block every 3 seconds.
// In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.

// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
// logs by filling in the block hash when the block was mined by the local miner. This can
// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
cpy := make([]*types.Log, len(coalescedLogs))
for i, l := range coalescedLogs {
cpy[i] = new(types.Log)
*cpy[i] = *l
}
w.pendingLogsFeed.Send(cpy)
}
return nil
}

// commit runs any post-transaction state modifications, assembles the final block
Expand Down Expand Up @@ -510,8 +562,7 @@ func (w *worker) handleResult(env *environment, block *types.Block, createdAt ti
fees := totalFees(block, receipts)
feesInEther := new(big.Float).Quo(new(big.Float).SetInt(fees), big.NewFloat(params.Ether))
log.Info("Commit new mining work", "number", block.Number(), "hash", hash,
"uncles", 0, "txs", env.tcount,
"gas", block.GasUsed(), "fees", feesInEther,
"txs", env.tcount, "gas", block.GasUsed(), "fees", feesInEther,
"elapsed", common.PrettyDuration(time.Since(env.start)))

// Note: the miner no longer emits a NewMinedBlock event. Instead the caller
Expand Down Expand Up @@ -546,3 +597,18 @@ func totalFees(block *types.Block, receipts []*types.Receipt) *big.Int {
}
return feesWei
}

// signalToErr converts the interruption signal to a concrete error type for return.
// The given signal must be a valid interruption signal.
func signalToErr(signal int32) error {
switch signal {
case commitInterruptNewHead:
return errBlockInterruptedByNewHead
case commitInterruptResubmit:
return errBlockInterruptedByRecommit
case commitInterruptTimeout:
return errBlockInterruptedByTimeout
default:
panic(fmt.Errorf("undefined signal %d", signal))
}
}

0 comments on commit 669c17d

Please sign in to comment.