Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tests to sentinel (FINALLY!) #9066

Merged
merged 7 commits into from
Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions cl/antiquary/state_antiquary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package antiquary
import (
"context"
_ "embed"
"fmt"
"testing"

"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand All @@ -20,7 +19,7 @@ import (

func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postState *state.CachingBeaconState) {
db := memdb.NewTestDB(t)
reader := tests.LoadChain(blocks, db)
reader, _ := tests.LoadChain(blocks, db)

ctx := context.Background()
vt := state_accessors.NewStaticValidatorTable()
Expand All @@ -31,21 +30,16 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt
}

func TestStateAntiquaryCapella(t *testing.T) {
t.Skip()
t.Skip()
blocks, preState, postState := tests.GetCapellaRandom()
runTest(t, blocks, preState, postState)
}

func TestStateAntiquaryBellatrix(t *testing.T) {
t.Skip()
blocks, preState, postState := tests.GetBellatrixRandom()
fmt.Println(len(blocks))
runTest(t, blocks, preState, postState)
}

func TestStateAntiquaryPhase0(t *testing.T) {
t.Skip()
blocks, preState, postState := tests.GetPhase0Random()
runTest(t, blocks, preState, postState)
}
13 changes: 9 additions & 4 deletions cl/antiquary/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/spf13/afero"
)

//go:embed test_data/capella/blocks_0.ssz_snappy
Expand Down Expand Up @@ -67,18 +69,21 @@ func (m *MockBlockReader) FrozenSlots() uint64 {
panic("implement me")
}

