Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify common.Engine API #3406

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,8 @@ func (m *manager) createAvalancheChain(
return nil, err
}

var halter common.Halter

// Asynchronously passes messages from the network to the consensus engine
h, err := handler.New(
ctx,
Expand All @@ -900,6 +902,7 @@ func (m *manager) createAvalancheChain(
connectedValidators,
peerTracker,
handlerReg,
halter.Halt,
)
if err != nil {
return nil, fmt.Errorf("error initializing network handler: %w", err)
Expand Down Expand Up @@ -950,6 +953,7 @@ func (m *manager) createAvalancheChain(

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
ShouldHalt: halter.Halted,
NonVerifyingParse: block.ParseFunc(proposerVM.ParseLocalBlock),
AllGetsServer: snowGetHandler,
Ctx: ctx,
Expand Down Expand Up @@ -1012,7 +1016,8 @@ func (m *manager) createAvalancheChain(
avalancheBootstrapperConfig.StopVertexID = m.Upgrades.CortinaXChainStopVertexID
}

avalancheBootstrapper, err := avbootstrap.New(
var avalancheBootstrapper common.BootstrapableEngine
avalancheBootstrapper, err = avbootstrap.New(
avalancheBootstrapperConfig,
snowmanBootstrapper.Start,
avalancheMetrics,
Expand Down Expand Up @@ -1277,6 +1282,8 @@ func (m *manager) createSnowmanChain(
return nil, err
}

var halter common.Halter

// Asynchronously passes messages from the network to the consensus engine
h, err := handler.New(
ctx,
Expand All @@ -1289,6 +1296,7 @@ func (m *manager) createSnowmanChain(
connectedValidators,
peerTracker,
handlerReg,
halter.Halt,
)
if err != nil {
return nil, fmt.Errorf("couldn't initialize message handler: %w", err)
Expand Down Expand Up @@ -1340,6 +1348,7 @@ func (m *manager) createSnowmanChain(

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
ShouldHalt: halter.Halted,
NonVerifyingParse: block.ParseFunc(proposerVM.ParseLocalBlock),
AllGetsServer: snowGetHandler,
Ctx: ctx,
Expand Down
46 changes: 23 additions & 23 deletions snow/engine/avalanche/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ const (
epsilon = 1e-6 // small amount to add to time to avoid division by 0
)

var _ common.BootstrapableEngine = (*bootstrapper)(nil)
var _ common.BootstrapableEngine = (*Bootstrapper)(nil)

func New(
config Config,
onFinished func(ctx context.Context, lastReqID uint32) error,
reg prometheus.Registerer,
) (common.BootstrapableEngine, error) {
b := &bootstrapper{
) (*Bootstrapper, error) {
b := &Bootstrapper{
Config: config,

StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log),
Expand All @@ -72,12 +72,12 @@ func New(
}

// Note: To align with the Snowman invariant, it should be guaranteed the VM is
// not used until after the bootstrapper has been Started.
type bootstrapper struct {
// not used until after the Bootstrapper has been Started.
type Bootstrapper struct {
Config
common.Halter

// list of NoOpsHandler for messages dropped by bootstrapper
// list of NoOpsHandler for messages dropped by Bootstrapper
common.StateSummaryFrontierHandler
common.AcceptedStateSummaryHandler
common.AcceptedFrontierHandler
Expand Down Expand Up @@ -107,11 +107,11 @@ type bootstrapper struct {
onFinished func(ctx context.Context, lastReqID uint32) error
}

func (b *bootstrapper) Context() *snow.ConsensusContext {
func (b *Bootstrapper) Context() *snow.ConsensusContext {
return b.Ctx
}

func (b *bootstrapper) Clear(context.Context) error {
func (b *Bootstrapper) Clear(context.Context) error {
b.Ctx.Lock.Lock()
defer b.Ctx.Lock.Unlock()

Expand All @@ -130,7 +130,7 @@ func (b *bootstrapper) Clear(context.Context) error {
// Ancestors handles the receipt of multiple containers. Should be received in
// response to a GetAncestors message to [nodeID] with request ID [requestID].
// Expects vtxs[0] to be the vertex requested in the corresponding GetAncestors.
func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error {
func (b *Bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error {
request := common.Request{
NodeID: nodeID,
RequestID: requestID,
Expand Down Expand Up @@ -254,7 +254,7 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request
return b.process(ctx, verticesToProcess...)
}

func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
func (b *Bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
request := common.Request{
NodeID: nodeID,
RequestID: requestID,
Expand All @@ -276,7 +276,7 @@ func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID
return b.fetch(ctx, vtxID)
}

func (b *bootstrapper) Connected(
func (b *Bootstrapper) Connected(
ctx context.Context,
nodeID ids.NodeID,
nodeVersion *version.Application,
Expand All @@ -288,36 +288,36 @@ func (b *bootstrapper) Connected(
return b.StartupTracker.Connected(ctx, nodeID, nodeVersion)
}

func (b *bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error {
func (b *Bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error {
if err := b.VM.Disconnected(ctx, nodeID); err != nil {
return err
}

return b.StartupTracker.Disconnected(ctx, nodeID)
}

func (*bootstrapper) Timeout(context.Context) error {
func (*Bootstrapper) Timeout(context.Context) error {
return nil
}

func (*bootstrapper) Gossip(context.Context) error {
func (*Bootstrapper) Gossip(context.Context) error {
return nil
}

func (b *bootstrapper) Shutdown(ctx context.Context) error {
b.Ctx.Log.Info("shutting down bootstrapper")
func (b *Bootstrapper) Shutdown(ctx context.Context) error {
b.Ctx.Log.Info("shutting down Bootstrapper")

b.Ctx.Lock.Lock()
defer b.Ctx.Lock.Unlock()

return b.VM.Shutdown(ctx)
}

func (*bootstrapper) Notify(context.Context, common.Message) error {
func (*Bootstrapper) Notify(context.Context, common.Message) error {
return nil
}

func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error {
func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error {
b.Ctx.Log.Info("starting bootstrap")

b.Ctx.State.Set(snow.EngineState{
Expand Down Expand Up @@ -388,7 +388,7 @@ func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error {
return b.startSyncing(ctx, nil)
}

func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) {
func (b *Bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) {
b.Ctx.Lock.Lock()
defer b.Ctx.Lock.Unlock()

Expand All @@ -403,7 +403,7 @@ func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) {
// Add the vertices in [vtxIDs] to the set of vertices that we need to fetch,
// and then fetch vertices (and their ancestors) until either there are no more
// to fetch or we are at the maximum number of outstanding requests.
func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error {
func (b *Bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error {
b.needToFetch.Add(vtxIDs...)
for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < maxOutstandingGetAncestorsRequests {
vtxID, _ := b.needToFetch.Pop() // Length checked in predicate above
Expand Down Expand Up @@ -442,7 +442,7 @@ func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error {
}

// Process the vertices in [vtxs].
func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) error {
func (b *Bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) error {
// Vertices that we need to process prioritized by vertices that are unknown
// or the furthest down the DAG. Unknown vertices are prioritized to ensure
// that once we have made it below a certain height in DAG traversal we do
Expand Down Expand Up @@ -565,7 +565,7 @@ func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) er
}

// startSyncing starts bootstrapping. Process the vertices in [accepterContainerIDs].
func (b *bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs []ids.ID) error {
func (b *Bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs []ids.ID) error {
pendingContainerIDs := b.VtxBlocked.MissingIDs()
// Append the list of accepted container IDs to pendingContainerIDs to ensure
// we iterate over every container that must be traversed.
Expand All @@ -592,7 +592,7 @@ func (b *bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs []

// checkFinish repeatedly executes pending transactions and requests new frontier blocks until there aren't any new ones
// after which it finishes the bootstrap process
func (b *bootstrapper) checkFinish(ctx context.Context) error {
func (b *Bootstrapper) checkFinish(ctx context.Context) error {
// If we still need to fetch vertices, we can't finish
if len(b.VtxBlocked.MissingIDs()) > 0 {
return nil
Expand Down
12 changes: 0 additions & 12 deletions snow/engine/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/ava-labs/avalanchego/api/health"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/set"
)
Expand All @@ -23,9 +22,6 @@ import (
type Engine interface {
Handler

// Return the context of the chain this engine is working on
Context() *snow.ConsensusContext

// Start engine operations from given request ID
Start(ctx context.Context, startReqID uint32) error

Expand Down Expand Up @@ -425,14 +421,6 @@ type InternalHandler interface {
// Gossip to the network a container on the accepted frontier
Gossip(context.Context) error

// Halt this engine.
//
// This function will be called before the environment starts exiting. This
// function is special, in that it does not expect the chain's context lock
// to be held before calling this function. This function also does not
// require the engine to have been started.
Halt(context.Context)

// Shutdown this engine.
//
// This function will be called when the environment is exiting.
Expand Down
9 changes: 3 additions & 6 deletions snow/engine/common/halter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@

package common

import (
"context"
"sync/atomic"
)
import "sync/atomic"

var _ Haltable = (*Halter)(nil)

type Haltable interface {
Halt(context.Context)
Halt()
Halted() bool
}

type Halter struct {
halted uint32
}

func (h *Halter) Halt(context.Context) {
func (h *Halter) Halt() {
atomic.StoreUint32(&h.halted, 1)
}

Expand Down
7 changes: 0 additions & 7 deletions snow/engine/common/no_ops_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,6 @@ func (nop *noOpInternalHandler) Gossip(context.Context) error {
return nil
}

func (nop *noOpInternalHandler) Halt(context.Context) {
nop.log.Debug("dropping request",
zap.String("reason", "unhandled by this gear"),
zap.String("messageOp", "halt"),
)
}

func (nop *noOpInternalHandler) Shutdown(context.Context) error {
nop.log.Debug("dropping request",
zap.String("reason", "unhandled by this gear"),
Expand Down
12 changes: 0 additions & 12 deletions snow/engine/common/traced_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"go.opentelemetry.io/otel/attribute"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
Expand Down Expand Up @@ -344,13 +343,6 @@ func (e *tracedEngine) Gossip(ctx context.Context) error {
return e.engine.Gossip(ctx)
}

func (e *tracedEngine) Halt(ctx context.Context) {
ctx, span := e.tracer.Start(ctx, "tracedEngine.Halt")
defer span.End()

e.engine.Halt(ctx)
}

func (e *tracedEngine) Shutdown(ctx context.Context) error {
ctx, span := e.tracer.Start(ctx, "tracedEngine.Shutdown")
defer span.End()
Expand All @@ -367,10 +359,6 @@ func (e *tracedEngine) Notify(ctx context.Context, msg Message) error {
return e.engine.Notify(ctx, msg)
}

func (e *tracedEngine) Context() *snow.ConsensusContext {
return e.engine.Context()
}

func (e *tracedEngine) Start(ctx context.Context, startReqID uint32) error {
ctx, span := e.tracer.Start(ctx, "tracedEngine.Start", oteltrace.WithAttributes(
attribute.Int64("requestID", int64(startReqID)),
Expand Down
26 changes: 0 additions & 26 deletions snow/engine/enginetest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,6 @@ func (e *Engine) Start(ctx context.Context, startReqID uint32) error {
return errStart
}

func (e *Engine) Context() *snow.ConsensusContext {
if e.ContextF != nil {
return e.ContextF()
}
if !e.CantContext {
return nil
}
if e.T != nil {
require.FailNow(e.T, "Unexpectedly called Context")
}
return nil
}

func (e *Engine) Timeout(ctx context.Context) error {
if e.TimeoutF != nil {
return e.TimeoutF(ctx)
Expand All @@ -228,19 +215,6 @@ func (e *Engine) Gossip(ctx context.Context) error {
return errGossip
}

func (e *Engine) Halt(ctx context.Context) {
if e.HaltF != nil {
e.HaltF(ctx)
return
}
if !e.CantHalt {
return
}
if e.T != nil {
require.FailNow(e.T, "Unexpectedly called Halt")
}
}

func (e *Engine) Shutdown(ctx context.Context) error {
if e.ShutdownF != nil {
return e.ShutdownF(ctx)
Expand Down
Loading