diff --git a/chains/manager.go b/chains/manager.go index a5375d6aacbc..61e40f789ddf 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -888,6 +888,8 @@ func (m *manager) createAvalancheChain( return nil, err } + var halter common.Halter + // Asynchronously passes messages from the network to the consensus engine h, err := handler.New( ctx, @@ -900,6 +902,7 @@ func (m *manager) createAvalancheChain( connectedValidators, peerTracker, handlerReg, + halter.Halt, ) if err != nil { return nil, fmt.Errorf("error initializing network handler: %w", err) @@ -950,6 +953,7 @@ func (m *manager) createAvalancheChain( // create bootstrap gear bootstrapCfg := smbootstrap.Config{ + ShouldHalt: halter.Halted, NonVerifyingParse: block.ParseFunc(proposerVM.ParseLocalBlock), AllGetsServer: snowGetHandler, Ctx: ctx, @@ -1012,7 +1016,8 @@ func (m *manager) createAvalancheChain( avalancheBootstrapperConfig.StopVertexID = m.Upgrades.CortinaXChainStopVertexID } - avalancheBootstrapper, err := avbootstrap.New( + var avalancheBootstrapper common.BootstrapableEngine + avalancheBootstrapper, err = avbootstrap.New( avalancheBootstrapperConfig, snowmanBootstrapper.Start, avalancheMetrics, @@ -1277,6 +1282,8 @@ func (m *manager) createSnowmanChain( return nil, err } + var halter common.Halter + // Asynchronously passes messages from the network to the consensus engine h, err := handler.New( ctx, @@ -1289,6 +1296,7 @@ func (m *manager) createSnowmanChain( connectedValidators, peerTracker, handlerReg, + halter.Halt, ) if err != nil { return nil, fmt.Errorf("couldn't initialize message handler: %w", err) @@ -1340,6 +1348,7 @@ func (m *manager) createSnowmanChain( // create bootstrap gear bootstrapCfg := smbootstrap.Config{ + ShouldHalt: halter.Halted, NonVerifyingParse: block.ParseFunc(proposerVM.ParseLocalBlock), AllGetsServer: snowGetHandler, Ctx: ctx, diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index 00f9ab64a458..e8393274c0f8 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -43,14 +43,14 @@ const ( epsilon = 1e-6 // small amount to add to time to avoid division by 0 ) -var _ common.BootstrapableEngine = (*bootstrapper)(nil) +var _ common.BootstrapableEngine = (*Bootstrapper)(nil) func New( config Config, onFinished func(ctx context.Context, lastReqID uint32) error, reg prometheus.Registerer, -) (common.BootstrapableEngine, error) { - b := &bootstrapper{ +) (*Bootstrapper, error) { + b := &Bootstrapper{ Config: config, StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log), @@ -72,12 +72,12 @@ func New( } // Note: To align with the Snowman invariant, it should be guaranteed the VM is -// not used until after the bootstrapper has been Started. -type bootstrapper struct { +// not used until after the Bootstrapper has been Started. +type Bootstrapper struct { Config common.Halter - // list of NoOpsHandler for messages dropped by bootstrapper + // list of NoOpsHandler for messages dropped by Bootstrapper common.StateSummaryFrontierHandler common.AcceptedStateSummaryHandler common.AcceptedFrontierHandler @@ -107,11 +107,11 @@ type bootstrapper struct { onFinished func(ctx context.Context, lastReqID uint32) error } -func (b *bootstrapper) Context() *snow.ConsensusContext { +func (b *Bootstrapper) Context() *snow.ConsensusContext { return b.Ctx } -func (b *bootstrapper) Clear(context.Context) error { +func (b *Bootstrapper) Clear(context.Context) error { b.Ctx.Lock.Lock() defer b.Ctx.Lock.Unlock() @@ -130,7 +130,7 @@ func (b *bootstrapper) Clear(context.Context) error { // Ancestors handles the receipt of multiple containers. Should be received in // response to a GetAncestors message to [nodeID] with request ID [requestID]. // Expects vtxs[0] to be the vertex requested in the corresponding GetAncestors. -func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error { +func (b *Bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error { request := common.Request{ NodeID: nodeID, RequestID: requestID, @@ -254,7 +254,7 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request return b.process(ctx, verticesToProcess...) } -func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { +func (b *Bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { request := common.Request{ NodeID: nodeID, RequestID: requestID, @@ -276,7 +276,7 @@ func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID return b.fetch(ctx, vtxID) } -func (b *bootstrapper) Connected( +func (b *Bootstrapper) Connected( ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application, @@ -288,7 +288,7 @@ func (b *bootstrapper) Connected( return b.StartupTracker.Connected(ctx, nodeID, nodeVersion) } -func (b *bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error { +func (b *Bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error { if err := b.VM.Disconnected(ctx, nodeID); err != nil { return err } @@ -296,16 +296,16 @@ func (b *bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) erro return b.StartupTracker.Disconnected(ctx, nodeID) } -func (*bootstrapper) Timeout(context.Context) error { +func (*Bootstrapper) Timeout(context.Context) error { return nil } -func (*bootstrapper) Gossip(context.Context) error { +func (*Bootstrapper) Gossip(context.Context) error { return nil } -func (b *bootstrapper) Shutdown(ctx context.Context) error { - b.Ctx.Log.Info("shutting down bootstrapper") +func (b *Bootstrapper) Shutdown(ctx context.Context) error { + b.Ctx.Log.Info("shutting down Bootstrapper") b.Ctx.Lock.Lock() defer b.Ctx.Lock.Unlock() @@ -313,11 +313,11 @@ func (b *bootstrapper) Shutdown(ctx context.Context) error { return b.VM.Shutdown(ctx) } -func (*bootstrapper) Notify(context.Context, common.Message) error { +func (*Bootstrapper) Notify(context.Context, common.Message) error { return nil } -func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error { +func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error { b.Ctx.Log.Info("starting bootstrap") b.Ctx.State.Set(snow.EngineState{ @@ -388,7 +388,7 @@ func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error { return b.startSyncing(ctx, nil) } -func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) { +func (b *Bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) { b.Ctx.Lock.Lock() defer b.Ctx.Lock.Unlock() @@ -403,7 +403,7 @@ func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) { // Add the vertices in [vtxIDs] to the set of vertices that we need to fetch, // and then fetch vertices (and their ancestors) until either there are no more // to fetch or we are at the maximum number of outstanding requests. -func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { +func (b *Bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { b.needToFetch.Add(vtxIDs...) for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < maxOutstandingGetAncestorsRequests { vtxID, _ := b.needToFetch.Pop() // Length checked in predicate above @@ -442,7 +442,7 @@ func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { } // Process the vertices in [vtxs]. -func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) error { +func (b *Bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) error { // Vertices that we need to process prioritized by vertices that are unknown // or the furthest down the DAG. Unknown vertices are prioritized to ensure // that once we have made it below a certain height in DAG traversal we do @@ -565,7 +565,7 @@ func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) er } // startSyncing starts bootstrapping. Process the vertices in [accepterContainerIDs]. -func (b *bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs []ids.ID) error { +func (b *Bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs []ids.ID) error { pendingContainerIDs := b.VtxBlocked.MissingIDs() // Append the list of accepted container IDs to pendingContainerIDs to ensure // we iterate over every container that must be traversed. @@ -592,7 +592,7 @@ func (b *bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs [] // checkFinish repeatedly executes pending transactions and requests new frontier blocks until there aren't any new ones // after which it finishes the bootstrap process -func (b *bootstrapper) checkFinish(ctx context.Context) error { +func (b *Bootstrapper) checkFinish(ctx context.Context) error { // If we still need to fetch vertices, we can't finish if len(b.VtxBlocked.MissingIDs()) > 0 { return nil diff --git a/snow/engine/common/engine.go b/snow/engine/common/engine.go index 537e33c5f192..b99b586f131f 100644 --- a/snow/engine/common/engine.go +++ b/snow/engine/common/engine.go @@ -9,7 +9,6 @@ import ( "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/set" ) @@ -23,9 +22,6 @@ import ( type Engine interface { Handler - // Return the context of the chain this engine is working on - Context() *snow.ConsensusContext - // Start engine operations from given request ID Start(ctx context.Context, startReqID uint32) error @@ -425,14 +421,6 @@ type InternalHandler interface { // Gossip to the network a container on the accepted frontier Gossip(context.Context) error - // Halt this engine. - // - // This function will be called before the environment starts exiting. This - // function is special, in that it does not expect the chain's context lock - // to be held before calling this function. This function also does not - // require the engine to have been started. - Halt(context.Context) - // Shutdown this engine. // // This function will be called when the environment is exiting. diff --git a/snow/engine/common/halter.go b/snow/engine/common/halter.go index 1fcea981d2e4..d46e85b181df 100644 --- a/snow/engine/common/halter.go +++ b/snow/engine/common/halter.go @@ -3,15 +3,12 @@ package common -import ( - "context" - "sync/atomic" -) +import "sync/atomic" var _ Haltable = (*Halter)(nil) type Haltable interface { - Halt(context.Context) + Halt() Halted() bool } @@ -19,7 +16,7 @@ type Halter struct { halted uint32 } -func (h *Halter) Halt(context.Context) { +func (h *Halter) Halt() { atomic.StoreUint32(&h.halted, 1) } diff --git a/snow/engine/common/no_ops_handlers.go b/snow/engine/common/no_ops_handlers.go index 0b2247ab3cde..33ea424c968f 100644 --- a/snow/engine/common/no_ops_handlers.go +++ b/snow/engine/common/no_ops_handlers.go @@ -355,13 +355,6 @@ func (nop *noOpInternalHandler) Gossip(context.Context) error { return nil } -func (nop *noOpInternalHandler) Halt(context.Context) { - nop.log.Debug("dropping request", - zap.String("reason", "unhandled by this gear"), - zap.String("messageOp", "halt"), - ) -} - func (nop *noOpInternalHandler) Shutdown(context.Context) error { nop.log.Debug("dropping request", zap.String("reason", "unhandled by this gear"), diff --git a/snow/engine/common/traced_engine.go b/snow/engine/common/traced_engine.go index 4574f0c3364a..a1ca48ad8293 100644 --- a/snow/engine/common/traced_engine.go +++ b/snow/engine/common/traced_engine.go @@ -10,7 +10,6 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/trace" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" @@ -344,13 +343,6 @@ func (e *tracedEngine) Gossip(ctx context.Context) error { return e.engine.Gossip(ctx) } -func (e *tracedEngine) Halt(ctx context.Context) { - ctx, span := e.tracer.Start(ctx, "tracedEngine.Halt") - defer span.End() - - e.engine.Halt(ctx) -} - func (e *tracedEngine) Shutdown(ctx context.Context) error { ctx, span := e.tracer.Start(ctx, "tracedEngine.Shutdown") defer span.End() @@ -367,10 +359,6 @@ func (e *tracedEngine) Notify(ctx context.Context, msg Message) error { return e.engine.Notify(ctx, msg) } -func (e *tracedEngine) Context() *snow.ConsensusContext { - return e.engine.Context() -} - func (e *tracedEngine) Start(ctx context.Context, startReqID uint32) error { ctx, span := e.tracer.Start(ctx, "tracedEngine.Start", oteltrace.WithAttributes( attribute.Int64("requestID", int64(startReqID)), diff --git a/snow/engine/enginetest/engine.go b/snow/engine/enginetest/engine.go index 29c577f60325..1ee1b7206590 100644 --- a/snow/engine/enginetest/engine.go +++ b/snow/engine/enginetest/engine.go @@ -189,19 +189,6 @@ func (e *Engine) Start(ctx context.Context, startReqID uint32) error { return errStart } -func (e *Engine) Context() *snow.ConsensusContext { - if e.ContextF != nil { - return e.ContextF() - } - if !e.CantContext { - return nil - } - if e.T != nil { - require.FailNow(e.T, "Unexpectedly called Context") - } - return nil -} - func (e *Engine) Timeout(ctx context.Context) error { if e.TimeoutF != nil { return e.TimeoutF(ctx) @@ -228,19 +215,6 @@ func (e *Engine) Gossip(ctx context.Context) error { return errGossip } -func (e *Engine) Halt(ctx context.Context) { - if e.HaltF != nil { - e.HaltF(ctx) - return - } - if !e.CantHalt { - return - } - if e.T != nil { - require.FailNow(e.T, "Unexpectedly called Halt") - } -} - func (e *Engine) Shutdown(ctx context.Context) error { if e.ShutdownF != nil { return e.ShutdownF(ctx) diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index 966bdcf67d11..bece6f32a1b2 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -68,7 +68,7 @@ var ( // called, so it must be guaranteed the VM is not used until after Start. type Bootstrapper struct { Config - common.Halter + shouldHalt func() bool *metrics // list of NoOpsHandler for messages dropped by bootstrapper @@ -120,6 +120,7 @@ type Bootstrapper struct { func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) error) (*Bootstrapper, error) { metrics, err := newMetrics(config.Ctx.Registerer) return &Bootstrapper{ + shouldHalt: config.ShouldHalt, nonVerifyingParser: config.NonVerifyingParse, Config: config, metrics: metrics, @@ -649,7 +650,7 @@ func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error { numToExecute := b.tree.Len() err = execute( ctx, - b, + b.ShouldHalt, log, b.DB, &parseAcceptor{ @@ -673,7 +674,7 @@ func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error { lastAccepted.Height(), ) } - if b.Halted() { + if b.shouldHalt() { return nil } diff --git a/snow/engine/snowman/bootstrap/bootstrapper_test.go b/snow/engine/snowman/bootstrap/bootstrapper_test.go index e35eb1d9990a..14024dc65fb6 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper_test.go +++ b/snow/engine/snowman/bootstrap/bootstrapper_test.go @@ -20,6 +20,7 @@ import ( "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/consensus/snowman/snowmantest" + "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/engine/snowman/block/blocktest" @@ -35,7 +36,7 @@ import ( var errUnknownBlock = errors.New("unknown block") -func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest.VM) { +func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest.VM, func()) { require := require.New(t) snowCtx := snowtest.Context(t, snowtest.CChainID) @@ -89,7 +90,10 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest peerTracker.Connected(peer, version.CurrentApp) + var halter common.Halter + return Config{ + ShouldHalt: halter.Halted, NonVerifyingParse: vm.ParseBlock, AllGetsServer: snowGetHandler, Ctx: ctx, @@ -103,7 +107,7 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest AncestorsMaxContainersReceived: 2000, DB: memdb.New(), VM: vm, - }, peer, sender, vm + }, peer, sender, vm, halter.Halt } func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { @@ -140,6 +144,9 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { require.NoError(err) cfg := Config{ + ShouldHalt: func() bool { + return false + }, AllGetsServer: snowGetHandler, Ctx: ctx, Beacons: peers, @@ -213,7 +220,7 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { func TestBootstrapperSingleFrontier(t *testing.T) { require := require.New(t) - config, _, _, vm := newConfig(t) + config, _, _, vm, _ := newConfig(t) blks := snowmantest.BuildChain(1) initializeVMWithBlockchain(vm, blks) @@ -241,7 +248,7 @@ func TestBootstrapperSingleFrontier(t *testing.T) { func TestBootstrapperUnknownByzantineResponse(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -286,7 +293,7 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) { func TestBootstrapperPartialFetch(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(4) initializeVMWithBlockchain(vm, blks) @@ -336,7 +343,7 @@ func TestBootstrapperPartialFetch(t *testing.T) { func TestBootstrapperEmptyResponse(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -384,7 +391,7 @@ func TestBootstrapperEmptyResponse(t *testing.T) { func TestBootstrapperAncestors(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(4) initializeVMWithBlockchain(vm, blks) @@ -429,7 +436,7 @@ func TestBootstrapperAncestors(t *testing.T) { func TestBootstrapperFinalized(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(3) initializeVMWithBlockchain(vm, blks) @@ -471,7 +478,7 @@ func TestBootstrapperFinalized(t *testing.T) { func TestRestartBootstrapping(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(5) initializeVMWithBlockchain(vm, blks) @@ -532,7 +539,7 @@ func TestRestartBootstrapping(t *testing.T) { func TestBootstrapOldBlockAfterStateSync(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -575,7 +582,7 @@ func TestBootstrapOldBlockAfterStateSync(t *testing.T) { func TestBootstrapContinueAfterHalt(t *testing.T) { require := require.New(t) - config, _, _, vm := newConfig(t) + config, _, _, vm, halt := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -594,7 +601,7 @@ func TestBootstrapContinueAfterHalt(t *testing.T) { getBlockF := vm.GetBlockF vm.GetBlockF = func(ctx context.Context, blkID ids.ID) (snowman.Block, error) { - bs.Halt(ctx) + halt() return getBlockF(ctx, blkID) } @@ -672,6 +679,9 @@ func TestBootstrapNoParseOnNew(t *testing.T) { peerTracker.Connected(peer, version.CurrentApp) config := Config{ + ShouldHalt: func() bool { + return false + }, AllGetsServer: snowGetHandler, Ctx: ctx, Beacons: peers, @@ -702,7 +712,7 @@ func TestBootstrapNoParseOnNew(t *testing.T) { func TestBootstrapperReceiveStaleAncestorsMessage(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(3) initializeVMWithBlockchain(vm, blks) @@ -745,7 +755,7 @@ func TestBootstrapperReceiveStaleAncestorsMessage(t *testing.T) { func TestBootstrapperRollbackOnSetState(t *testing.T) { require := require.New(t) - config, _, _, vm := newConfig(t) + config, _, _, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) diff --git a/snow/engine/snowman/bootstrap/config.go b/snow/engine/snowman/bootstrap/config.go index d501e37f5499..1211e68ebbb6 100644 --- a/snow/engine/snowman/bootstrap/config.go +++ b/snow/engine/snowman/bootstrap/config.go @@ -42,4 +42,6 @@ type Config struct { NonVerifyingParse block.ParseFunc Bootstrapped func() + + ShouldHalt func() bool } diff --git a/snow/engine/snowman/bootstrap/storage.go b/snow/engine/snowman/bootstrap/storage.go index bc5488e3870e..1ddbb5731a3e 100644 --- a/snow/engine/snowman/bootstrap/storage.go +++ b/snow/engine/snowman/bootstrap/storage.go @@ -13,7 +13,6 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap/interval" "github.com/ava-labs/avalanchego/utils/logging" @@ -127,7 +126,7 @@ func process( // TODO: Replace usage of haltable with context cancellation. func execute( ctx context.Context, - haltable common.Haltable, + shouldHalt func() bool, log logging.Func, db database.Database, nonVerifyingParser block.Parser, @@ -172,7 +171,7 @@ func execute( var ( numProcessed = totalNumberToProcess - tree.Len() - halted = haltable.Halted() + halted = shouldHalt() ) if numProcessed >= minBlocksToCompact && !halted { log("compacting database after executing blocks...") @@ -196,7 +195,7 @@ func execute( zap.Uint64("numToExecute", totalNumberToProcess), ) - for !haltable.Halted() && iterator.Next() { + for !shouldHalt() && iterator.Next() { blkBytes := iterator.Value() blk, err := nonVerifyingParser.ParseBlock(ctx, blkBytes) if err != nil { diff --git a/snow/engine/snowman/bootstrap/storage_test.go b/snow/engine/snowman/bootstrap/storage_test.go index 44ac1621226d..cafb79514921 100644 --- a/snow/engine/snowman/bootstrap/storage_test.go +++ b/snow/engine/snowman/bootstrap/storage_test.go @@ -221,7 +221,7 @@ func TestExecute(t *testing.T) { unhalted := &common.Halter{} halted := &common.Halter{} - halted.Halt(context.Background()) + halted.Halt() tests := []struct { name string @@ -269,7 +269,7 @@ func TestExecute(t *testing.T) { require.NoError(execute( context.Background(), - test.haltable, + test.haltable.Halted, logging.NoLog{}.Info, db, parser, diff --git a/snow/engine/snowman/engine.go b/snow/engine/snowman/engine.go index bdc791c2cf4c..0998a9e9f56d 100644 --- a/snow/engine/snowman/engine.go +++ b/snow/engine/snowman/engine.go @@ -434,8 +434,6 @@ func (*Engine) Timeout(context.Context) error { return nil } -func (*Engine) Halt(context.Context) {} - func (e *Engine) Shutdown(ctx context.Context) error { e.Ctx.Log.Info("shutting down consensus engine") diff --git a/snow/engine/snowman/syncer/state_syncer.go b/snow/engine/snowman/syncer/state_syncer.go index 2fea946f3106..76e647e73a64 100644 --- a/snow/engine/snowman/syncer/state_syncer.go +++ b/snow/engine/snowman/syncer/state_syncer.go @@ -109,10 +109,6 @@ func New( } } -func (ss *stateSyncer) Context() *snow.ConsensusContext { - return ss.Ctx -} - func (ss *stateSyncer) Start(ctx context.Context, startReqID uint32) error { ss.Ctx.Log.Info("starting state sync") @@ -604,8 +600,6 @@ func (ss *stateSyncer) Shutdown(ctx context.Context) error { return ss.VM.Shutdown(ctx) } -func (*stateSyncer) Halt(context.Context) {} - func (*stateSyncer) Timeout(context.Context) error { return nil } diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 73e40ceee60f..9ab6c614c582 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -77,6 +77,8 @@ type Handler interface { // handler passes incoming messages from the network to the consensus engine. // (Actually, it receives the incoming messages from a ChainRouter, but same difference.) type handler struct { + haltBootstrapping func() + metrics *metrics // Useful for faking time in tests @@ -138,20 +140,22 @@ func New( peerTracker commontracker.Peers, p2pTracker *p2p.PeerTracker, reg prometheus.Registerer, + haltBootstrapping func(), ) (Handler, error) { h := &handler{ - ctx: ctx, - validators: validators, - msgFromVMChan: msgFromVMChan, - preemptTimeouts: subnet.OnBootstrapCompleted(), - gossipFrequency: gossipFrequency, - timeouts: make(chan struct{}, 1), - closingChan: make(chan struct{}), - closed: make(chan struct{}), - resourceTracker: resourceTracker, - subnet: subnet, - peerTracker: peerTracker, - p2pTracker: p2pTracker, + haltBootstrapping: haltBootstrapping, + ctx: ctx, + validators: validators, + msgFromVMChan: msgFromVMChan, + preemptTimeouts: subnet.OnBootstrapCompleted(), + gossipFrequency: gossipFrequency, + timeouts: make(chan struct{}, 1), + closingChan: make(chan struct{}), + closed: make(chan struct{}), + resourceTracker: resourceTracker, + subnet: subnet, + peerTracker: peerTracker, + p2pTracker: p2pTracker, } h.asyncMessagePool.SetLimit(threadPoolSize) @@ -316,7 +320,7 @@ func (h *handler) RegisterTimeout(d time.Duration) { // Note: It is possible for Stop to be called before/concurrently with Start. // // Invariant: Stop must never block. -func (h *handler) Stop(ctx context.Context) { +func (h *handler) Stop(_ context.Context) { h.closeOnce.Do(func() { h.startClosingTime = h.clock.Time() @@ -325,25 +329,7 @@ func (h *handler) Stop(ctx context.Context) { h.syncMessageQueue.Shutdown() h.asyncMessageQueue.Shutdown() close(h.closingChan) - - // TODO: switch this to use a [context.Context] with a cancel function. - // - // Don't process any more bootstrap messages. If a dispatcher is - // processing a bootstrap message, stop. We do this because if we - // didn't, and the engine was in the middle of executing state - // transitions during bootstrapping, we wouldn't be able to grab - // [h.ctx.Lock] until the engine finished executing state transitions, - // which may take a long time. As a result, the router would time out on - // shutting down this chain. - state := h.ctx.State.Get() - bootstrapper, ok := h.engineManager.Get(state.Type).Get(snow.Bootstrapping) - if !ok { - h.ctx.Log.Error("bootstrapping engine doesn't exist", - zap.Stringer("type", state.Type), - ) - return - } - bootstrapper.Halt(ctx) + h.haltBootstrapping() }) } diff --git a/snow/networking/handler/handler_test.go b/snow/networking/handler/handler_test.go index a2b5aa9acc8b..661ba17300fb 100644 --- a/snow/networking/handler/handler_test.go +++ b/snow/networking/handler/handler_test.go @@ -77,6 +77,7 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) handler := handlerIntf.(*handler) @@ -183,6 +184,7 @@ func TestHandlerClosesOnError(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) handler := handlerIntf.(*handler) @@ -285,6 +287,7 @@ func TestHandlerDropsGossipDuringBootstrapping(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) handler := handlerIntf.(*handler) @@ -375,6 +378,7 @@ func TestHandlerDispatchInternal(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -550,6 +554,7 @@ func TestDynamicEngineTypeDispatch(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -632,6 +637,7 @@ func TestHandlerStartError(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) diff --git a/snow/networking/handler/health_test.go b/snow/networking/handler/health_test.go index aa082b042321..4e7e5732b8de 100644 --- a/snow/networking/handler/health_test.go +++ b/snow/networking/handler/health_test.go @@ -93,6 +93,7 @@ func TestHealthCheckSubnet(t *testing.T) { peerTracker, p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index d2770ce1d211..149be6dc7b52 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -114,6 +114,7 @@ func TestShutdown(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -239,6 +240,7 @@ func TestConnectedAfterShutdownErrorLogRegression(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -371,6 +373,7 @@ func TestShutdownTimesOut(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -539,6 +542,7 @@ func TestRouterTimeout(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -1070,6 +1074,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -1235,6 +1240,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -1486,6 +1492,7 @@ func newChainRouterTest(t *testing.T) (*ChainRouter, *enginetest.Engine) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(t, err) diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 79b9ca0ba087..1f7469575690 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -140,6 +140,7 @@ func TestTimeout(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -398,6 +399,7 @@ func TestReliableMessages(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -554,6 +556,7 @@ func TestReliableMessagesToMyself(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 9c87556e7603..2d1740a650e0 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1319,6 +1319,9 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { require.NoError(err) bootstrapConfig := bootstrap.Config{ + ShouldHalt: func() bool { + return false + }, NonVerifyingParse: vm.ParseBlock, AllGetsServer: snowGetHandler, Ctx: consensusCtx, @@ -1353,6 +1356,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { tracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err)