Skip to content

Commit

Permalink
[beacon handler] framework (#8851)
Browse files Browse the repository at this point in the history
adds a two indexes to the validators cache

creates beaconhttp package with many utilities for beacon http endpoint
(future support for ssz is baked in)

started on some validator endpoints
  • Loading branch information
elee1766 authored Dec 4, 2023
1 parent 1492cc4 commit 47a6ac1
Show file tree
Hide file tree
Showing 24 changed files with 1,060 additions and 397 deletions.
105 changes: 105 additions & 0 deletions cl/beacon/beaconhttp/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package beaconhttp

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"

"github.com/ledgerwatch/erigon-lib/types/ssz"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph"
"github.com/ledgerwatch/log/v3"
)

var _ error = EndpointError{}
var _ error = (*EndpointError)(nil)

type EndpointError struct {
Code int `json:"code"`
Message string `json:"message"`
}

func WrapEndpointError(err error) *EndpointError {
e := &EndpointError{}
if errors.As(err, e) {
return e
}
if errors.Is(err, fork_graph.ErrStateNotFound) {
return NewEndpointError(http.StatusNotFound, "Could not find beacon state")
}
return NewEndpointError(http.StatusInternalServerError, err.Error())
}

func NewEndpointError(code int, message string) *EndpointError {
return &EndpointError{
Code: code,
Message: message,
}
}

func (e EndpointError) Error() string {
return fmt.Sprintf("Code %d: %s", e.Code, e.Message)
}

func (e *EndpointError) WriteTo(w http.ResponseWriter) {
w.WriteHeader(e.Code)
encErr := json.NewEncoder(w).Encode(e)
if encErr != nil {
log.Error("beaconapi failed to write json error", "err", encErr)
}
}

type EndpointHandler[T any] interface {
Handle(r *http.Request) (T, error)
}

type EndpointHandlerFunc[T any] func(r *http.Request) (T, error)

func (e EndpointHandlerFunc[T]) Handle(r *http.Request) (T, error) {
return e(r)
}

func HandleEndpointFunc[T any](h EndpointHandlerFunc[T]) http.HandlerFunc {
return HandleEndpoint[T](h)
}

func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ans, err := h.Handle(r)
if err != nil {
log.Error("beacon api request error", "err", err)
endpointError := WrapEndpointError(err)
endpointError.WriteTo(w)
return
}
// TODO: ssz handler
// TODO: potentially add a context option to buffer these
contentType := r.Header.Get("Accept")
switch contentType {
case "application/octet-stream":
sszMarshaler, ok := any(ans).(ssz.Marshaler)
if !ok {
NewEndpointError(http.StatusBadRequest, "This endpoint does not support SSZ response").WriteTo(w)
return
}
// TODO: we should probably figure out some way to stream this in the future :)
encoded, err := sszMarshaler.EncodeSSZ(nil)
if err != nil {
WrapEndpointError(err).WriteTo(w)
return
}
w.Write(encoded)
case "application/json", "":
w.Header().Add("content-type", "application/json")
err := json.NewEncoder(w).Encode(ans)
if err != nil {
// this error is fatal, log to console
log.Error("beaconapi failed to encode json", "type", reflect.TypeOf(ans), "err", err)
}
default:
http.Error(w, "content type must be application/json or application/octet-stream", http.StatusBadRequest)

}
})
}
81 changes: 35 additions & 46 deletions cl/beacon/handler/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/beacon/beaconhttp"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
)
Expand All @@ -22,12 +23,12 @@ type getHeadersRequest struct {
ParentRoot *libcommon.Hash `json:"root,omitempty"`
}

