Skip to content

Commit

Permalink
Rate limiter for rpc beacon blocks (prysmaticlabs#4549)
Browse files Browse the repository at this point in the history
* use rate limiter for rpc beacon blocks

* gofmt

* don't delete empty buckets

* disconnect bad peers

* tell peer they are being rate limited

* defer disconnect

* fix tests

* set burst to x10

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and cryptomental committed Feb 24, 2020
1 parent 6a14bd4 commit 44dd24c
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 2 deletions.
7 changes: 7 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -1483,3 +1483,10 @@ go_repository(
commit = "d7df74196a9e781ede915320c11c378c1b2f3a1f",
importpath = "github.com/cespare/xxhash",
)

go_repository(
name = "com_github_kevinms_leakybucket_go",
importpath = "github.com/kevinms/leakybucket-go",
sum = "h1:oq6BiN7v0MfWCRcJAxSV+hesVMAAV8COrQbTjYNnso4=",
version = "v0.0.0-20190611015032-8a3d0352aa79",
)
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"//shared/slotutil:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
Expand Down Expand Up @@ -115,6 +116,7 @@ go_test(
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

const genericError = "internal service error"
const rateLimitedError = "rate limited"

var errWrongForkVersion = errors.New("wrong fork version")
var errInvalidEpoch = errors.New("invalid epoch")
Expand Down
21 changes: 21 additions & 0 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,36 @@ func (r *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa

startSlot := m.StartSlot
endSlot := startSlot + (m.Step * (m.Count - 1))
remainingBucketCapacity := r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String())

span.AddAttributes(
trace.Int64Attribute("start", int64(startSlot)),
trace.Int64Attribute("end", int64(endSlot)),
trace.Int64Attribute("step", int64(m.Step)),
trace.Int64Attribute("count", int64(m.Count)),
trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()),
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
)

if m.Count > uint64(remainingBucketCapacity) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer r.p2p.Disconnect(stream.Conn().RemotePeer())
}
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
}
return errors.New(rateLimitedError)
}

r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(m.Count))

// TODO(3147): Update this with reasonable constraints.
if endSlot-startSlot > 1000 || m.Step == 0 {
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, "invalid range or step")
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand Down Expand Up @@ -38,7 +39,7 @@ func TestBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
}
}

r := &Service{p2p: p1, db: d}
r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false)}
pcl := protocol.ID("/testing")

var wg sync.WaitGroup
Expand Down
19 changes: 19 additions & 0 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ func (r *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return errors.New("no block roots provided")
}

if int64(len(blockRoots)) > r.blocksRateLimiter.Remaining(stream.Conn().RemotePeer().String()) {
r.p2p.Peers().IncrementBadResponses(stream.Conn().RemotePeer())
if r.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
log.Debug("Disconnecting bad peer")
defer r.p2p.Disconnect(stream.Conn().RemotePeer())
}
resp, err := r.generateErrorResponse(responseCodeInvalidRequest, rateLimitedError)
if err != nil {
log.WithError(err).Error("Failed to generate a response error")
} else {
if _, err := stream.Write(resp); err != nil {
log.WithError(err).Errorf("Failed to write to stream")
}
}
return errors.New(rateLimitedError)
}

r.blocksRateLimiter.Add(stream.Conn().RemotePeer().String(), int64(len(blockRoots)))

for _, root := range blockRoots {
blk, err := r.db.Block(ctx, root)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand Down Expand Up @@ -45,7 +46,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
blkRoots = append(blkRoots, root)
}

r := &Service{p2p: p1, db: d}
r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false)}
pcl := protocol.ID("/testing")

var wg sync.WaitGroup
Expand Down Expand Up @@ -118,6 +119,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
ctx: context.Background(),
blocksRateLimiter: leakybucket.NewCollector(10000, 10000, false),
}

// Setup streams
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"github.com/kevinms/leakybucket-go"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
Expand All @@ -16,6 +17,9 @@ import (

var _ = shared.Service(&Service{})

const allowedBlocksPerSecond = 32.0
const allowedBlocksBurst = 10 * allowedBlocksPerSecond

// Config to set up the regular sync service.
type Config struct {
P2P p2p.P2P
Expand Down Expand Up @@ -50,6 +54,7 @@ func NewRegularSync(cfg *Config) *Service {
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
stateNotifier: cfg.StateNotifier,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}

r.registerRPCHandlers()
Expand All @@ -74,6 +79,7 @@ type Service struct {
initialSync Checker
validateBlockLock sync.RWMutex
stateNotifier statefeed.Notifier
blocksRateLimiter *leakybucket.Collector
}

// Start the regular sync service.
Expand Down

0 comments on commit 44dd24c

Please sign in to comment.