Skip to content

Commit

Permalink
Add X-chain block execution manager (ava-labs#2727)
Browse files Browse the repository at this point in the history
Co-authored-by: Dan Laine <daniel.laine@avalabs.org>
Co-authored-by: Chloe <99216251+coffeeavax@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 16, 2023
1 parent 459759d commit 1babaee
Show file tree
Hide file tree
Showing 9 changed files with 2,831 additions and 2 deletions.
4 changes: 3 additions & 1 deletion scripts/mocks.mockgen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ github.com/ava-labs/avalanchego/utils/filesystem=Reader=utils/filesystem/mock_io
github.com/ava-labs/avalanchego/utils/hashing=Hasher=utils/hashing/mock_hasher.go
github.com/ava-labs/avalanchego/utils/logging=Logger=utils/logging/mock_logger.go
github.com/ava-labs/avalanchego/utils/resource=User=utils/resource/mock_user.go
github.com/ava-labs/avalanchego/vms/avm/states=Chain=vms/avm/states/mock_states.go
github.com/ava-labs/avalanchego/vms/avm/blocks=Block=vms/avm/blocks/mock_block.go
github.com/ava-labs/avalanchego/vms/avm/states=Chain,State,Diff=vms/avm/states/mock_states.go
github.com/ava-labs/avalanchego/vms/avm/txs/mempool=Mempool=vms/avm/txs/mempool/mock_mempool.go
github.com/ava-labs/avalanchego/vms/components/avax=TransferableIn=vms/components/avax/mock_transferable_in.go
github.com/ava-labs/avalanchego/vms/components/avax=TransferableOut=vms/components/avax/mock_transferable_out.go
github.com/ava-labs/avalanchego/vms/components/verify=Verifiable=vms/components/verify/mock_verifiable.go
Expand Down
325 changes: 325 additions & 0 deletions vms/avm/blocks/executor/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package executor

import (
"context"
"errors"
"fmt"
"time"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/chains/atomic"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/vms/avm/blocks"
"github.com/ava-labs/avalanchego/vms/avm/states"
"github.com/ava-labs/avalanchego/vms/avm/txs/executor"
)

const SyncBound = 10 * time.Second

var (
_ snowman.Block = (*Block)(nil)

ErrTimestampBeyondSyncBound = errors.New("proposed timestamp is too far in the future relative to local time")
ErrEmptyBlock = errors.New("block contains no transactions")
ErrChildBlockEarlierThanParent = errors.New("proposed timestamp before current chain time")
ErrConflictingBlockTxs = errors.New("block contains conflicting transactions")
ErrIncorrectHeight = errors.New("block has incorrect height")
ErrBlockNotFound = errors.New("block not found")
)

// Exported for testing in avm package.
type Block struct {
blocks.Block
manager *manager
rejected bool
}

func (b *Block) Verify(context.Context) error {
blkID := b.ID()
if _, ok := b.manager.blkIDToState[blkID]; ok {
// This block has already been verified.
return nil
}

// Only allow timestamp to reasonably far forward
newChainTime := b.Timestamp()
now := b.manager.clk.Time()
maxNewChainTime := now.Add(SyncBound)
if newChainTime.After(maxNewChainTime) {
return fmt.Errorf(
"%w, proposed time (%s), local time (%s)",
ErrTimestampBeyondSyncBound,
newChainTime,
now,
)
}

txs := b.Txs()
if len(txs) == 0 {
return ErrEmptyBlock
}

// Syntactic verification is generally pretty fast, so we verify this first
// before performing any possible DB reads.
for _, tx := range txs {
err := tx.Unsigned.Visit(&executor.SyntacticVerifier{
Backend: b.manager.backend,
Tx: tx,
})
if err != nil {
txID := tx.ID()
b.manager.mempool.MarkDropped(txID, err)
return err
}
}

// Verify that the parent exists.
parentID := b.Parent()
parent, err := b.manager.GetStatelessBlock(parentID)
if err != nil {
return err
}

// Verify that currentBlkHeight = parentBlkHeight + 1.
expectedHeight := parent.Height() + 1
height := b.Height()
if expectedHeight != height {
return fmt.Errorf(
"%w: expected height %d, got %d",
ErrIncorrectHeight,
expectedHeight,
height,
)
}

stateDiff, err := states.NewDiff(parentID, b.manager)
if err != nil {
return err
}

parentChainTime := stateDiff.GetTimestamp()
// The proposed timestamp must not be before the parent's timestamp.
if newChainTime.Before(parentChainTime) {
return fmt.Errorf(
"%w: proposed timestamp (%s), chain time (%s)",
ErrChildBlockEarlierThanParent,
newChainTime,
parentChainTime,
)
}

stateDiff.SetTimestamp(newChainTime)

blockState := &blockState{
statelessBlock: b.Block,
onAcceptState: stateDiff,
atomicRequests: make(map[ids.ID]*atomic.Requests),
}

for _, tx := range txs {
// Verify that the tx is valid according to the current state of the
// chain.
err := tx.Unsigned.Visit(&executor.SemanticVerifier{
Backend: b.manager.backend,
State: stateDiff,
Tx: tx,
})
if err != nil {
txID := tx.ID()
b.manager.mempool.MarkDropped(txID, err)
return err
}

// Apply the txs state changes to the state.
//
// Note: This must be done inside the same loop as semantic verification
// to ensure that semantic verification correctly accounts for
// transactions that occurred earlier in the block.
executor := &executor.Executor{
Codec: b.manager.backend.Codec,
State: stateDiff,
Tx: tx,
}
err = tx.Unsigned.Visit(executor)
if err != nil {
txID := tx.ID()
b.manager.mempool.MarkDropped(txID, err)
return err
}

// Verify that the transaction we just executed didn't consume inputs
// that were already imported in a previous transaction.
if blockState.importedInputs.Overlaps(executor.Inputs) {
txID := tx.ID()
b.manager.mempool.MarkDropped(txID, ErrConflictingBlockTxs)
return ErrConflictingBlockTxs
}
blockState.importedInputs.Union(executor.Inputs)

// Now that the tx would be marked as accepted, we should add it to the
// state for the next transaction in the block.
stateDiff.AddTx(tx)

for chainID, txRequests := range executor.AtomicRequests {
// Add/merge in the atomic requests represented by [tx]
chainRequests, exists := blockState.atomicRequests[chainID]
if !exists {
blockState.atomicRequests[chainID] = txRequests
continue
}

chainRequests.PutRequests = append(chainRequests.PutRequests, txRequests.PutRequests...)
chainRequests.RemoveRequests = append(chainRequests.RemoveRequests, txRequests.RemoveRequests...)
}
}

// Verify that none of the transactions consumed any inputs that were
// already imported in a currently processing block.
err = b.manager.VerifyUniqueInputs(parentID, blockState.importedInputs)
if err != nil {
return err
}

// Now that the block has been executed, we can add the block data to the
// state diff.
stateDiff.SetLastAccepted(blkID)
stateDiff.AddBlock(b)

b.manager.blkIDToState[blkID] = blockState
b.manager.mempool.Remove(txs)
return nil
}

func (b *Block) Accept(context.Context) error {
blkID := b.ID()
defer b.manager.free(blkID)

b.manager.backend.Ctx.Log.Debug(
"accepting block",
zap.Stringer("blkID", blkID),
zap.Uint64("height", b.Height()),
zap.Stringer("parentID", b.Parent()),
)

txs := b.Txs()
for _, tx := range txs {
if err := b.manager.onAccept(tx); err != nil {
return fmt.Errorf(
"failed to mark tx %q as accepted: %w",
blkID,
err,
)
}
}

b.manager.lastAccepted = blkID
b.manager.mempool.Remove(txs)

blkState, ok := b.manager.blkIDToState[blkID]
if !ok {
return fmt.Errorf("%w: %s", ErrBlockNotFound, blkID)
}

// Update the state to reflect the changes made in [onAcceptState].
blkState.onAcceptState.Apply(b.manager.state)

defer b.manager.state.Abort()
batch, err := b.manager.state.CommitBatch()
if err != nil {
return fmt.Errorf(
"failed to stage state diff for block %s: %w",
blkID,
err,
)
}

// Note that this method writes [batch] to the database.
if err := b.manager.backend.Ctx.SharedMemory.Apply(blkState.atomicRequests, batch); err != nil {
return fmt.Errorf("failed to apply state diff to shared memory: %w", err)
}
return nil
}

func (b *Block) Reject(context.Context) error {
blkID := b.ID()
defer b.manager.free(blkID)

b.manager.backend.Ctx.Log.Verbo(
"rejecting block",
zap.Stringer("blkID", blkID),
zap.Uint64("height", b.Height()),
zap.Stringer("parentID", b.Parent()),
)

for _, tx := range b.Txs() {
if err := b.manager.VerifyTx(tx); err != nil {
b.manager.backend.Ctx.Log.Debug("dropping invalidated tx",
zap.Stringer("txID", tx.ID()),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
continue
}
if err := b.manager.mempool.Add(tx); err != nil {
b.manager.backend.Ctx.Log.Debug("dropping valid tx",
zap.Stringer("txID", tx.ID()),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
}
}

b.rejected = true
return nil
}

func (b *Block) Status() choices.Status {
// If this block's reference was rejected, we should report it as rejected.
//
// We don't persist the rejection, but that's fine. The consensus engine
// will hold the same reference to the block until it no longer needs it.
// After the consensus engine has released the reference to the block that
// was verified, it may get a new reference that isn't marked as rejected.
// The consensus engine may then try to issue the block, but will discover
// that it was rejected due to a conflicting block having been accepted.
if b.rejected {
return choices.Rejected
}

blkID := b.ID()
// If this block is the last accepted block, we don't need to go to disk to
// check the status.
if b.manager.lastAccepted == blkID {
return choices.Accepted
}
// Check if the block is in memory. If so, it's processing.
if _, ok := b.manager.blkIDToState[blkID]; ok {
return choices.Processing
}
// Block isn't in memory. Check in the database.
_, err := b.manager.state.GetBlock(blkID)
switch err {
case nil:
return choices.Accepted

case database.ErrNotFound:
// choices.Unknown means we don't have the bytes of the block.
// In this case, we do, so we return choices.Processing.
return choices.Processing

default:
// TODO: correctly report this error to the consensus engine.
b.manager.backend.Ctx.Log.Error(
"dropping unhandled database error",
zap.Error(err),
)
return choices.Processing
}
}
Loading

0 comments on commit 1babaee

Please sign in to comment.