func (a *ApiHandler) rootFromBlockId(ctx context.Context, tx kv.Tx, blockId *segmentID) (root libcommon.Hash, httpStatusErr int, err error) {
func (a *ApiHandler) rootFromBlockId(ctx context.Context, tx kv.Tx, blockId *segmentID) (root libcommon.Hash, err error) {
switch {
case blockId.head():
root, _, err = a.forkchoiceStore.GetHead()
if err != nil {
return libcommon.Hash{}, http.StatusInternalServerError, err
return libcommon.Hash{}, err
}
case blockId.finalized():
root = a.forkchoiceStore.FinalizedCheckpoint().BlockRoot()
Expand All @@ -36,134 +37,122 @@ func (a *ApiHandler) rootFromBlockId(ctx context.Context, tx kv.Tx, blockId *seg
case blockId.genesis():
root, err = beacon_indicies.ReadCanonicalBlockRoot(tx, 0)
if err != nil {
return libcommon.Hash{}, http.StatusInternalServerError, err
return libcommon.Hash{}, err
}
if root == (libcommon.Hash{}) {
return libcommon.Hash{}, http.StatusNotFound, fmt.Errorf("genesis block not found")
return libcommon.Hash{}, beaconhttp.NewEndpointError(http.StatusNotFound, "genesis block not found")
}
case blockId.getSlot() != nil:
root, err = beacon_indicies.ReadCanonicalBlockRoot(tx, *blockId.getSlot())
if err != nil {
return libcommon.Hash{}, http.StatusInternalServerError, err
return libcommon.Hash{}, err
}
if root == (libcommon.Hash{}) {
return libcommon.Hash{}, http.StatusNotFound, fmt.Errorf("block not found %d", *blockId.getSlot())
return libcommon.Hash{}, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %d", *blockId.getSlot()))
}
case blockId.getRoot() != nil:
// first check if it exists
root = *blockId.getRoot()
default:
return libcommon.Hash{}, http.StatusInternalServerError, fmt.Errorf("cannot parse block id")
return libcommon.Hash{}, beaconhttp.NewEndpointError(http.StatusInternalServerError, "cannot parse block id")
}
return
}

func (a *ApiHandler) getBlock(r *http.Request) *beaconResponse {

func (a *ApiHandler) getBlock(r *http.Request) (*beaconResponse, error) {
ctx := r.Context()

tx, err := a.indiciesDB.BeginRo(ctx)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}
defer tx.Rollback()

blockId, err := blockIdFromRequest(r)
if err != nil {
return newCriticalErrorResponse(err)

return nil, err
}
root, httpStatus, err := a.rootFromBlockId(ctx, tx, blockId)
root, err := a.rootFromBlockId(ctx, tx, blockId)
if err != nil {
return newApiErrorResponse(httpStatus, err.Error())
return nil, err
}

blk, err := a.blockReader.ReadBlockByRoot(ctx, tx, root)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}
if blk == nil {
return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("block not found %x", root))
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %x", root))
}
// Check if the block is canonical
var canonicalRoot libcommon.Hash
canonicalRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, blk.Block.Slot)
if err != nil {
return newCriticalErrorResponse(err)
return nil, beaconhttp.WrapEndpointError(err)
}
return newBeaconResponse(blk).
withFinalized(root == canonicalRoot && blk.Block.Slot <= a.forkchoiceStore.FinalizedSlot()).
withVersion(blk.Version())
withVersion(blk.Version()), nil
}

func (a *ApiHandler) getBlockAttestations(r *http.Request) *beaconResponse {
func (a *ApiHandler) getBlockAttestations(r *http.Request) (*beaconResponse, error) {
ctx := r.Context()

tx, err := a.indiciesDB.BeginRo(ctx)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}
defer tx.Rollback()

blockId, err := blockIdFromRequest(r)
if err != nil {
return newApiErrorResponse(http.StatusBadRequest, err.Error())
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error())
}

root, httpStatus, err := a.rootFromBlockId(ctx, tx, blockId)
root, err := a.rootFromBlockId(ctx, tx, blockId)
if err != nil {
return newApiErrorResponse(httpStatus, err.Error())
return nil, err
}

blk, err := a.blockReader.ReadBlockByRoot(ctx, tx, root)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}
if blk == nil {
return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("block not found %x", root))
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %x", root))
}
// Check if the block is canonical
canonicalRoot, err := beacon_indicies.ReadCanonicalBlockRoot(tx, blk.Block.Slot)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}

return newBeaconResponse(blk.Block.Body.Attestations).withFinalized(root == canonicalRoot && blk.Block.Slot <= a.forkchoiceStore.FinalizedSlot()).
withVersion(blk.Version())
withVersion(blk.Version()), nil
}

func (a *ApiHandler) getBlockRoot(r *http.Request) *beaconResponse {
func (a *ApiHandler) getBlockRoot(r *http.Request) (*beaconResponse, error) {
ctx := r.Context()

tx, err := a.indiciesDB.BeginRo(ctx)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}
defer tx.Rollback()

blockId, err := blockIdFromRequest(r)
if err != nil {
return newApiErrorResponse(http.StatusBadRequest, err.Error())
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error())
}
root, httpStatus, err := a.rootFromBlockId(ctx, tx, blockId)
root, err := a.rootFromBlockId(ctx, tx, blockId)
if err != nil {
return newApiErrorResponse(httpStatus, err.Error())
return nil, err
}

