Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate Range Availibility #13587

Merged
merged 9 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
}

log.Debugln("Registering Sync Service")
if err := beacon.registerSyncService(beacon.initialSyncComplete); err != nil {
if err := beacon.registerSyncService(beacon.initialSyncComplete, bfs); err != nil {
return nil, err
}

Expand Down Expand Up @@ -719,7 +719,7 @@ func (b *BeaconNode) registerPOWChainService() error {
return b.services.RegisterService(web3Service)
}

func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}) error {
func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFillStore *backfill.Store) error {
var web3Service *execution.Service
if err := b.services.FetchService(&web3Service); err != nil {
return err
Expand Down Expand Up @@ -758,6 +758,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}) erro
regularsync.WithStateNotifier(b),
regularsync.WithBlobStorage(b.BlobStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
)
return b.services.RegisterService(rs)
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/types/rpc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ var (
ErrInvalidRequest = errors.New("invalid range, step or count")
ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch")
ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS")
ErrResourceUnavailable = errors.New("resource requested unavailable")
)
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ go_library(
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync/backfill/coverage:go_default_library",
"//beacon-chain/sync/verify:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cache/lru:go_default_library",
Expand Down
10 changes: 10 additions & 0 deletions beacon-chain/sync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill/coverage"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
)

Expand Down Expand Up @@ -170,3 +171,12 @@ func WithVerifierWaiter(v *verification.InitializerWaiter) Option {
return nil
}
}

// WithAvailableBlocker allows the sync package to access the current
// status of backfill.
func WithAvailableBlocker(avb coverage.AvailableBlocker) Option {
return func(s *Service) error {
s.availableBlocker = avb
return nil
}
}
12 changes: 12 additions & 0 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
tracing.AnnotateError(span, err)
return err
}
available := s.validateRangeAvailability(rp)
if !available {
log.Debug("error in validating range availability")
s.writeErrorResponseToStream(responseCodeResourceUnavailable, p2ptypes.ErrResourceUnavailable.Error(), stream)
tracing.AnnotateError(span, err)
return nil
}

blockLimiter, err := s.rateLimiter.topicCollector(string(stream.Protocol()))
if err != nil {
Expand Down Expand Up @@ -126,6 +133,11 @@ func validateRangeRequest(r *pb.BeaconBlocksByRangeRequest, current primitives.S
return rp, nil
}

func (s *Service) validateRangeAvailability(rp rangeParams) bool {
startBlock := rp.start
return s.availableBlocker.AvailableBlock(startBlock)
}

func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream")
defer span.End()
Expand Down
37 changes: 23 additions & 14 deletions beacon-chain/sync/rpc_beacon_blocks_by_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) {

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) {

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
Expand Down Expand Up @@ -244,7 +244,8 @@ func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) {
clock: clock,
executionPayloadReconstructor: mockEngine,
},
rateLimiter: newRateLimiter(p1),
rateLimiter: newRateLimiter(p1),
availableBlocker: mockBlocker{avail: true},
}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
Expand Down Expand Up @@ -314,7 +315,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) {

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
Expand Down Expand Up @@ -380,7 +381,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) {
}

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
Expand Down Expand Up @@ -472,7 +473,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {

capacity := int64(flags.Get().BlockBatchLimit * 3)
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}

pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
Expand All @@ -499,7 +500,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {

capacity := int64(flags.Get().BlockBatchLimit * 3)
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}

pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
Expand Down Expand Up @@ -530,7 +531,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {

capacity := int64(flags.Get().BlockBatchLimit * flags.Get().BlockBatchLimitBurstFactor)
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
Expand Down Expand Up @@ -722,7 +723,7 @@ func TestRPCBeaconBlocksByRange_EnforceResponseInvariants(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 448,
Expand Down Expand Up @@ -891,7 +892,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Expand Down Expand Up @@ -923,7 +924,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Expand Down Expand Up @@ -958,7 +959,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Expand Down Expand Up @@ -994,7 +995,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Expand Down Expand Up @@ -1035,7 +1036,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")

clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, availableBlocker: mockBlocker{avail: true}, rateLimiter: newRateLimiter(p1)}
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
req := &ethpb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Expand Down Expand Up @@ -1105,3 +1106,11 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
// pointer should reference a new root.
require.NotEqual(t, cf.prevRoot, [32]byte{})
}

type mockBlocker struct {
avail bool
}

func (m mockBlocker) AvailableBlock(_ primitives.Slot) bool {
return m.avail
}
2 changes: 2 additions & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill/coverage"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
lruwrpr "github.com/prysmaticlabs/prysm/v4/cache/lru"
"github.com/prysmaticlabs/prysm/v4/config/params"
Expand Down Expand Up @@ -155,6 +156,7 @@ type Service struct {
initialSyncComplete chan struct{}
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
availableBlocker coverage.AvailableBlocker
}

// NewService initializes new regular sync service.
Expand Down
Loading