diff --git a/chains/manager.go b/chains/manager.go index b8b051c0a058..47183a819a7a 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -885,9 +885,9 @@ func (m *manager) createAvalancheChain( avalancheBootstrapperConfig := avbootstrap.Config{ AllGetsServer: avaGetHandler, Ctx: ctx, - Beacons: vdrs, StartupTracker: startupTracker, Sender: avalancheMessageSender, + PeerTracker: peerTracker, AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived, VtxBlocked: vtxBlocker, TxBlocked: txBlocker, diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index d3028bc3bfce..b79c9c8cb5ff 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -6,6 +6,7 @@ package bootstrap import ( "context" "fmt" + "time" "go.uber.org/zap" @@ -18,7 +19,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/bimap" "github.com/ava-labs/avalanchego/utils/heap" - "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" ) @@ -38,6 +38,8 @@ const ( // maxOutstandingGetAncestorsRequests is the maximum number of GetAncestors // sent but not yet responded to/failed maxOutstandingGetAncestorsRequests = 10 + + epsilon = 1e-6 // small amount to add to time to avoid division by 0 ) var _ common.BootstrapableEngine = (*bootstrapper)(nil) @@ -58,7 +60,8 @@ func New( ChitsHandler: common.NewNoOpChitsHandler(config.Ctx.Log), AppHandler: config.VM, - outstandingRequests: bimap.New[common.Request, ids.ID](), + outstandingRequests: bimap.New[common.Request, ids.ID](), + outstandingRequestTimes: make(map[common.Request]time.Time), processedCache: &cache.LRU[ids.ID, struct{}]{Size: cacheSize}, onFinished: onFinished, @@ -85,7 +88,8 @@ type bootstrapper struct { metrics // tracks which validators were asked for which containers in which requests - outstandingRequests *bimap.BiMap[common.Request, ids.ID] + outstandingRequests *bimap.BiMap[common.Request, ids.ID] + outstandingRequestTimes map[common.Request]time.Time // IDs of vertices that we will send a GetAncestors request for once we are // not at the max number of outstanding requests @@ -125,84 +129,76 @@ func (b *bootstrapper) Clear(context.Context) error { // response to a GetAncestors message to [nodeID] with request ID [requestID]. // Expects vtxs[0] to be the vertex requested in the corresponding GetAncestors. func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error { + request := common.Request{ + NodeID: nodeID, + RequestID: requestID, + } + requestedVtxID, ok := b.outstandingRequests.DeleteKey(request) + if !ok { // this message isn't in response to a request we made + b.Ctx.Log.Debug("received unexpected Ancestors", + zap.Stringer("nodeID", nodeID), + zap.Uint32("requestID", requestID), + ) + return nil + } + requestTime := b.outstandingRequestTimes[request] + delete(b.outstandingRequestTimes, request) + lenVtxs := len(vtxs) if lenVtxs == 0 { b.Ctx.Log.Debug("Ancestors contains no vertices", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), ) - return b.GetAncestorsFailed(ctx, nodeID, requestID) + + b.PeerTracker.RegisterFailure(nodeID) + return b.fetch(ctx, requestedVtxID) } + if lenVtxs > b.Config.AncestorsMaxContainersReceived { + vtxs = vtxs[:b.Config.AncestorsMaxContainersReceived] + b.Ctx.Log.Debug("ignoring containers in Ancestors", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), zap.Int("numIgnored", lenVtxs-b.Config.AncestorsMaxContainersReceived), ) - - vtxs = vtxs[:b.Config.AncestorsMaxContainersReceived] } - requestedVtxID, requested := b.outstandingRequests.DeleteKey(common.Request{ - NodeID: nodeID, - RequestID: requestID, - }) - vtx, err := b.Manager.ParseVtx(ctx, vtxs[0]) // first vertex should be the one we requested in GetAncestors request + vtx, err := b.Manager.ParseVtx(ctx, vtxs[0]) if err != nil { - if !requested { - b.Ctx.Log.Debug("failed to parse unrequested vertex", - zap.Stringer("nodeID", nodeID), - zap.Uint32("requestID", requestID), - zap.Error(err), - ) - return nil - } - if b.Ctx.Log.Enabled(logging.Verbo) { - b.Ctx.Log.Verbo("failed to parse requested vertex", - zap.Stringer("nodeID", nodeID), - zap.Uint32("requestID", requestID), - zap.Stringer("vtxID", requestedVtxID), - zap.Binary("vtxBytes", vtxs[0]), - zap.Error(err), - ) - } else { - b.Ctx.Log.Debug("failed to parse requested vertex", - zap.Stringer("nodeID", nodeID), - zap.Uint32("requestID", requestID), - zap.Stringer("vtxID", requestedVtxID), - zap.Error(err), - ) - } - return b.fetch(ctx, requestedVtxID) - } - - vtxID := vtx.ID() - // If the vertex is neither the requested vertex nor a needed vertex, return early and re-fetch if necessary - if requested && requestedVtxID != vtxID { - b.Ctx.Log.Debug("received incorrect vertex", + b.Ctx.Log.Debug("failed to parse requested vertex", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), - zap.Stringer("vtxID", vtxID), + zap.Stringer("vtxID", requestedVtxID), + zap.Error(err), ) + + b.PeerTracker.RegisterFailure(nodeID) return b.fetch(ctx, requestedVtxID) } - if !requested && !b.outstandingRequests.HasValue(vtxID) && !b.needToFetch.Contains(vtxID) { - b.Ctx.Log.Debug("received un-needed vertex", + + if actualID := vtx.ID(); actualID != requestedVtxID { + b.Ctx.Log.Debug("received incorrect vertex", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), - zap.Stringer("vtxID", vtxID), + zap.Stringer("vtxID", actualID), ) - return nil + + b.PeerTracker.RegisterFailure(nodeID) + return b.fetch(ctx, requestedVtxID) } - // Do not remove from outstanding requests if this did not answer a specific outstanding request - // to ensure that real responses are not dropped in favor of potentially byzantine Ancestors messages that - // could force the node to bootstrap 1 vertex at a time. - b.needToFetch.Remove(vtxID) + b.needToFetch.Remove(requestedVtxID) + + // All vertices added to [verticesToProcess] have received transitive votes + // from the accepted frontier. + var ( + numBytes = len(vtxs[0]) + verticesToProcess = make([]avalanche.Vertex, 1, len(vtxs)) + ) + verticesToProcess[0] = vtx - // All vertices added to [processVertices] have received transitive votes from the accepted frontier - processVertices := make([]avalanche.Vertex, 1, len(vtxs)) // Process all of the valid vertices in this message - processVertices[0] = vtx parents, err := vtx.Parents() if err != nil { return err @@ -212,7 +208,7 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request eligibleVertices.Add(parent.ID()) } - for _, vtxBytes := range vtxs[1:] { // Parse/persist all the vertices + for _, vtxBytes := range vtxs[1:] { vtx, err := b.Manager.ParseVtx(ctx, vtxBytes) // Persists the vtx if err != nil { b.Ctx.Log.Debug("failed to parse vertex", @@ -220,12 +216,6 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request zap.Uint32("requestID", requestID), zap.Error(err), ) - b.Ctx.Log.Debug("failed to parse vertex", - zap.Stringer("nodeID", nodeID), - zap.Uint32("requestID", requestID), - zap.Binary("vtxBytes", vtxBytes), - zap.Error(err), - ) break } vtxID := vtx.ID() @@ -245,26 +235,41 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request for _, parent := range parents { eligibleVertices.Add(parent.ID()) } - processVertices = append(processVertices, vtx) + + numBytes += len(vtxBytes) + verticesToProcess = append(verticesToProcess, vtx) b.needToFetch.Remove(vtxID) // No need to fetch this vertex since we have it now } - return b.process(ctx, processVertices...) + // TODO: Calculate bandwidth based on the vertices that were persisted to + // disk. + var ( + requestLatency = time.Since(requestTime).Seconds() + epsilon + bandwidth = float64(numBytes) / requestLatency + ) + b.PeerTracker.RegisterResponse(nodeID, bandwidth) + + return b.process(ctx, verticesToProcess...) } func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { - vtxID, ok := b.outstandingRequests.DeleteKey(common.Request{ + request := common.Request{ NodeID: nodeID, RequestID: requestID, - }) + } + vtxID, ok := b.outstandingRequests.DeleteKey(request) if !ok { - b.Ctx.Log.Debug("skipping GetAncestorsFailed call", - zap.String("reason", "no matching outstanding request"), + b.Ctx.Log.Debug("unexpectedly called GetAncestorsFailed", zap.Stringer("nodeID", nodeID), zap.Uint32("requestID", requestID), ) return nil } + delete(b.outstandingRequestTimes, request) + + // This node timed out their request. + b.PeerTracker.RegisterFailure(nodeID) + // Send another request for the vertex return b.fetch(ctx, vtxID) } @@ -411,21 +416,25 @@ func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { continue } - validatorIDs, err := b.Config.Beacons.Sample(b.Ctx.SubnetID, 1) // validator to send request to - if err != nil { - return fmt.Errorf("dropping request for %s as there are no validators", vtxID) + nodeID, ok := b.PeerTracker.SelectPeer() + if !ok { + // If we aren't connected to any peers, we send a request to ourself + // which is guaranteed to fail. We send this message to use the + // message timeout as a retry mechanism. Once we are connected to + // another node again we will select them to sample from. + nodeID = b.Ctx.NodeID } - validatorID := validatorIDs[0] - b.requestID++ - b.outstandingRequests.Put( - common.Request{ - NodeID: validatorID, - RequestID: b.requestID, - }, - vtxID, - ) - b.Config.Sender.SendGetAncestors(ctx, validatorID, b.requestID, vtxID) // request vertex and ancestors + b.PeerTracker.RegisterRequest(nodeID) + + b.requestID++ + request := common.Request{ + NodeID: nodeID, + RequestID: b.requestID, + } + b.outstandingRequests.Put(request, vtxID) + b.outstandingRequestTimes[request] = time.Now() + b.Config.Sender.SendGetAncestors(ctx, nodeID, b.requestID, vtxID) // request vertex and ancestors } return b.checkFinish(ctx) } diff --git a/snow/engine/avalanche/bootstrap/bootstrapper_test.go b/snow/engine/avalanche/bootstrap/bootstrapper_test.go index 8d8c107383e9..d17bbd4743ec 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper_test.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper_test.go @@ -10,12 +10,13 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/ava-labs/avalanchego/database/memdb" "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/proto/pb/p2p" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/choices" "github.com/ava-labs/avalanchego/snow/consensus/avalanche" @@ -28,7 +29,11 @@ import ( "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/constants" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/version" + + p2ppb "github.com/ava-labs/avalanchego/proto/pb/p2p" ) var ( @@ -86,12 +91,23 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *vertex.Te avaGetHandler, err := getter.New(manager, sender, ctx.Log, time.Second, 2000, ctx.AvalancheRegisterer) require.NoError(err) + p2pTracker, err := p2p.NewPeerTracker( + logging.NoLog{}, + "", + prometheus.NewRegistry(), + nil, + version.CurrentApp, + ) + require.NoError(err) + + p2pTracker.Connected(peer, version.CurrentApp) + return Config{ AllGetsServer: avaGetHandler, Ctx: ctx, - Beacons: vdrs, StartupTracker: startupTracker, Sender: sender, + PeerTracker: p2pTracker, AncestorsMaxContainersReceived: 2000, VtxBlocked: vtxBlocker, TxBlocked: txBlocker, @@ -151,7 +167,7 @@ func TestBootstrapperSingleFrontier(t *testing.T) { config, func(context.Context, uint32) error { config.Ctx.State.Set(snow.EngineState{ - Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, State: snow.NormalOp, }) return nil @@ -257,7 +273,7 @@ func TestBootstrapperByzantineResponses(t *testing.T) { config, func(context.Context, uint32) error { config.Ctx.State.Set(snow.EngineState{ - Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, State: snow.NormalOp, }) return nil @@ -423,7 +439,7 @@ func TestBootstrapperTxDependencies(t *testing.T) { config, func(context.Context, uint32) error { config.Ctx.State.Set(snow.EngineState{ - Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, State: snow.NormalOp, }) return nil @@ -546,7 +562,7 @@ func TestBootstrapperIncompleteAncestors(t *testing.T) { config, func(context.Context, uint32) error { config.Ctx.State.Set(snow.EngineState{ - Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, + Type: p2ppb.EngineType_ENGINE_TYPE_AVALANCHE, State: snow.NormalOp, }) return nil @@ -620,111 +636,3 @@ func TestBootstrapperIncompleteAncestors(t *testing.T) { require.Equal(choices.Accepted, vtx1.Status()) require.Equal(choices.Accepted, vtx2.Status()) } - -func TestBootstrapperUnexpectedVertex(t *testing.T) { - require := require.New(t) - - config, peerID, sender, manager, vm := newConfig(t) - - vtxID0 := ids.Empty.Prefix(0) - vtxID1 := ids.Empty.Prefix(1) - - vtxBytes0 := []byte{0} - vtxBytes1 := []byte{1} - - vtx0 := &avalanche.TestVertex{ - TestDecidable: choices.TestDecidable{ - IDV: vtxID0, - StatusV: choices.Unknown, - }, - HeightV: 0, - BytesV: vtxBytes0, - } - vtx1 := &avalanche.TestVertex{ // vtx1 is the stop vertex - TestDecidable: choices.TestDecidable{ - IDV: vtxID1, - StatusV: choices.Unknown, - }, - ParentsV: []avalanche.Vertex{vtx0}, - HeightV: 1, - BytesV: vtxBytes1, - } - - config.StopVertexID = vtxID1 - bs, err := New( - config, - func(context.Context, uint32) error { - config.Ctx.State.Set(snow.EngineState{ - Type: p2p.EngineType_ENGINE_TYPE_AVALANCHE, - State: snow.NormalOp, - }) - return nil - }, - ) - require.NoError(err) - - parsedVtx0 := false - parsedVtx1 := false - manager.GetVtxF = func(_ context.Context, vtxID ids.ID) (avalanche.Vertex, error) { - switch vtxID { - case vtxID0: - if parsedVtx0 { - return vtx0, nil - } - return nil, errUnknownVertex - case vtxID1: - if parsedVtx1 { - return vtx1, nil - } - return nil, errUnknownVertex - default: - require.FailNow(errUnknownVertex.Error()) - return nil, errUnknownVertex - } - } - manager.ParseVtxF = func(_ context.Context, vtxBytes []byte) (avalanche.Vertex, error) { - switch { - case bytes.Equal(vtxBytes, vtxBytes0): - vtx0.StatusV = choices.Processing - parsedVtx0 = true - return vtx0, nil - case bytes.Equal(vtxBytes, vtxBytes1): - vtx1.StatusV = choices.Processing - parsedVtx1 = true - return vtx1, nil - default: - require.FailNow(errUnknownVertex.Error()) - return nil, errUnknownVertex - } - } - - requestIDs := map[ids.ID]uint32{} - sender.SendGetAncestorsF = func(_ context.Context, vdr ids.NodeID, reqID uint32, vtxID ids.ID) { - require.Equal(peerID, vdr) - requestIDs[vtxID] = reqID - } - - vm.CantSetState = false - require.NoError(bs.Start(context.Background(), 0)) // should request vtx1 - require.Contains(requestIDs, vtxID1) - - reqID := requestIDs[vtxID1] - clear(requestIDs) - require.NoError(bs.Ancestors(context.Background(), peerID, reqID, [][]byte{vtxBytes0})) - require.Contains(requestIDs, vtxID1) - - manager.EdgeF = func(context.Context) []ids.ID { - require.Equal(choices.Accepted, vtx1.Status()) - return []ids.ID{vtxID1} - } - - vm.LinearizeF = func(_ context.Context, stopVertexID ids.ID) error { - require.Equal(vtxID1, stopVertexID) - return nil - } - - require.NoError(bs.Ancestors(context.Background(), peerID, reqID, [][]byte{vtxBytes1, vtxBytes0})) - require.Equal(choices.Accepted, vtx0.Status()) - require.Equal(choices.Accepted, vtx1.Status()) - require.Equal(snow.NormalOp, config.Ctx.State.Get().State) -} diff --git a/snow/engine/avalanche/bootstrap/config.go b/snow/engine/avalanche/bootstrap/config.go index 8151967abe9d..ec1baccc390b 100644 --- a/snow/engine/avalanche/bootstrap/config.go +++ b/snow/engine/avalanche/bootstrap/config.go @@ -5,23 +5,25 @@ package bootstrap import ( "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/avalanche/bootstrap/queue" "github.com/ava-labs/avalanchego/snow/engine/avalanche/vertex" "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" - "github.com/ava-labs/avalanchego/snow/validators" ) type Config struct { common.AllGetsServer - Ctx *snow.ConsensusContext - Beacons validators.Manager + Ctx *snow.ConsensusContext StartupTracker tracker.Startup Sender common.Sender + // PeerTracker manages the set of nodes that we fetch the next block from. + PeerTracker *p2p.PeerTracker + // This node will only consider the first [AncestorsMaxContainersReceived] // containers in an ancestors message it receives. AncestorsMaxContainersReceived int diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index f5f906a1657c..2988ece9ba0e 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -421,7 +421,6 @@ func (b *Bootstrapper) fetch(ctx context.Context, blkID ids.ID) error { b.PeerTracker.RegisterRequest(nodeID) b.requestID++ - request := common.Request{ NodeID: nodeID, RequestID: b.requestID,