Skip to content

Commit

Permalink
Complete attestation pool for run time (#4286)
Browse files Browse the repository at this point in the history
* Added subscribers

* Fixed conflict

* Delete atts in pool in validate pipeline

* Moved it to subscriber

* Test

* Fixed test

* New curl for forkchoice attestations

* Starting att pool service for fork choice

* Update pool interface

* Update pool interface

* Update sync and node

* Lint

* Gazelle

* Updated servers, filled in missing functionalities

* RPC working with 1 beacon node 64 validators

* Started writing tests. Yay

* Test to aggregate and save multiple fork choice atts

* Tests for BatchAttestations for fork choice

* Fixed exisiting tests

* Minor fixes

* Fmt

* Added batch saves

* Lint

* Mo tests yay

* Delete test

* Fmt

* Update interval

* Fixed aggregation broadcast

* Clean up based on design review comment

* Fixed setupBeaconChain

* Raul's feedback. s/error/err
  • Loading branch information
terencechain authored and rauljordan committed Dec 20, 2019
1 parent f181362 commit 718bf3d
Show file tree
Hide file tree
Showing 35 changed files with 1,007 additions and 56 deletions.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func (s *Service) processAttestation() {
period := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
ctx := context.Background()
runutil.RunEvery(s.ctx, period, func() {
atts, err := s.opsPoolService.AttestationPoolForForkchoice(ctx)
if err != nil {
log.WithError(err).Error("Could not retrieve attestation from pool")
return
}
atts := s.attPool.ForkchoiceAttestations()

for _, a := range atts {
if err := s.attPool.DeleteForkchoiceAttestation(a); err != nil {
log.WithError(err).Error("Could not delete fork choice attestation in pool")
}

if err := s.ReceiveAttestationNoPubsub(ctx, a); err != nil {
log.WithFields(logrus.Fields{
"targetRoot": fmt.Sprintf("%#x", a.Data.Target.Root),
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.BeaconB
},
})

// Add attestations from the block to the pool for fork choice.
if err := s.attPool.SaveBlockAttestations(block.Body.Attestations); err != nil {
log.Errorf("Could not save attestation for fork choice: %v", err)
return nil
}

// Reports on block and fork choice metrics.
s.reportSlotMetrics(blockCopy.Slot)

Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
Expand All @@ -40,6 +41,7 @@ type Service struct {
depositCache *depositcache.DepositCache
chainStartFetcher powchain.ChainStartFetcher
opsPoolService operations.OperationFeeds
attPool attestations.Pool
forkChoiceStore forkchoice.ForkChoicer
genesisTime time.Time
p2p p2p.Broadcaster
Expand All @@ -59,6 +61,7 @@ type Config struct {
BeaconDB db.Database
DepositCache *depositcache.DepositCache
OpsPoolService operations.OperationFeeds
AttPool attestations.Pool
P2p p2p.Broadcaster
MaxRoutines int64
StateNotifier statefeed.Notifier
Expand All @@ -76,6 +79,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
depositCache: cfg.DepositCache,
chainStartFetcher: cfg.ChainStartFetcher,
opsPoolService: cfg.OpsPoolService,
attPool: cfg.AttPool,
forkChoiceStore: store,
p2p: cfg.P2p,
canonicalRoots: make(map[uint64][]byte),
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"io/ioutil"
"reflect"
"testing"
Expand Down Expand Up @@ -121,6 +122,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
OpsPoolService: &ops.Operations{},
P2p: &mockBroadcaster{},
StateNotifier: &mockBeaconNode{},
AttPool: attestations.NewPool(),
}
if err != nil {
t.Fatalf("could not register blockchain service: %v", err)
Expand Down
33 changes: 24 additions & 9 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
flags.ConfigureGlobalFlags(ctx)
registry := shared.NewServiceRegistry()

beacon := &BeaconNode{
ctx: ctx,
services: registry,
stop: make(chan struct{}),
stateFeed: new(event.Feed),
opFeed: new(event.Feed),
attestationPool: attestations.NewPool(),
}

// Use custom config values if the --no-custom-config flag is not set.
if !ctx.GlobalBool(flags.NoCustomConfigFlag.Name) {
if featureconfig.Get().MinimalConfig {
Expand All @@ -104,6 +95,15 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
}
}

beacon := &BeaconNode{
ctx: ctx,
services: registry,
stop: make(chan struct{}),
stateFeed: new(event.Feed),
opFeed: new(event.Feed),
attestationPool: attestations.NewPool(),
}

if err := beacon.startDB(ctx); err != nil {
return nil, err
}
Expand All @@ -120,6 +120,10 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}

if err := beacon.registerAttestationPool(ctx); err != nil {
return nil, err
}

if err := beacon.registerInteropServices(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -294,6 +298,7 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
DepositCache: b.depositCache,
ChainStartFetcher: web3Service,
OpsPoolService: opsService,
AttPool: b.attestationPool,
P2p: b.fetchP2P(ctx),
MaxRoutines: maxRoutines,
StateNotifier: b,
Expand All @@ -312,6 +317,16 @@ func (b *BeaconNode) registerOperationService(ctx *cli.Context) error {
return b.services.RegisterService(operationService)
}

func (b *BeaconNode) registerAttestationPool(ctx *cli.Context) error {
attPoolService, err := attestations.NewService(context.Background(), &attestations.Config{
Pool: b.attestationPool,
})
if err != nil {
return err
}
return b.services.RegisterService(attPoolService)
}

func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error {
if cliCtx.GlobalBool(testSkipPowFlag) {
return b.services.RegisterService(&powchain.Service{})
Expand Down
28 changes: 25 additions & 3 deletions beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,40 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["pool.go"],
srcs = [
"log.go",
"pool.go",
"prepare_forkchoice.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/operations/attestations/kv:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["pool_test.go"],
srcs = [
"pool_test.go",
"prepare_forkchoice_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = ["//beacon-chain/operations/attestations/kv:go_default_library"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/operations/attestations/kv:go_default_library",
"//shared/bls:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)
4 changes: 4 additions & 0 deletions beacon-chain/operations/attestations/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go_library(
name = "go_default_library",
srcs = [
"aggregated.go",
"block.go",
"forkchoice.go",
"kv.go",
"unaggregated.go",
],
Expand All @@ -23,6 +25,8 @@ go_test(
name = "go_default_test",
srcs = [
"aggregated_test.go",
"block_test.go",
"forkchoice_test.go",
"unaggregated_test.go",
],
embed = [":go_default_library"],
Expand Down
14 changes: 12 additions & 2 deletions beacon-chain/operations/attestations/kv/aggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,18 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error {
return nil
}

// AggregatedAttestation returns the aggregated attestations in cache.
func (p *AttCaches) AggregatedAttestation() []*ethpb.Attestation {
// SaveAggregatedAttestations saves a list of aggregated attestations in cache.
func (p *AttCaches) SaveAggregatedAttestations(atts []*ethpb.Attestation) error {
for _, att := range atts {
if err := p.SaveAggregatedAttestation(att); err != nil {
return err
}
}
return nil
}

// AggregatedAttestations returns the aggregated attestations in cache.
func (p *AttCaches) AggregatedAttestations() []*ethpb.Attestation {
atts := make([]*ethpb.Attestation, 0, p.aggregatedAtt.ItemCount())
for s, i := range p.aggregatedAtt.Items() {
// Type assertion for the worst case. This shouldn't happen.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/operations/attestations/kv/aggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestKV_Aggregated_CanSaveRetrieve(t *testing.T) {
}
}

returned := cache.AggregatedAttestation()
returned := cache.AggregatedAttestations()

sort.Slice(returned, func(i, j int) bool {
return returned[i].Data.Slot < returned[j].Data.Slot
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestKV_Aggregated_CanDelete(t *testing.T) {
t.Fatal(err)
}

returned := cache.AggregatedAttestation()
returned := cache.AggregatedAttestations()
wanted := []*ethpb.Attestation{att2}

if !reflect.DeepEqual(wanted, returned) {
Expand Down
60 changes: 60 additions & 0 deletions beacon-chain/operations/attestations/kv/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package kv

import (
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
)

// SaveBlockAttestation saves an block attestation in cache.
func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error {
r, err := ssz.HashTreeRoot(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}

// DefaultExpiration is set to what was given to New(). In this case
// it's one epoch.
p.blockAtt.Set(string(r[:]), att, cache.DefaultExpiration)

return nil
}

// SaveBlockAttestations saves a list of block attestations in cache.
func (p *AttCaches) SaveBlockAttestations(atts []*ethpb.Attestation) error {
for _, att := range atts {
if err := p.SaveBlockAttestation(att); err != nil {
return err
}
}

return nil
}

// BlockAttestations returns the block attestations in cache.
func (p *AttCaches) BlockAttestations() []*ethpb.Attestation {
atts := make([]*ethpb.Attestation, 0, p.blockAtt.ItemCount())
for s, i := range p.blockAtt.Items() {
// Type assertion for the worst case. This shouldn't happen.
att, ok := i.Object.(*ethpb.Attestation)
if !ok {
p.blockAtt.Delete(s)
}
atts = append(atts, att)
}

return atts
}

// DeleteBlockAttestation deletes a block attestation in cache.
func (p *AttCaches) DeleteBlockAttestation(att *ethpb.Attestation) error {
r, err := ssz.HashTreeRoot(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}

p.blockAtt.Delete(string(r[:]))

return nil
}
Loading

0 comments on commit 718bf3d

Please sign in to comment.