func LoadChain(blocks []*cltypes.SignedBeaconBlock, db kv.RwDB) *MockBlockReader {
func LoadChain(blocks []*cltypes.SignedBeaconBlock, db kv.RwDB) (*MockBlockReader, afero.Fs) {
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
tx, err := db.BeginRw(context.Background())
if err != nil {
panic(err)
}
defer tx.Rollback()
fs := afero.NewMemMapFs()
bs := persistence.NewAferoRawBlockSaver(fs, &clparams.MainnetBeaconConfig)
source := persistence.NewBeaconChainDatabaseFilesystem(bs, nil, &clparams.MainnetBeaconConfig)

m := NewMockBlockReader()
for _, block := range blocks {
m.u[block.Block.Slot] = block
h := block.SignedBeaconBlockHeader()
if err := beacon_indicies.WriteBeaconBlockHeaderAndIndicies(context.Background(), tx, h, true); err != nil {

if err := source.WriteBlock(context.Background(), tx, block, true); err != nil {
panic(err)
}
if err := beacon_indicies.WriteHighestFinalized(tx, block.Block.Slot+64); err != nil {
Expand All @@ -88,7 +93,7 @@ func LoadChain(blocks []*cltypes.SignedBeaconBlock, db kv.RwDB) *MockBlockReader
if err := tx.Commit(); err != nil {
panic(err)
}
return m
return m, fs
}

func GetCapellaRandom() ([]*cltypes.SignedBeaconBlock, *state.CachingBeaconState, *state.CachingBeaconState) {
Expand Down
1 change: 1 addition & 0 deletions cl/clparams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ func toBytes4(in []byte) (ret [4]byte) {
func configForkSchedule(b *BeaconChainConfig) map[libcommon.Bytes4]uint64 {
fvs := map[libcommon.Bytes4]uint64{}
fvs[utils.Uint32ToBytes4(b.GenesisForkVersion)] = 0
fmt.Println(b.AltairForkEpoch)
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
fvs[utils.Uint32ToBytes4(b.AltairForkVersion)] = b.AltairForkEpoch
fvs[utils.Uint32ToBytes4(b.BellatrixForkVersion)] = b.BellatrixForkEpoch
fvs[utils.Uint32ToBytes4(b.CapellaForkVersion)] = b.CapellaForkEpoch
Expand Down
15 changes: 10 additions & 5 deletions cl/persistence/beacon_indicies/indicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,16 @@ func PruneBlockRoots(ctx context.Context, tx kv.RwTx, fromSlot, toSlot uint64) e
func ReadBeaconBlockRootsInSlotRange(ctx context.Context, tx kv.Tx, fromSlot, count uint64) ([]libcommon.Hash, []uint64, error) {
blockRoots := make([]libcommon.Hash, 0, count)
slots := make([]uint64, 0, count)
err := RangeBlockRoots(ctx, tx, fromSlot, fromSlot+count, func(slot uint64, beaconBlockRoot libcommon.Hash) bool {
blockRoots = append(blockRoots, beaconBlockRoot)
slots = append(slots, slot)
return true
})
cursor, err := tx.Cursor(kv.CanonicalBlockRoots)
if err != nil {
return nil, nil, err
}
currentCount := uint64(0)
for k, v, err := cursor.Seek(base_encoding.Encode64ToBytes4(fromSlot)); err == nil && k != nil && currentCount != count; k, v, err = cursor.Next() {
currentCount++
blockRoots = append(blockRoots, libcommon.BytesToHash(v))
slots = append(slots, base_encoding.Decode64FromBytes4(k))
}
return blockRoots, slots, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postState *state.CachingBeaconState) {
db := memdb.NewTestDB(t)
reader := tests.LoadChain(blocks, db)
reader, _ := tests.LoadChain(blocks, db)

ctx := context.Background()
vt := state_accessors.NewStaticValidatorTable()
Expand Down
2 changes: 2 additions & 0 deletions cl/sentinel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type SentinelConfig struct {
NoDiscovery bool
TmpDir string
LocalDiscovery bool

EnableBlocks bool
}

func convertToCryptoPrivkey(privkey *ecdsa.PrivateKey) (crypto.PrivKey, error) {
Expand Down
47 changes: 46 additions & 1 deletion cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,55 @@ func (s *GossipManager) unsubscribe(topic string) {
if _, ok := s.subscriptions[topic]; !ok {
return
}
s.subscriptions[topic].Close()
sub := s.subscriptions[topic]
go func() {
timer := time.NewTimer(time.Hour)
ctx := sub.ctx
select {
case <-ctx.Done():
sub.Close()
case <-timer.C:
sub.Close()
}
}()
delete(s.subscriptions, topic)
}

func (s *Sentinel) forkWatcher() {
prevDigest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
return
}
fmt.Println("A")
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
iterationInterval := time.NewTicker(30 * time.Millisecond)
for {
select {
case <-s.ctx.Done():
return
case <-iterationInterval.C:
digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
log.Error("[Gossip] Failed to calculate fork choice", "err", err)
return
}
if prevDigest != digest {
subs := s.subManager.subscriptions
for path, sub := range subs {
s.subManager.unsubscribe(path)
newSub, err := s.SubscribeGossip(sub.gossip_topic)
if err != nil {
log.Error("[Gossip] Failed to resubscribe to topic", "err", err)
return
}
newSub.Listen()
}
prevDigest = digest
}
}
}
}

func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) {
digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig)
if err != nil {
Expand Down
70 changes: 28 additions & 42 deletions cl/sentinel/handlers/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package handlers

import (
"errors"
"io"

libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -42,50 +41,44 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error {
return err
}

if req.Step != 1 {
return errors.New("step must be 1")
}

tx, err := c.indiciesDB.BeginRo(c.ctx)
if err != nil {
return err
}
defer tx.Rollback()

// Limit the number of blocks to the count specified in the request.
if int(req.Count) > MAX_REQUEST_BLOCKS {
req.Count = MAX_REQUEST_BLOCKS
}

beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count-1)
beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count)
if err != nil {
return err
}

if len(beaconBlockRooots) == 0 || len(slots) == 0 {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
}
// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

resourceAvaiable := false
for i, slot := range slots {
r, err := c.beaconDB.BlockReader(c.ctx, slot, beaconBlockRooots[i])
if err != nil {
return err
}
defer r.Close()

if !resourceAvaiable {
if _, err := s.Write([]byte{1}); err != nil {
return err
}
resourceAvaiable = true
version := c.beaconConfig.GetCurrentStateVersion(slot / c.beaconConfig.SlotsPerEpoch)
// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GetForkVersionByVersion(version)),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

if _, err := s.Write([]byte{0}); err != nil {
return err
}

if _, err := s.Write(forkDigest[:]); err != nil {
Expand All @@ -96,9 +89,7 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error {
return err
}
}
if !resourceAvaiable {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
}

return nil
}

Expand Down Expand Up @@ -132,16 +123,6 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
}
defer tx.Rollback()

// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

resourceAvaiable := false
for i, blockRoot := range blockRoots {
slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, blockRoot)
if slot == nil {
Expand All @@ -157,11 +138,18 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
}
defer r.Close()

if !resourceAvaiable {
if _, err := s.Write([]byte{1}); err != nil {
return err
}
resourceAvaiable = true
if _, err := s.Write([]byte{0}); err != nil {
return err
}

version := c.beaconConfig.GetCurrentStateVersion(*slot / c.beaconConfig.SlotsPerEpoch)
// Read the fork digest
forkDigest, err := fork.ComputeForkDigestForVersion(
utils.Uint32ToBytes4(c.beaconConfig.GetForkVersionByVersion(version)),
c.genesisConfig.GenesisValidatorRoot,
)
if err != nil {
return err
}

if _, err := s.Write(forkDigest[:]); err != nil {
Expand All @@ -178,9 +166,7 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
return err
}
}
if !resourceAvaiable {
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
}

return nil
}

Expand Down
5 changes: 3 additions & 2 deletions cl/sentinel/handlers/blocks_by_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestBlocksByRootHandler(t *testing.T) {
}

reqData := libcommon.CopyBytes(reqBuf.Bytes())
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV1))
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV2))
require.NoError(t, err)

_, err = stream.Write(reqData)
Expand All @@ -86,7 +86,7 @@ func TestBlocksByRootHandler(t *testing.T) {
firstByte := make([]byte, 1)
_, err = stream.Read(firstByte)
require.NoError(t, err)
require.Equal(t, firstByte[0], byte(1))
require.Equal(t, firstByte[0], byte(0))

for i := 0; i < int(count); i++ {
forkDigest := make([]byte, 4)
Expand Down Expand Up @@ -133,6 +133,7 @@ func TestBlocksByRootHandler(t *testing.T) {
require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot)
require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex)
require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber)
stream.Read(make([]byte, 1))
}

_, err = stream.Read(make([]byte, 1))
Expand Down
5 changes: 3 additions & 2 deletions cl/sentinel/handlers/blocks_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBlocksByRangeHandler(t *testing.T) {
}

reqData := libcommon.CopyBytes(reqBuf.Bytes())
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV1))
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV2))
require.NoError(t, err)

_, err = stream.Write(reqData)
Expand All @@ -89,7 +89,7 @@ func TestBlocksByRangeHandler(t *testing.T) {
firstByte := make([]byte, 1)
_, err = stream.Read(firstByte)
require.NoError(t, err)
require.Equal(t, firstByte[0], byte(1))
require.Equal(t, firstByte[0], byte(0))

for i := 0; i < len(blockRoots); i++ {
forkDigest := make([]byte, 4)
Expand Down Expand Up @@ -132,6 +132,7 @@ func TestBlocksByRangeHandler(t *testing.T) {
require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot)
require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex)
require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber)
stream.Read(make([]byte, 1))
}

_, err = stream.Read(make([]byte, 1))
Expand Down
Loading
Loading