Skip to content

Commit

Permalink
Unify engines
Browse files Browse the repository at this point in the history
The lifecycle of an avalanchego node can be separated to three main stages:

Stage I - If supported by the VM, then a state sync occurs. Otherwise, this stage is skipped.
In the X-chain, this stage is entirely substituted by a custom bootstrapping phase that replicates and executes the X-chain DAG.

Stage II - A stage called bootstrapping, which entails replicating the blocks from other avalanchego nodes, and executing them.

Stage III - The final form of an avalanchego node, in which it runs the snowman consensus protocol.

The phases are implemented by components called "engines":

- avalanche bootstrapping
- snowman
- state syncer
- snowman bootstrapper

And the handler in snow/networking/handler/ is responsible to route messages from the network into the correct engine.
Engines all implement the same common.Engine interface, but the interface consists of the union of all operations across all engines.

Indeed, it is often the case that a message of type `m` dispatched by engine `e`, cannot be processed by a different engine `e'.
For instance, a Chits message cannot be processed by any engine other than consensus, and a message about a query of state summary is only relevant to the state sync engine.

To that end, each engine simply implements a no-op dispatcher for messages it does not care about.

The biggest problem with the existing aforementioned structure, is that the lifecycle of the engines imposes a strict one way movement across the various stages,
and there is no single component which consolidates the transition among the stages. The movement between the various stages takes place by a callback given to each
engine at every stage but the final one (Stage I and Stage II).

The structure therefore makes it difficult to introduce movement from snowman consensus back to bootstrapping / state sync, or to have better control over the message dispatch.

This commit unifies all engines into a single one under snow/engine/unified

As a result, the implementation of the handler in snow/networking/handler/handler.go is now simpler,
as it only interacts with the unified engine, and it never needs to query the snow.EngineState.

The state transition between the various stages is now taken care of by the unified engine, and since the code to dispatch messages to the right engine
is now all in the unified engine, it's not only more testable but it lets us move among the stages in the same place where we consider the stage we're in to dispatch
the message to the correct engine.

Signed-off-by: Yacov Manevich <yacov.manevich@avalabs.org>
  • Loading branch information
yacovm committed Oct 16, 2024
1 parent 814ad2c commit d2505c2
Show file tree
Hide file tree
Showing 36 changed files with 2,238 additions and 981 deletions.
156 changes: 71 additions & 85 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common/tracker"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/snow/engine/snowman/syncer"
"github.com/ava-labs/avalanchego/snow/engine/unified"
"github.com/ava-labs/avalanchego/snow/networking/handler"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/sender"
Expand Down Expand Up @@ -63,7 +64,6 @@ import (

p2ppb "github.com/ava-labs/avalanchego/proto/pb/p2p"
smcon "github.com/ava-labs/avalanchego/snow/consensus/snowman"
aveng "github.com/ava-labs/avalanchego/snow/engine/avalanche"
avbootstrap "github.com/ava-labs/avalanchego/snow/engine/avalanche/bootstrap"
avagetter "github.com/ava-labs/avalanchego/snow/engine/avalanche/getter"
smeng "github.com/ava-labs/avalanchego/snow/engine/snowman"
Expand Down Expand Up @@ -933,23 +933,13 @@ func (m *manager) createAvalancheChain(
// to make sure start callbacks are duly initialized
snowmanEngineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanMessageSender,
Validators: vdrs,
ConnectedValidators: connectedValidators,
Params: consensusParams,
Consensus: snowmanConsensus,
}
var snowmanEngine common.Engine
snowmanEngine, err = smeng.New(snowmanEngineConfig)
if err != nil {
return nil, fmt.Errorf("error initializing snowman engine: %w", err)
}

if m.TracingEnabled {
snowmanEngine = common.TraceEngine(snowmanEngine, m.Tracer)
}

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Expand All @@ -968,18 +958,6 @@ func (m *manager) createAvalancheChain(
DB: blockBootstrappingDB,
VM: vmWrappingProposerVM,
}
var snowmanBootstrapper common.BootstrapableEngine
snowmanBootstrapper, err = smbootstrap.New(
bootstrapCfg,
snowmanEngine.Start,
)
if err != nil {
return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err)
}

if m.TracingEnabled {
snowmanBootstrapper = common.TraceBootstrapableEngine(snowmanBootstrapper, m.Tracer)
}

avaGetHandler, err := avagetter.New(
vtxManager,
Expand All @@ -993,12 +971,6 @@ func (m *manager) createAvalancheChain(
return nil, fmt.Errorf("couldn't initialize avalanche base message handler: %w", err)
}

// create engine gear
avalancheEngine := aveng.New(ctx, avaGetHandler, linearizableVM)
if m.TracingEnabled {
avalancheEngine = common.TraceEngine(avalancheEngine, m.Tracer)
}

// create bootstrap gear
avalancheBootstrapperConfig := avbootstrap.Config{
AllGetsServer: avaGetHandler,
Expand All @@ -1016,32 +988,33 @@ func (m *manager) createAvalancheChain(
avalancheBootstrapperConfig.StopVertexID = m.Upgrades.CortinaXChainStopVertexID
}

var avalancheBootstrapper common.BootstrapableEngine
avalancheBootstrapper, err = avbootstrap.New(
avalancheBootstrapperConfig,
snowmanBootstrapper.Start,
avalancheMetrics,
)
ef := &unified.EngineFactory{
TracingEnabled: m.TracingEnabled,
GetServer: snowGetHandler,
AvaAncestorGetter: avaGetHandler,
AvaMetrics: avalancheMetrics,
Tracer: m.Tracer,
BootConfig: bootstrapCfg,
AvaBootConfig: avalancheBootstrapperConfig,
SnowmanConfig: snowmanEngineConfig,
Logger: ctx.Log,
}

ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE,
State: snow.Bootstrapping,
})

ue, err := unified.EngineFromEngines(ctx, ef, vm)
if err != nil {
return nil, fmt.Errorf("error initializing avalanche bootstrapper: %w", err)
return nil, fmt.Errorf("error initializing unified engine: %w", err)
}

engine := common.Engine(ue)
if m.TracingEnabled {
avalancheBootstrapper = common.TraceBootstrapableEngine(avalancheBootstrapper, m.Tracer)
engine = common.TraceEngine(ue, m.Tracer)
}

h.SetEngineManager(&handler.EngineManager{
Avalanche: &handler.Engine{
StateSyncer: nil,
Bootstrapper: avalancheBootstrapper,
Consensus: avalancheEngine,
},
Snowman: &handler.Engine{
StateSyncer: nil,
Bootstrapper: snowmanBootstrapper,
Consensus: snowmanEngine,
},
})
h.SetEngine(engine)

// Register health check for this chain
if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil {
Expand Down Expand Up @@ -1327,7 +1300,6 @@ func (m *manager) createSnowmanChain(
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: messageSender,
Validators: vdrs,
Expand All @@ -1336,15 +1308,6 @@ func (m *manager) createSnowmanChain(
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
}
var engine common.Engine
engine, err = smeng.New(engineConfig)
if err != nil {
return nil, fmt.Errorf("error initializing snowman engine: %w", err)
}

if m.TracingEnabled {
engine = common.TraceEngine(engine, m.Tracer)
}

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Expand All @@ -1364,18 +1327,6 @@ func (m *manager) createSnowmanChain(
VM: vm,
Bootstrapped: bootstrapFunc,
}
var bootstrapper common.BootstrapableEngine
bootstrapper, err = smbootstrap.New(
bootstrapCfg,
engine.Start,
)
if err != nil {
return nil, fmt.Errorf("error initializing snowman bootstrapper: %w", err)
}

if m.TracingEnabled {
bootstrapper = common.TraceBootstrapableEngine(bootstrapper, m.Tracer)
}

// create state sync gear
stateSyncCfg, err := syncer.NewConfig(
Expand All @@ -1392,24 +1343,42 @@ func (m *manager) createSnowmanChain(
if err != nil {
return nil, fmt.Errorf("couldn't initialize state syncer configuration: %w", err)
}
stateSyncer := syncer.New(
stateSyncCfg,
bootstrapper.Start,
)

if m.TracingEnabled {
stateSyncer = common.TraceStateSyncer(stateSyncer, m.Tracer)
ef := &unified.EngineFactory{
TracingEnabled: m.TracingEnabled,
GetServer: snowGetHandler,
AvaAncestorGetter: invalidEngineAncestorsGetter{},
StateSync: hasStateSync(stateSyncCfg),
Tracer: m.Tracer,
BootConfig: bootstrapCfg,
SnowmanConfig: engineConfig,
StateSyncConfig: stateSyncCfg,
Logger: ctx.Log,
}

h.SetEngineManager(&handler.EngineManager{
Avalanche: nil,
Snowman: &handler.Engine{
StateSyncer: stateSyncer,
Bootstrapper: bootstrapper,
Consensus: engine,
},
ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN,
State: snow.StateSyncing,
})

if !ef.HasStateSync() {
ctx.State.Set(snow.EngineState{
Type: p2ppb.EngineType_ENGINE_TYPE_SNOWMAN,
State: snow.Bootstrapping,
})
}

ue, err := unified.EngineFromEngines(ctx, ef, vm)
if err != nil {
return nil, fmt.Errorf("error initializing unified engine: %w", err)
}

engine := common.Engine(ue)
if m.TracingEnabled {
engine = common.TraceEngine(ue, m.Tracer)
}
h.SetEngine(engine)

// Register health checks
if err := m.Health.RegisterHealthCheck(primaryAlias, h, ctx.SubnetID.String()); err != nil {
return nil, fmt.Errorf("couldn't add health check for chain %s: %w", primaryAlias, err)
Expand All @@ -1423,6 +1392,17 @@ func (m *manager) createSnowmanChain(
}, nil
}

func hasStateSync(stateSyncCfg syncer.Config) bool {
var hasStateSync bool
ssVM, isStateSyncable := stateSyncCfg.VM.(block.StateSyncableVM)
if isStateSyncable {
if enabled, err := ssVM.StateSyncEnabled(context.Background()); err == nil && enabled {
hasStateSync = true
}
}
return hasStateSync
}

func (m *manager) IsBootstrapped(id ids.ID) bool {
m.chainsLock.Lock()
chain, exists := m.chains[id]
Expand Down Expand Up @@ -1581,3 +1561,9 @@ func (m *manager) getOrMakeVMRegisterer(vmID ids.ID, chainAlias string) (metrics
)
return chainReg, err
}

type invalidEngineAncestorsGetter struct{}

func (invalidEngineAncestorsGetter) GetAncestors(_ context.Context, _ ids.NodeID, _ uint32, _ ids.ID, engineType p2ppb.EngineType) error {
return fmt.Errorf("this engine does not run %s", engineType)
}
1 change: 1 addition & 0 deletions scripts/mocks.mockgen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ github.com/ava-labs/avalanchego/snow/engine/snowman/block=BuildBlockWithContextC
github.com/ava-labs/avalanchego/snow/engine/snowman/block=ChainVM=snow/engine/snowman/block/blockmock/chain_vm.go
github.com/ava-labs/avalanchego/snow/engine/snowman/block=StateSyncableVM=snow/engine/snowman/block/blockmock/state_syncable_vm.go
github.com/ava-labs/avalanchego/snow/engine/snowman/block=WithVerifyContext=snow/engine/snowman/block/blockmock/with_verify_context.go
github.com/ava-labs/avalanchego/snow/engine/unified=Factory=snow/engine/unified/mocks/mock_factory.go
github.com/ava-labs/avalanchego/snow/networking/handler=Handler=snow/networking/handler/handlermock/handler.go
github.com/ava-labs/avalanchego/snow/networking/timeout=Manager=snow/networking/timeout/timeoutmock/manager.go
github.com/ava-labs/avalanchego/snow/networking/tracker=Targeter=snow/networking/tracker/trackermock/targeter.go
Expand Down
26 changes: 1 addition & 25 deletions snow/engine/avalanche/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/snow/consensus/avalanche"
Expand Down Expand Up @@ -43,7 +42,7 @@ const (
epsilon = 1e-6 // small amount to add to time to avoid division by 0
)

var _ common.BootstrapableEngine = (*Bootstrapper)(nil)
var _ common.AvalancheBootstrapableEngine = (*Bootstrapper)(nil)

func New(
config Config,
Expand All @@ -53,15 +52,6 @@ func New(
b := &Bootstrapper{
Config: config,

StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log),
AcceptedStateSummaryHandler: common.NewNoOpAcceptedStateSummaryHandler(config.Ctx.Log),
AcceptedFrontierHandler: common.NewNoOpAcceptedFrontierHandler(config.Ctx.Log),
AcceptedHandler: common.NewNoOpAcceptedHandler(config.Ctx.Log),
PutHandler: common.NewNoOpPutHandler(config.Ctx.Log),
QueryHandler: common.NewNoOpQueryHandler(config.Ctx.Log),
ChitsHandler: common.NewNoOpChitsHandler(config.Ctx.Log),
AppHandler: config.VM,

outstandingRequests: bimap.New[common.Request, ids.ID](),
outstandingRequestTimes: make(map[common.Request]time.Time),

Expand All @@ -77,16 +67,6 @@ type Bootstrapper struct {
Config
common.Halter

// list of NoOpsHandler for messages dropped by Bootstrapper
common.StateSummaryFrontierHandler
common.AcceptedStateSummaryHandler
common.AcceptedFrontierHandler
common.AcceptedHandler
common.PutHandler
common.QueryHandler
common.ChitsHandler
common.AppHandler

metrics

// tracks which validators were asked for which containers in which requests
Expand Down Expand Up @@ -320,10 +300,6 @@ func (*Bootstrapper) Notify(context.Context, common.Message) error {
func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error {
b.Ctx.Log.Info("starting bootstrap")

b.Ctx.State.Set(snow.EngineState{
Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE,
State: snow.Bootstrapping,
})
if err := b.VM.SetState(ctx, snow.Bootstrapping); err != nil {
return fmt.Errorf("failed to notify VM that bootstrapping has started: %w",
err)
Expand Down
1 change: 0 additions & 1 deletion snow/engine/avalanche/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,6 @@ func TestBootstrapperIncompleteAncestors(t *testing.T) {
require.Equal(vtxID1, requested)

require.NoError(bs.Ancestors(context.Background(), peerID, *reqIDPtr, [][]byte{vtxBytes1})) // Provide vtx1; should request vtx0
require.Equal(snow.Bootstrapping, bs.Context().State.Get().State)
require.Equal(vtxID0, requested)

manager.StopVertexAcceptedF = func(context.Context) (bool, error) {
Expand Down
Loading

0 comments on commit d2505c2

Please sign in to comment.