Skip to content

Commit

Permalink
Merge branch 'main' into cleanups-3
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelneuder committed Aug 2, 2023
2 parents 65e1417 + 2944f4f commit ad6ba49
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 19 deletions.
2 changes: 1 addition & 1 deletion beaconclient/mock_beacon_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *MockBeaconInstance) addDelay() {
}
}

func (c *MockBeaconInstance) PublishBlock(block *common.SignedBeaconBlock) (code int, err error) {
func (c *MockBeaconInstance) PublishBlock(block *common.SignedBeaconBlock, broadcastValidation BroadcastValidation) (code int, err error) {
return 0, nil
}

Expand Down
30 changes: 28 additions & 2 deletions beaconclient/multi_beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ var (
ErrBeaconBlock202 = errors.New("beacon block failed validation but was still broadcast (202)")
)

type BroadcastValidation int

const (
Gossip BroadcastValidation = iota // lightweight gossip checks only
Consensus // full consensus checks, including validation of all signatures and blocks fields
ConsensusAndEquivocation // the same as `consensus`, with an extra equivocation check
)

func (b BroadcastValidation) String() string {
return [...]string{"gossip", "consensus", "consensus_and_equivocation"}[b]
}

// IMultiBeaconClient is the interface for the MultiBeaconClient, which can manage several beacon client instances under the hood
type IMultiBeaconClient interface {
BestSyncStatus() (*SyncStatusPayloadData, error)
Expand Down Expand Up @@ -48,7 +60,7 @@ type IBeaconInstance interface {
GetStateValidators(stateID string) (*GetStateValidatorsResponse, error)
GetProposerDuties(epoch uint64) (*ProposerDutiesResponse, error)
GetURI() string
PublishBlock(block *common.SignedBeaconBlock) (code int, err error)
PublishBlock(block *common.SignedBeaconBlock, broadcastValidation BroadcastValidation) (code int, err error)
GetGenesis() (*GetGenesisResponse, error)
GetSpec() (spec *GetSpecResponse, err error)
GetForkSchedule() (spec *GetForkScheduleResponse, err error)
Expand All @@ -64,6 +76,7 @@ type MultiBeaconClient struct {

// feature flags
ffAllowSyncingBeaconNode bool
ffBroadcastValidation BroadcastValidation
}

func NewMultiBeaconClient(log *logrus.Entry, beaconInstances []IBeaconInstance) *MultiBeaconClient {
Expand All @@ -72,6 +85,7 @@ func NewMultiBeaconClient(log *logrus.Entry, beaconInstances []IBeaconInstance)
beaconInstances: beaconInstances,
bestBeaconIndex: *uberatomic.NewInt64(0),
ffAllowSyncingBeaconNode: false,
ffBroadcastValidation: ConsensusAndEquivocation,
}

// feature flags
Expand All @@ -80,6 +94,18 @@ func NewMultiBeaconClient(log *logrus.Entry, beaconInstances []IBeaconInstance)
client.ffAllowSyncingBeaconNode = true
}

if os.Getenv("BROADCAST_VALIDATION") != "" {
broadcastValidationStr := os.Getenv("BROADCAST_VALIDATION")
broadcastValidation, ok := parseBroadcastValidationString(broadcastValidationStr)
if !ok {
msg := fmt.Sprintf("env: BROADCAST_VALIDATION: invalid value %s, leaving to default value %s", broadcastValidationStr, client.ffBroadcastValidation.String())
client.log.Warn(msg)
} else {
client.log.Info(fmt.Sprintf("env: BROADCAST_VALIDATION: setting validation to %s", broadcastValidation.String()))
client.ffBroadcastValidation = broadcastValidation
}
}

return client
}

Expand Down Expand Up @@ -243,7 +269,7 @@ func (c *MultiBeaconClient) PublishBlock(block *common.SignedBeaconBlock) (code
log := log.WithField("uri", client.GetURI())
log.Debug("publishing block")
go func(index int, client IBeaconInstance) {
code, err := client.PublishBlock(block)
code, err := client.PublishBlock(block, c.ffBroadcastValidation)
resChans <- publishResp{
index: index,
code: code,
Expand Down
32 changes: 17 additions & 15 deletions beaconclient/prod_beacon_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type ValidatorResponseValidatorData struct {
func (c *ProdBeaconInstance) GetStateValidators(stateID string) (*GetStateValidatorsResponse, error) {
uri := fmt.Sprintf("%s/eth/v1/beacon/states/%s/validators?status=active,pending", c.beaconURI, stateID)
vd := new(GetStateValidatorsResponse)
_, err := fetchBeacon(http.MethodGet, uri, nil, vd, nil)
_, err := fetchBeacon(http.MethodGet, uri, nil, vd, nil, http.Header{})
return vd, err
}

Expand All @@ -159,7 +159,7 @@ func (c *ProdBeaconInstance) SyncStatus() (*SyncStatusPayloadData, error) {
uri := c.beaconURI + "/eth/v1/node/syncing"
timeout := 5 * time.Second
resp := new(SyncStatusPayload)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, &timeout)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, &timeout, http.Header{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ type ProposerDutiesResponseData struct {
func (c *ProdBeaconInstance) GetProposerDuties(epoch uint64) (*ProposerDutiesResponse, error) {
uri := fmt.Sprintf("%s/eth/v1/validator/duties/proposer/%d", c.beaconURI, epoch)
resp := new(ProposerDutiesResponse)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

Expand All @@ -212,15 +212,15 @@ type GetHeaderResponseMessage struct {
func (c *ProdBeaconInstance) GetHeader() (*GetHeaderResponse, error) {
uri := fmt.Sprintf("%s/eth/v1/beacon/headers/head", c.beaconURI)
resp := new(GetHeaderResponse)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

// GetHeaderForSlot returns the header for a given slot - https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockHeader
func (c *ProdBeaconInstance) GetHeaderForSlot(slot uint64) (*GetHeaderResponse, error) {
uri := fmt.Sprintf("%s/eth/v1/beacon/headers/%d", c.beaconURI, slot)
resp := new(GetHeaderResponse)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

Expand All @@ -240,25 +240,27 @@ type GetBlockResponse struct {
func (c *ProdBeaconInstance) GetBlock(blockID string) (block *GetBlockResponse, err error) {
uri := fmt.Sprintf("%s/eth/v2/beacon/blocks/%s", c.beaconURI, blockID)
resp := new(GetBlockResponse)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

// GetBlockForSlot returns the block for a given slot - https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockV2
func (c *ProdBeaconInstance) GetBlockForSlot(slot uint64) (*GetBlockResponse, error) {
uri := fmt.Sprintf("%s/eth/v2/beacon/blocks/%d", c.beaconURI, slot)
resp := new(GetBlockResponse)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

func (c *ProdBeaconInstance) GetURI() string {
return c.beaconURI
}

func (c *ProdBeaconInstance) PublishBlock(block *common.SignedBeaconBlock) (code int, err error) {
uri := fmt.Sprintf("%s/eth/v1/beacon/blocks", c.beaconURI)
return fetchBeacon(http.MethodPost, uri, block, nil, nil)
func (c *ProdBeaconInstance) PublishBlock(block *common.SignedBeaconBlock, broadcastValidation BroadcastValidation) (code int, err error) {
uri := fmt.Sprintf("%s/eth/v2/beacon/blocks?broadcast_validation=%s", c.beaconURI, broadcastValidation.String())
headers := http.Header{}
headers.Add("Eth-Consensus-Version", common.ForkVersionStringCapella)
return fetchBeacon(http.MethodPost, uri, block, nil, nil, headers)
}

type GetGenesisResponse struct {
Expand All @@ -275,7 +277,7 @@ type GetGenesisResponseData struct {
func (c *ProdBeaconInstance) GetGenesis() (*GetGenesisResponse, error) {
uri := fmt.Sprintf("%s/eth/v1/beacon/genesis", c.beaconURI)
resp := new(GetGenesisResponse)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err := fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

Expand All @@ -292,7 +294,7 @@ type GetSpecResponse struct {
func (c *ProdBeaconInstance) GetSpec() (spec *GetSpecResponse, err error) {
uri := fmt.Sprintf("%s/eth/v1/config/spec", c.beaconURI)
resp := new(GetSpecResponse)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

Expand All @@ -308,7 +310,7 @@ type GetForkScheduleResponse struct {
func (c *ProdBeaconInstance) GetForkSchedule() (spec *GetForkScheduleResponse, err error) {
uri := fmt.Sprintf("%s/eth/v1/config/fork_schedule", c.beaconURI)
resp := new(GetForkScheduleResponse)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

Expand All @@ -322,7 +324,7 @@ type GetRandaoResponse struct {
func (c *ProdBeaconInstance) GetRandao(slot uint64) (randaoResp *GetRandaoResponse, err error) {
uri := fmt.Sprintf("%s/eth/v1/beacon/states/%d/randao", c.beaconURI, slot)
resp := new(GetRandaoResponse)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}

Expand All @@ -336,6 +338,6 @@ type GetWithdrawalsResponse struct {
func (c *ProdBeaconInstance) GetWithdrawals(slot uint64) (withdrawalsResp *GetWithdrawalsResponse, err error) {
uri := fmt.Sprintf("%s/eth/v1/beacon/states/%d/withdrawals", c.beaconURI, slot)
resp := new(GetWithdrawalsResponse)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil)
_, err = fetchBeacon(http.MethodGet, uri, nil, resp, nil, http.Header{})
return resp, err
}
16 changes: 15 additions & 1 deletion beaconclient/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"time"
)

Expand All @@ -19,7 +20,17 @@ var (
StateIDJustified = "justified"
)

func fetchBeacon(method, url string, payload, dst any, timeout *time.Duration) (code int, err error) {
func parseBroadcastValidationString(s string) (BroadcastValidation, bool) {
broadcastValidationMap := map[string]BroadcastValidation{
"gossip": Gossip,
"consensus": Consensus,
"consensus_and_equivocation": ConsensusAndEquivocation,
}
b, ok := broadcastValidationMap[strings.ToLower(s)]
return b, ok
}

func fetchBeacon(method, url string, payload, dst any, timeout *time.Duration, headers http.Header) (code int, err error) {
var req *http.Request

if payload == nil {
Expand All @@ -33,6 +44,9 @@ func fetchBeacon(method, url string, payload, dst any, timeout *time.Duration) (

// Set content-type
req.Header.Add("Content-Type", "application/json")
for k, v := range headers {
req.Header.Add(k, v[0])
}
}

if err != nil {
Expand Down

0 comments on commit ad6ba49

Please sign in to comment.