Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use engine api get-blobs for block subscriber #14513

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Updating to this release is recommended at your convenience.
- fastssz version bump (better error messages).
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)
- Added GetPoolAttesterSlashingsV2 endpoint.
- Use engine api get-blobs for block subscriber

### Changed

Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/execution/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand Down Expand Up @@ -105,8 +106,11 @@ go_test(
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/execution/testing:go_default_library",
"//beacon-chain/execution/types:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/forkchoice/doubly-linked-tree:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand Down
127 changes: 123 additions & 4 deletions beacon-chain/execution/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"slices"
"strings"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/holiman/uint256"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,6 +82,8 @@ const (
GetPayloadBodiesByRangeV1 = "engine_getPayloadBodiesByRangeV1"
// ExchangeCapabilities request string for JSON-RPC.
ExchangeCapabilities = "engine_exchangeCapabilities"
// GetBlobsV1 request string for JSON-RPC.
GetBlobsV1 = "engine_getBlobsV1"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
Expand All @@ -93,16 +98,15 @@ type ForkchoiceUpdatedResponse struct {
ValidationError string `json:"validationError"`
}

// PayloadReconstructor defines a service that can reconstruct a full beacon
// block with an execution payload from a signed beacon block and a connection
// to an execution client's engine API.
type PayloadReconstructor interface {
// Reconstructor defines a service responsible for reconstructing full beacon chain objects by utilizing the execution API and making requests through the execution client.
type Reconstructor interface {
ReconstructFullBlock(
ctx context.Context, blindedBlock interfaces.ReadOnlySignedBeaconBlock,
) (interfaces.SignedBeaconBlock, error)
ReconstructFullBellatrixBlockBatch(
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices [6]bool) ([]blocks.VerifiedROBlob, error)
}

// EngineCaller defines a client that can interact with an Ethereum
Expand Down Expand Up @@ -494,6 +498,23 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H
return hdr, err
}

// GetBlobs returns the blob and proof from the execution engine for the given versioned hashes.
func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs")
defer span.End()
// If the execution engine does not support `GetBlobsV1`, return early to prevent encountering an error later.
s.capabilitiesLock.RLock()
if !slices.Contains(s.capabilities, GetBlobsV1) {
s.capabilitiesLock.RUnlock()
return nil, nil
}
s.capabilitiesLock.RUnlock()

result := make([]*pb.BlobAndProof, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the json methodset defined on this type, you probably need to define unmarshaling for the encoding to come through corrrectly.

return result, handleRPCError(err)
}

// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
Expand Down Expand Up @@ -522,6 +543,104 @@ func (s *Service) ReconstructFullBellatrixBlockBatch(
return unb, nil
}

// ReconstructBlobSidecars reconstructs the verified blob sidecars for a given beacon block.
// It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs,
// and constructs the corresponding verified read-only blob sidecars.
//
// The 'exists' argument is a boolean array of length 6, where each element corresponds to whether a
// particular blob sidecar already exists. If exists[i] is true, the blob for the i-th KZG commitment
// has already been retrieved and does not need to be fetched again from the execution layer (EL).
//
// For example:
// - If exists = [true, false, true, false, true, false], the function will fetch the blobs
// associated with indices 1, 3, and 5 (since those are marked as non-existent).
// - If exists = [false ... x 6], the function will attempt to fetch all blobs.
//
// Only the blobs that do not already exist (where exists[i] is false) are fetched using the KZG commitments from block body.
func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, exists [6]bool) ([]blocks.VerifiedROBlob, error) {
blockBody := block.Block().Body()
kzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "could not get blob KZG commitments")
}

// Collect KZG hashes for non-existing blobs
var kzgHashes []common.Hash
for i, commitment := range kzgCommitments {
if !exists[i] {
kzgHashes = append(kzgHashes, primitives.ConvertKzgCommitmentToVersionedHash(commitment))
}
}
if len(kzgHashes) == 0 {
return nil, nil
}

// Fetch blobs from EL
blobs, err := s.GetBlobs(ctx, kzgHashes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check if the EL supports this API (cached exchange capabilities result?) before attempting to call it? Otherwise the logs could be noisy for anyone who upgrades to this version with an EL that does not support getBlobs.

if err != nil {
return nil, errors.Wrap(err, "could not get blobs")
}
if blobs == nil {
return nil, nil
}

header, err := block.Header()
if err != nil {
return nil, errors.Wrap(err, "could not get header")
}

// Reconstruct verify blob sidecars
var verifiedBlobs []blocks.VerifiedROBlob
for i, blobIndex := 0, 0; i < len(kzgCommitments); i++ {
if exists[i] {
continue
}

blob := blobs[blobIndex]
blobIndex++
if blob == nil {
continue
}

proof, err := blocks.MerkleProofKZGCommitment(blockBody, i)
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to get Merkle proof for KZG commitment")
continue
}
sidecar := &ethpb.BlobSidecar{
Index: uint64(i),
Blob: blob.Blob,
KzgCommitment: kzgCommitments[i],
KzgProof: blob.KzgProof,
SignedBlockHeader: header,
CommitmentInclusionProof: proof,
}

roBlob, err := blocks.NewROBlobWithRoot(sidecar, blockRoot)
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to create RO blob with root")
continue
}

