Skip to content

Commit

Permalink
Cleanup avalanche bootstrapping fetching (#2947)
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Buttolph <stephen@avalabs.org>
Co-authored-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>
  • Loading branch information
StephenButtolph and joshua-kim authored Apr 18, 2024
1 parent 197179f commit eca19b7
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 199 deletions.
2 changes: 1 addition & 1 deletion chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,9 @@ func (m *manager) createAvalancheChain(
avalancheBootstrapperConfig := avbootstrap.Config{
AllGetsServer: avaGetHandler,
Ctx: ctx,
Beacons: vdrs,
StartupTracker: startupTracker,
Sender: avalancheMessageSender,
PeerTracker: peerTracker,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
VtxBlocked: vtxBlocker,
TxBlocked: txBlocker,
Expand Down
169 changes: 89 additions & 80 deletions snow/engine/avalanche/bootstrap/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package bootstrap
import (
"context"
"fmt"
"time"

"go.uber.org/zap"

Expand All @@ -18,7 +19,6 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/bimap"
"github.com/ava-labs/avalanchego/utils/heap"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)
Expand All @@ -38,6 +38,8 @@ const (
// maxOutstandingGetAncestorsRequests is the maximum number of GetAncestors
// sent but not yet responded to/failed
maxOutstandingGetAncestorsRequests = 10

epsilon = 1e-6 // small amount to add to time to avoid division by 0
)

var _ common.BootstrapableEngine = (*bootstrapper)(nil)
Expand All @@ -58,7 +60,8 @@ func New(
ChitsHandler: common.NewNoOpChitsHandler(config.Ctx.Log),
AppHandler: config.VM,

outstandingRequests: bimap.New[common.Request, ids.ID](),
outstandingRequests: bimap.New[common.Request, ids.ID](),
outstandingRequestTimes: make(map[common.Request]time.Time),

processedCache: &cache.LRU[ids.ID, struct{}]{Size: cacheSize},
onFinished: onFinished,
Expand All @@ -85,7 +88,8 @@ type bootstrapper struct {
metrics

// tracks which validators were asked for which containers in which requests
outstandingRequests *bimap.BiMap[common.Request, ids.ID]
outstandingRequests *bimap.BiMap[common.Request, ids.ID]
outstandingRequestTimes map[common.Request]time.Time

// IDs of vertices that we will send a GetAncestors request for once we are
// not at the max number of outstanding requests
Expand Down Expand Up @@ -125,84 +129,76 @@ func (b *bootstrapper) Clear(context.Context) error {
// response to a GetAncestors message to [nodeID] with request ID [requestID].
// Expects vtxs[0] to be the vertex requested in the corresponding GetAncestors.
func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error {
request := common.Request{
NodeID: nodeID,
RequestID: requestID,
}
requestedVtxID, ok := b.outstandingRequests.DeleteKey(request)
if !ok { // this message isn't in response to a request we made
b.Ctx.Log.Debug("received unexpected Ancestors",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
)
return nil
}
requestTime := b.outstandingRequestTimes[request]
delete(b.outstandingRequestTimes, request)

lenVtxs := len(vtxs)
if lenVtxs == 0 {
b.Ctx.Log.Debug("Ancestors contains no vertices",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
)
return b.GetAncestorsFailed(ctx, nodeID, requestID)

b.PeerTracker.RegisterFailure(nodeID)
return b.fetch(ctx, requestedVtxID)
}

if lenVtxs > b.Config.AncestorsMaxContainersReceived {
vtxs = vtxs[:b.Config.AncestorsMaxContainersReceived]

b.Ctx.Log.Debug("ignoring containers in Ancestors",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Int("numIgnored", lenVtxs-b.Config.AncestorsMaxContainersReceived),
)

vtxs = vtxs[:b.Config.AncestorsMaxContainersReceived]
}

requestedVtxID, requested := b.outstandingRequests.DeleteKey(common.Request{
NodeID: nodeID,
RequestID: requestID,
})
vtx, err := b.Manager.ParseVtx(ctx, vtxs[0]) // first vertex should be the one we requested in GetAncestors request
vtx, err := b.Manager.ParseVtx(ctx, vtxs[0])
if err != nil {
if !requested {
b.Ctx.Log.Debug("failed to parse unrequested vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
return nil
}
if b.Ctx.Log.Enabled(logging.Verbo) {
b.Ctx.Log.Verbo("failed to parse requested vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Stringer("vtxID", requestedVtxID),
zap.Binary("vtxBytes", vtxs[0]),
zap.Error(err),
)
} else {
b.Ctx.Log.Debug("failed to parse requested vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Stringer("vtxID", requestedVtxID),
zap.Error(err),
)
}
return b.fetch(ctx, requestedVtxID)
}

vtxID := vtx.ID()
// If the vertex is neither the requested vertex nor a needed vertex, return early and re-fetch if necessary
if requested && requestedVtxID != vtxID {
b.Ctx.Log.Debug("received incorrect vertex",
b.Ctx.Log.Debug("failed to parse requested vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Stringer("vtxID", vtxID),
zap.Stringer("vtxID", requestedVtxID),
zap.Error(err),
)

b.PeerTracker.RegisterFailure(nodeID)
return b.fetch(ctx, requestedVtxID)
}
if !requested && !b.outstandingRequests.HasValue(vtxID) && !b.needToFetch.Contains(vtxID) {
b.Ctx.Log.Debug("received un-needed vertex",

if actualID := vtx.ID(); actualID != requestedVtxID {
b.Ctx.Log.Debug("received incorrect vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Stringer("vtxID", vtxID),
zap.Stringer("vtxID", actualID),
)
return nil

b.PeerTracker.RegisterFailure(nodeID)
return b.fetch(ctx, requestedVtxID)
}

// Do not remove from outstanding requests if this did not answer a specific outstanding request
// to ensure that real responses are not dropped in favor of potentially byzantine Ancestors messages that
// could force the node to bootstrap 1 vertex at a time.
b.needToFetch.Remove(vtxID)
b.needToFetch.Remove(requestedVtxID)

// All vertices added to [verticesToProcess] have received transitive votes
// from the accepted frontier.
var (
numBytes = len(vtxs[0])
verticesToProcess = make([]avalanche.Vertex, 1, len(vtxs))
)
verticesToProcess[0] = vtx

// All vertices added to [processVertices] have received transitive votes from the accepted frontier
processVertices := make([]avalanche.Vertex, 1, len(vtxs)) // Process all of the valid vertices in this message
processVertices[0] = vtx
parents, err := vtx.Parents()
if err != nil {
return err
Expand All @@ -212,20 +208,14 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request
eligibleVertices.Add(parent.ID())
}

for _, vtxBytes := range vtxs[1:] { // Parse/persist all the vertices
for _, vtxBytes := range vtxs[1:] {
vtx, err := b.Manager.ParseVtx(ctx, vtxBytes) // Persists the vtx
if err != nil {
b.Ctx.Log.Debug("failed to parse vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Error(err),
)
b.Ctx.Log.Debug("failed to parse vertex",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Binary("vtxBytes", vtxBytes),
zap.Error(err),
)
break
}
vtxID := vtx.ID()
Expand All @@ -245,26 +235,41 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request
for _, parent := range parents {
eligibleVertices.Add(parent.ID())
}
processVertices = append(processVertices, vtx)

numBytes += len(vtxBytes)
verticesToProcess = append(verticesToProcess, vtx)
b.needToFetch.Remove(vtxID) // No need to fetch this vertex since we have it now
}

return b.process(ctx, processVertices...)
// TODO: Calculate bandwidth based on the vertices that were persisted to
// disk.
var (
requestLatency = time.Since(requestTime).Seconds() + epsilon
bandwidth = float64(numBytes) / requestLatency
)
b.PeerTracker.RegisterResponse(nodeID, bandwidth)

return b.process(ctx, verticesToProcess...)
}

func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
vtxID, ok := b.outstandingRequests.DeleteKey(common.Request{
request := common.Request{
NodeID: nodeID,
RequestID: requestID,
})
}
vtxID, ok := b.outstandingRequests.DeleteKey(request)
if !ok {
b.Ctx.Log.Debug("skipping GetAncestorsFailed call",
zap.String("reason", "no matching outstanding request"),
b.Ctx.Log.Debug("unexpectedly called GetAncestorsFailed",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
)
return nil
}
delete(b.outstandingRequestTimes, request)

// This node timed out their request.
b.PeerTracker.RegisterFailure(nodeID)

// Send another request for the vertex
return b.fetch(ctx, vtxID)
}
Expand Down Expand Up @@ -411,21 +416,25 @@ func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error {
continue
}

validatorIDs, err := b.Config.Beacons.Sample(b.Ctx.SubnetID, 1) // validator to send request to
if err != nil {
return fmt.Errorf("dropping request for %s as there are no validators", vtxID)
nodeID, ok := b.PeerTracker.SelectPeer()
if !ok {
// If we aren't connected to any peers, we send a request to ourself
// which is guaranteed to fail. We send this message to use the
// message timeout as a retry mechanism. Once we are connected to
// another node again we will select them to sample from.
nodeID = b.Ctx.NodeID
}
validatorID := validatorIDs[0]
b.requestID++

b.outstandingRequests.Put(
common.Request{
NodeID: validatorID,
RequestID: b.requestID,
},
vtxID,
)
b.Config.Sender.SendGetAncestors(ctx, validatorID, b.requestID, vtxID) // request vertex and ancestors
b.PeerTracker.RegisterRequest(nodeID)

b.requestID++
request := common.Request{
NodeID: nodeID,
RequestID: b.requestID,
}
b.outstandingRequests.Put(request, vtxID)
b.outstandingRequestTimes[request] = time.Now()
b.Config.Sender.SendGetAncestors(ctx, nodeID, b.requestID, vtxID) // request vertex and ancestors
}
return b.checkFinish(ctx)
}
Expand Down
Loading

0 comments on commit eca19b7

Please sign in to comment.