Skip to content

Commit

Permalink
Added fill-up with pre-existing API (#9042)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Dec 21, 2023
1 parent 393dd18 commit 0e18866
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 2 deletions.
51 changes: 51 additions & 0 deletions cl/phase1/core/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package core

import (
"context"
"encoding/binary"
"fmt"
"io"
"net/http"

"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/log/v3"
Expand Down Expand Up @@ -48,3 +51,51 @@ func RetrieveBeaconState(ctx context.Context, beaconConfig *clparams.BeaconChain
}
return beaconState, nil
}

func RetrieveBlock(ctx context.Context, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, uri string, expectedBlockRoot *libcommon.Hash) (*cltypes.SignedBeaconBlock, error) {
log.Debug("[Checkpoint Sync] Requesting beacon block", "uri", uri)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, uri, nil)
if err != nil {
return nil, err
}

req.Header.Set("Accept", "application/octet-stream")
if err != nil {
return nil, fmt.Errorf("checkpoint sync request failed %s", err)
}
r, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer func() {
err = r.Body.Close()
}()
if r.StatusCode != http.StatusOK {
return nil, fmt.Errorf("checkpoint sync failed, bad status code %d", r.StatusCode)
}
marshaled, err := io.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("checkpoint sync read failed %s", err)
}
if len(marshaled) < 108 {
return nil, fmt.Errorf("checkpoint sync read failed, too short")
}
currentSlot := binary.LittleEndian.Uint64(marshaled[100:108])
v := beaconConfig.GetCurrentStateVersion(currentSlot / beaconConfig.SlotsPerEpoch)

block := cltypes.NewSignedBeaconBlock(beaconConfig)
err = block.DecodeSSZ(marshaled, int(v))
if err != nil {
return nil, fmt.Errorf("checkpoint sync decode failed %s", err)
}
if expectedBlockRoot != nil {
has, err := block.Block.HashSSZ()
if err != nil {
return nil, fmt.Errorf("checkpoint sync decode failed %s", err)
}
if has != *expectedBlockRoot {
return nil, fmt.Errorf("checkpoint sync decode failed, unexpected block root %s", has)
}
}
return block, nil
}
126 changes: 124 additions & 2 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"context"
"fmt"
"math"
"net/url"
"os"
"strings"
"time"

"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/turbo/debug"

lg "github.com/anacrolix/log"
libcommon "github.com/ledgerwatch/erigon-lib/common"

lg "github.com/anacrolix/log"
"github.com/ledgerwatch/erigon-lib/direct"
downloader3 "github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/metrics"
Expand All @@ -30,6 +32,7 @@ import (
persistence2 "github.com/ledgerwatch/erigon/cl/persistence"
"github.com/ledgerwatch/erigon/cmd/caplin/caplin1"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks"
Expand Down Expand Up @@ -76,6 +79,7 @@ var CLI struct {
DownloadSnapshots DownloadSnapshots `cmd:"" help:"download snapshots from webseed"`
LoopSnapshots LoopSnapshots `cmd:"" help:"loop over snapshots"`
RetrieveHistoricalState RetrieveHistoricalState `cmd:"" help:"retrieve historical state from db"`
ChainEndpoint ChainEndpoint `cmd:"" help:"chain endpoint"`
}

type chainCfg struct {
Expand Down Expand Up @@ -437,6 +441,124 @@ func (c *Chain) Run(ctx *Context) error {
return stages.SpawnStageHistoryDownload(cfg, ctx, log.Root())
}

type ChainEndpoint struct {
Endpoint string `help:"endpoint" default:""`
chainCfg
outputFolder
}

func (c *ChainEndpoint) Run(ctx *Context) error {
genesisConfig, _, beaconConfig, _, err := clparams.GetConfigsByNetworkName(c.Chain)
if err != nil {
return err
}
log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))

dirs := datadir.New(c.Datadir)
rawDB, _ := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory)
beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false)
if err != nil {
return err
}
defer db.Close()

baseUri, err := url.JoinPath(c.Endpoint, "eth/v2/beacon/blocks")
if err != nil {
return err
}
log.Info("Hooked", "uri", baseUri)
// Let's fetch the head first
currentBlock, err := core.RetrieveBlock(ctx, beaconConfig, genesisConfig, fmt.Sprintf("%s/head", baseUri), nil)
if err != nil {
return err
}
currentRoot, err := currentBlock.Block.HashSSZ()
if err != nil {
return err
}
tx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()

log.Info("Starting with", "root", libcommon.Hash(currentRoot), "slot", currentBlock.Block.Slot)
currentRoot = currentBlock.Block.ParentRoot
if err := beaconDB.WriteBlock(ctx, tx, currentBlock, true); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
previousLogBlock := currentBlock.Block.Slot

logInterval := time.NewTicker(30 * time.Second)
defer logInterval.Stop()

loopStep := func() (bool, error) {
tx, err := db.BeginRw(ctx)
if err != nil {
return false, err
}
defer tx.Rollback()

stringifiedRoot := common.Bytes2Hex(currentRoot[:])
// Let's fetch the head first
currentBlock, err := core.RetrieveBlock(ctx, beaconConfig, genesisConfig, fmt.Sprintf("%s/0x%s", baseUri, stringifiedRoot), (*libcommon.Hash)(&currentRoot))
if err != nil {
return false, err
}
currentRoot, err = currentBlock.Block.HashSSZ()
if err != nil {
return false, err
}
if err := beaconDB.WriteBlock(ctx, tx, currentBlock, true); err != nil {
return false, err
}
currentRoot = currentBlock.Block.ParentRoot
currentSlot := currentBlock.Block.Slot
// it will stop if we end finding a gap or if we reach the maxIterations
for {
// check if the expected root is in db
slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, currentRoot)
if err != nil {
return false, err
}
if slot == nil || *slot == 0 {
break
}
if err := beacon_indicies.MarkRootCanonical(ctx, tx, *slot, currentRoot); err != nil {
return false, err
}
currentRoot, err = beacon_indicies.ReadParentBlockRoot(ctx, tx, currentRoot)
if err != nil {
return false, err
}
}
if err := tx.Commit(); err != nil {
return false, err
}
select {
case <-logInterval.C:
// up to 2 decimal places
rate := float64(previousLogBlock-currentSlot) / 30
log.Info("Successfully processed", "slot", currentSlot, "blk/sec", fmt.Sprintf("%.2f", rate))
previousLogBlock = currentBlock.Block.Slot
case <-ctx.Done():
default:
}
return currentSlot != 0, nil
}
var keepGoing bool
for keepGoing, err = loopStep(); keepGoing && err == nil; keepGoing, err = loopStep() {
if !keepGoing {
break
}
}

return err
}

type DumpSnapshots struct {
chainCfg
outputFolder
Expand Down

0 comments on commit 0e18866

Please sign in to comment.