Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete attestation pool for run time #4286

Merged
merged 35 commits into from
Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
638c894
Added subscribers
terencechain Dec 10, 2019
e039d78
Clean up
terencechain Dec 10, 2019
10d85a2
Fixed conflict
terencechain Dec 10, 2019
15e6f42
Delete atts in pool in validate pipeline
terencechain Dec 10, 2019
fe88abe
Moved it to subscriber
terencechain Dec 10, 2019
e63404b
Merge branch 'v0.9.2' of https://github.com/prysmaticlabs/prysm into …
terencechain Dec 13, 2019
e524315
Test
terencechain Dec 13, 2019
5d4acff
Fixed test
terencechain Dec 13, 2019
fc6a09c
New curl for forkchoice attestations
terencechain Dec 13, 2019
d7ebf55
Starting att pool service for fork choice
terencechain Dec 13, 2019
92b990a
Update pool interface
terencechain Dec 13, 2019
888be41
Update pool interface
terencechain Dec 13, 2019
9ef6ff7
Update sync and node
terencechain Dec 13, 2019
0720dfc
Merged with master
terencechain Dec 13, 2019
f0ec1f6
Lint
terencechain Dec 15, 2019
d165445
Gazelle
terencechain Dec 15, 2019
cf01350
Updated servers, filled in missing functionalities
terencechain Dec 15, 2019
c6c2874
RPC working with 1 beacon node 64 validators
terencechain Dec 15, 2019
b35d087
Started writing tests. Yay
terencechain Dec 16, 2019
7a1ce3e
Test to aggregate and save multiple fork choice atts
terencechain Dec 17, 2019
da50ad3
Tests for BatchAttestations for fork choice
terencechain Dec 17, 2019
a43a2d8
Fixed exisiting tests
terencechain Dec 17, 2019
d6dc2c1
Minor fixes
terencechain Dec 17, 2019
3e5dd8c
Fmt
terencechain Dec 17, 2019
b30bdb6
Added batch saves
terencechain Dec 17, 2019
3cf5cfb
Lint
terencechain Dec 17, 2019
477a9d5
Mo tests yay
terencechain Dec 17, 2019
b8d7d05
Delete test
terencechain Dec 17, 2019
1eb0aa0
Fmt
terencechain Dec 17, 2019
daeac54
Update interval
terencechain Dec 18, 2019
a7fc53f
Fixed aggregation broadcast
terencechain Dec 18, 2019
1abdbe9
Clean up based on design review comment
terencechain Dec 18, 2019
13e2541
Fixed setupBeaconChain
terencechain Dec 18, 2019
3547207
Conflict
terencechain Dec 20, 2019
ba217b5
Raul's feedback. s/error/err
terencechain Dec 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func (s *Service) processAttestation() {
period := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
ctx := context.Background()
runutil.RunEvery(s.ctx, period, func() {
atts, err := s.opsPoolService.AttestationPoolForForkchoice(ctx)
if err != nil {
log.WithError(err).Error("Could not retrieve attestation from pool")
return
}
atts := s.attPool.ForkchoiceAttestations()

for _, a := range atts {
if err := s.attPool.DeleteForkchoiceAttestation(a); err != nil {
log.WithError(err).Error("Could not delete fork choice attestation in pool")
}

if err := s.ReceiveAttestationNoPubsub(ctx, a); err != nil {
log.WithFields(logrus.Fields{
"targetRoot": fmt.Sprintf("%#x", a.Data.Target.Root),
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.BeaconB
},
})

// Add attestations from the block to the pool for fork choice.
if err := s.attPool.SaveBlockAttestations(block.Body.Attestations); err != nil {
log.Errorf("Could not save attestation for fork choice: %v", err)
return nil
}

// Reports on block and fork choice metrics.
s.reportSlotMetrics(blockCopy.Slot)

Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
Expand All @@ -40,6 +41,7 @@ type Service struct {
depositCache *depositcache.DepositCache
chainStartFetcher powchain.ChainStartFetcher
opsPoolService operations.OperationFeeds
attPool attestations.Pool
forkChoiceStore forkchoice.ForkChoicer
genesisTime time.Time
p2p p2p.Broadcaster
Expand All @@ -59,6 +61,7 @@ type Config struct {
BeaconDB db.Database
DepositCache *depositcache.DepositCache
OpsPoolService operations.OperationFeeds
AttPool attestations.Pool
P2p p2p.Broadcaster
MaxRoutines int64
StateNotifier statefeed.Notifier
Expand All @@ -76,6 +79,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
depositCache: cfg.DepositCache,
chainStartFetcher: cfg.ChainStartFetcher,
opsPoolService: cfg.OpsPoolService,
attPool: cfg.AttPool,
forkChoiceStore: store,
p2p: cfg.P2p,
canonicalRoots: make(map[uint64][]byte),
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"io/ioutil"
"reflect"
"testing"
Expand Down Expand Up @@ -121,6 +122,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {
OpsPoolService: &ops.Operations{},
P2p: &mockBroadcaster{},
StateNotifier: &mockBeaconNode{},
AttPool: attestations.NewPool(),
}
if err != nil {
t.Fatalf("could not register blockchain service: %v", err)
Expand Down
33 changes: 24 additions & 9 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
flags.ConfigureGlobalFlags(ctx)
registry := shared.NewServiceRegistry()

beacon := &BeaconNode{
ctx: ctx,
services: registry,
stop: make(chan struct{}),
stateFeed: new(event.Feed),
opFeed: new(event.Feed),
attestationPool: attestations.NewPool(),
}

// Use custom config values if the --no-custom-config flag is not set.
if !ctx.GlobalBool(flags.NoCustomConfigFlag.Name) {
if featureconfig.Get().MinimalConfig {
Expand All @@ -104,6 +95,15 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
}
}

beacon := &BeaconNode{
ctx: ctx,
services: registry,
stop: make(chan struct{}),
stateFeed: new(event.Feed),
opFeed: new(event.Feed),
attestationPool: attestations.NewPool(),
}

if err := beacon.startDB(ctx); err != nil {
return nil, err
}
Expand All @@ -120,6 +120,10 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}

if err := beacon.registerAttestationPool(ctx); err != nil {
return nil, err
}

if err := beacon.registerInteropServices(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -294,6 +298,7 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
DepositCache: b.depositCache,
ChainStartFetcher: web3Service,
OpsPoolService: opsService,
AttPool: b.attestationPool,
P2p: b.fetchP2P(ctx),
MaxRoutines: maxRoutines,
StateNotifier: b,
Expand All @@ -312,6 +317,16 @@ func (b *BeaconNode) registerOperationService(ctx *cli.Context) error {
return b.services.RegisterService(operationService)
}

func (b *BeaconNode) registerAttestationPool(ctx *cli.Context) error {
attPoolService, err := attestations.NewService(context.Background(), &attestations.Config{
Pool: b.attestationPool,
})
if err != nil {
return err
}
return b.services.RegisterService(attPoolService)
}

func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error {
if cliCtx.GlobalBool(testSkipPowFlag) {
return b.services.RegisterService(&powchain.Service{})
Expand Down
28 changes: 25 additions & 3 deletions beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,40 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["pool.go"],
srcs = [
"log.go",
"pool.go",
"prepare_forkchoice.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/operations/attestations/kv:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["pool_test.go"],
srcs = [
"pool_test.go",
"prepare_forkchoice_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = ["//beacon-chain/operations/attestations/kv:go_default_library"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/operations/attestations/kv:go_default_library",
"//shared/bls:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
)
4 changes: 4 additions & 0 deletions beacon-chain/operations/attestations/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go_library(
name = "go_default_library",
srcs = [
"aggregated.go",
"block.go",
"forkchoice.go",
"kv.go",
"unaggregated.go",
],
Expand All @@ -23,6 +25,8 @@ go_test(
name = "go_default_test",
srcs = [
"aggregated_test.go",
"block_test.go",
"forkchoice_test.go",
"unaggregated_test.go",
],
embed = [":go_default_library"],
Expand Down
14 changes: 12 additions & 2 deletions beacon-chain/operations/attestations/kv/aggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,18 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error {
return nil
}

// AggregatedAttestation returns the aggregated attestations in cache.
func (p *AttCaches) AggregatedAttestation() []*ethpb.Attestation {
// SaveAggregatedAttestations saves a list of aggregated attestations in cache.
func (p *AttCaches) SaveAggregatedAttestations(atts []*ethpb.Attestation) error {
for _, att := range atts {
if err := p.SaveAggregatedAttestation(att); err != nil {
return err
}
}
return nil
}

// AggregatedAttestations returns the aggregated attestations in cache.
func (p *AttCaches) AggregatedAttestations() []*ethpb.Attestation {
atts := make([]*ethpb.Attestation, 0, p.aggregatedAtt.ItemCount())
for s, i := range p.aggregatedAtt.Items() {
// Type assertion for the worst case. This shouldn't happen.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/operations/attestations/kv/aggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestKV_Aggregated_CanSaveRetrieve(t *testing.T) {
}
}

returned := cache.AggregatedAttestation()
returned := cache.AggregatedAttestations()

sort.Slice(returned, func(i, j int) bool {
return returned[i].Data.Slot < returned[j].Data.Slot
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestKV_Aggregated_CanDelete(t *testing.T) {
t.Fatal(err)
}

returned := cache.AggregatedAttestation()
returned := cache.AggregatedAttestations()
wanted := []*ethpb.Attestation{att2}

if !reflect.DeepEqual(wanted, returned) {
Expand Down
60 changes: 60 additions & 0 deletions beacon-chain/operations/attestations/kv/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package kv

import (
"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
)

// SaveBlockAttestation saves an block attestation in cache.
func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error {
r, err := ssz.HashTreeRoot(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}

// DefaultExpiration is set to what was given to New(). In this case
// it's one epoch.
p.blockAtt.Set(string(r[:]), att, cache.DefaultExpiration)

return nil
}

// SaveBlockAttestations saves a list of block attestations in cache.
func (p *AttCaches) SaveBlockAttestations(atts []*ethpb.Attestation) error {
for _, att := range atts {
if err := p.SaveBlockAttestation(att); err != nil {
return err
}
}

return nil
}

// BlockAttestations returns the block attestations in cache.
func (p *AttCaches) BlockAttestations() []*ethpb.Attestation {
atts := make([]*ethpb.Attestation, 0, p.blockAtt.ItemCount())
for s, i := range p.blockAtt.Items() {
// Type assertion for the worst case. This shouldn't happen.
att, ok := i.Object.(*ethpb.Attestation)
if !ok {
p.blockAtt.Delete(s)
}
atts = append(atts, att)
}

return atts
}

// DeleteBlockAttestation deletes a block attestation in cache.
func (p *AttCaches) DeleteBlockAttestation(att *ethpb.Attestation) error {
r, err := ssz.HashTreeRoot(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}

p.blockAtt.Delete(string(r[:]))

return nil
}
Loading