From 638c894d3e75edbf0fd9b4c5b16798f3a2747ac4 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Tue, 10 Dec 2019 08:43:59 -0800 Subject: [PATCH 01/31] Added subscribers --- beacon-chain/node/node.go | 2 ++ beacon-chain/sync/service.go | 4 ++++ beacon-chain/sync/subscriber.go | 5 +++++ .../sync/subscriber_beacon_aggregate_proof.go | 14 ++++++++++++++ 4 files changed, 25 insertions(+) create mode 100644 beacon-chain/sync/subscriber_beacon_aggregate_proof.go diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index bfbd788ee788..e5ded480fe1a 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -24,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/gateway" interopcoldstart "github.com/prysmaticlabs/prysm/beacon-chain/interop-cold-start" "github.com/prysmaticlabs/prysm/beacon-chain/operations" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" "github.com/prysmaticlabs/prysm/beacon-chain/rpc" @@ -57,6 +58,7 @@ type BeaconNode struct { lock sync.RWMutex stop chan struct{} // Channel to wait for termination notifications. db db.Database + attPool attestations.AttestationPool depositCache *depositcache.DepositCache stateFeed *event.Feed } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 99ebae3bedc9..019c82e6c8f4 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -11,6 +11,7 @@ import ( statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/operations" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared" @@ -23,6 +24,7 @@ type Config struct { P2P p2p.P2P DB db.Database Operations *operations.Service + AttPool attestations.AttestationPool Chain blockchainService InitialSync Checker StateNotifier statefeed.Notifier @@ -45,6 +47,7 @@ func NewRegularSync(cfg *Config) *RegularSync { db: cfg.DB, p2p: cfg.P2P, operations: cfg.Operations, + attPool: cfg.AttPool, chain: cfg.Chain, initialSync: cfg.InitialSync, slotToPendingBlocks: make(map[uint64]*ethpb.BeaconBlock), @@ -65,6 +68,7 @@ type RegularSync struct { p2p p2p.P2P db db.Database operations *operations.Service + attPool attestations.AttestationPool chain blockchainService slotToPendingBlocks map[uint64]*ethpb.BeaconBlock seenPendingBlocks map[[32]byte]bool diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index f3662003219d..b679f08f8c31 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -72,6 +72,11 @@ func (r *RegularSync) registerSubscribers() { r.validateBeaconAttestation, r.beaconAttestationSubscriber, ) + r.subscribe( + "/eth2/beacon_aggregate_and_proof", + r.validateAggregateAndProof, + r.beaconAggregateProofSubscriber, + ) r.subscribe( "/eth2/voluntary_exit", r.validateVoluntaryExit, diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go new file mode 100644 index 000000000000..09daf6151521 --- /dev/null +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -0,0 +1,14 @@ +package sync + +import ( + "context" + + "github.com/gogo/protobuf/proto" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" +) + +// beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the +// attestation pool for processing. +func (r *RegularSync) beaconAggregateProofSubscriber(ctx context.Context, msg proto.Message) error { + return r.attPool.SaveAggregatedAttestation(msg.(*ethpb.Attestation)) +} From 10d85a2993fde4cbd6dea06689327a3da2d875f1 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Tue, 10 Dec 2019 09:00:08 -0800 Subject: [PATCH 02/31] Fixed conflict --- beacon-chain/sync/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 019c82e6c8f4..8cf5a978200f 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -24,7 +24,7 @@ type Config struct { P2P p2p.P2P DB db.Database Operations *operations.Service - AttPool attestations.AttestationPool + AttPool attestations.Pool Chain blockchainService InitialSync Checker StateNotifier statefeed.Notifier @@ -68,7 +68,7 @@ type RegularSync struct { p2p p2p.P2P db db.Database operations *operations.Service - attPool attestations.AttestationPool + attPool attestations.Pool chain blockchainService slotToPendingBlocks map[uint64]*ethpb.BeaconBlock seenPendingBlocks map[[32]byte]bool From 15e6f4205c15e4174d945fd82259df4bdefc9e54 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Tue, 10 Dec 2019 10:21:14 -0800 Subject: [PATCH 03/31] Delete atts in pool in validate pipeline --- beacon-chain/sync/validate_beacon_blocks.go | 23 +++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index eaec35dc566b..8e45af94f113 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -51,6 +51,11 @@ func (r *RegularSync) validateBeaconBlockPubSub(ctx context.Context, msg proto.M } r.pendingQueueLock.RUnlock() + // Delete the same attestations from the block in the pool to avoid inclusion in future block. + if err := r.deleteAttsInPool(m.Body.Attestations); err != nil { + return false, err + } + if _, ok := recentlySeenRoots.Get(string(blockRoot[:])); ok || r.db.HasBlock(ctx, blockRoot) { return false, nil } @@ -82,3 +87,21 @@ func (r *RegularSync) validateBeaconBlockPubSub(ctx context.Context, msg proto.M return err == nil, err } + +// The input attestations are seen by the network, this deletes them from pool +// so proposers don't include them in a block for the future. +func (r *RegularSync) deleteAttsInPool(atts []*ethpb.Attestation) error { + for _, att := range atts { + if helpers.IsAggregated(att) { + if err := r.attPool.DeleteAggregatedAttestation(att); err != nil { + return err + } + } else { + // Ideally there's shouldn't be any unaggregated attestation in the block. + if err := r.attPool.DeleteUnaggregatedAttestation(att); err != nil { + return err + } + } + } + return nil +} From fe88abe613a0070df2f6541829d53676f9cd4244 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Tue, 10 Dec 2019 10:29:58 -0800 Subject: [PATCH 04/31] Moved it to subscriber --- beacon-chain/sync/subscriber_beacon_blocks.go | 25 +++++++++++++++++++ beacon-chain/sync/validate_beacon_blocks.go | 23 ----------------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index a3e91dc92f79..3450616e609e 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -45,5 +45,30 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa if err != nil { interop.WriteBlockToDisk(block, true /*failed*/) } + + // Delete the same attestations from the block in the pool to avoid inclusion in future block. + if err := r.deleteAttsInPool(block.Body.Attestations); err != nil { + log.Errorf("Could not delete attestations in pool: %v", err) + return nil + } + return err } + +// The input attestations are seen by the network, this deletes them from pool +// so proposers don't include them in a block for the future. +func (r *RegularSync) deleteAttsInPool(atts []*ethpb.Attestation) error { + for _, att := range atts { + if helpers.IsAggregated(att) { + if err := r.attPool.DeleteAggregatedAttestation(att); err != nil { + return err + } + } else { + // Ideally there's shouldn't be any unaggregated attestation in the block. + if err := r.attPool.DeleteUnaggregatedAttestation(att); err != nil { + return err + } + } + } + return nil +} diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 8e45af94f113..eaec35dc566b 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -51,11 +51,6 @@ func (r *RegularSync) validateBeaconBlockPubSub(ctx context.Context, msg proto.M } r.pendingQueueLock.RUnlock() - // Delete the same attestations from the block in the pool to avoid inclusion in future block. - if err := r.deleteAttsInPool(m.Body.Attestations); err != nil { - return false, err - } - if _, ok := recentlySeenRoots.Get(string(blockRoot[:])); ok || r.db.HasBlock(ctx, blockRoot) { return false, nil } @@ -87,21 +82,3 @@ func (r *RegularSync) validateBeaconBlockPubSub(ctx context.Context, msg proto.M return err == nil, err } - -// The input attestations are seen by the network, this deletes them from pool -// so proposers don't include them in a block for the future. -func (r *RegularSync) deleteAttsInPool(atts []*ethpb.Attestation) error { - for _, att := range atts { - if helpers.IsAggregated(att) { - if err := r.attPool.DeleteAggregatedAttestation(att); err != nil { - return err - } - } else { - // Ideally there's shouldn't be any unaggregated attestation in the block. - if err := r.attPool.DeleteUnaggregatedAttestation(att); err != nil { - return err - } - } - } - return nil -} From e524315bc4fb297cb5e657d6ca203643c8c3be4d Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Fri, 13 Dec 2019 09:29:29 -0800 Subject: [PATCH 05/31] Test --- beacon-chain/sync/BUILD.bazel | 1 + beacon-chain/sync/subscriber_beacon_blocks.go | 2 +- .../sync/subscriber_beacon_blocks_test.go | 35 +++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 168ce4ffde4e..6457a34d05bb 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -101,6 +101,7 @@ go_test( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/state:go_default_library", "//beacon-chain/db/testing:go_default_library", + "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/p2p:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 3450616e609e..843285e72ba9 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -46,7 +46,7 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa interop.WriteBlockToDisk(block, true /*failed*/) } - // Delete the same attestations from the block in the pool to avoid inclusion in future block. + // Delete attestations from the block in the pool to avoid inclusion in future block. if err := r.deleteAttsInPool(block.Body.Attestations); err != nil { log.Errorf("Could not delete attestations in pool: %v", err) return nil diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index f24c9a505b38..fe0d350387cf 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -3,12 +3,15 @@ package sync import ( "context" "fmt" + "reflect" "testing" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" @@ -49,3 +52,35 @@ func TestRegularSyncBeaconBlockSubscriber_FilterByFinalizedEpoch(t *testing.T) { } testutil.AssertLogsDoNotContain(t, hook, "Received a block older than finalized checkpoint") } + +func TestDeleteAttsInPool(t *testing.T) { + r := &RegularSync{ + attPool: attestations.NewPool(), + } + att1 := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b1110}} + att3 := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b1011}} + att4 := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b1001}} + if err := r.attPool.SaveAggregatedAttestation(att1); err != nil { + t.Fatal(err) + } + if err := r.attPool.SaveAggregatedAttestation(att2); err != nil { + t.Fatal(err) + } + if err := r.attPool.SaveAggregatedAttestation(att3); err != nil { + t.Fatal(err) + } + if err := r.attPool.SaveUnaggregatedAttestation(att4); err != nil { + t.Fatal(err) + } + + // Seen 1, 3 and 4 in block + if err := r.deleteAttsInPool([]*ethpb.Attestation{att1, att3, att4}); err != nil { + t.Fatal(err) + } + + // Only 2 should remain + if !reflect.DeepEqual(r.attPool.AggregatedAttestation(), []*ethpb.Attestation{att2}) { + t.Error("Did not get wanted attestation from pool") + } +} From 5d4acff336ee7ee7458b5a3bacc4de04970e49b2 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Fri, 13 Dec 2019 09:43:38 -0800 Subject: [PATCH 06/31] Fixed test --- beacon-chain/sync/subscriber_beacon_blocks_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index fe0d350387cf..2e9346032886 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -37,9 +37,10 @@ func TestRegularSyncBeaconBlockSubscriber_FilterByFinalizedEpoch(t *testing.T) { r := &RegularSync{ db: db, chain: &mock.ChainService{State: s}, + attPool: attestations.NewPool(), } - b := ðpb.BeaconBlock{Slot: 1, ParentRoot: parentRoot[:]} + b := ðpb.BeaconBlock{Slot: 1, ParentRoot: parentRoot[:], Body: ðpb.BeaconBlockBody{}} if err := r.beaconBlockSubscriber(context.Background(), b); err != nil { t.Fatal(err) } From fc6a09ca73abdee69914f99678d63002e310ff8a Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Fri, 13 Dec 2019 15:21:47 -0800 Subject: [PATCH 07/31] New curl for forkchoice attestations --- .../operations/attestations/kv/forkchoice.go | 50 ++++++++++ .../attestations/kv/forkchoice_test.go | 96 +++++++++++++++++++ beacon-chain/operations/attestations/kv/kv.go | 4 +- 3 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 beacon-chain/operations/attestations/kv/forkchoice.go create mode 100644 beacon-chain/operations/attestations/kv/forkchoice_test.go diff --git a/beacon-chain/operations/attestations/kv/forkchoice.go b/beacon-chain/operations/attestations/kv/forkchoice.go new file mode 100644 index 000000000000..d32b35e37995 --- /dev/null +++ b/beacon-chain/operations/attestations/kv/forkchoice.go @@ -0,0 +1,50 @@ +package kv + +import ( + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" +) + +// SaveAggregatedAttestation saves an forkchoice attestation in cache. +func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error { + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + // DefaultExpiration is set to what was given to New(). In this case + // it's one epoch. + p.forkchoiceAtt.Set(string(r[:]), att, cache.DefaultExpiration) + + return nil +} + +// ForkchoiceAttestation returns the forkchoice attestations in cache. +func (p *AttCaches) ForkchoiceAttestation() []*ethpb.Attestation { + atts := make([]*ethpb.Attestation, 0, p.forkchoiceAtt.ItemCount()) + for s, i := range p.forkchoiceAtt.Items() { + // Type assertion for the worst case. This shouldn't happen. + att, ok := i.Object.(*ethpb.Attestation) + if !ok { + p.forkchoiceAtt.Delete(s) + } + atts = append(atts, att) + } + + return atts +} + +// DeleteForkchoiceAttestation deletes a forkchoice attestation in cache. +func (p *AttCaches) DeleteForkchoiceAttestation(att *ethpb.Attestation) error { + + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + p.forkchoiceAtt.Delete(string(r[:])) + + return nil +} diff --git a/beacon-chain/operations/attestations/kv/forkchoice_test.go b/beacon-chain/operations/attestations/kv/forkchoice_test.go new file mode 100644 index 000000000000..a5414ee7baba --- /dev/null +++ b/beacon-chain/operations/attestations/kv/forkchoice_test.go @@ -0,0 +1,96 @@ +package kv + +import ( + "math" + "reflect" + "sort" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/shared/params" +) + +func TestKV_Forkchoice_CanSaveRetrieve(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveForkchoiceAttestation(att); err != nil { + t.Fatal(err) + } + } + + returned := cache.ForkchoiceAttestation() + + sort.Slice(returned, func(i, j int) bool { + return returned[i].Data.Slot < returned[j].Data.Slot + }) + + if !reflect.DeepEqual(atts, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_Forkchoice_CanDelete(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveForkchoiceAttestation(att); err != nil { + t.Fatal(err) + } + } + + if err := cache.DeleteForkchoiceAttestation(att1); err != nil { + t.Fatal(err) + } + if err := cache.DeleteForkchoiceAttestation(att3); err != nil { + t.Fatal(err) + } + + returned := cache.ForkchoiceAttestation() + wanted := []*ethpb.Attestation{att2} + + if !reflect.DeepEqual(wanted, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_Forkchoice_CheckExpTime(t *testing.T) { + cache := NewAttCaches() + + att := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b111}} + r, _ := ssz.HashTreeRoot(att) + + if err := cache.SaveForkchoiceAttestation(att); err != nil { + t.Fatal(err) + } + + item, exp, exists := cache.aggregatedAtt.GetWithExpiration(string(r[:])) + if !exists { + t.Error("Saved att does not exist") + } + + receivedAtt := item.(*ethpb.Attestation) + if !proto.Equal(att, receivedAtt) { + t.Error("Did not receive correct aggregated att") + } + + wanted := float64(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) + if math.RoundToEven(exp.Sub(time.Now()).Seconds()) != wanted { + t.Errorf("Did not receive correct exp time. Wanted: %f, got: %f", wanted, + math.RoundToEven(exp.Sub(time.Now()).Seconds())) + } +} diff --git a/beacon-chain/operations/attestations/kv/kv.go b/beacon-chain/operations/attestations/kv/kv.go index 7cbbf5f00a45..c7976bb7817e 100644 --- a/beacon-chain/operations/attestations/kv/kv.go +++ b/beacon-chain/operations/attestations/kv/kv.go @@ -13,7 +13,7 @@ import ( type AttCaches struct { aggregatedAtt *cache.Cache unAggregatedAtt *cache.Cache - attInBlock *cache.Cache + forkchoiceAtt *cache.Cache } // NewAttCaches initializes a new attestation pool consists of multiple KV store in cache for @@ -26,7 +26,7 @@ func NewAttCaches() *AttCaches { pool := &AttCaches{ unAggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), aggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), - attInBlock: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), + forkchoiceAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), } return pool From d7ebf553afbfe13e108ad132a988645458a3e46d Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Fri, 13 Dec 2019 15:22:07 -0800 Subject: [PATCH 08/31] Starting att pool service for fork choice --- .../operations/attestations/service.go | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 beacon-chain/operations/attestations/service.go diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go new file mode 100644 index 000000000000..56ae68762c48 --- /dev/null +++ b/beacon-chain/operations/attestations/service.go @@ -0,0 +1,48 @@ +package attestations + +import ( + "context" +) + +// Service represents a service that handles the internal +// logic of attestation pool operations +type Service struct { + ctx context.Context + cancel context.CancelFunc + pool Pool + error error +} + +// Config options for the service. +type Config struct { + Pool Pool +} + +// NewService instantiates a new attestation pool service instance that will +// be registered into a running beacon node. +func NewService(ctx context.Context, cfg *Config) *Service { + ctx, cancel := context.WithCancel(ctx) + return &Service{ + ctx: ctx, + cancel: cancel, + } +} + +// Start an attestation pool service's main event loop. +func (s *Service) Start() { +} + +// Stop the beacon block attestation pool service's main event loop +// and associated goroutines. +func (s *Service) Stop() error { + defer s.cancel() + return nil +} + +// Status returns the current service error if there's any. +func (s *Service) Status() error { + if s.error != nil { + return s.error + } + return nil +} From 92b990a81254b7ad6183a51d57d24c5432524ead Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Fri, 13 Dec 2019 15:28:58 -0800 Subject: [PATCH 09/31] Update pool interface --- beacon-chain/archiver/BUILD.bazel | 43 ------------------------------- 1 file changed, 43 deletions(-) delete mode 100644 beacon-chain/archiver/BUILD.bazel diff --git a/beacon-chain/archiver/BUILD.bazel b/beacon-chain/archiver/BUILD.bazel deleted file mode 100644 index 47304321161d..000000000000 --- a/beacon-chain/archiver/BUILD.bazel +++ /dev/null @@ -1,43 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "go_default_library", - srcs = ["service.go"], - importpath = "github.com/prysmaticlabs/prysm/beacon-chain/archiver", - visibility = ["//beacon-chain:__subpackages__"], - deps = [ - "//beacon-chain/blockchain:go_default_library", - "//beacon-chain/core/epoch:go_default_library", - "//beacon-chain/core/feed:go_default_library", - "//beacon-chain/core/feed/state:go_default_library", - "//beacon-chain/core/helpers:go_default_library", - "//beacon-chain/core/validators:go_default_library", - "//beacon-chain/db:go_default_library", - "//proto/beacon/p2p/v1:go_default_library", - "//shared/params:go_default_library", - "@com_github_pkg_errors//:go_default_library", - "@com_github_sirupsen_logrus//:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["service_test.go"], - embed = [":go_default_library"], - deps = [ - "//beacon-chain/blockchain/testing:go_default_library", - "//beacon-chain/core/feed:go_default_library", - "//beacon-chain/core/feed/state:go_default_library", - "//beacon-chain/core/helpers:go_default_library", - "//beacon-chain/db:go_default_library", - "//beacon-chain/db/testing:go_default_library", - "//proto/beacon/p2p/v1:go_default_library", - "//shared/params:go_default_library", - "//shared/testutil:go_default_library", - "@com_github_gogo_protobuf//proto:go_default_library", - "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", - "@com_github_prysmaticlabs_go_bitfield//:go_default_library", - "@com_github_sirupsen_logrus//:go_default_library", - "@com_github_sirupsen_logrus//hooks/test:go_default_library", - ], -) From 888be4174906a86e8afb681b3670eff70c7df136 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Fri, 13 Dec 2019 15:28:58 -0800 Subject: [PATCH 10/31] Update pool interface --- beacon-chain/operations/attestations/pool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/beacon-chain/operations/attestations/pool.go b/beacon-chain/operations/attestations/pool.go index 403f16ee68d8..1ea28d7bea87 100644 --- a/beacon-chain/operations/attestations/pool.go +++ b/beacon-chain/operations/attestations/pool.go @@ -19,6 +19,9 @@ type Pool interface { UnaggregatedAttestations(slot uint64, committeeIndex uint64) []*ethpb.Attestation DeleteUnaggregatedAttestation(att *ethpb.Attestation) error // For forkchoice attestations + SaveForkchoiceAttestation(att *ethpb.Attestation) error + ForkchoiceAttestations() []*ethpb.Attestation + DeleteForkchoiceAttestation(att *ethpb.Attestation) error } // NewPool initializes a new attestation pool. From 9ef6ff791e4b239429b10ca3599dd04e355a9d31 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Fri, 13 Dec 2019 15:29:23 -0800 Subject: [PATCH 11/31] Update sync and node --- beacon-chain/archiver/BUILD.bazel | 43 +++++++++++++++++++ beacon-chain/node/node.go | 11 +++++ beacon-chain/sync/subscriber_beacon_blocks.go | 8 ++++ .../sync/subscriber_beacon_blocks_test.go | 4 +- 4 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 beacon-chain/archiver/BUILD.bazel diff --git a/beacon-chain/archiver/BUILD.bazel b/beacon-chain/archiver/BUILD.bazel new file mode 100644 index 000000000000..47304321161d --- /dev/null +++ b/beacon-chain/archiver/BUILD.bazel @@ -0,0 +1,43 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["service.go"], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/archiver", + visibility = ["//beacon-chain:__subpackages__"], + deps = [ + "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/core/epoch:go_default_library", + "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/state:go_default_library", + "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/validators:go_default_library", + "//beacon-chain/db:go_default_library", + "//proto/beacon/p2p/v1:go_default_library", + "//shared/params:go_default_library", + "@com_github_pkg_errors//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["service_test.go"], + embed = [":go_default_library"], + deps = [ + "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/state:go_default_library", + "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/db:go_default_library", + "//beacon-chain/db/testing:go_default_library", + "//proto/beacon/p2p/v1:go_default_library", + "//shared/params:go_default_library", + "//shared/testutil:go_default_library", + "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_bitfield//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", + ], +) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index e977ed421558..4ff7c7bdc0a1 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -118,6 +118,10 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { return nil, err } + if err := beacon.registerAttestationPool(ctx); err != nil { + return nil, err + } + if err := beacon.registerInteropServices(ctx); err != nil { return nil, err } @@ -305,6 +309,13 @@ func (b *BeaconNode) registerOperationService(ctx *cli.Context) error { return b.services.RegisterService(operationService) } +func (b *BeaconNode) registerAttestationPool(ctx *cli.Context) error { + attPoolService := attestations.NewService(context.Background(), &attestations.Config{ + Pool: b.attestationPool, + }) + return b.services.RegisterService(attPoolService) +} + func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error { if cliCtx.GlobalBool(testSkipPowFlag) { return b.services.RegisterService(&powchain.Service{}) diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 843285e72ba9..4a6711a9df82 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -52,6 +52,14 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa return nil } + // Add attestations from the block to the fork choice pool. + for _, att := range block.Body.Attestations { + if err := r.attPool.SaveForkchoiceAttestation(att); err != nil { + log.Errorf("Could not save attestation for fork choice: %v", err) + return nil + } + } + return err } diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index 2e9346032886..6ee9674bf8be 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -35,8 +35,8 @@ func TestRegularSyncBeaconBlockSubscriber_FilterByFinalizedEpoch(t *testing.T) { } parentRoot, _ := ssz.SigningRoot(parent) r := &RegularSync{ - db: db, - chain: &mock.ChainService{State: s}, + db: db, + chain: &mock.ChainService{State: s}, attPool: attestations.NewPool(), } From f0ec1f650c642999970ecc9f59ab4eb658d8f33d Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Sat, 14 Dec 2019 17:45:36 -0800 Subject: [PATCH 12/31] Lint --- beacon-chain/operations/attestations/kv/forkchoice.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon-chain/operations/attestations/kv/forkchoice.go b/beacon-chain/operations/attestations/kv/forkchoice.go index 6d143683c901..725338bfd1a1 100644 --- a/beacon-chain/operations/attestations/kv/forkchoice.go +++ b/beacon-chain/operations/attestations/kv/forkchoice.go @@ -7,7 +7,7 @@ import ( "github.com/prysmaticlabs/go-ssz" ) -// SaveAggregatedAttestation saves an forkchoice attestation in cache. +// SaveForkchoiceAttestation saves an forkchoice attestation in cache. func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error { r, err := ssz.HashTreeRoot(att) if err != nil { @@ -21,7 +21,7 @@ func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error { return nil } -// ForkchoiceAttestation returns the forkchoice attestations in cache. +// ForkchoiceAttestations returns the forkchoice attestations in cache. func (p *AttCaches) ForkchoiceAttestations() []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.forkchoiceAtt.ItemCount()) for s, i := range p.forkchoiceAtt.Items() { From d16544542fcc6e46b13b2adcb2d83c8a93c80a68 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Sat, 14 Dec 2019 17:46:57 -0800 Subject: [PATCH 13/31] Gazelle --- beacon-chain/operations/attestations/BUILD.bazel | 5 ++++- beacon-chain/operations/attestations/kv/BUILD.bazel | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index 413817a09719..65622a87ad8b 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["pool.go"], + srcs = [ + "pool.go", + "service.go", + ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations", visibility = ["//beacon-chain:__subpackages__"], deps = [ diff --git a/beacon-chain/operations/attestations/kv/BUILD.bazel b/beacon-chain/operations/attestations/kv/BUILD.bazel index 8b5725b877c0..8dcada254168 100644 --- a/beacon-chain/operations/attestations/kv/BUILD.bazel +++ b/beacon-chain/operations/attestations/kv/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "aggregated.go", + "forkchoice.go", "kv.go", "unaggregated.go", ], @@ -23,6 +24,7 @@ go_test( name = "go_default_test", srcs = [ "aggregated_test.go", + "forkchoice_test.go", "unaggregated_test.go", ], embed = [":go_default_library"], From cf01350f405d816df1f56dc6a8c53802eb1554b5 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Sun, 15 Dec 2019 08:43:31 -0800 Subject: [PATCH 14/31] Updated servers, filled in missing functionalities --- .../operations/attestations/BUILD.bazel | 4 + .../operations/attestations/kv/BUILD.bazel | 2 + .../operations/attestations/kv/aggregated.go | 4 +- .../attestations/kv/aggregated_test.go | 4 +- .../operations/attestations/kv/block.go | 49 ++++++++++ .../operations/attestations/kv/block_test.go | 96 +++++++++++++++++++ .../attestations/kv/forkchoice_test.go | 6 +- beacon-chain/operations/attestations/kv/kv.go | 2 + .../attestations/kv/unaggregated.go | 21 +++- .../attestations/kv/unaggregated_test.go | 4 +- beacon-chain/operations/attestations/pool.go | 11 ++- .../operations/attestations/service.go | 80 ++++++++++++++++ beacon-chain/rpc/aggregator/server.go | 2 +- beacon-chain/rpc/aggregator/server_test.go | 4 +- beacon-chain/rpc/beacon/attestations.go | 2 +- beacon-chain/rpc/validator/proposer.go | 2 +- beacon-chain/sync/subscriber_beacon_blocks.go | 1 - .../sync/subscriber_beacon_blocks_test.go | 2 +- 18 files changed, 275 insertions(+), 21 deletions(-) create mode 100644 beacon-chain/operations/attestations/kv/block.go create mode 100644 beacon-chain/operations/attestations/kv/block_test.go diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index 65622a87ad8b..32814dbb4242 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -9,8 +9,12 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations", visibility = ["//beacon-chain:__subpackages__"], deps = [ + "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/operations/attestations/kv:go_default_library", + "//shared/hashutil:go_default_library", + "@com_github_dgraph_io_ristretto//:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_ssz//:go_default_library", ], ) diff --git a/beacon-chain/operations/attestations/kv/BUILD.bazel b/beacon-chain/operations/attestations/kv/BUILD.bazel index 8dcada254168..e167898ae41e 100644 --- a/beacon-chain/operations/attestations/kv/BUILD.bazel +++ b/beacon-chain/operations/attestations/kv/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "aggregated.go", + "block.go", "forkchoice.go", "kv.go", "unaggregated.go", @@ -24,6 +25,7 @@ go_test( name = "go_default_test", srcs = [ "aggregated_test.go", + "block_test.go", "forkchoice_test.go", "unaggregated_test.go", ], diff --git a/beacon-chain/operations/attestations/kv/aggregated.go b/beacon-chain/operations/attestations/kv/aggregated.go index e104090a6d59..bf790aa1aa95 100644 --- a/beacon-chain/operations/attestations/kv/aggregated.go +++ b/beacon-chain/operations/attestations/kv/aggregated.go @@ -26,8 +26,8 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error { return nil } -// AggregatedAttestation returns the aggregated attestations in cache. -func (p *AttCaches) AggregatedAttestation() []*ethpb.Attestation { +// AggregatedAttestations returns the aggregated attestations in cache. +func (p *AttCaches) AggregatedAttestations() []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.aggregatedAtt.ItemCount()) for s, i := range p.aggregatedAtt.Items() { // Type assertion for the worst case. This shouldn't happen. diff --git a/beacon-chain/operations/attestations/kv/aggregated_test.go b/beacon-chain/operations/attestations/kv/aggregated_test.go index 589b562ef5cd..bfa9fb91bff3 100644 --- a/beacon-chain/operations/attestations/kv/aggregated_test.go +++ b/beacon-chain/operations/attestations/kv/aggregated_test.go @@ -40,7 +40,7 @@ func TestKV_Aggregated_CanSaveRetrieve(t *testing.T) { } } - returned := cache.AggregatedAttestation() + returned := cache.AggregatedAttestations() sort.Slice(returned, func(i, j int) bool { return returned[i].Data.Slot < returned[j].Data.Slot @@ -72,7 +72,7 @@ func TestKV_Aggregated_CanDelete(t *testing.T) { t.Fatal(err) } - returned := cache.AggregatedAttestation() + returned := cache.AggregatedAttestations() wanted := []*ethpb.Attestation{att2} if !reflect.DeepEqual(wanted, returned) { diff --git a/beacon-chain/operations/attestations/kv/block.go b/beacon-chain/operations/attestations/kv/block.go new file mode 100644 index 000000000000..9b72800c8752 --- /dev/null +++ b/beacon-chain/operations/attestations/kv/block.go @@ -0,0 +1,49 @@ +package kv + +import ( + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" +) + +// SaveBlockAttestation saves an block attestation in cache. +func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error { + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + // DefaultExpiration is set to what was given to New(). In this case + // it's one epoch. + p.blockAtt.Set(string(r[:]), att, cache.DefaultExpiration) + + return nil +} + +// BlockAttestations returns the block attestations in cache. +func (p *AttCaches) BlockAttestations() []*ethpb.Attestation { + atts := make([]*ethpb.Attestation, 0, p.blockAtt.ItemCount()) + for s, i := range p.blockAtt.Items() { + // Type assertion for the worst case. This shouldn't happen. + att, ok := i.Object.(*ethpb.Attestation) + if !ok { + p.blockAtt.Delete(s) + } + atts = append(atts, att) + } + + return atts +} + +// DeleteBlockAttestation deletes a block attestation in cache. +func (p *AttCaches) DeleteBlockAttestation(att *ethpb.Attestation) error { + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + p.blockAtt.Delete(string(r[:])) + + return nil +} diff --git a/beacon-chain/operations/attestations/kv/block_test.go b/beacon-chain/operations/attestations/kv/block_test.go new file mode 100644 index 000000000000..32cccc50a991 --- /dev/null +++ b/beacon-chain/operations/attestations/kv/block_test.go @@ -0,0 +1,96 @@ +package kv + +import ( + "math" + "reflect" + "sort" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/shared/params" +) + +func TestKV_BlockAttestation_CanSaveRetrieve(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + } + + returned := cache.BlockAttestations() + + sort.Slice(returned, func(i, j int) bool { + return returned[i].Data.Slot < returned[j].Data.Slot + }) + + if !reflect.DeepEqual(atts, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_BlockAttestation_CanDelete(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + } + + if err := cache.DeleteBlockAttestation(att1); err != nil { + t.Fatal(err) + } + if err := cache.DeleteBlockAttestation(att3); err != nil { + t.Fatal(err) + } + + returned := cache.BlockAttestations() + wanted := []*ethpb.Attestation{att2} + + if !reflect.DeepEqual(wanted, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_BlockAttestation_CheckExpTime(t *testing.T) { + cache := NewAttCaches() + + att := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b111}} + r, _ := ssz.HashTreeRoot(att) + + if err := cache.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + + item, exp, exists := cache.blockAtt.GetWithExpiration(string(r[:])) + if !exists { + t.Error("Saved att does not exist") + } + + receivedAtt := item.(*ethpb.Attestation) + if !proto.Equal(att, receivedAtt) { + t.Error("Did not receive correct aggregated att") + } + + wanted := float64(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) + if math.RoundToEven(exp.Sub(time.Now()).Seconds()) != wanted { + t.Errorf("Did not receive correct exp time. Wanted: %f, got: %f", wanted, + math.RoundToEven(exp.Sub(time.Now()).Seconds())) + } +} diff --git a/beacon-chain/operations/attestations/kv/forkchoice_test.go b/beacon-chain/operations/attestations/kv/forkchoice_test.go index a5414ee7baba..caba719c4516 100644 --- a/beacon-chain/operations/attestations/kv/forkchoice_test.go +++ b/beacon-chain/operations/attestations/kv/forkchoice_test.go @@ -28,7 +28,7 @@ func TestKV_Forkchoice_CanSaveRetrieve(t *testing.T) { } } - returned := cache.ForkchoiceAttestation() + returned := cache.ForkchoiceAttestations() sort.Slice(returned, func(i, j int) bool { return returned[i].Data.Slot < returned[j].Data.Slot @@ -60,7 +60,7 @@ func TestKV_Forkchoice_CanDelete(t *testing.T) { t.Fatal(err) } - returned := cache.ForkchoiceAttestation() + returned := cache.ForkchoiceAttestations() wanted := []*ethpb.Attestation{att2} if !reflect.DeepEqual(wanted, returned) { @@ -78,7 +78,7 @@ func TestKV_Forkchoice_CheckExpTime(t *testing.T) { t.Fatal(err) } - item, exp, exists := cache.aggregatedAtt.GetWithExpiration(string(r[:])) + item, exp, exists := cache.forkchoiceAtt.GetWithExpiration(string(r[:])) if !exists { t.Error("Saved att does not exist") } diff --git a/beacon-chain/operations/attestations/kv/kv.go b/beacon-chain/operations/attestations/kv/kv.go index c7976bb7817e..362738c61e28 100644 --- a/beacon-chain/operations/attestations/kv/kv.go +++ b/beacon-chain/operations/attestations/kv/kv.go @@ -14,6 +14,7 @@ type AttCaches struct { aggregatedAtt *cache.Cache unAggregatedAtt *cache.Cache forkchoiceAtt *cache.Cache + blockAtt *cache.Cache } // NewAttCaches initializes a new attestation pool consists of multiple KV store in cache for @@ -27,6 +28,7 @@ func NewAttCaches() *AttCaches { unAggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), aggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), forkchoiceAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), + blockAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), } return pool diff --git a/beacon-chain/operations/attestations/kv/unaggregated.go b/beacon-chain/operations/attestations/kv/unaggregated.go index d585e77367e2..d1b5d10da9b0 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated.go +++ b/beacon-chain/operations/attestations/kv/unaggregated.go @@ -26,9 +26,9 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error { return nil } -// UnaggregatedAttestations returns the aggregated attestations in cache, +// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache, // filtered by committee index and slot. -func (p *AttCaches) UnaggregatedAttestations(slot uint64, committeeIndex uint64) []*ethpb.Attestation { +func (p *AttCaches) UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.unAggregatedAtt.ItemCount()) for s, i := range p.unAggregatedAtt.Items() { @@ -46,6 +46,23 @@ func (p *AttCaches) UnaggregatedAttestations(slot uint64, committeeIndex uint64) return atts } +// UnaggregatedAttestations returns all the unaggregated attestations in cache. +func (p *AttCaches) UnaggregatedAttestations() []*ethpb.Attestation { + atts := make([]*ethpb.Attestation, 0, p.unAggregatedAtt.ItemCount()) + for s, i := range p.unAggregatedAtt.Items() { + + // Type assertion for the worst case. This shouldn't happen. + att, ok := i.Object.(*ethpb.Attestation) + if !ok { + p.unAggregatedAtt.Delete(s) + } + + atts = append(atts, att) + } + + return atts +} + // DeleteUnaggregatedAttestation deletes the unaggregated attestations in cache. func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error { if helpers.IsAggregated(att) { diff --git a/beacon-chain/operations/attestations/kv/unaggregated_test.go b/beacon-chain/operations/attestations/kv/unaggregated_test.go index 8fddf2a575d3..cecdaa4f341d 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated_test.go +++ b/beacon-chain/operations/attestations/kv/unaggregated_test.go @@ -40,7 +40,7 @@ func TestKV_Unaggregated_CanSaveRetrieve(t *testing.T) { } } - returned := cache.UnaggregatedAttestations(data.Slot, data.CommitteeIndex) + returned := cache.UnaggregatedAttestationsBySlotIndex(data.Slot, data.CommitteeIndex) wanted := []*ethpb.Attestation{att2, att3} if !reflect.DeepEqual(len(wanted), len(returned)) { @@ -69,7 +69,7 @@ func TestKV_Unaggregated_CanDelete(t *testing.T) { t.Fatal(err) } - returned := cache.UnaggregatedAttestations(2, 0) + returned := cache.UnaggregatedAttestationsBySlotIndex(2, 0) if !reflect.DeepEqual([]*ethpb.Attestation{}, returned) { t.Error("Did not receive correct aggregated atts") diff --git a/beacon-chain/operations/attestations/pool.go b/beacon-chain/operations/attestations/pool.go index 1ea28d7bea87..23923dfa078d 100644 --- a/beacon-chain/operations/attestations/pool.go +++ b/beacon-chain/operations/attestations/pool.go @@ -12,13 +12,18 @@ import ( type Pool interface { // For Aggregated attestations SaveAggregatedAttestation(att *ethpb.Attestation) error - AggregatedAttestation() []*ethpb.Attestation + AggregatedAttestations() []*ethpb.Attestation DeleteAggregatedAttestation(att *ethpb.Attestation) error // For unaggregated attestations SaveUnaggregatedAttestation(att *ethpb.Attestation) error - UnaggregatedAttestations(slot uint64, committeeIndex uint64) []*ethpb.Attestation + UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation + UnaggregatedAttestations() []*ethpb.Attestation DeleteUnaggregatedAttestation(att *ethpb.Attestation) error - // For forkchoice attestations + // For attestations that were included in the block + SaveBlockAttestation(att *ethpb.Attestation) error + BlockAttestations() []*ethpb.Attestation + DeleteBlockAttestation(att *ethpb.Attestation) error + // For attestations to be passed to fork choice SaveForkchoiceAttestation(att *ethpb.Attestation) error ForkchoiceAttestations() []*ethpb.Attestation DeleteForkchoiceAttestation(att *ethpb.Attestation) error diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index 56ae68762c48..27dbd01be94d 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -2,8 +2,23 @@ package attestations import ( "context" + + "github.com/dgraph-io/ristretto" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/shared/hashutil" ) +var forkchoiceProcessedRootsSize = int64(1 << 16) + +// forkchoiceProcessedAttRoots cache with max size of ~2Mib ( including keys) +var forkchoiceProcessedRoots, _ = ristretto.NewCache(&ristretto.Config{ + NumCounters: forkchoiceProcessedRootsSize, + MaxCost: forkchoiceProcessedRootsSize, + BufferItems: 64, +}) + // Service represents a service that handles the internal // logic of attestation pool operations type Service struct { @@ -25,6 +40,7 @@ func NewService(ctx context.Context, cfg *Config) *Service { return &Service{ ctx: ctx, cancel: cancel, + pool: cfg.Pool, } } @@ -46,3 +62,67 @@ func (s *Service) Status() error { } return nil } + +// PrepareAttsForForkchoice gets the attestations from the unaggregated, aggregated and block +// pool. Find the common data and aggregate them for fork choice. The resulting attestations +// are saved in the fork choice pool. +func (s *Service) PrepareAttsForForkchoice() error { + attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation) + + atts := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...) + atts = append(atts, s.pool.BlockAttestations()...) + + for _, att := range atts { + seen, err := seen(att) + if err != nil { + return nil + } + if seen { + continue + } + + attDataRoot, err := ssz.HashTreeRoot(att.Data) + if err != nil { + return err + } + attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att) + } + + for _, atts := range attsByDataRoot { + if err := s.aggregateAndSaveForkchoiceAtts(atts); err != nil { + return err + } + } + + return nil +} + +// This aggregates a list of attestations using the aggregation algorithm defined in AggregateAttestations +// and saves the attestations for fork choice. +func (s *Service) aggregateAndSaveForkchoiceAtts(atts []*ethpb.Attestation) error { + aggregatedAtts, err := helpers.AggregateAttestations(atts) + if err != nil { + return err + } + for _, att := range aggregatedAtts { + if err := s.pool.SaveForkchoiceAttestation(att); err != nil { + return err + } + } + return nil +} + +// This checks if the attestation has previously been aggregated for fork choice +// return true if yes, false if no. +func seen(att *ethpb.Attestation) (bool, error) { + attRoot, err := hashutil.HashProto(att) + if err != nil { + return false, err + } + if _, ok := forkchoiceProcessedRoots.Get(string(attRoot[:])); ok { + return true, nil + } + forkchoiceProcessedRoots.Set(string(attRoot[:]), true /*value*/, 1 /*cost*/) + + return false, nil +} diff --git a/beacon-chain/rpc/aggregator/server.go b/beacon-chain/rpc/aggregator/server.go index 9d6e6f531df6..ade4f8d5e037 100644 --- a/beacon-chain/rpc/aggregator/server.go +++ b/beacon-chain/rpc/aggregator/server.go @@ -80,7 +80,7 @@ func (as *Server) SubmitAggregateAndProof(ctx context.Context, req *pb.Aggregati } // Retrieve the unaggregated attestation from pool - atts := as.AttPool.UnaggregatedAttestations(req.Slot, req.CommitteeIndex) + atts := as.AttPool.UnaggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex) // Verify attestations are valid before aggregating and broadcasting them out. validAtts := make([]*ethpb.Attestation, 0, len(atts)) diff --git a/beacon-chain/rpc/aggregator/server_test.go b/beacon-chain/rpc/aggregator/server_test.go index daf5424912b2..ef50297cbf47 100644 --- a/beacon-chain/rpc/aggregator/server_test.go +++ b/beacon-chain/rpc/aggregator/server_test.go @@ -144,7 +144,7 @@ func TestSubmitAggregateAndProof_AggregateOk(t *testing.T) { t.Fatal(err) } - aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestation() + aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestations() wanted, err := helpers.AggregateAttestation(att0, att1) if err != nil { t.Fatal(err) @@ -194,7 +194,7 @@ func TestSubmitAggregateAndProof_AggregateNotOk(t *testing.T) { t.Fatal(err) } - aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestation() + aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestations() if len(aggregatedAtts) != 0 { t.Errorf("Wanted aggregated attestation 0, got %d", len(aggregatedAtts)) } diff --git a/beacon-chain/rpc/beacon/attestations.go b/beacon-chain/rpc/beacon/attestations.go index f671cb404f0d..55cc4ca5fd34 100644 --- a/beacon-chain/rpc/beacon/attestations.go +++ b/beacon-chain/rpc/beacon/attestations.go @@ -133,7 +133,7 @@ func (bs *Server) StreamAttestations( func (bs *Server) AttestationPool( ctx context.Context, _ *ptypes.Empty, ) (*ethpb.AttestationPoolResponse, error) { - atts := bs.Pool.AggregatedAttestation() + atts := bs.Pool.AggregatedAttestations() return ðpb.AttestationPoolResponse{ Attestations: atts, }, nil diff --git a/beacon-chain/rpc/validator/proposer.go b/beacon-chain/rpc/validator/proposer.go index fa8fcf9490dc..a9ab8be331ec 100644 --- a/beacon-chain/rpc/validator/proposer.go +++ b/beacon-chain/rpc/validator/proposer.go @@ -55,7 +55,7 @@ func (vs *Server) GetBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb } // Pack aggregated attestations which have not been included in the beacon chain. - atts := vs.AttPool.AggregatedAttestation() + atts := vs.AttPool.AggregatedAttestations() atts, err = vs.filterAttestationsForBlockInclusion(ctx, req.Slot, atts) if err != nil { return nil, status.Errorf(codes.Internal, "Could not filter attestations: %v", err) diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index a612278f765e..4a6711a9df82 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -52,7 +52,6 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa return nil } - // Add attestations from the block to the fork choice pool. for _, att := range block.Body.Attestations { if err := r.attPool.SaveForkchoiceAttestation(att); err != nil { diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index 6ee9674bf8be..1b7d434ff0d9 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -81,7 +81,7 @@ func TestDeleteAttsInPool(t *testing.T) { } // Only 2 should remain - if !reflect.DeepEqual(r.attPool.AggregatedAttestation(), []*ethpb.Attestation{att2}) { + if !reflect.DeepEqual(r.attPool.AggregatedAttestations(), []*ethpb.Attestation{att2}) { t.Error("Did not get wanted attestation from pool") } } From c6c28747f86b2b00a18b088005ad51bf15b1e5e0 Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Sun, 15 Dec 2019 11:19:24 -0800 Subject: [PATCH 15/31] RPC working with 1 beacon node 64 validators --- beacon-chain/blockchain/BUILD.bazel | 1 + .../blockchain/receive_attestation.go | 8 ++-- beacon-chain/blockchain/service.go | 4 ++ beacon-chain/node/node.go | 1 + .../operations/attestations/BUILD.bazel | 4 ++ beacon-chain/operations/attestations/log.go | 7 ++++ .../operations/attestations/service.go | 40 ++++++++++++++++--- beacon-chain/rpc/service.go | 2 + beacon-chain/rpc/validator/proposer.go | 23 ++++++++++- beacon-chain/rpc/validator/server.go | 2 +- 10 files changed, 80 insertions(+), 12 deletions(-) create mode 100644 beacon-chain/operations/attestations/log.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 064b5c0d6d2e..aa462b021109 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//beacon-chain/core/state:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/operations:go_default_library", + "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/p2p:go_default_library", "//beacon-chain/powchain:go_default_library", "//proto/beacon/p2p/v1:go_default_library", diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 470e1fa90637..7cdfb7a1309e 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -61,10 +61,7 @@ func (s *Service) processAttestation() { ctx := context.Background() select { case <-ticker.C: - atts, err := s.opsPoolService.AttestationPoolForForkchoice(ctx) - if err != nil { - log.WithError(err).Error("Could not retrieve attestation from pool") - } + atts := s.attPool.ForkchoiceAttestations() for _, a := range atts { if err := s.ReceiveAttestationNoPubsub(ctx, a); err != nil { @@ -72,6 +69,9 @@ func (s *Service) processAttestation() { "targetRoot": fmt.Sprintf("%#x", a.Data.Target.Root), }).WithError(err).Error("Could not receive attestation in chain service") } + if err := s.attPool.DeleteForkchoiceAttestation(a); err != nil { + log.WithError(err).Error("Could not delete fork choice attestation in pool") + } } case <-s.ctx.Done(): log.Debug("Context closed, exiting routine") diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 849d80073be2..c99e547cb877 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -22,6 +22,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/operations" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -40,6 +41,7 @@ type Service struct { depositCache *depositcache.DepositCache chainStartFetcher powchain.ChainStartFetcher opsPoolService operations.OperationFeeds + attPool attestations.Pool forkChoiceStore forkchoice.ForkChoicer genesisTime time.Time p2p p2p.Broadcaster @@ -59,6 +61,7 @@ type Config struct { BeaconDB db.Database DepositCache *depositcache.DepositCache OpsPoolService operations.OperationFeeds + AttPool attestations.Pool P2p p2p.Broadcaster MaxRoutines int64 StateNotifier statefeed.Notifier @@ -76,6 +79,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { depositCache: cfg.DepositCache, chainStartFetcher: cfg.ChainStartFetcher, opsPoolService: cfg.OpsPoolService, + attPool: cfg.AttPool, forkChoiceStore: store, p2p: cfg.P2p, canonicalRoots: make(map[uint64][]byte), diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index a3867c198021..44538199242a 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -298,6 +298,7 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error { DepositCache: b.depositCache, ChainStartFetcher: web3Service, OpsPoolService: opsService, + AttPool: b.attestationPool, P2p: b.fetchP2P(ctx), MaxRoutines: maxRoutines, StateNotifier: b, diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index 32814dbb4242..4e297fd75b5e 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "log.go", "pool.go", "service.go", ], @@ -12,9 +13,12 @@ go_library( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/operations/attestations/kv:go_default_library", "//shared/hashutil:go_default_library", + "//shared/params:go_default_library", "@com_github_dgraph_io_ristretto//:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_go_ssz//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@io_opencensus_go//trace:go_default_library", ], ) diff --git a/beacon-chain/operations/attestations/log.go b/beacon-chain/operations/attestations/log.go new file mode 100644 index 000000000000..9a30bf344e06 --- /dev/null +++ b/beacon-chain/operations/attestations/log.go @@ -0,0 +1,7 @@ +package attestations + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("prefix", "pool/attestations") diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index 27dbd01be94d..1610d580c9a9 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -2,12 +2,15 @@ package attestations import ( "context" + "time" "github.com/dgraph-io/ristretto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/params" + "go.opencensus.io/trace" ) var forkchoiceProcessedRootsSize = int64(1 << 16) @@ -19,6 +22,9 @@ var forkchoiceProcessedRoots, _ = ristretto.NewCache(&ristretto.Config{ BufferItems: 64, }) +// prepare attestations for fork choice for every half of the slot. +var prepareForkchoiceAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second + // Service represents a service that handles the internal // logic of attestation pool operations type Service struct { @@ -46,6 +52,7 @@ func NewService(ctx context.Context, cfg *Config) *Service { // Start an attestation pool service's main event loop. func (s *Service) Start() { + go s.forkChoiceAtts() } // Stop the beacon block attestation pool service's main event loop @@ -63,10 +70,13 @@ func (s *Service) Status() error { return nil } -// PrepareAttsForForkchoice gets the attestations from the unaggregated, aggregated and block -// pool. Find the common data and aggregate them for fork choice. The resulting attestations +// This gets the attestations from the unaggregated, aggregated and block +// pool. Then finds the common data and aggregate them for fork choice. The resulting attestations // are saved in the fork choice pool. -func (s *Service) PrepareAttsForForkchoice() error { +func (s *Service) prepareForkChoiceAtts(ctx context.Context) error { + _, span := trace.StartSpan(ctx, "Operations.attestations.prepareForkChoiceAtts") + defer span.End() + attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation) atts := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...) @@ -75,7 +85,7 @@ func (s *Service) PrepareAttsForForkchoice() error { for _, att := range atts { seen, err := seen(att) if err != nil { - return nil + return err } if seen { continue @@ -89,7 +99,7 @@ func (s *Service) PrepareAttsForForkchoice() error { } for _, atts := range attsByDataRoot { - if err := s.aggregateAndSaveForkchoiceAtts(atts); err != nil { + if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { return err } } @@ -99,7 +109,7 @@ func (s *Service) PrepareAttsForForkchoice() error { // This aggregates a list of attestations using the aggregation algorithm defined in AggregateAttestations // and saves the attestations for fork choice. -func (s *Service) aggregateAndSaveForkchoiceAtts(atts []*ethpb.Attestation) error { +func (s *Service) aggregateAndSaveForkChoiceAtts(atts []*ethpb.Attestation) error { aggregatedAtts, err := helpers.AggregateAttestations(atts) if err != nil { return err @@ -112,6 +122,24 @@ func (s *Service) aggregateAndSaveForkchoiceAtts(atts []*ethpb.Attestation) erro return nil } +// This prepares fork choice attestations by running prepareForkChoiceAtts +// every prepareForkchoiceAttsPeriod. +func (s *Service) forkChoiceAtts() { + ticker := time.NewTicker(prepareForkchoiceAttsPeriod) + for { + ctx := context.Background() + select { + case <-ticker.C: + if err := s.prepareForkChoiceAtts(ctx); err != nil { + log.WithError(err).Error("Could not save fork choice attestations") + } + case <-s.ctx.Done(): + log.Debug("Context closed, exiting routine") + return + } + } +} + // This checks if the attestation has previously been aggregated for fork choice // return true if yes, false if no. func seen(att *ethpb.Attestation) (bool, error) { diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 90eef5f8f115..d6aa242c6ea0 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -217,6 +217,8 @@ func (s *Service) Start() { BeaconDB: s.beaconDB, HeadFetcher: s.headFetcher, SyncChecker: s.syncService, + AttPool: s.attestationsPool, + P2p: s.p2p, } pb.RegisterAggregatorServiceServer(s.grpcServer, aggregatorServer) ethpb.RegisterNodeServer(s.grpcServer, nodeServer) diff --git a/beacon-chain/rpc/validator/proposer.go b/beacon-chain/rpc/validator/proposer.go index a9ab8be331ec..f03e08c93f34 100644 --- a/beacon-chain/rpc/validator/proposer.go +++ b/beacon-chain/rpc/validator/proposer.go @@ -110,6 +110,10 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.BeaconBlock) (*et return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err) } + if err := vs.deleteAttsInPool(blk.Body.Attestations); err != nil { + return nil, status.Errorf(codes.Internal, "Could not delete attestations in pool: %v", err) + } + return ðpb.ProposeResponse{ BlockRoot: root[:], }, nil @@ -353,7 +357,7 @@ func (vs *Server) filterAttestationsForBlockInclusion(ctx context.Context, slot break } - if err := blocks.VerifyAttestation(ctx, bState, att); err != nil { + if _, err := blocks.ProcessAttestation(ctx, bState, att); err != nil { if helpers.IsAggregated(att) { if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil { return nil, err @@ -371,6 +375,23 @@ func (vs *Server) filterAttestationsForBlockInclusion(ctx context.Context, slot return validAtts, nil } +// The input attestations are processed and seen by the node, this deletes them from pool +// so proposers don't include them in a block for the future. +func (vs *Server) deleteAttsInPool(atts []*ethpb.Attestation) error { + for _, att := range atts { + if helpers.IsAggregated(att) { + if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil { + return err + } + } else { + if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil { + return err + } + } + } + return nil +} + func constructMerkleProof(trie *trieutil.MerkleTrie, index int, deposit *ethpb.Deposit) (*ethpb.Deposit, error) { proof, err := trie.MerkleProof(index) if err != nil { diff --git a/beacon-chain/rpc/validator/server.go b/beacon-chain/rpc/validator/server.go index aaa8d15b8954..a14e9cb31eff 100644 --- a/beacon-chain/rpc/validator/server.go +++ b/beacon-chain/rpc/validator/server.go @@ -41,7 +41,6 @@ type Server struct { Ctx context.Context BeaconDB db.Database AttestationCache *cache.AttestationCache - AttPool attestations.Pool HeadFetcher blockchain.HeadFetcher ForkFetcher blockchain.ForkFetcher CanonicalStateChan chan *pbp2p.BeaconState @@ -54,6 +53,7 @@ type Server struct { OperationsHandler operations.Handler P2P p2p.Broadcaster Pool operations.Pool + AttPool attestations.Pool BlockReceiver blockchain.BlockReceiver MockEth1Votes bool Eth1BlockFetcher powchain.POWBlockFetcher From b35d087fcb45d315aceead5321e3547eef4e37da Mon Sep 17 00:00:00 2001 From: Terence Tsao Date: Sun, 15 Dec 2019 16:25:46 -0800 Subject: [PATCH 16/31] Started writing tests. Yay --- beacon-chain/node/node.go | 5 +- .../operations/attestations/BUILD.bazel | 14 +- .../attestations/prepare_forkchoice.go | 100 ++++++++++++++ .../attestations/prepare_forkchoice_test.go | 90 +++++++++++++ .../operations/attestations/service.go | 122 +++--------------- 5 files changed, 221 insertions(+), 110 deletions(-) create mode 100644 beacon-chain/operations/attestations/prepare_forkchoice.go create mode 100644 beacon-chain/operations/attestations/prepare_forkchoice_test.go diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 44538199242a..92b77ddcaed1 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -318,9 +318,12 @@ func (b *BeaconNode) registerOperationService(ctx *cli.Context) error { } func (b *BeaconNode) registerAttestationPool(ctx *cli.Context) error { - attPoolService := attestations.NewService(context.Background(), &attestations.Config{ + attPoolService, err := attestations.NewService(context.Background(), &attestations.Config{ Pool: b.attestationPool, }) + if err != nil { + return err + } return b.services.RegisterService(attPoolService) } diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index 4e297fd75b5e..0108c0166f84 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "log.go", "pool.go", + "prepare_forkchoice.go", "service.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations", @@ -24,7 +25,16 @@ go_library( go_test( name = "go_default_test", - srcs = ["pool_test.go"], + srcs = [ + "pool_test.go", + "prepare_forkchoice_test.go", + ], embed = [":go_default_library"], - deps = ["//beacon-chain/operations/attestations/kv:go_default_library"], + deps = [ + "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/operations/attestations/kv:go_default_library", + "//shared/bls:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_bitfield//:go_default_library", + ], ) diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go new file mode 100644 index 000000000000..5d8feb95e59f --- /dev/null +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -0,0 +1,100 @@ +package attestations + +import ( + "context" + "time" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/params" + "go.opencensus.io/trace" +) + +// prepare attestations for fork choice at every half of the slot. +var prepareForkChoiceAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second + +// This prepares fork choice attestations by running batchForkChoiceAtts +// every prepareForkChoiceAttsPeriod. +func (s *Service) prepareForkChoiceAtts() { + ticker := time.NewTicker(prepareForkChoiceAttsPeriod) + for { + ctx := context.Background() + select { + case <-ticker.C: + if err := s.batchForkChoiceAtts(ctx); err != nil { + log.WithError(err).Error("Could not prepare attestations for fork choice") + } + case <-s.ctx.Done(): + log.Debug("Context closed, exiting routine") + return + } + } +} + +// This gets the attestations from the unaggregated, aggregated and block +// pool. Then finds the common data, aggregate and batch them for fork choice. +// The resulting attestations are saved in the fork choice pool. +func (s *Service) batchForkChoiceAtts(ctx context.Context) error { + _, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts") + defer span.End() + + attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation) + + atts := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...) + atts = append(atts, s.pool.BlockAttestations()...) + + for _, att := range atts { + seen, err := s.seen(att) + if err != nil { + return err + } + if seen { + continue + } + + attDataRoot, err := ssz.HashTreeRoot(att.Data) + if err != nil { + return err + } + attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att) + } + + for _, atts := range attsByDataRoot { + if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { + return err + } + } + + return nil +} + +// This aggregates a list of attestations using the aggregation algorithm defined in AggregateAttestations +// and saves the attestations for fork choice. +func (s *Service) aggregateAndSaveForkChoiceAtts(atts []*ethpb.Attestation) error { + aggregatedAtts, err := helpers.AggregateAttestations(atts) + if err != nil { + return err + } + for _, att := range aggregatedAtts { + if err := s.pool.SaveForkchoiceAttestation(att); err != nil { + return err + } + } + return nil +} + +// This checks if the attestation has previously been aggregated for fork choice +// return true if yes, false if no. +func (s *Service) seen(att *ethpb.Attestation) (bool, error) { + attRoot, err := hashutil.HashProto(att) + if err != nil { + return false, err + } + if _, ok := s.forkChoiceProcessedRoots.Get(string(attRoot[:])); ok { + return true, nil + } + s.forkChoiceProcessedRoots.Set(string(attRoot[:]), true /*value*/, 1 /*cost*/) + return false, nil +} diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go new file mode 100644 index 000000000000..d49dffb5da41 --- /dev/null +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -0,0 +1,90 @@ +package attestations + +import ( + "context" + "reflect" + "testing" + "time" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/shared/bls" +) + +func Test_AggregateAndSaveForkChoiceAtts_Single(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool:NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + atts := []*ethpb.Attestation{ + {AggregationBits:bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, + {AggregationBits:bitfield.Bitlist{0b110}, Signature: sig.Marshal()}} + if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations(atts) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { + t.Error("Did not aggregation and save") + } + } + +func Test_AggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool:NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + atts := []*ethpb.Attestation{ + {AggregationBits:bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, + {AggregationBits:bitfield.Bitlist{0b110}, Signature: sig.Marshal()}} + if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations(atts) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { + t.Error("Did not aggregation and save") + } +} + +func Test_SeenAttestations_PresentInCache(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool:NewPool()}) + if err != nil { + t.Fatal(err) + } + + att1 := ðpb.Attestation{Signature: []byte{'A'}} + got, err := s.seen(att1) + if err != nil { + t.Fatal(err) + } + if got { + t.Error("Wanted false, got true") + } + + time.Sleep(100 * time.Millisecond) + got, err = s.seen(att1) + if err != nil { + t.Fatal(err) + } + if !got { + t.Error("Wanted true, got false") + } +} diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index 1610d580c9a9..cd578f769f31 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -2,36 +2,19 @@ package attestations import ( "context" - "time" "github.com/dgraph-io/ristretto" - ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/go-ssz" - "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" - "github.com/prysmaticlabs/prysm/shared/hashutil" - "github.com/prysmaticlabs/prysm/shared/params" - "go.opencensus.io/trace" ) -var forkchoiceProcessedRootsSize = int64(1 << 16) +var forkChoiceProcessedRootsSize = int64(1 << 16) -// forkchoiceProcessedAttRoots cache with max size of ~2Mib ( including keys) -var forkchoiceProcessedRoots, _ = ristretto.NewCache(&ristretto.Config{ - NumCounters: forkchoiceProcessedRootsSize, - MaxCost: forkchoiceProcessedRootsSize, - BufferItems: 64, -}) - -// prepare attestations for fork choice for every half of the slot. -var prepareForkchoiceAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second - -// Service represents a service that handles the internal // logic of attestation pool operations type Service struct { ctx context.Context cancel context.CancelFunc pool Pool error error + forkChoiceProcessedRoots *ristretto.Cache } // Config options for the service. @@ -41,18 +24,28 @@ type Config struct { // NewService instantiates a new attestation pool service instance that will // be registered into a running beacon node. -func NewService(ctx context.Context, cfg *Config) *Service { +func NewService(ctx context.Context, cfg *Config) (*Service, error) { + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: forkChoiceProcessedRootsSize, + MaxCost: forkChoiceProcessedRootsSize, + BufferItems: 64, + }) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(ctx) return &Service{ ctx: ctx, cancel: cancel, pool: cfg.Pool, - } + forkChoiceProcessedRoots: cache, + }, nil } // Start an attestation pool service's main event loop. func (s *Service) Start() { - go s.forkChoiceAtts() + go s.prepareForkChoiceAtts() } // Stop the beacon block attestation pool service's main event loop @@ -69,88 +62,3 @@ func (s *Service) Status() error { } return nil } - -// This gets the attestations from the unaggregated, aggregated and block -// pool. Then finds the common data and aggregate them for fork choice. The resulting attestations -// are saved in the fork choice pool. -func (s *Service) prepareForkChoiceAtts(ctx context.Context) error { - _, span := trace.StartSpan(ctx, "Operations.attestations.prepareForkChoiceAtts") - defer span.End() - - attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation) - - atts := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...) - atts = append(atts, s.pool.BlockAttestations()...) - - for _, att := range atts { - seen, err := seen(att) - if err != nil { - return err - } - if seen { - continue - } - - attDataRoot, err := ssz.HashTreeRoot(att.Data) - if err != nil { - return err - } - attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att) - } - - for _, atts := range attsByDataRoot { - if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { - return err - } - } - - return nil -} - -// This aggregates a list of attestations using the aggregation algorithm defined in AggregateAttestations -// and saves the attestations for fork choice. -func (s *Service) aggregateAndSaveForkChoiceAtts(atts []*ethpb.Attestation) error { - aggregatedAtts, err := helpers.AggregateAttestations(atts) - if err != nil { - return err - } - for _, att := range aggregatedAtts { - if err := s.pool.SaveForkchoiceAttestation(att); err != nil { - return err - } - } - return nil -} - -// This prepares fork choice attestations by running prepareForkChoiceAtts -// every prepareForkchoiceAttsPeriod. -func (s *Service) forkChoiceAtts() { - ticker := time.NewTicker(prepareForkchoiceAttsPeriod) - for { - ctx := context.Background() - select { - case <-ticker.C: - if err := s.prepareForkChoiceAtts(ctx); err != nil { - log.WithError(err).Error("Could not save fork choice attestations") - } - case <-s.ctx.Done(): - log.Debug("Context closed, exiting routine") - return - } - } -} - -// This checks if the attestation has previously been aggregated for fork choice -// return true if yes, false if no. -func seen(att *ethpb.Attestation) (bool, error) { - attRoot, err := hashutil.HashProto(att) - if err != nil { - return false, err - } - if _, ok := forkchoiceProcessedRoots.Get(string(attRoot[:])); ok { - return true, nil - } - forkchoiceProcessedRoots.Set(string(attRoot[:]), true /*value*/, 1 /*cost*/) - - return false, nil -} From 7a1ce3e48802e54a619a0f593b20ea5e5aee5c50 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Mon, 16 Dec 2019 16:56:06 -0800 Subject: [PATCH 17/31] Test to aggregate and save multiple fork choice atts --- .../attestations/prepare_forkchoice_test.go | 45 ++++++++++++++----- .../operations/attestations/service.go | 14 +++--- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go index d49dffb5da41..5e59e4658f74 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice_test.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -13,7 +13,7 @@ import ( ) func Test_AggregateAndSaveForkChoiceAtts_Single(t *testing.T) { - s, err := NewService(context.Background(), &Config{Pool:NewPool()}) + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) if err != nil { t.Fatal(err) } @@ -22,8 +22,8 @@ func Test_AggregateAndSaveForkChoiceAtts_Single(t *testing.T) { sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) atts := []*ethpb.Attestation{ - {AggregationBits:bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, - {AggregationBits:bitfield.Bitlist{0b110}, Signature: sig.Marshal()}} + {AggregationBits: bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b110}, Signature: sig.Marshal()}} if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { t.Fatal(err) } @@ -36,10 +36,10 @@ func Test_AggregateAndSaveForkChoiceAtts_Single(t *testing.T) { if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { t.Error("Did not aggregation and save") } - } +} func Test_AggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { - s, err := NewService(context.Background(), &Config{Pool:NewPool()}) + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) if err != nil { t.Fatal(err) } @@ -47,25 +47,46 @@ func Test_AggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { sk := bls.RandKey() sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) - atts := []*ethpb.Attestation{ - {AggregationBits:bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, - {AggregationBits:bitfield.Bitlist{0b110}, Signature: sig.Marshal()}} - if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { + atts1 := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b110}, Signature: sig.Marshal()}, + } + if err := s.aggregateAndSaveForkChoiceAtts(atts1); err != nil { + t.Fatal(err) + } + atts2 := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b10110}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b11100}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b11000}, Signature: sig.Marshal()}, + } + if err := s.aggregateAndSaveForkChoiceAtts(atts2); err != nil { + t.Fatal(err) + } + att3 := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1100}, Signature: sig.Marshal()}, + } + if err := s.aggregateAndSaveForkChoiceAtts(att3); err != nil { t.Fatal(err) } - wanted, err := helpers.AggregateAttestations(atts) + wanted1, err := helpers.AggregateAttestations(atts1) + if err != nil { + t.Fatal(err) + } + wanted2, err := helpers.AggregateAttestations(atts2) if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { + wanted1 = append(wanted1, wanted2...) + wanted1 = append(wanted1, att3...) + if !reflect.DeepEqual(wanted1, s.pool.ForkchoiceAttestations()) { t.Error("Did not aggregation and save") } } func Test_SeenAttestations_PresentInCache(t *testing.T) { - s, err := NewService(context.Background(), &Config{Pool:NewPool()}) + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) if err != nil { t.Fatal(err) } diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index cd578f769f31..4666f4d1def4 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -10,10 +10,10 @@ var forkChoiceProcessedRootsSize = int64(1 << 16) // logic of attestation pool operations type Service struct { - ctx context.Context - cancel context.CancelFunc - pool Pool - error error + ctx context.Context + cancel context.CancelFunc + pool Pool + error error forkChoiceProcessedRoots *ristretto.Cache } @@ -36,9 +36,9 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { ctx, cancel := context.WithCancel(ctx) return &Service{ - ctx: ctx, - cancel: cancel, - pool: cfg.Pool, + ctx: ctx, + cancel: cancel, + pool: cfg.Pool, forkChoiceProcessedRoots: cache, }, nil } From da50ad31c455cb81194fd7ebeb70196ff89bc4de Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 10:05:20 -0800 Subject: [PATCH 18/31] Tests for BatchAttestations for fork choice --- .../attestations/prepare_forkchoice_test.go | 136 +++++++++++++++++- .../sync/subscriber_beacon_aggregate_proof.go | 2 +- 2 files changed, 134 insertions(+), 4 deletions(-) diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go index 5e59e4658f74..3c5bab05c04b 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice_test.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -3,6 +3,7 @@ package attestations import ( "context" "reflect" + "sort" "testing" "time" @@ -12,7 +13,136 @@ import ( "github.com/prysmaticlabs/prysm/shared/bls" ) -func Test_AggregateAndSaveForkChoiceAtts_Single(t *testing.T) { +func TestBatchAttestations_Multiple(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + unaggregatedAtts := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b101000}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()}, + } + aggregatedAtts := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b111000}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100011}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, + } + blockAtts := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b100101}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b111000}, Signature: sig.Marshal()}, // Duplicated + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100011}, Signature: sig.Marshal()}, // Duplicated + } + for _, att := range unaggregatedAtts { + if err := s.pool.SaveUnaggregatedAttestation(att); err != nil { + t.Fatal(err) + } + } + for _, att := range aggregatedAtts { + if err := s.pool.SaveAggregatedAttestation(att); err != nil { + t.Fatal(err) + } + } + for _, att := range blockAtts { + if err := s.pool.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + } + + if err := s.batchForkChoiceAtts(context.Background()); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[0], aggregatedAtts[0], blockAtts[0]}) + if err != nil { + t.Fatal(err) + } + aggregated, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[1], aggregatedAtts[1], blockAtts[1]}) + if err != nil { + t.Fatal(err) + } + wanted = append(wanted, aggregated...) + aggregated, err = helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[2], aggregatedAtts[2], blockAtts[2]}) + if err != nil { + t.Fatal(err) + } + wanted = append(wanted, aggregated...) + received := s.pool.ForkchoiceAttestations() + + sort.Slice(received, func(i, j int) bool { + return received[i].Data.Slot < received[j].Data.Slot + }) + sort.Slice(wanted, func(i, j int) bool { + return wanted[i].Data.Slot < wanted[j].Data.Slot + }) + + if !reflect.DeepEqual(wanted, received) { + t.Error("Did not aggregation and save for batch") + }} + +func TestBatchAttestations_Single(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + unaggregatedAtts := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b101000}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, + } + aggregatedAtts := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b101100}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, + } + blockAtts := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, // Duplicated + } + for _, att := range unaggregatedAtts { + if err := s.pool.SaveUnaggregatedAttestation(att); err != nil { + t.Fatal(err) + } + } + for _, att := range aggregatedAtts { + if err := s.pool.SaveAggregatedAttestation(att); err != nil { + t.Fatal(err) + } + } + for _, att := range blockAtts { + if err := s.pool.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + } + + if err := s.batchForkChoiceAtts(context.Background()); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations(append(unaggregatedAtts,aggregatedAtts...)) + if err != nil { + t.Fatal(err) + } + + wanted, err = helpers.AggregateAttestations(append(wanted,blockAtts...)) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { + t.Error("Did not aggregation and save for batch") + }} + +func TestAggregateAndSaveForkChoiceAtts_Single(t *testing.T) { s, err := NewService(context.Background(), &Config{Pool: NewPool()}) if err != nil { t.Fatal(err) @@ -38,7 +168,7 @@ func Test_AggregateAndSaveForkChoiceAtts_Single(t *testing.T) { } } -func Test_AggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { +func TestAggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { s, err := NewService(context.Background(), &Config{Pool: NewPool()}) if err != nil { t.Fatal(err) @@ -85,7 +215,7 @@ func Test_AggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { } } -func Test_SeenAttestations_PresentInCache(t *testing.T) { +func TestSeenAttestations_PresentInCache(t *testing.T) { s, err := NewService(context.Background(), &Config{Pool: NewPool()}) if err != nil { t.Fatal(err) diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 09daf6151521..948e0332cc5e 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -10,5 +10,5 @@ import ( // beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the // attestation pool for processing. func (r *RegularSync) beaconAggregateProofSubscriber(ctx context.Context, msg proto.Message) error { - return r.attPool.SaveAggregatedAttestation(msg.(*ethpb.Attestation)) + return r.attPool.SaveUnaggregatedAttestation(msg.(*ethpb.Attestation)) } From a43a2d8e6bf75f1f1d8296246bf8e27957e15c5b Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 10:34:21 -0800 Subject: [PATCH 19/31] Fixed exisiting tests --- .../attestations/prepare_forkchoice_test.go | 19 ++++++++++++------- .../operations/attestations/service.go | 2 +- beacon-chain/rpc/validator/proposer_test.go | 3 ++- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go index 3c5bab05c04b..8fc49056074d 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice_test.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -178,8 +178,8 @@ func TestAggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) atts1 := []*ethpb.Attestation{ - {AggregationBits: bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, - {AggregationBits: bitfield.Bitlist{0b110}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0b110}, Signature: sig.Marshal()}, } if err := s.aggregateAndSaveForkChoiceAtts(atts1); err != nil { t.Fatal(err) @@ -199,18 +199,23 @@ func TestAggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { t.Fatal(err) } - wanted1, err := helpers.AggregateAttestations(atts1) + wanted, err := helpers.AggregateAttestations(atts1) if err != nil { t.Fatal(err) } - wanted2, err := helpers.AggregateAttestations(atts2) + aggregated, err := helpers.AggregateAttestations(atts2) if err != nil { t.Fatal(err) } - wanted1 = append(wanted1, wanted2...) - wanted1 = append(wanted1, att3...) - if !reflect.DeepEqual(wanted1, s.pool.ForkchoiceAttestations()) { + wanted = append(wanted, aggregated...) + wanted = append(wanted, att3...) + + received := s.pool.ForkchoiceAttestations() + sort.Slice(received, func(i, j int) bool { + return received[i].Data.Slot < received[j].Data.Slot + }) + if !reflect.DeepEqual(wanted, received) { t.Error("Did not aggregation and save") } } diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index 4666f4d1def4..b382ed8be46a 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -8,7 +8,7 @@ import ( var forkChoiceProcessedRootsSize = int64(1 << 16) -// logic of attestation pool operations +// Service of attestation pool operations type Service struct { ctx context.Context cancel context.CancelFunc diff --git a/beacon-chain/rpc/validator/proposer_test.go b/beacon-chain/rpc/validator/proposer_test.go index e812ce1b88da..e05fd3a32cb0 100644 --- a/beacon-chain/rpc/validator/proposer_test.go +++ b/beacon-chain/rpc/validator/proposer_test.go @@ -1065,7 +1065,8 @@ func TestFilterAttestation_OK(t *testing.T) { aggBits.SetBitAt(0, true) atts[i] = ðpb.Attestation{Data: ðpb.AttestationData{ CommitteeIndex: uint64(i), - Target: ðpb.Checkpoint{}}, + Target: ðpb.Checkpoint{}, + Source: ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}}, AggregationBits: aggBits, } attestingIndices, _ := helpers.AttestingIndices(state, atts[i].Data, atts[i].AggregationBits) From d6dc2c1c09f6ac9b590d6bfb885de84f30fa835b Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 10:41:08 -0800 Subject: [PATCH 20/31] Minor fixes --- beacon-chain/sync/subscriber_beacon_aggregate_proof.go | 8 +++++++- beacon-chain/sync/subscriber_beacon_blocks.go | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 948e0332cc5e..15d53218d972 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -2,6 +2,7 @@ package sync import ( "context" + "fmt" "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -10,5 +11,10 @@ import ( // beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the // attestation pool for processing. func (r *RegularSync) beaconAggregateProofSubscriber(ctx context.Context, msg proto.Message) error { - return r.attPool.SaveUnaggregatedAttestation(msg.(*ethpb.Attestation)) + a, ok := msg.(*ethpb.AggregateAttestationAndProof) + if !ok { + return fmt.Errorf("message was not type *eth.AggregateAttestationAndProof, type=%T", msg) + } + + return r.attPool.SaveAggregatedAttestation(a.Aggregate) } diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 4a6711a9df82..741a3bf91044 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -54,7 +54,7 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa // Add attestations from the block to the fork choice pool. for _, att := range block.Body.Attestations { - if err := r.attPool.SaveForkchoiceAttestation(att); err != nil { + if err := r.attPool.SaveBlockAttestation(att); err != nil { log.Errorf("Could not save attestation for fork choice: %v", err) return nil } From 3e5dd8cb9422d6169e94c7da6b768660afd273b7 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 10:48:45 -0800 Subject: [PATCH 21/31] Fmt --- .../operations/attestations/prepare_forkchoice_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go index 8fc49056074d..7a8ef103e97b 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice_test.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -84,7 +84,8 @@ func TestBatchAttestations_Multiple(t *testing.T) { if !reflect.DeepEqual(wanted, received) { t.Error("Did not aggregation and save for batch") - }} + } +} func TestBatchAttestations_Single(t *testing.T) { s, err := NewService(context.Background(), &Config{Pool: NewPool()}) @@ -128,19 +129,20 @@ func TestBatchAttestations_Single(t *testing.T) { t.Fatal(err) } - wanted, err := helpers.AggregateAttestations(append(unaggregatedAtts,aggregatedAtts...)) + wanted, err := helpers.AggregateAttestations(append(unaggregatedAtts, aggregatedAtts...)) if err != nil { t.Fatal(err) } - wanted, err = helpers.AggregateAttestations(append(wanted,blockAtts...)) + wanted, err = helpers.AggregateAttestations(append(wanted, blockAtts...)) if err != nil { t.Fatal(err) } if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { t.Error("Did not aggregation and save for batch") - }} + } +} func TestAggregateAndSaveForkChoiceAtts_Single(t *testing.T) { s, err := NewService(context.Background(), &Config{Pool: NewPool()}) From b30bdb6ff126a30cb8e325f860b3babe4c8db8e5 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 14:01:31 -0800 Subject: [PATCH 22/31] Added batch saves --- .../operations/attestations/kv/aggregated.go | 11 +++++ .../operations/attestations/kv/block.go | 11 +++++ .../operations/attestations/kv/forkchoice.go | 11 +++++ .../attestations/kv/unaggregated.go | 11 +++++ beacon-chain/operations/attestations/pool.go | 4 ++ .../attestations/prepare_forkchoice.go | 8 ++-- .../attestations/prepare_forkchoice_test.go | 45 ++++++++----------- beacon-chain/sync/subscriber_beacon_blocks.go | 8 ++-- 8 files changed, 73 insertions(+), 36 deletions(-) diff --git a/beacon-chain/operations/attestations/kv/aggregated.go b/beacon-chain/operations/attestations/kv/aggregated.go index bf790aa1aa95..ddd82ae8a0dd 100644 --- a/beacon-chain/operations/attestations/kv/aggregated.go +++ b/beacon-chain/operations/attestations/kv/aggregated.go @@ -26,6 +26,17 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error { return nil } +// SaveAggregatedAttestations saves a list of aggregated attestation in cache. +func (p *AttCaches) SaveAggregatedAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveAggregatedAttestation(att); err != nil { + return err + } + } + + return nil +} + // AggregatedAttestations returns the aggregated attestations in cache. func (p *AttCaches) AggregatedAttestations() []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.aggregatedAtt.ItemCount()) diff --git a/beacon-chain/operations/attestations/kv/block.go b/beacon-chain/operations/attestations/kv/block.go index 9b72800c8752..dd5638de9d90 100644 --- a/beacon-chain/operations/attestations/kv/block.go +++ b/beacon-chain/operations/attestations/kv/block.go @@ -21,6 +21,17 @@ func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error { return nil } +// SaveBlockAttestations saves a list of block attestation in cache. +func (p *AttCaches) SaveBlockAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveBlockAttestation(att); err != nil { + return err + } + } + + return nil +} + // BlockAttestations returns the block attestations in cache. func (p *AttCaches) BlockAttestations() []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.blockAtt.ItemCount()) diff --git a/beacon-chain/operations/attestations/kv/forkchoice.go b/beacon-chain/operations/attestations/kv/forkchoice.go index 725338bfd1a1..a2c93e73984d 100644 --- a/beacon-chain/operations/attestations/kv/forkchoice.go +++ b/beacon-chain/operations/attestations/kv/forkchoice.go @@ -21,6 +21,17 @@ func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error { return nil } +// SaveForkchoiceAttestations saves a list of forkchoice attestation in cache. +func (p *AttCaches) SaveForkchoiceAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveForkchoiceAttestation(att); err != nil { + return err + } + } + + return nil +} + // ForkchoiceAttestations returns the forkchoice attestations in cache. func (p *AttCaches) ForkchoiceAttestations() []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.forkchoiceAtt.ItemCount()) diff --git a/beacon-chain/operations/attestations/kv/unaggregated.go b/beacon-chain/operations/attestations/kv/unaggregated.go index d1b5d10da9b0..ff0d29df9783 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated.go +++ b/beacon-chain/operations/attestations/kv/unaggregated.go @@ -26,6 +26,17 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error { return nil } +// SaveUnaggregatedAttestations saves a list of unaggregated attestation in cache. +func (p *AttCaches) SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveUnaggregatedAttestation(att); err != nil { + return err + } + } + + return nil +} + // UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache, // filtered by committee index and slot. func (p *AttCaches) UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation { diff --git a/beacon-chain/operations/attestations/pool.go b/beacon-chain/operations/attestations/pool.go index 23923dfa078d..e7118c959950 100644 --- a/beacon-chain/operations/attestations/pool.go +++ b/beacon-chain/operations/attestations/pool.go @@ -12,19 +12,23 @@ import ( type Pool interface { // For Aggregated attestations SaveAggregatedAttestation(att *ethpb.Attestation) error + SaveAggregatedAttestations(atts []*ethpb.Attestation) error AggregatedAttestations() []*ethpb.Attestation DeleteAggregatedAttestation(att *ethpb.Attestation) error // For unaggregated attestations SaveUnaggregatedAttestation(att *ethpb.Attestation) error + SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation UnaggregatedAttestations() []*ethpb.Attestation DeleteUnaggregatedAttestation(att *ethpb.Attestation) error // For attestations that were included in the block SaveBlockAttestation(att *ethpb.Attestation) error + SaveBlockAttestations(atts []*ethpb.Attestation) error BlockAttestations() []*ethpb.Attestation DeleteBlockAttestation(att *ethpb.Attestation) error // For attestations to be passed to fork choice SaveForkchoiceAttestation(att *ethpb.Attestation) error + SaveForkchoiceAttestations(atts []*ethpb.Attestation) error ForkchoiceAttestations() []*ethpb.Attestation DeleteForkchoiceAttestation(att *ethpb.Attestation) error } diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index 5d8feb95e59f..cc43e8fd4b2e 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -77,11 +77,11 @@ func (s *Service) aggregateAndSaveForkChoiceAtts(atts []*ethpb.Attestation) erro if err != nil { return err } - for _, att := range aggregatedAtts { - if err := s.pool.SaveForkchoiceAttestation(att); err != nil { - return err - } + + if err := s.pool.SaveForkchoiceAttestations(aggregatedAtts); err != nil { + return err } + return nil } diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go index 7a8ef103e97b..55a2c4996b9d 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice_test.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -30,29 +30,23 @@ func TestBatchAttestations_Multiple(t *testing.T) { aggregatedAtts := []*ethpb.Attestation{ {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b111000}, Signature: sig.Marshal()}, {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100011}, Signature: sig.Marshal()}, - {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b110001}, Signature: sig.Marshal()}, } blockAtts := []*ethpb.Attestation{ {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, - {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, - {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b100101}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b111000}, Signature: sig.Marshal()}, // Duplicated {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100011}, Signature: sig.Marshal()}, // Duplicated } - for _, att := range unaggregatedAtts { - if err := s.pool.SaveUnaggregatedAttestation(att); err != nil { - t.Fatal(err) - } + if err := s.pool.SaveUnaggregatedAttestations(unaggregatedAtts); err != nil { + t.Fatal(err) } - for _, att := range aggregatedAtts { - if err := s.pool.SaveAggregatedAttestation(att); err != nil { - t.Fatal(err) - } + if err := s.pool.SaveAggregatedAttestations(aggregatedAtts); err != nil { + t.Fatal(err) } - for _, att := range blockAtts { - if err := s.pool.SaveBlockAttestation(att); err != nil { - t.Fatal(err) - } + if err := s.pool.SaveBlockAttestations(blockAtts); err != nil { + t.Fatal(err) } if err := s.batchForkChoiceAtts(context.Background()); err != nil { @@ -72,6 +66,7 @@ func TestBatchAttestations_Multiple(t *testing.T) { if err != nil { t.Fatal(err) } + wanted = append(wanted, aggregated...) received := s.pool.ForkchoiceAttestations() @@ -109,20 +104,16 @@ func TestBatchAttestations_Single(t *testing.T) { {AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()}, {AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, // Duplicated } - for _, att := range unaggregatedAtts { - if err := s.pool.SaveUnaggregatedAttestation(att); err != nil { - t.Fatal(err) - } + if err := s.pool.SaveUnaggregatedAttestations(unaggregatedAtts); err != nil { + t.Fatal(err) } - for _, att := range aggregatedAtts { - if err := s.pool.SaveAggregatedAttestation(att); err != nil { - t.Fatal(err) - } + + if err := s.pool.SaveAggregatedAttestations(aggregatedAtts); err != nil { + t.Fatal(err) } - for _, att := range blockAtts { - if err := s.pool.SaveBlockAttestation(att); err != nil { - t.Fatal(err) - } + + if err := s.pool.SaveBlockAttestations(blockAtts); err != nil { + t.Fatal(err) } if err := s.batchForkChoiceAtts(context.Background()); err != nil { diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 741a3bf91044..bf87dda10d51 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -53,11 +53,9 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa } // Add attestations from the block to the fork choice pool. - for _, att := range block.Body.Attestations { - if err := r.attPool.SaveBlockAttestation(att); err != nil { - log.Errorf("Could not save attestation for fork choice: %v", err) - return nil - } + if err := r.attPool.SaveBlockAttestations(block.Body.Attestations); err != nil { + log.Errorf("Could not save attestation for fork choice: %v", err) + return nil } return err From 3cf5cfbbd3adcab96fa197428300cc42a258fa8f Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 14:03:14 -0800 Subject: [PATCH 23/31] Lint --- beacon-chain/operations/attestations/kv/aggregated.go | 3 +-- beacon-chain/operations/attestations/kv/block.go | 2 +- beacon-chain/operations/attestations/kv/forkchoice.go | 2 +- beacon-chain/operations/attestations/kv/unaggregated.go | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/beacon-chain/operations/attestations/kv/aggregated.go b/beacon-chain/operations/attestations/kv/aggregated.go index ddd82ae8a0dd..09443f8d32fd 100644 --- a/beacon-chain/operations/attestations/kv/aggregated.go +++ b/beacon-chain/operations/attestations/kv/aggregated.go @@ -26,14 +26,13 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error { return nil } -// SaveAggregatedAttestations saves a list of aggregated attestation in cache. +// SaveAggregatedAttestations saves a list of aggregated attestations in cache. func (p *AttCaches) SaveAggregatedAttestations(atts []*ethpb.Attestation) error { for _, att := range atts { if err := p.SaveAggregatedAttestation(att); err != nil { return err } } - return nil } diff --git a/beacon-chain/operations/attestations/kv/block.go b/beacon-chain/operations/attestations/kv/block.go index dd5638de9d90..80d15078edcb 100644 --- a/beacon-chain/operations/attestations/kv/block.go +++ b/beacon-chain/operations/attestations/kv/block.go @@ -21,7 +21,7 @@ func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error { return nil } -// SaveBlockAttestations saves a list of block attestation in cache. +// SaveBlockAttestations saves a list of block attestations in cache. func (p *AttCaches) SaveBlockAttestations(atts []*ethpb.Attestation) error { for _, att := range atts { if err := p.SaveBlockAttestation(att); err != nil { diff --git a/beacon-chain/operations/attestations/kv/forkchoice.go b/beacon-chain/operations/attestations/kv/forkchoice.go index a2c93e73984d..03ccb5acafa9 100644 --- a/beacon-chain/operations/attestations/kv/forkchoice.go +++ b/beacon-chain/operations/attestations/kv/forkchoice.go @@ -21,7 +21,7 @@ func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error { return nil } -// SaveForkchoiceAttestations saves a list of forkchoice attestation in cache. +// SaveForkchoiceAttestations saves a list of forkchoice attestations in cache. func (p *AttCaches) SaveForkchoiceAttestations(atts []*ethpb.Attestation) error { for _, att := range atts { if err := p.SaveForkchoiceAttestation(att); err != nil { diff --git a/beacon-chain/operations/attestations/kv/unaggregated.go b/beacon-chain/operations/attestations/kv/unaggregated.go index ff0d29df9783..b796f1a0f249 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated.go +++ b/beacon-chain/operations/attestations/kv/unaggregated.go @@ -26,7 +26,7 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error { return nil } -// SaveUnaggregatedAttestations saves a list of unaggregated attestation in cache. +// SaveUnaggregatedAttestations saves a list of unaggregated attestations in cache. func (p *AttCaches) SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error { for _, att := range atts { if err := p.SaveUnaggregatedAttestation(att); err != nil { From 477a9d5c9f1a2c508b2e4605eb55e1fd434b3dc8 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 15:21:11 -0800 Subject: [PATCH 24/31] Mo tests yay --- .../operations/attestations/BUILD.bazel | 1 + .../operations/attestations/service_test.go | 31 +++++++++++++++++++ beacon-chain/rpc/validator/proposer_test.go | 26 ++++++++++++++++ beacon-chain/sync/BUILD.bazel | 2 ++ .../subscriber_beacon_aggregate_proof.test.go | 1 + .../subscriber_beacon_aggregate_proof_test.go | 25 +++++++++++++++ 6 files changed, 86 insertions(+) create mode 100644 beacon-chain/operations/attestations/service_test.go create mode 100644 beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go create mode 100644 beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index 0108c0166f84..78f28b8a71aa 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -28,6 +28,7 @@ go_test( srcs = [ "pool_test.go", "prepare_forkchoice_test.go", + "service_test.go", ], embed = [":go_default_library"], deps = [ diff --git a/beacon-chain/operations/attestations/service_test.go b/beacon-chain/operations/attestations/service_test.go new file mode 100644 index 000000000000..7a713a76599b --- /dev/null +++ b/beacon-chain/operations/attestations/service_test.go @@ -0,0 +1,31 @@ +package attestations + +import ( + "context" + "errors" + "testing" +) + +func TestStop_OK(t *testing.T) { + s, err := NewService(context.Background(), &Config{}) + if err != nil { + t.Fatal(err) + } + + if err := s.Stop(); err != nil { + t.Fatalf("Unable to stop attestation pool service: %v", err) + } + + if s.ctx.Err() != context.Canceled { + t.Error("context was not canceled") + } +} + +func TestStatus_Error(t *testing.T) { + err := errors.New("bad bad bad") + s := &Service{error: err} + + if err := s.Status(); err != s.error { + t.Errorf("Wanted: %v, got: %v", s.error, s.Status()) + } +} diff --git a/beacon-chain/rpc/validator/proposer_test.go b/beacon-chain/rpc/validator/proposer_test.go index e05fd3a32cb0..ef7b0f1b7dc9 100644 --- a/beacon-chain/rpc/validator/proposer_test.go +++ b/beacon-chain/rpc/validator/proposer_test.go @@ -1282,3 +1282,29 @@ func TestDeposits_ReturnsEmptyList_IfLatestEth1DataEqGenesisEth1Block(t *testing ) } } + +func TestDeleteAttsInPool_Aggregated(t *testing.T) { + s := &Server{ + AttPool: attestations.NewPool(), + } + + aggregatedAtts := []*ethpb.Attestation{{AggregationBits: bitfield.Bitlist{0b01101}}, {AggregationBits: bitfield.Bitlist{0b0111}}} + unaggregatedAtts := []*ethpb.Attestation{{AggregationBits: bitfield.Bitlist{0b001}}, {AggregationBits: bitfield.Bitlist{0b0001}}} + + if err := s.AttPool.SaveAggregatedAttestations(aggregatedAtts); err != nil { + t.Fatal(err) + } + if err := s.AttPool.SaveUnaggregatedAttestations(unaggregatedAtts); err != nil { + t.Fatal(err) + } + + if err := s.deleteAttsInPool(append(aggregatedAtts, unaggregatedAtts...)); err != nil { + t.Fatal(err) + } + if len(s.AttPool.AggregatedAttestations()) != 0 { + t.Error("Did not delete aggregated attestation") + } + if len(s.AttPool.UnaggregatedAttestations()) != 0 { + t.Error("Did not delete unaggregated attestation") + } +} diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 6457a34d05bb..6efb66c4b5b6 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "service.go", "subscriber.go", "subscriber_beacon_aggregate_proof.go", + "subscriber_beacon_aggregate_proof.test.go", "subscriber_beacon_attestation.go", "subscriber_beacon_blocks.go", "subscriber_committee_index_beacon_attestation.go", @@ -83,6 +84,7 @@ go_test( "rpc_goodbye_test.go", "rpc_status_test.go", "rpc_test.go", + "subscriber_beacon_aggregate_proof_test.go", "subscriber_beacon_blocks_test.go", "subscriber_test.go", "validate_aggregate_proof_test.go", diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go new file mode 100644 index 000000000000..1ca2a85edd97 --- /dev/null +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go @@ -0,0 +1 @@ +package sync diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go new file mode 100644 index 000000000000..6e0fbe3b9d35 --- /dev/null +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go @@ -0,0 +1,25 @@ +package sync + +import ( + "context" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" + "reflect" + "testing" +) + +func TestBeaconAggregateProofSubscriber_CanSave(t *testing.T) { + r := &RegularSync{ + attPool: attestations.NewPool(), + } + + a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100} + if err := r.beaconAggregateProofSubscriber(context.Background(), a); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(r.attPool.AggregatedAttestations(), []*ethpb.Attestation{a.Aggregate}) { + t.Error("Did not save aggregated attestation") + } +} From b8d7d05f6ed9cd7866569526d2819bf9de6f4d14 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 15:21:35 -0800 Subject: [PATCH 25/31] Delete test --- beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go | 1 - 1 file changed, 1 deletion(-) delete mode 100644 beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go deleted file mode 100644 index 1ca2a85edd97..000000000000 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.test.go +++ /dev/null @@ -1 +0,0 @@ -package sync From 1eb0aa0ca911fac209d544b2be4e9430104f3d40 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 15:26:31 -0800 Subject: [PATCH 26/31] Fmt --- beacon-chain/sync/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 6efb66c4b5b6..0925b9f73734 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -19,7 +19,6 @@ go_library( "service.go", "subscriber.go", "subscriber_beacon_aggregate_proof.go", - "subscriber_beacon_aggregate_proof.test.go", "subscriber_beacon_attestation.go", "subscriber_beacon_blocks.go", "subscriber_committee_index_beacon_attestation.go", From daeac54bd20ec3cd7c64409df0d912fd7107e724 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Tue, 17 Dec 2019 16:38:52 -0800 Subject: [PATCH 27/31] Update interval --- beacon-chain/operations/attestations/prepare_forkchoice.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index cc43e8fd4b2e..72203d885e8b 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -13,7 +13,7 @@ import ( ) // prepare attestations for fork choice at every half of the slot. -var prepareForkChoiceAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second +var prepareForkChoiceAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/3) * time.Second // This prepares fork choice attestations by running batchForkChoiceAtts // every prepareForkChoiceAttsPeriod. From a7fc53fe6719822e3acf29f6d8dffa23a2c5f9b1 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Wed, 18 Dec 2019 08:17:53 -0800 Subject: [PATCH 28/31] Fixed aggregation broadcast --- beacon-chain/rpc/aggregator/server.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/beacon-chain/rpc/aggregator/server.go b/beacon-chain/rpc/aggregator/server.go index ade4f8d5e037..e389b593dab1 100644 --- a/beacon-chain/rpc/aggregator/server.go +++ b/beacon-chain/rpc/aggregator/server.go @@ -99,12 +99,16 @@ func (as *Server) SubmitAggregateAndProof(ctx context.Context, req *pb.Aggregati if err != nil { return nil, status.Errorf(codes.Internal, "Could not aggregate attestations: %v", err) } - for _, att := range aggregatedAtts { - if helpers.IsAggregated(att) { - if err := as.P2p.Broadcast(ctx, att); err != nil { + for _, aggregatedAtt := range aggregatedAtts { + if helpers.IsAggregated(aggregatedAtt) { + if err := as.P2p.Broadcast(ctx, ðpb.AggregateAttestationAndProof{ + AggregatorIndex: validatorIndex, + SelectionProof: req.SlotSignature, + Aggregate: aggregatedAtt, + }); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast aggregated attestation: %v", err) } - if err := as.AttPool.SaveAggregatedAttestation(att); err != nil { + if err := as.AttPool.SaveAggregatedAttestation(aggregatedAtt); err != nil { return nil, status.Errorf(codes.Internal, "Could not save aggregated attestation: %v", err) } } From 1abdbe9e46d82b6db816cca91b451f87432db444 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Wed, 18 Dec 2019 12:41:09 -0800 Subject: [PATCH 29/31] Clean up based on design review comment --- beacon-chain/blockchain/receive_attestation.go | 7 ++++--- beacon-chain/blockchain/receive_block.go | 6 ++++++ beacon-chain/node/node.go | 18 +++++++++--------- .../attestations/prepare_forkchoice.go | 6 ++++++ beacon-chain/rpc/validator/proposer.go | 16 +++++++--------- beacon-chain/sync/subscriber_beacon_blocks.go | 6 ------ 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 7cdfb7a1309e..8a1e12746c5c 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -64,14 +64,15 @@ func (s *Service) processAttestation() { atts := s.attPool.ForkchoiceAttestations() for _, a := range atts { + if err := s.attPool.DeleteForkchoiceAttestation(a); err != nil { + log.WithError(err).Error("Could not delete fork choice attestation in pool") + } + if err := s.ReceiveAttestationNoPubsub(ctx, a); err != nil { log.WithFields(logrus.Fields{ "targetRoot": fmt.Sprintf("%#x", a.Data.Target.Root), }).WithError(err).Error("Could not receive attestation in chain service") } - if err := s.attPool.DeleteForkchoiceAttestation(a); err != nil { - log.WithError(err).Error("Could not delete fork choice attestation in pool") - } } case <-s.ctx.Done(): log.Debug("Context closed, exiting routine") diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 75d556ea468e..0d3b33f6f327 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -109,6 +109,12 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.BeaconB }, }) + // Add attestations from the block to the pool for fork choice. + if err := s.attPool.SaveBlockAttestations(block.Body.Attestations); err != nil { + log.Errorf("Could not save attestation for fork choice: %v", err) + return nil + } + // Reports on block and fork choice metrics. s.reportSlotMetrics(blockCopy.Slot) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 92b77ddcaed1..43c1a81f0f95 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -80,15 +80,6 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { flags.ConfigureGlobalFlags(ctx) registry := shared.NewServiceRegistry() - beacon := &BeaconNode{ - ctx: ctx, - services: registry, - stop: make(chan struct{}), - stateFeed: new(event.Feed), - opFeed: new(event.Feed), - attestationPool: attestations.NewPool(), - } - // Use custom config values if the --no-custom-config flag is not set. if !ctx.GlobalBool(flags.NoCustomConfigFlag.Name) { if featureconfig.Get().MinimalConfig { @@ -104,6 +95,15 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { } } + beacon := &BeaconNode{ + ctx: ctx, + services: registry, + stop: make(chan struct{}), + stateFeed: new(event.Feed), + opFeed: new(event.Feed), + attestationPool: attestations.NewPool(), + } + if err := beacon.startDB(ctx); err != nil { return nil, err } diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index 72203d885e8b..0f7d3ac34dc7 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -67,6 +67,12 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error { } } + for _, a := range s.pool.BlockAttestations() { + if err := s.pool.DeleteBlockAttestation(a); err != nil { + return err + } + } + return nil } diff --git a/beacon-chain/rpc/validator/proposer.go b/beacon-chain/rpc/validator/proposer.go index f03e08c93f34..9dc908ecb4f0 100644 --- a/beacon-chain/rpc/validator/proposer.go +++ b/beacon-chain/rpc/validator/proposer.go @@ -338,6 +338,7 @@ func (vs *Server) filterAttestationsForBlockInclusion(ctx context.Context, slot defer span.End() validAtts := make([]*ethpb.Attestation, 0, len(atts)) + inValidAtts := make([]*ethpb.Attestation, 0, len(atts)) bState, err := vs.BeaconDB.HeadState(ctx) if err != nil { @@ -358,20 +359,17 @@ func (vs *Server) filterAttestationsForBlockInclusion(ctx context.Context, slot } if _, err := blocks.ProcessAttestation(ctx, bState, att); err != nil { - if helpers.IsAggregated(att) { - if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil { - return nil, err - } - } else { - if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil { - return nil, err - } - } + inValidAtts = append(inValidAtts, att) continue + } validAtts = append(validAtts, att) } + if err := vs.deleteAttsInPool(inValidAtts); err != nil { + return nil, err + } + return validAtts, nil } diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index bf87dda10d51..843285e72ba9 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -52,12 +52,6 @@ func (r *RegularSync) beaconBlockSubscriber(ctx context.Context, msg proto.Messa return nil } - // Add attestations from the block to the fork choice pool. - if err := r.attPool.SaveBlockAttestations(block.Body.Attestations); err != nil { - log.Errorf("Could not save attestation for fork choice: %v", err) - return nil - } - return err } From 13e25413086ece6c348ce4a6ad933b0bfb274adf Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Wed, 18 Dec 2019 13:23:57 -0800 Subject: [PATCH 30/31] Fixed setupBeaconChain --- beacon-chain/blockchain/service_test.go | 2 ++ beacon-chain/rpc/aggregator/server.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index 639b257a66bf..931660a7aa42 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "io/ioutil" "reflect" "testing" @@ -121,6 +122,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { OpsPoolService: &ops.Operations{}, P2p: &mockBroadcaster{}, StateNotifier: &mockBeaconNode{}, + AttPool: attestations.NewPool(), } if err != nil { t.Fatalf("could not register blockchain service: %v", err) diff --git a/beacon-chain/rpc/aggregator/server.go b/beacon-chain/rpc/aggregator/server.go index e389b593dab1..85088ef13019 100644 --- a/beacon-chain/rpc/aggregator/server.go +++ b/beacon-chain/rpc/aggregator/server.go @@ -103,8 +103,8 @@ func (as *Server) SubmitAggregateAndProof(ctx context.Context, req *pb.Aggregati if helpers.IsAggregated(aggregatedAtt) { if err := as.P2p.Broadcast(ctx, ðpb.AggregateAttestationAndProof{ AggregatorIndex: validatorIndex, - SelectionProof: req.SlotSignature, - Aggregate: aggregatedAtt, + SelectionProof: req.SlotSignature, + Aggregate: aggregatedAtt, }); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast aggregated attestation: %v", err) } From ba217b5127a9e9d23640048264189626804860d3 Mon Sep 17 00:00:00 2001 From: TerenceTsao Date: Fri, 20 Dec 2019 08:54:37 -0800 Subject: [PATCH 31/31] Raul's feedback. s/error/err --- beacon-chain/operations/attestations/service.go | 8 ++++---- beacon-chain/operations/attestations/service_test.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index b382ed8be46a..08b7015c00b0 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -13,7 +13,7 @@ type Service struct { ctx context.Context cancel context.CancelFunc pool Pool - error error + err error forkChoiceProcessedRoots *ristretto.Cache } @@ -55,10 +55,10 @@ func (s *Service) Stop() error { return nil } -// Status returns the current service error if there's any. +// Status returns the current service err if there's any. func (s *Service) Status() error { - if s.error != nil { - return s.error + if s.err != nil { + return s.err } return nil } diff --git a/beacon-chain/operations/attestations/service_test.go b/beacon-chain/operations/attestations/service_test.go index 7a713a76599b..e2ffb263403f 100644 --- a/beacon-chain/operations/attestations/service_test.go +++ b/beacon-chain/operations/attestations/service_test.go @@ -23,9 +23,9 @@ func TestStop_OK(t *testing.T) { func TestStatus_Error(t *testing.T) { err := errors.New("bad bad bad") - s := &Service{error: err} + s := &Service{err: err} - if err := s.Status(); err != s.error { - t.Errorf("Wanted: %v, got: %v", s.error, s.Status()) + if err := s.Status(); err != s.err { + t.Errorf("Wanted: %v, got: %v", s.err, s.Status()) } }