// Verify the sidecar KZG proof
v := s.blobVerifier(roBlob, verification.ELMemPoolRequirements)
if err := v.SidecarKzgProofVerified(); err != nil {
log.WithError(err).WithField("index", i).Error("failed to verify KZG proof for sidecar")
continue
}

verifiedBlob, err := v.VerifiedROBlob()
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to verify RO blob")
continue
}

verifiedBlobs = append(verifiedBlobs, verifiedBlob)
}

return verifiedBlobs, nil
}

func fullPayloadFromPayloadBody(
header interfaces.ExecutionData, body *pb.ExecutionPayloadBody, bVersion int,
) (interfaces.ExecutionData, error) {
Expand Down
111 changes: 109 additions & 2 deletions beacon-chain/execution/engine_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package execution

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/holiman/uint256"
"github.com/pkg/errors"
mocks "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand All @@ -37,9 +39,9 @@ import (
)

var (
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&Service{})
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&mocks.EngineClient{})
)

Expand Down Expand Up @@ -2390,3 +2392,108 @@ func Test_ExchangeCapabilities(t *testing.T) {
}
})
}

func TestReconstructBlobSidecars(t *testing.T) {
client := &Service{}
b := util.NewBeaconBlockDeneb()
kzgCommitments := createRandomKzgCommitments(t, 6)

b.Block.Body.BlobKzgCommitments = kzgCommitments
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)

ctx := context.Background()
t.Run("all seen", func(t *testing.T) {
exists := [6]bool{true, true, true, true, true, true}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 0, len(verifiedBlobs))
})

t.Run("get-blobs end point is not supported", func(t *testing.T) {
exists := [6]bool{true, true, true, true, true, false}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 0, len(verifiedBlobs))
})

t.Run("recovered 6 missing blobs", func(t *testing.T) {
srv := createBlobServer(t, 6)
defer srv.Close()

rpcClient, client := setupRpcClient(t, srv.URL, client)
defer rpcClient.Close()

exists := [6]bool{}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 6, len(verifiedBlobs))
})

t.Run("recovered 3 missing blobs", func(t *testing.T) {
srv := createBlobServer(t, 3)
defer srv.Close()

rpcClient, client := setupRpcClient(t, srv.URL, client)
defer rpcClient.Close()

exists := [6]bool{true, false, true, false, true, false}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 3, len(verifiedBlobs))
})
}

func createRandomKzgCommitments(t *testing.T, num int) [][]byte {
kzgCommitments := make([][]byte, num)
for i := range kzgCommitments {
kzgCommitments[i] = make([]byte, 48)
_, err := rand.Read(kzgCommitments[i])
require.NoError(t, err)
}
return kzgCommitments
}

func createBlobServer(t *testing.T, numBlobs int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
defer func() {
require.NoError(t, r.Body.Close())
}()

blobs := make([]pb.BlobAndProofJson, numBlobs)
for i := range blobs {
blobs[i] = pb.BlobAndProofJson{Blob: []byte(fmt.Sprintf("blob%d", i+1)), KzgProof: []byte(fmt.Sprintf("proof%d", i+1))}
}

respJSON := map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"result": blobs,
}
require.NoError(t, json.NewEncoder(w).Encode(respJSON))
}))
}

func setupRpcClient(t *testing.T, url string, client *Service) (*rpc.Client, *Service) {
rpcClient, err := rpc.DialHTTP(url)
require.NoError(t, err)

client.rpcClient = rpcClient
client.capabilities = []string{GetBlobsV1}
client.blobVerifier = testNewBlobVerifier()

return rpcClient, client
}

func testNewBlobVerifier() verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return &verification.MockBlobVerifier{
CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
return blocks.VerifiedROBlob{}, nil
},
}
}
}
9 changes: 9 additions & 0 deletions beacon-chain/execution/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/network"
"github.com/prysmaticlabs/prysm/v5/network/authorization"
)
Expand Down Expand Up @@ -115,3 +116,11 @@ func WithJwtId(jwtId string) Option {
return nil
}
}

// WithVerifierWaiter gives the sync package direct access to the verifier waiter.
func WithVerifierWaiter(v *verification.InitializerWaiter) Option {
return func(s *Service) error {
s.verifierWaiter = v
return nil
}
}
9 changes: 9 additions & 0 deletions beacon-chain/execution/rpc_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func (s *Service) pollConnectionStatus(ctx context.Context) {
currClient.Close()
}
log.WithField("endpoint", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url)).Info("Connected to new endpoint")

c, err := s.ExchangeCapabilities(ctx)
if err != nil {
errorLogger(err, "Could not exchange capabilities with execution client")
}
s.capabilitiesLock.Lock()
s.capabilities = c
s.capabilitiesLock.Unlock()

return
case <-s.ctx.Done():
log.Debug("Received cancelled context,closing existing powchain service")
Expand Down
Loading
Loading