diff --git a/cl/antiquary/state_antiquary_test.go b/cl/antiquary/state_antiquary_test.go index 8f73fbdc663..352deae3fd7 100644 --- a/cl/antiquary/state_antiquary_test.go +++ b/cl/antiquary/state_antiquary_test.go @@ -3,7 +3,6 @@ package antiquary import ( "context" _ "embed" - "fmt" "testing" "github.com/ledgerwatch/erigon-lib/common/datadir" @@ -20,7 +19,7 @@ import ( func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postState *state.CachingBeaconState) { db := memdb.NewTestDB(t) - reader := tests.LoadChain(blocks, db) + reader, _ := tests.LoadChain(blocks, db, t) ctx := context.Background() vt := state_accessors.NewStaticValidatorTable() @@ -31,21 +30,16 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt } func TestStateAntiquaryCapella(t *testing.T) { - t.Skip() - t.Skip() blocks, preState, postState := tests.GetCapellaRandom() runTest(t, blocks, preState, postState) } func TestStateAntiquaryBellatrix(t *testing.T) { - t.Skip() blocks, preState, postState := tests.GetBellatrixRandom() - fmt.Println(len(blocks)) runTest(t, blocks, preState, postState) } func TestStateAntiquaryPhase0(t *testing.T) { - t.Skip() blocks, preState, postState := tests.GetPhase0Random() runTest(t, blocks, preState, postState) } diff --git a/cl/antiquary/tests/tests.go b/cl/antiquary/tests/tests.go index 1596e16d05b..47bb2848dc6 100644 --- a/cl/antiquary/tests/tests.go +++ b/cl/antiquary/tests/tests.go @@ -5,14 +5,18 @@ import ( "embed" _ "embed" "strconv" + "testing" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/persistence" "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/utils" + "github.com/spf13/afero" + "github.com/stretchr/testify/require" ) //go:embed test_data/capella/blocks_0.ssz_snappy @@ -67,28 +71,24 @@ func (m *MockBlockReader) FrozenSlots() uint64 { panic("implement me") } -func LoadChain(blocks []*cltypes.SignedBeaconBlock, db kv.RwDB) *MockBlockReader { +func LoadChain(blocks []*cltypes.SignedBeaconBlock, db kv.RwDB, t *testing.T) (*MockBlockReader, afero.Fs) { tx, err := db.BeginRw(context.Background()) - if err != nil { - panic(err) - } + require.NoError(t, err) defer tx.Rollback() + fs := afero.NewMemMapFs() + bs := persistence.NewAferoRawBlockSaver(fs, &clparams.MainnetBeaconConfig) + source := persistence.NewBeaconChainDatabaseFilesystem(bs, nil, &clparams.MainnetBeaconConfig) m := NewMockBlockReader() for _, block := range blocks { m.u[block.Block.Slot] = block - h := block.SignedBeaconBlockHeader() - if err := beacon_indicies.WriteBeaconBlockHeaderAndIndicies(context.Background(), tx, h, true); err != nil { - panic(err) - } - if err := beacon_indicies.WriteHighestFinalized(tx, block.Block.Slot+64); err != nil { - panic(err) - } - } - if err := tx.Commit(); err != nil { - panic(err) + + require.NoError(t, source.WriteBlock(context.Background(), tx, block, true)) + require.NoError(t, beacon_indicies.WriteHighestFinalized(tx, block.Block.Slot+64)) } - return m + + require.NoError(t, tx.Commit()) + return m, fs } func GetCapellaRandom() ([]*cltypes.SignedBeaconBlock, *state.CachingBeaconState, *state.CachingBeaconState) { diff --git a/cl/persistence/beacon_indicies/indicies.go b/cl/persistence/beacon_indicies/indicies.go index a58b0de8c96..d485b121b70 100644 --- a/cl/persistence/beacon_indicies/indicies.go +++ b/cl/persistence/beacon_indicies/indicies.go @@ -247,11 +247,16 @@ func PruneBlockRoots(ctx context.Context, tx kv.RwTx, fromSlot, toSlot uint64) e func ReadBeaconBlockRootsInSlotRange(ctx context.Context, tx kv.Tx, fromSlot, count uint64) ([]libcommon.Hash, []uint64, error) { blockRoots := make([]libcommon.Hash, 0, count) slots := make([]uint64, 0, count) - err := RangeBlockRoots(ctx, tx, fromSlot, fromSlot+count, func(slot uint64, beaconBlockRoot libcommon.Hash) bool { - blockRoots = append(blockRoots, beaconBlockRoot) - slots = append(slots, slot) - return true - }) + cursor, err := tx.Cursor(kv.CanonicalBlockRoots) + if err != nil { + return nil, nil, err + } + currentCount := uint64(0) + for k, v, err := cursor.Seek(base_encoding.Encode64ToBytes4(fromSlot)); err == nil && k != nil && currentCount != count; k, v, err = cursor.Next() { + currentCount++ + blockRoots = append(blockRoots, libcommon.BytesToHash(v)) + slots = append(slots, base_encoding.Decode64FromBytes4(k)) + } return blockRoots, slots, err } diff --git a/cl/persistence/format/snapshot_format/getters/execution_snapshot.go b/cl/persistence/format/snapshot_format/getters/execution_snapshot.go index e66078d53e3..60d6e92179d 100644 --- a/cl/persistence/format/snapshot_format/getters/execution_snapshot.go +++ b/cl/persistence/format/snapshot_format/getters/execution_snapshot.go @@ -96,9 +96,6 @@ func (r *ExecutionSnapshotReader) Withdrawals(number uint64, hash libcommon.Hash } ret := solid.NewStaticListSSZ[*cltypes.Withdrawal](int(r.beaconCfg.MaxWithdrawalsPerPayload), 44) for _, w := range body.Withdrawals { - if w.Index == 0 { - return nil, fmt.Errorf("withdrawal index is zero") - } ret.Append(&cltypes.Withdrawal{ Index: w.Index, Validator: w.Validator, diff --git a/cl/persistence/state/historical_states_reader/historical_states_reader_test.go b/cl/persistence/state/historical_states_reader/historical_states_reader_test.go index aa605c29bc7..c3e64b3ed11 100644 --- a/cl/persistence/state/historical_states_reader/historical_states_reader_test.go +++ b/cl/persistence/state/historical_states_reader/historical_states_reader_test.go @@ -21,7 +21,7 @@ import ( func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postState *state.CachingBeaconState) { db := memdb.NewTestDB(t) - reader := tests.LoadChain(blocks, db) + reader, _ := tests.LoadChain(blocks, db, t) ctx := context.Background() vt := state_accessors.NewStaticValidatorTable() diff --git a/cl/sentinel/config.go b/cl/sentinel/config.go index c27e8b4e220..3157e1a96c3 100644 --- a/cl/sentinel/config.go +++ b/cl/sentinel/config.go @@ -44,6 +44,8 @@ type SentinelConfig struct { NoDiscovery bool TmpDir string LocalDiscovery bool + + EnableBlocks bool } func convertToCryptoPrivkey(privkey *ecdsa.PrivateKey) (crypto.PrivKey, error) { diff --git a/cl/sentinel/gossip.go b/cl/sentinel/gossip.go index 38af103b890..cf1602e2d71 100644 --- a/cl/sentinel/gossip.go +++ b/cl/sentinel/gossip.go @@ -137,10 +137,54 @@ func (s *GossipManager) unsubscribe(topic string) { if _, ok := s.subscriptions[topic]; !ok { return } - s.subscriptions[topic].Close() + sub := s.subscriptions[topic] + go func() { + timer := time.NewTimer(time.Hour) + ctx := sub.ctx + select { + case <-ctx.Done(): + sub.Close() + case <-timer.C: + sub.Close() + } + }() delete(s.subscriptions, topic) } +func (s *Sentinel) forkWatcher() { + prevDigest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig) + if err != nil { + log.Error("[Gossip] Failed to calculate fork choice", "err", err) + return + } + iterationInterval := time.NewTicker(30 * time.Millisecond) + for { + select { + case <-s.ctx.Done(): + return + case <-iterationInterval.C: + digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig) + if err != nil { + log.Error("[Gossip] Failed to calculate fork choice", "err", err) + return + } + if prevDigest != digest { + subs := s.subManager.subscriptions + for path, sub := range subs { + s.subManager.unsubscribe(path) + newSub, err := s.SubscribeGossip(sub.gossip_topic) + if err != nil { + log.Error("[Gossip] Failed to resubscribe to topic", "err", err) + return + } + newSub.Listen() + } + prevDigest = digest + } + } + } +} + func (s *Sentinel) SubscribeGossip(topic GossipTopic, opts ...pubsub.TopicOpt) (sub *GossipSubscription, err error) { digest, err := fork.ComputeForkDigest(s.cfg.BeaconConfig, s.cfg.GenesisConfig) if err != nil { diff --git a/cl/sentinel/handlers/blocks.go b/cl/sentinel/handlers/blocks.go index c202a7eceec..d3b09c28c79 100644 --- a/cl/sentinel/handlers/blocks.go +++ b/cl/sentinel/handlers/blocks.go @@ -14,7 +14,6 @@ package handlers import ( - "errors" "io" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -42,38 +41,25 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error { return err } - if req.Step != 1 { - return errors.New("step must be 1") - } - tx, err := c.indiciesDB.BeginRo(c.ctx) if err != nil { return err } defer tx.Rollback() - // Limit the number of blocks to the count specified in the request. if int(req.Count) > MAX_REQUEST_BLOCKS { req.Count = MAX_REQUEST_BLOCKS } - beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count-1) + beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count) if err != nil { return err } + if len(beaconBlockRooots) == 0 || len(slots) == 0 { return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix) } - // Read the fork digest - forkDigest, err := fork.ComputeForkDigestForVersion( - utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion), - c.genesisConfig.GenesisValidatorRoot, - ) - if err != nil { - return err - } - resourceAvaiable := false for i, slot := range slots { r, err := c.beaconDB.BlockReader(c.ctx, slot, beaconBlockRooots[i]) if err != nil { @@ -81,11 +67,18 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error { } defer r.Close() - if !resourceAvaiable { - if _, err := s.Write([]byte{1}); err != nil { - return err - } - resourceAvaiable = true + version := c.beaconConfig.GetCurrentStateVersion(slot / c.beaconConfig.SlotsPerEpoch) + // Read the fork digest + forkDigest, err := fork.ComputeForkDigestForVersion( + utils.Uint32ToBytes4(c.beaconConfig.GetForkVersionByVersion(version)), + c.genesisConfig.GenesisValidatorRoot, + ) + if err != nil { + return err + } + + if _, err := s.Write([]byte{0}); err != nil { + return err } if _, err := s.Write(forkDigest[:]); err != nil { @@ -96,9 +89,7 @@ func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error { return err } } - if !resourceAvaiable { - return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix) - } + return nil } @@ -132,16 +123,6 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error { } defer tx.Rollback() - // Read the fork digest - forkDigest, err := fork.ComputeForkDigestForVersion( - utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion), - c.genesisConfig.GenesisValidatorRoot, - ) - if err != nil { - return err - } - - resourceAvaiable := false for i, blockRoot := range blockRoots { slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, blockRoot) if slot == nil { @@ -157,11 +138,18 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error { } defer r.Close() - if !resourceAvaiable { - if _, err := s.Write([]byte{1}); err != nil { - return err - } - resourceAvaiable = true + if _, err := s.Write([]byte{0}); err != nil { + return err + } + + version := c.beaconConfig.GetCurrentStateVersion(*slot / c.beaconConfig.SlotsPerEpoch) + // Read the fork digest + forkDigest, err := fork.ComputeForkDigestForVersion( + utils.Uint32ToBytes4(c.beaconConfig.GetForkVersionByVersion(version)), + c.genesisConfig.GenesisValidatorRoot, + ) + if err != nil { + return err } if _, err := s.Write(forkDigest[:]); err != nil { @@ -178,9 +166,7 @@ func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error { return err } } - if !resourceAvaiable { - return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix) - } + return nil } diff --git a/cl/sentinel/handlers/blocks_by_range_test.go b/cl/sentinel/handlers/blocks_by_range_test.go index 148c0052c27..84500364c21 100644 --- a/cl/sentinel/handlers/blocks_by_range_test.go +++ b/cl/sentinel/handlers/blocks_by_range_test.go @@ -77,7 +77,7 @@ func TestBlocksByRootHandler(t *testing.T) { } reqData := libcommon.CopyBytes(reqBuf.Bytes()) - stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV1)) + stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV2)) require.NoError(t, err) _, err = stream.Write(reqData) @@ -86,7 +86,7 @@ func TestBlocksByRootHandler(t *testing.T) { firstByte := make([]byte, 1) _, err = stream.Read(firstByte) require.NoError(t, err) - require.Equal(t, firstByte[0], byte(1)) + require.Equal(t, firstByte[0], byte(0)) for i := 0; i < int(count); i++ { forkDigest := make([]byte, 4) @@ -133,6 +133,7 @@ func TestBlocksByRootHandler(t *testing.T) { require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot) require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex) require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber) + stream.Read(make([]byte, 1)) } _, err = stream.Read(make([]byte, 1)) diff --git a/cl/sentinel/handlers/blocks_by_root_test.go b/cl/sentinel/handlers/blocks_by_root_test.go index 7c2c3627e53..b917644b145 100644 --- a/cl/sentinel/handlers/blocks_by_root_test.go +++ b/cl/sentinel/handlers/blocks_by_root_test.go @@ -80,7 +80,7 @@ func TestBlocksByRangeHandler(t *testing.T) { } reqData := libcommon.CopyBytes(reqBuf.Bytes()) - stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV1)) + stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV2)) require.NoError(t, err) _, err = stream.Write(reqData) @@ -89,7 +89,7 @@ func TestBlocksByRangeHandler(t *testing.T) { firstByte := make([]byte, 1) _, err = stream.Read(firstByte) require.NoError(t, err) - require.Equal(t, firstByte[0], byte(1)) + require.Equal(t, firstByte[0], byte(0)) for i := 0; i < len(blockRoots); i++ { forkDigest := make([]byte, 4) @@ -132,6 +132,7 @@ func TestBlocksByRangeHandler(t *testing.T) { require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot) require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex) require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber) + stream.Read(make([]byte, 1)) } _, err = stream.Read(make([]byte, 1)) diff --git a/cl/sentinel/handlers/handlers.go b/cl/sentinel/handlers/handlers.go index fe7f1868379..87be5480c40 100644 --- a/cl/sentinel/handlers/handlers.go +++ b/cl/sentinel/handlers/handlers.go @@ -16,6 +16,7 @@ package handlers import ( "context" "errors" + "fmt" "strings" "sync" "time" @@ -76,8 +77,8 @@ type ConsensusHandlers struct { const ( SuccessfulResponsePrefix = 0x00 - RateLimitedPrefix = 0x02 - ResourceUnavaiablePrefix = 0x03 + RateLimitedPrefix = 0x01 + ResourceUnavaiablePrefix = 0x02 ) func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, host host.Host, @@ -104,8 +105,8 @@ func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChai } if c.enableBlocks { - hm[communication.BeaconBlocksByRangeProtocolV1] = c.beaconBlocksByRangeHandler - hm[communication.BeaconBlocksByRootProtocolV1] = c.beaconBlocksByRootHandler + hm[communication.BeaconBlocksByRangeProtocolV2] = c.beaconBlocksByRangeHandler + hm[communication.BeaconBlocksByRootProtocolV2] = c.beaconBlocksByRootHandler } c.handlers = map[protocol.ID]network.StreamHandler{} @@ -162,6 +163,7 @@ func (c *ConsensusHandlers) wrapStreamHandler(name string, fn func(s network.Str err = fn(s) if err != nil { l["err"] = err + fmt.Println("err", err) log.Trace("[pubsubhandler] stream handler", l) // TODO: maybe we should log this _ = s.Reset() diff --git a/cl/sentinel/handlers/heartbeats.go b/cl/sentinel/handlers/heartbeats.go index e461da37fdc..b06774a77fe 100644 --- a/cl/sentinel/handlers/heartbeats.go +++ b/cl/sentinel/handlers/heartbeats.go @@ -59,6 +59,7 @@ func (c *ConsensusHandlers) metadataV1Handler(s network.Stream) error { func (c *ConsensusHandlers) metadataV2Handler(s network.Stream) error { peerId := s.Conn().RemotePeer().String() + if err := c.checkRateLimit(peerId, "metadataV2", rateLimits.metadataV2Limit); err != nil { ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix) return err diff --git a/cl/sentinel/sentinel.go b/cl/sentinel/sentinel.go index b829ee45fbf..ad2cbbf6db8 100644 --- a/cl/sentinel/sentinel.go +++ b/cl/sentinel/sentinel.go @@ -168,7 +168,7 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) { } // Start stream handlers - handlers.NewConsensusHandlers(s.ctx, s.db, s.indiciesDB, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2, false).Start() + handlers.NewConsensusHandlers(s.ctx, s.db, s.indiciesDB, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2, s.cfg.EnableBlocks).Start() net, err := discover.ListenV5(s.ctx, "any", conn, localNode, discCfg) if err != nil { @@ -290,6 +290,7 @@ func (s *Sentinel) Start() error { s.subManager = NewGossipManager(s.ctx) go s.listenForPeers() + go s.forkWatcher() return nil } diff --git a/cl/sentinel/sentinel_gossip_test.go b/cl/sentinel/sentinel_gossip_test.go new file mode 100644 index 00000000000..db39617ad5c --- /dev/null +++ b/cl/sentinel/sentinel_gossip_test.go @@ -0,0 +1,180 @@ +package sentinel + +import ( + "context" + "math" + "testing" + "time" + + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/log/v3" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestSentinelGossipAverage(t *testing.T) { + listenAddrHost := "127.0.0.1" + + ctx := context.Background() + db, _, f, _, _ := loadChain(t) + raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig) + genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork) + sentinel1, err := New(ctx, &SentinelConfig{ + NetworkConfig: networkConfig, + BeaconConfig: beaconConfig, + GenesisConfig: genesisConfig, + IpAddr: listenAddrHost, + Port: 7070, + EnableBlocks: true, + }, raw, db, log.New()) + require.NoError(t, err) + defer sentinel1.Stop() + + require.NoError(t, sentinel1.Start()) + h := sentinel1.host + + sentinel2, err := New(ctx, &SentinelConfig{ + NetworkConfig: networkConfig, + BeaconConfig: beaconConfig, + GenesisConfig: genesisConfig, + IpAddr: listenAddrHost, + Port: 7077, + EnableBlocks: true, + TCPPort: 9123, + }, raw, db, log.New()) + require.NoError(t, err) + defer sentinel2.Stop() + + require.NoError(t, sentinel2.Start()) + h2 := sentinel2.host + + sub1, err := sentinel1.SubscribeGossip(BeaconBlockSsz) + require.NoError(t, err) + defer sub1.Close() + + require.NoError(t, sub1.Listen()) + + sub2, err := sentinel2.SubscribeGossip(BeaconBlockSsz) + require.NoError(t, err) + defer sub2.Close() + require.NoError(t, sub2.Listen()) + + err = h.Connect(ctx, peer.AddrInfo{ + ID: h2.ID(), + Addrs: h2.Addrs(), + }) + require.NoError(t, err) + + ch := sentinel2.RecvGossip() + msg := []byte("hello") + go func() { + // delay to make sure that the connection is established + time.Sleep(5 * time.Second) + sub1.Publish(msg) + }() + + select { + case ans := <-ch: + require.Equal(t, len(msg), len(ans.Message.Data)) + require.Equal(t, msg, ans.Data) + case <-ctx.Done(): + t.Fatal("timeout") + } +} + +func TestSentinelGossipOnHardFork(t *testing.T) { + listenAddrHost := "127.0.0.1" + + ctx := context.Background() + db, _, f, _, _ := loadChain(t) + raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig) + genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork) + bcfg := *beaconConfig + + bcfg.AltairForkEpoch = math.MaxUint64 + bcfg.BellatrixForkEpoch = math.MaxUint64 + bcfg.CapellaForkEpoch = math.MaxUint64 + bcfg.DenebForkEpoch = math.MaxUint64 + bcfg.InitializeForkSchedule() + + sentinel1, err := New(ctx, &SentinelConfig{ + NetworkConfig: networkConfig, + BeaconConfig: &bcfg, + GenesisConfig: genesisConfig, + IpAddr: listenAddrHost, + Port: 7070, + EnableBlocks: true, + }, raw, db, log.New()) + require.NoError(t, err) + defer sentinel1.Stop() + + require.NoError(t, sentinel1.Start()) + h := sentinel1.host + + sentinel2, err := New(ctx, &SentinelConfig{ + NetworkConfig: networkConfig, + BeaconConfig: &bcfg, + GenesisConfig: genesisConfig, + IpAddr: listenAddrHost, + Port: 7077, + EnableBlocks: true, + TCPPort: 9123, + }, raw, db, log.New()) + require.NoError(t, err) + defer sentinel2.Stop() + + require.NoError(t, sentinel2.Start()) + h2 := sentinel2.host + + sub1, err := sentinel1.SubscribeGossip(BeaconBlockSsz) + require.NoError(t, err) + defer sub1.Close() + + require.NoError(t, sub1.Listen()) + + sub2, err := sentinel2.SubscribeGossip(BeaconBlockSsz) + require.NoError(t, err) + defer sub2.Close() + require.NoError(t, sub2.Listen()) + + err = h.Connect(ctx, peer.AddrInfo{ + ID: h2.ID(), + Addrs: h2.Addrs(), + }) + require.NoError(t, err) + + ch := sentinel2.RecvGossip() + msg := []byte("hello") + go func() { + // delay to make sure that the connection is established + time.Sleep(time.Second) + sub1.Publish(msg) + }() + previousTopic := "" + + select { + case ans := <-ch: + require.Equal(t, len(msg), len(ans.Message.Data)) + previousTopic = *ans.Topic + case <-ctx.Done(): + t.Fatal("timeout") + } + + bcfg.AltairForkEpoch = clparams.MainnetBeaconConfig.AltairForkEpoch + bcfg.InitializeForkSchedule() + msg = []byte("hello1") + go func() { + // delay to make sure that the connection is established + time.Sleep(time.Second) + sub1 = sentinel1.subManager.GetMatchingSubscription(string(BeaconBlockSsz.Name)) + sub1.Publish(msg) + }() + + select { + case ans := <-ch: + require.NotEqual(t, previousTopic, *ans.Topic) + case <-ctx.Done(): + t.Fatal("timeout") + } +} diff --git a/cl/sentinel/sentinel_requests_test.go b/cl/sentinel/sentinel_requests_test.go new file mode 100644 index 00000000000..34a58ab917a --- /dev/null +++ b/cl/sentinel/sentinel_requests_test.go @@ -0,0 +1,315 @@ +package sentinel + +import ( + "bytes" + "context" + "encoding/binary" + "io" + "testing" + + "github.com/golang/snappy" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/cl/antiquary" + "github.com/ledgerwatch/erigon/cl/antiquary/tests" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/ledgerwatch/erigon/cl/fork" + "github.com/ledgerwatch/erigon/cl/persistence" + state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state" + "github.com/ledgerwatch/erigon/cl/phase1/core/state" + "github.com/ledgerwatch/erigon/cl/sentinel/communication" + "github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy" + "github.com/ledgerwatch/erigon/cl/utils" + "github.com/ledgerwatch/log/v3" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/spf13/afero" + "github.com/stretchr/testify/require" +) + +func loadChain(t *testing.T) (db kv.RwDB, blocks []*cltypes.SignedBeaconBlock, f afero.Fs, preState, postState *state.CachingBeaconState) { + blocks, preState, postState = tests.GetPhase0Random() + db = memdb.NewTestDB(t) + var reader *tests.MockBlockReader + reader, f = tests.LoadChain(blocks, db, t) + + ctx := context.Background() + vt := state_accessors.NewStaticValidatorTable() + a := antiquary.NewAntiquary(ctx, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, reader, nil, log.New(), true, f) + require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33)) + return +} + +func TestSentinelBlocksByRange(t *testing.T) { + listenAddrHost := "127.0.0.1" + + ctx := context.Background() + db, blocks, f, _, _ := loadChain(t) + raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig) + genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork) + sentinel, err := New(ctx, &SentinelConfig{ + NetworkConfig: networkConfig, + BeaconConfig: beaconConfig, + GenesisConfig: genesisConfig, + IpAddr: listenAddrHost, + Port: 7070, + EnableBlocks: true, + }, raw, db, log.New()) + require.NoError(t, err) + defer sentinel.Stop() + + require.NoError(t, sentinel.Start()) + h := sentinel.host + + listenAddrHost1 := "/ip4/127.0.0.1/tcp/3202" + host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1)) + require.NoError(t, err) + + err = h.Connect(ctx, peer.AddrInfo{ + ID: host1.ID(), + Addrs: host1.Addrs(), + }) + require.NoError(t, err) + + stream, err := host1.NewStream(ctx, h.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV2)) + require.NoError(t, err) + + req := &cltypes.BeaconBlocksByRangeRequest{ + StartSlot: blocks[0].Block.Slot, + Count: 6, + } + + if err := ssz_snappy.EncodeAndWrite(stream, req); err != nil { + return + } + + code := make([]byte, 1) + _, err = stream.Read(code) + require.NoError(t, err) + require.Equal(t, code[0], uint8(0)) + + var w bytes.Buffer + _, err = io.Copy(&w, stream) + require.NoError(t, err) + + responsePacket := make([]*cltypes.SignedBeaconBlock, 0) + + r := bytes.NewReader(w.Bytes()) + for i := 0; i < len(blocks); i++ { + forkDigest := make([]byte, 4) + if _, err := r.Read(forkDigest); err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + + // Read varint for length of message. + encodedLn, _, err := ssz_snappy.ReadUvarint(r) + require.NoError(t, err) + + // Read bytes using snappy into a new raw buffer of side encodedLn. + raw := make([]byte, encodedLn) + sr := snappy.NewReader(r) + bytesRead := 0 + for bytesRead < int(encodedLn) { + n, err := sr.Read(raw[bytesRead:]) + require.NoError(t, err) + bytesRead += n + } + // Fork digests + respForkDigest := binary.BigEndian.Uint32(forkDigest) + require.NoError(t, err) + + version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconConfig, genesisConfig.GenesisValidatorRoot) + require.NoError(t, err) + + responseChunk := cltypes.NewSignedBeaconBlock(beaconConfig) + + require.NoError(t, responseChunk.DecodeSSZ(raw, int(version))) + + responsePacket = append(responsePacket, responseChunk) + // TODO(issues/5884): figure out why there is this extra byte. + r.ReadByte() + } + require.Equal(t, len(responsePacket), len(blocks)) + for i := 0; i < len(blocks); i++ { + root1, err := responsePacket[i].HashSSZ() + require.NoError(t, err) + + root2, err := blocks[i].HashSSZ() + require.NoError(t, err) + + require.Equal(t, root1, root2) + } + +} + +func TestSentinelBlocksByRoots(t *testing.T) { + listenAddrHost := "127.0.0.1" + + ctx := context.Background() + db, blocks, f, _, _ := loadChain(t) + raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig) + genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork) + sentinel, err := New(ctx, &SentinelConfig{ + NetworkConfig: networkConfig, + BeaconConfig: beaconConfig, + GenesisConfig: genesisConfig, + IpAddr: listenAddrHost, + Port: 7070, + EnableBlocks: true, + }, raw, db, log.New()) + require.NoError(t, err) + defer sentinel.Stop() + + require.NoError(t, sentinel.Start()) + h := sentinel.host + + listenAddrHost1 := "/ip4/127.0.0.1/tcp/5021" + host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1)) + require.NoError(t, err) + + err = h.Connect(ctx, peer.AddrInfo{ + ID: host1.ID(), + Addrs: host1.Addrs(), + }) + require.NoError(t, err) + + stream, err := host1.NewStream(ctx, h.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV2)) + require.NoError(t, err) + + req := solid.NewHashList(1232) + rt, err := blocks[0].Block.HashSSZ() + require.NoError(t, err) + + req.Append(rt) + rt, err = blocks[1].Block.HashSSZ() + require.NoError(t, err) + req.Append(rt) + + if err := ssz_snappy.EncodeAndWrite(stream, req); err != nil { + return + } + + code := make([]byte, 1) + _, err = stream.Read(code) + require.NoError(t, err) + require.Equal(t, code[0], uint8(0)) + + var w bytes.Buffer + _, err = io.Copy(&w, stream) + require.NoError(t, err) + + responsePacket := make([]*cltypes.SignedBeaconBlock, 0) + + r := bytes.NewReader(w.Bytes()) + for i := 0; i < len(blocks); i++ { + forkDigest := make([]byte, 4) + if _, err := r.Read(forkDigest); err != nil { + if err == io.EOF { + break + } + require.NoError(t, err) + } + + // Read varint for length of message. + encodedLn, _, err := ssz_snappy.ReadUvarint(r) + require.NoError(t, err) + + // Read bytes using snappy into a new raw buffer of side encodedLn. + raw := make([]byte, encodedLn) + sr := snappy.NewReader(r) + bytesRead := 0 + for bytesRead < int(encodedLn) { + n, err := sr.Read(raw[bytesRead:]) + require.NoError(t, err) + bytesRead += n + } + // Fork digests + respForkDigest := binary.BigEndian.Uint32(forkDigest) + require.NoError(t, err) + + version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconConfig, genesisConfig.GenesisValidatorRoot) + require.NoError(t, err) + + responseChunk := cltypes.NewSignedBeaconBlock(beaconConfig) + + require.NoError(t, responseChunk.DecodeSSZ(raw, int(version))) + + responsePacket = append(responsePacket, responseChunk) + // TODO(issues/5884): figure out why there is this extra byte. + r.ReadByte() + } + + require.Equal(t, len(responsePacket), len(blocks)) + for i := 0; i < len(responsePacket); i++ { + root1, err := responsePacket[i].HashSSZ() + require.NoError(t, err) + + root2, err := blocks[i].HashSSZ() + require.NoError(t, err) + + require.Equal(t, root1, root2) + } +} + +func TestSentinelStatusRequest(t *testing.T) { + listenAddrHost := "127.0.0.1" + + ctx := context.Background() + db, blocks, f, _, _ := loadChain(t) + raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig) + genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork) + sentinel, err := New(ctx, &SentinelConfig{ + NetworkConfig: networkConfig, + BeaconConfig: beaconConfig, + GenesisConfig: genesisConfig, + IpAddr: listenAddrHost, + Port: 7070, + EnableBlocks: true, + }, raw, db, log.New()) + require.NoError(t, err) + defer sentinel.Stop() + + require.NoError(t, sentinel.Start()) + h := sentinel.host + + listenAddrHost1 := "/ip4/127.0.0.1/tcp/5001" + host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1)) + require.NoError(t, err) + + err = h.Connect(ctx, peer.AddrInfo{ + ID: host1.ID(), + Addrs: host1.Addrs(), + }) + require.NoError(t, err) + req := &cltypes.Status{ + HeadRoot: blocks[0].Block.ParentRoot, + HeadSlot: 1234, + } + sentinel.SetStatus(req) + stream, err := host1.NewStream(ctx, h.ID(), protocol.ID(communication.StatusProtocolV1)) + require.NoError(t, err) + + if err := ssz_snappy.EncodeAndWrite(stream, req); err != nil { + return + } + + code := make([]byte, 1) + _, err = stream.Read(code) + require.NoError(t, err) + require.Equal(t, code[0], uint8(0)) + + resp := &cltypes.Status{} + if err := ssz_snappy.DecodeAndReadNoForkDigest(stream, resp, 0); err != nil { + return + } + require.NoError(t, err) + + require.Equal(t, resp, req) +}