Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added GET /eth/v1/beacon/rewards/blocks/{block_id} and POST /eth/v1/beacon/rewards/sync_committee/{block_id} #9102

Merged
merged 31 commits into from
Dec 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cl/abstract/beacon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type BeaconStateUpgradable interface {
}

type BeaconStateExtension interface {
SlashValidator(slashedInd uint64, whistleblowerInd *uint64) error
SlashValidator(slashedInd uint64, whistleblowerInd *uint64) (uint64, error)
InitiateValidatorExit(index uint64) error
GetActiveValidatorsIndices(epoch uint64) (indicies []uint64)
GetTotalActiveBalance() uint64
Expand Down
2 changes: 1 addition & 1 deletion cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func (a *Antiquary) Loop() error {
return err
}
// Here we need to start mdbx transaction and lock the thread
log.Info("[Antiquary]: Stopping Caplin to process historical indicies")
tx, err := a.mainDB.BeginRw(a.ctx)
if err != nil {
return err
Expand All @@ -111,6 +110,7 @@ func (a *Antiquary) Loop() error {
return err
}
defer logInterval.Stop()
log.Info("[Antiquary]: Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable())

// Now write the snapshots as indicies
for i := from; i < a.sn.BlocksAvailable(); i++ {
Expand Down
71 changes: 41 additions & 30 deletions cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ledgerwatch/erigon/cl/phase1/core/state/raw"
"github.com/ledgerwatch/erigon/cl/phase1/core/state/shuffling"
"github.com/ledgerwatch/erigon/cl/transition"
"github.com/ledgerwatch/erigon/cl/transition/impl/eth2"
"github.com/ledgerwatch/log/v3"
)

Expand Down Expand Up @@ -165,12 +166,12 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
defer blockRoots.Close()
stateRoots := etl.NewCollector(kv.StateRoot, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer stateRoots.Close()
minimalBeaconStates := etl.NewCollector(kv.MinimalBeaconState, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer minimalBeaconStates.Close()
slotData := etl.NewCollector(kv.SlotData, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer slotData.Close()
epochData := etl.NewCollector(kv.EpochData, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer epochData.Close()
inactivityScoresC := etl.NewCollector(kv.InactivityScores, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer inactivityScoresC.Close()
checkpoints := etl.NewCollector(kv.Checkpoints, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer checkpoints.Close()
nextSyncCommittee := etl.NewCollector(kv.NextSyncCommittee, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
defer nextSyncCommittee.Close()
currentSyncCommittee := etl.NewCollector(kv.CurrentSyncCommittee, s.dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), s.logger)
Expand Down Expand Up @@ -216,7 +217,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
// Collect genesis state if we are at genesis
if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, currentSyncCommittee, nextSyncCommittee, slashings, checkpoints, inactivityScoresC, proposers, minimalBeaconStates, stateEvents, changedValidators); err != nil {
if err := s.collectGenesisState(ctx, compressedWriter, s.currentState, currentSyncCommittee, nextSyncCommittee, slashings, epochData, inactivityScoresC, proposers, slotData, stateEvents, changedValidators); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -300,15 +301,13 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return s.validatorsTable.AddWithdrawalCredentials(uint64(index), slot, libcommon.BytesToHash(wc))
},
OnEpochBoundary: func(epoch uint64) error {
k := base_encoding.Encode64ToBytes4(s.cfg.RoundSlotToEpoch(slot))
v := make([]byte, solid.CheckpointSize*3)
copy(v, s.currentState.CurrentJustifiedCheckpoint())
copy(v[solid.CheckpointSize:], s.currentState.PreviousJustifiedCheckpoint())
copy(v[solid.CheckpointSize*2:], s.currentState.FinalizedCheckpoint())
if err := checkpoints.Collect(k, v); err != nil {
if err := s.storeEpochData(commonBuffer, s.currentState, epochData); err != nil {
return err
}
prevEpoch := epoch - 1
var prevEpoch uint64
if epoch > 0 {
prevEpoch = epoch - 1
}
mix := s.currentState.GetRandaoMixes(prevEpoch)
if err := randaoMixes.Collect(base_encoding.Encode64ToBytes4(prevEpoch*s.cfg.SlotsPerEpoch), mix[:]); err != nil {
return err
Expand All @@ -319,15 +318,15 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
if err := base_encoding.WriteRabbits(actives, commonBuffer); err != nil {
return err
}
if err := activeValidatorIndicies.Collect(base_encoding.Encode64ToBytes4(prevEpoch), libcommon.Copy(commonBuffer.Bytes())); err != nil {
if err := activeValidatorIndicies.Collect(base_encoding.Encode64ToBytes4(prevEpoch*s.cfg.SlotsPerEpoch), libcommon.Copy(commonBuffer.Bytes())); err != nil {
return err
}
actives = s.currentState.GetActiveValidatorsIndices(epoch)
commonBuffer.Reset()
if err := base_encoding.WriteRabbits(actives, commonBuffer); err != nil {
return err
}
if err := activeValidatorIndicies.Collect(base_encoding.Encode64ToBytes4(epoch), libcommon.Copy(commonBuffer.Bytes())); err != nil {
if err := activeValidatorIndicies.Collect(base_encoding.Encode64ToBytes4(epoch*s.cfg.SlotsPerEpoch), libcommon.Copy(commonBuffer.Bytes())); err != nil {
return err
}
// truncate the file
Expand Down Expand Up @@ -420,8 +419,9 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
prevValSet = append(prevValSet, s.currentState.RawValidatorSet()...)

fullValidation := slot%100_000 == 0 || first
blockRewardsCollector := &eth2.BlockRewardsCollector{}
// We sanity check the state every 100k slots or when we start.
if err := transition.TransitionState(s.currentState, block, fullValidation); err != nil {
if err := transition.TransitionState(s.currentState, block, blockRewardsCollector, fullValidation); err != nil {
return err
}

Expand All @@ -434,7 +434,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
}
}

if err := s.storeMinimalState(commonBuffer, s.currentState, minimalBeaconStates); err != nil {
if err := s.storeSlotData(commonBuffer, s.currentState, blockRewardsCollector, slotData); err != nil {
return err
}
if err := stateEvents.Collect(base_encoding.Encode64ToBytes4(slot), events.CopyBytes()); err != nil {
Expand Down Expand Up @@ -527,7 +527,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}

if err := minimalBeaconStates.Load(rwTx, kv.MinimalBeaconState, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
if err := slotData.Load(rwTx, kv.SlotData, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err
}

Expand All @@ -539,7 +539,7 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}

if err := checkpoints.Load(rwTx, kv.Checkpoints, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
if err := epochData.Load(rwTx, kv.EpochData, loadfunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil {
return err
}

Expand Down Expand Up @@ -710,7 +710,7 @@ func getProposerDutiesValue(s *state.CachingBeaconState) []byte {
return list
}

func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.Encoder, state *state.CachingBeaconState, currentSyncCommittee, nextSyncCommittee, slashings, checkpoints, inactivities, proposersCollector, minimalBeaconStateCollector, stateEvents *etl.Collector, changedValidators map[uint64]struct{}) error {
func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.Encoder, state *state.CachingBeaconState, currentSyncCommittee, nextSyncCommittee, slashings, epochData, inactivities, proposersCollector, slotDataCollector, stateEvents *etl.Collector, changedValidators map[uint64]struct{}) error {
var err error
slot := state.Slot()
epoch := slot / s.cfg.SlotsPerEpoch
Expand Down Expand Up @@ -744,12 +744,7 @@ func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.En
return err
}

k := base_encoding.Encode64ToBytes4(s.cfg.RoundSlotToEpoch(slot))
v := make([]byte, solid.CheckpointSize*3)
copy(v, state.CurrentJustifiedCheckpoint())
copy(v[solid.CheckpointSize:], state.PreviousJustifiedCheckpoint())
copy(v[solid.CheckpointSize*2:], state.FinalizedCheckpoint())
if err := checkpoints.Collect(k, v); err != nil {
if err := s.storeEpochData(&commonBuffer, state, epochData); err != nil {
return err
}

Expand All @@ -771,21 +766,37 @@ func (s *Antiquary) collectGenesisState(ctx context.Context, compressor *zstd.En
}

var b bytes.Buffer
if err := s.storeMinimalState(&b, state, minimalBeaconStateCollector); err != nil {
if err := s.storeSlotData(&b, state, nil, slotDataCollector); err != nil {
return err
}

return stateEvents.Collect(base_encoding.Encode64ToBytes4(slot), events.CopyBytes())
}

func (s *Antiquary) storeMinimalState(buffer *bytes.Buffer, st *state.CachingBeaconState, collector *etl.Collector) error {
func (s *Antiquary) storeSlotData(buffer *bytes.Buffer, st *state.CachingBeaconState, rewardsCollector *eth2.BlockRewardsCollector, collector *etl.Collector) error {
buffer.Reset()
slotData := state_accessors.SlotDataFromBeaconState(st)
if rewardsCollector != nil {
slotData.AttestationsRewards = rewardsCollector.Attestations
slotData.SyncAggregateRewards = rewardsCollector.SyncAggregate
slotData.AttesterSlashings = rewardsCollector.AttesterSlashings
slotData.ProposerSlashings = rewardsCollector.ProposerSlashings
}
if err := slotData.WriteTo(buffer); err != nil {
return err
}
return collector.Collect(base_encoding.Encode64ToBytes4(st.Slot()), libcommon.Copy(buffer.Bytes()))
}

func (s *Antiquary) storeEpochData(buffer *bytes.Buffer, st *state.CachingBeaconState, collector *etl.Collector) error {
buffer.Reset()
minimalBeaconState := state_accessors.MinimalBeaconStateFromBeaconState(st)
epochData := state_accessors.EpochDataFromBeaconState(st)

if err := minimalBeaconState.WriteTo(buffer); err != nil {
if err := epochData.WriteTo(buffer); err != nil {
return err
}
return collector.Collect(base_encoding.Encode64ToBytes4(st.Slot()), buffer.Bytes())
roundedSlot := s.cfg.RoundSlotToEpoch(st.Slot())
return collector.Collect(base_encoding.Encode64ToBytes4(roundedSlot), libcommon.Copy(buffer.Bytes()))
}

func (s *Antiquary) dumpPayload(k []byte, v []byte, c *etl.Collector, b *bytes.Buffer, compressor *zstd.Encoder) error {
Expand Down
3 changes: 3 additions & 0 deletions cl/beacon/beaconhttp/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"reflect"
"strings"
"time"

"github.com/ledgerwatch/erigon-lib/types/ssz"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph"
Expand Down Expand Up @@ -68,7 +69,9 @@ func HandleEndpointFunc[T any](h EndpointHandlerFunc[T]) http.HandlerFunc {

func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
ans, err := h.Handle(r)
log.Debug("beacon api request", "endpoint", r.URL.Path, "duration", time.Since(start))
if err != nil {
log.Error("beacon api request error", "err", err)
var endpointError *EndpointError
Expand Down
167 changes: 167 additions & 0 deletions cl/beacon/handler/commitees_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package handler

import (
"io"
"math"
"net/http"
"net/http/httptest"
"strconv"
"testing"

"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/common"
"github.com/stretchr/testify/require"
)

func TestGetCommitteesAntiquated(t *testing.T) {

// setupTestingHandler(t, clparams.Phase0Version)
_, blocks, _, _, postState, handler, _, _, fcu := setupTestingHandler(t, clparams.Phase0Version)

postRoot, err := postState.HashSSZ()
require.NoError(t, err)

fcu.HeadVal, err = blocks[len(blocks)-1].Block.HashSSZ()
require.NoError(t, err)

fcu.HeadSlotVal = blocks[len(blocks)-1].Block.Slot

fcu.FinalizedCheckpointVal = solid.NewCheckpointFromParameters(fcu.HeadVal, fcu.HeadSlotVal/32)
fcu.FinalizedSlotVal = math.MaxUint64

fcu.StateAtBlockRootVal[fcu.HeadVal] = postState

cases := []struct {
name string
blockID string
code int
query string
expected string
}{
{
name: "slot",
blockID: "0x" + common.Bytes2Hex(postRoot[:]),
code: http.StatusOK,
query: "?slot=" + strconv.FormatUint(fcu.HeadSlotVal, 10),
expected: `{"data":[{"index":"0","slot":"8322","validators":["0","104","491","501","379","318","275","504","75","280","105","399","35","401"]}],"finalized":true,"execution_optimistic":false}` + "\n",
},
{
name: "empty-index",
blockID: "0x" + common.Bytes2Hex(postRoot[:]),
code: http.StatusOK,
query: "?index=1",
expected: `{"data":[],"finalized":true,"execution_optimistic":false}` + "\n",
},
{
name: "all-queries",
blockID: "0x" + common.Bytes2Hex(postRoot[:]),
code: http.StatusOK,
query: "?index=0&slot=" + strconv.FormatUint(fcu.HeadSlotVal-32, 10) + "&epoch=" + strconv.FormatUint((fcu.HeadSlotVal/32)-1, 10),
expected: `{"data":[{"index":"0","slot":"8290","validators":["127","377","274","85","309","420","423","398","153","480","273","429","374","260"]}],"finalized":true,"execution_optimistic":false}` + "\n",
},
{
blockID: "0x" + common.Bytes2Hex(make([]byte, 32)),
code: http.StatusNotFound,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
server := httptest.NewServer(handler.mux)
defer server.Close()
// Query the block in the handler with /eth/v2/beacon/states/{block_id} with content-type octet-stream
req, err := http.NewRequest("GET", server.URL+"/eth/v1/beacon/states/"+c.blockID+"/committees"+c.query, nil)
require.NoError(t, err)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)

defer resp.Body.Close()
require.Equal(t, c.code, resp.StatusCode)
if resp.StatusCode != http.StatusOK {
return
}
// read the all of the octect
out, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, c.expected, string(out))
})
}
}

func TestGetCommitteesNonAntiquated(t *testing.T) {

// setupTestingHandler(t, clparams.Phase0Version)
_, blocks, _, _, postState, handler, _, sm, fcu := setupTestingHandler(t, clparams.Phase0Version)

postRoot, err := postState.HashSSZ()
require.NoError(t, err)

fcu.HeadVal, err = blocks[len(blocks)-1].Block.HashSSZ()
require.NoError(t, err)

fcu.HeadSlotVal = blocks[len(blocks)-1].Block.Slot

fcu.FinalizedCheckpointVal = solid.NewCheckpointFromParameters(fcu.HeadVal, fcu.HeadSlotVal/32)
fcu.FinalizedSlotVal = 0

fcu.StateAtBlockRootVal[fcu.HeadVal] = postState
require.NoError(t, sm.OnHeadState(postState))
cases := []struct {
name string
blockID string
code int
query string
expected string
}{
{
name: "slot",
blockID: "0x" + common.Bytes2Hex(postRoot[:]),
code: http.StatusOK,
query: "?slot=" + strconv.FormatUint(fcu.HeadSlotVal, 10),
expected: `{"data":[{"index":"0","slot":"8322","validators":["0","104","491","501","379","318","275","504","75","280","105","399","35","401"]}],"finalized":false,"execution_optimistic":false}` + "\n",
},
{
name: "empty-index",
blockID: "0x" + common.Bytes2Hex(postRoot[:]),
code: http.StatusOK,
query: "?index=1",
expected: `{"data":[],"finalized":false,"execution_optimistic":false}` + "\n",
},
{
name: "all-queries",
blockID: "0x" + common.Bytes2Hex(postRoot[:]),
code: http.StatusOK,
query: "?index=0&slot=" + strconv.FormatUint(fcu.HeadSlotVal-32, 10) + "&epoch=" + strconv.FormatUint((fcu.HeadSlotVal/32)-1, 10),
expected: `{"data":[{"index":"0","slot":"8290","validators":["127","377","274","85","309","420","423","398","153","480","273","429","374","260"]}],"finalized":false,"execution_optimistic":false}` + "\n",
},
{
blockID: "0x" + common.Bytes2Hex(make([]byte, 32)),
code: http.StatusNotFound,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
server := httptest.NewServer(handler.mux)
defer server.Close()
// Query the block in the handler with /eth/v2/beacon/states/{block_id} with content-type octet-stream
req, err := http.NewRequest("GET", server.URL+"/eth/v1/beacon/states/"+c.blockID+"/committees"+c.query, nil)
require.NoError(t, err)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)

defer resp.Body.Close()
require.Equal(t, c.code, resp.StatusCode)
if resp.StatusCode != http.StatusOK {
return
}
// read the all of the octect
out, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, c.expected, string(out))
})
}
}
Loading
Loading