// check if the root exist
slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, root)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}
if slot == nil {
return newApiErrorResponse(http.StatusNotFound, fmt.Sprintf("block not found %x", root))
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Sprintf("block not found %x", root))
}
// Check if the block is canonical
var canonicalRoot libcommon.Hash
canonicalRoot, err = beacon_indicies.ReadCanonicalBlockRoot(tx, *slot)
if err != nil {
return newCriticalErrorResponse(err)
return nil, err
}

return newBeaconResponse(struct{ Root libcommon.Hash }{Root: root}).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot())
return newBeaconResponse(struct{ Root libcommon.Hash }{Root: root}).withFinalized(canonicalRoot == root && *slot <= a.forkchoiceStore.FinalizedSlot()), nil
}
12 changes: 6 additions & 6 deletions cl/beacon/handler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ import (
"github.com/ledgerwatch/erigon/cl/cltypes"
)

func (a *ApiHandler) getSpec(r *http.Request) *beaconResponse {
return newBeaconResponse(a.beaconChainCfg)
func (a *ApiHandler) getSpec(r *http.Request) (*beaconResponse, error) {
return newBeaconResponse(a.beaconChainCfg), nil
}

func (a *ApiHandler) getDepositContract(r *http.Request) *beaconResponse {
func (a *ApiHandler) getDepositContract(r *http.Request) (*beaconResponse, error) {

return newBeaconResponse(struct {
ChainId uint64 `json:"chain_id"`
DepositContract string `json:"address"`
}{ChainId: a.beaconChainCfg.DepositChainID, DepositContract: a.beaconChainCfg.DepositContractAddress})
}{ChainId: a.beaconChainCfg.DepositChainID, DepositContract: a.beaconChainCfg.DepositContractAddress}), nil

}

func (a *ApiHandler) getForkSchedule(r *http.Request) *beaconResponse {
func (a *ApiHandler) getForkSchedule(r *http.Request) (*beaconResponse, error) {
response := []cltypes.Fork{}
// create first response (unordered and incomplete)
for currentVersion, epoch := range a.beaconChainCfg.ForkVersionSchedule {
Expand All @@ -43,5 +43,5 @@ func (a *ApiHandler) getForkSchedule(r *http.Request) *beaconResponse {
response[i].PreviousVersion = previousVersion
previousVersion = response[i].CurrentVersion
}
return newBeaconResponse(response)
return newBeaconResponse(response), nil
}
11 changes: 6 additions & 5 deletions cl/beacon/handler/duties_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"sync"

"github.com/ledgerwatch/erigon/cl/beacon/beaconhttp"
shuffling2 "github.com/ledgerwatch/erigon/cl/phase1/core/state/shuffling"

libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand All @@ -17,22 +18,22 @@ type proposerDuties struct {
Slot uint64 `json:"slot"`
}

func (a *ApiHandler) getDutiesProposer(r *http.Request) *beaconResponse {
func (a *ApiHandler) getDutiesProposer(r *http.Request) (*beaconResponse, error) {

epoch, err := epochFromRequest(r)
if err != nil {
return newApiErrorResponse(http.StatusBadRequest, err.Error())
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err.Error())
}

if epoch < a.forkchoiceStore.FinalizedCheckpoint().Epoch() {
return newApiErrorResponse(http.StatusBadRequest, "invalid epoch")
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, "invalid epoch")
}

// We need to compute our duties
state, cancel := a.syncedData.HeadState()
defer cancel()
if state == nil {
return newApiErrorResponse(http.StatusInternalServerError, "beacon node is syncing")
return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, "beacon node is syncing")

}

Expand Down Expand Up @@ -88,6 +89,6 @@ func (a *ApiHandler) getDutiesProposer(r *http.Request) *beaconResponse {
}
wg.Wait()

return newBeaconResponse(duties).withFinalized(false).withVersion(a.beaconChainCfg.GetCurrentStateVersion(epoch))
return newBeaconResponse(duties).withFinalized(false).withVersion(a.beaconChainCfg.GetCurrentStateVersion(epoch)), nil

}
Loading

0 comments on commit 47a6ac1

Please sign in to comment.