Skip to content

Commit

Permalink
Fetch and skip sync events (#10051)
Browse files Browse the repository at this point in the history
For period where there are not many sync events (mostly testnets) sync
event fecthing can be slow becuase sync events are fetched at the end of
every sprint.

Fetching the next and looking at its block number optimizes this because
fetches can be skipped until the next known block with sync events.
  • Loading branch information
mh0lt authored Apr 27, 2024
1 parent ab0f633 commit 5d92302
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 20 deletions.
4 changes: 4 additions & 0 deletions cmd/devnet/services/polygon/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func (h *Heimdall) FetchStateSyncEvents(ctx context.Context, fromID uint64, to t
return nil, fmt.Errorf("TODO")
}

func (h *Heimdall) FetchStateSyncEvent(ctx context.Context, id uint64) (*heimdall.EventRecordWithTime, error) {
return nil, fmt.Errorf("TODO")
}

func (h *Heimdall) Close() {
h.unsubscribe()
}
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/bor_heimdall_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func fetchAndWriteHeimdallStateSyncEvents(

from = lastStateSyncEventID + 1

logger.Debug(
logger.Trace(
fmt.Sprintf("[%s] Fetching state updates from Heimdall", logPrefix),
"fromID", from,
"to", to.Format(time.RFC3339),
Expand Down
90 changes: 74 additions & 16 deletions eth/stagedsync/stage_bor_heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"sort"
"time"
Expand All @@ -14,10 +15,11 @@ import (
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/chain/networkname"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/accounts/abi"
"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/dataflow"
Expand Down Expand Up @@ -90,6 +92,8 @@ func StageBorHeimdallCfg(
}
}

var lastMumbaiEventRecord *heimdall.EventRecordWithTime

func BorHeimdallForward(
s *StageState,
u Unwinder,
Expand Down Expand Up @@ -168,6 +172,8 @@ func BorHeimdallForward(
var fetchTime time.Duration
var snapTime time.Duration
var snapInitTime time.Duration
var syncEventTime time.Duration

var eventRecords int

lastSpanID, err := fetchRequiredHeimdallSpansIfNeeded(ctx, headNumber, tx, cfg, s.LogPrefix(), logger)
Expand All @@ -185,6 +191,9 @@ func BorHeimdallForward(
defer logTimer.Stop()

logger.Info(fmt.Sprintf("[%s] Processing sync events...", s.LogPrefix()), "from", lastBlockNum+1, "to", headNumber)

var nextEventRecord *heimdall.EventRecordWithTime

for blockNum = lastBlockNum + 1; blockNum <= headNumber; blockNum++ {
select {
default:
Expand All @@ -195,7 +204,8 @@ func BorHeimdallForward(
"lastSpanID", lastSpanID,
"lastStateSyncEventID", lastStateSyncEventID,
"total records", eventRecords,
"fetch time", fetchTime,
"sync-events", syncEventTime,
"sync-event-fetch", fetchTime,
"snaps", snapTime,
"snap-init", snapInitTime,
"process time", time.Since(processStart),
Expand Down Expand Up @@ -291,23 +301,70 @@ func BorHeimdallForward(
return err
}

syncEventStart := time.Now()

var callTime time.Duration
var records int
lastStateSyncEventID, records, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
cfg,
s.LogPrefix(),
logger,
lastStateSyncEventID,
)
if err != nil {
return err

var endStateSyncEventId uint64

// mumbai event records have stopped being produced as of march 2024
// as part of the goerli decom - so there is no point trying to
// fetch them
if cfg.chainConfig.ChainName == networkname.MumbaiChainName {
if nextEventRecord == nil {
nextEventRecord = lastMumbaiEventRecord
}
}

if nextEventRecord == nil || header.Time > uint64(nextEventRecord.Time.Unix()) {
var records int

if lastStateSyncEventID == 0 || lastStateSyncEventID != endStateSyncEventId {
lastStateSyncEventID, records, callTime, err = fetchRequiredHeimdallStateSyncEventsIfNeeded(
ctx,
header,
tx,
cfg,
s.LogPrefix(),
logger,
lastStateSyncEventID,
)

if err != nil {
return err
}
}

if records != 0 {
nextEventRecord = nil
eventRecords += records
} else {
if nextEventRecord == nil || nextEventRecord.ID <= lastStateSyncEventID {
if eventRecord, err := cfg.heimdallClient.FetchStateSyncEvent(ctx, lastStateSyncEventID+1); err == nil {
nextEventRecord = eventRecord
endStateSyncEventId = 0
} else {
if !errors.Is(err, heimdall.ErrEventRecordNotFound) {
return err
}

if cfg.chainConfig.ChainName == networkname.MumbaiChainName && lastStateSyncEventID == 276850 {
lastMumbaiEventRecord = &heimdall.EventRecordWithTime{
EventRecord: heimdall.EventRecord{
ID: 276851,
},
Time: time.Unix(math.MaxInt64, 0),
}
}

endStateSyncEventId = lastStateSyncEventID
}
}
}
}

eventRecords += records
fetchTime += callTime
syncEventTime = syncEventTime + time.Since(syncEventStart)

if cfg.loopBreakCheck != nil && cfg.loopBreakCheck(int(blockNum-lastBlockNum)) {
headNumber = blockNum
Expand All @@ -331,6 +388,7 @@ func BorHeimdallForward(
"lastSpanID", lastSpanID,
"lastStateSyncEventID", lastStateSyncEventID,
"total records", eventRecords,
"sync event time", syncEventTime,
"fetch time", fetchTime,
"snap time", snapTime,
"process time", time.Since(processStart),
Expand Down Expand Up @@ -455,7 +513,7 @@ func persistValidatorSets(
var err error
if snap, err = snap.Apply(parent, headers, logger); err != nil {
if snap != nil {
var badHash common.Hash
var badHash libcommon.Hash
for _, header := range headers {
if header.Number.Uint64() == snap.Number+1 {
badHash = header.Hash()
Expand Down
7 changes: 7 additions & 0 deletions eth/stagedsync/stagedsynctest/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,13 @@ func (h *Harness) mockHeimdallClient() {
return []*heimdall.EventRecordWithTime{&newEvent}, nil
}).
AnyTimes()
h.heimdallClient.
EXPECT().
FetchStateSyncEvent(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, _ uint64) (*heimdall.EventRecordWithTime, error) {
return nil, heimdall.ErrEventRecordNotFound
}).
AnyTimes()
}

func (h *Harness) runSyncStageForwardWithErrorIs(
Expand Down
4 changes: 4 additions & 0 deletions polygon/bor/bor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (h test_heimdall) FetchStateSyncEvents(ctx context.Context, fromID uint64,
return nil, nil
}

func (h *test_heimdall) FetchStateSyncEvent(ctx context.Context, id uint64) (*heimdall.EventRecordWithTime, error) {
return nil, nil
}

func (h *test_heimdall) FetchSpan(ctx context.Context, spanID uint64) (*heimdall.Span, error) {

if span, ok := h.spans[heimdall.SpanId(spanID)]; ok {
Expand Down
38 changes: 35 additions & 3 deletions polygon/heimdall/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
//go:generate mockgen -destination=./client_mock.go -package=heimdall . HeimdallClient
type HeimdallClient interface {
FetchStateSyncEvents(ctx context.Context, fromId uint64, to time.Time, limit int) ([]*EventRecordWithTime, error)
FetchStateSyncEvent(ctx context.Context, id uint64) (*EventRecordWithTime, error)

FetchLatestSpan(ctx context.Context) (*Span, error)
FetchSpan(ctx context.Context, spanID uint64) (*Span, error)
Expand Down Expand Up @@ -106,6 +107,7 @@ func newHeimdallClient(urlString string, httpClient HttpClient, retryBackOff tim
const (
fetchStateSyncEventsFormat = "from-id=%d&to-time=%d&limit=%d"
fetchStateSyncEventsPath = "clerk/event-record/list"
fetchStateSyncEvent = "clerk/event-record/%s"

fetchCheckpoint = "/checkpoints/%s"
fetchCheckpointCount = "/checkpoints/count"
Expand All @@ -130,12 +132,12 @@ func (c *Client) FetchStateSyncEvents(ctx context.Context, fromID uint64, to tim
eventRecords := make([]*EventRecordWithTime, 0)

for {
url, err := stateSyncURL(c.urlString, fromID, to.Unix())
url, err := stateSyncListURL(c.urlString, fromID, to.Unix())
if err != nil {
return nil, err
}

c.logger.Debug(heimdallLogPrefix("Fetching state sync events"), "queryParams", url.RawQuery)
c.logger.Trace(heimdallLogPrefix("Fetching state sync events"), "queryParams", url.RawQuery)

ctx = withRequestType(ctx, stateSyncRequest)

Expand Down Expand Up @@ -173,6 +175,32 @@ func (c *Client) FetchStateSyncEvents(ctx context.Context, fromID uint64, to tim
return eventRecords, nil
}

func (c *Client) FetchStateSyncEvent(ctx context.Context, id uint64) (*EventRecordWithTime, error) {
url, err := stateSyncURL(c.urlString, id)

if err != nil {
return nil, err
}

ctx = withRequestType(ctx, stateSyncRequest)

isRecoverableError := func(err error) bool {
return !strings.Contains(err.Error(), "could not get state record; No record found")
}

response, err := FetchWithRetryEx[StateSyncEventResponse](ctx, c, url, isRecoverableError, c.logger)

if err != nil {
if strings.Contains(err.Error(), "could not get state record; No record found") {
return nil, ErrEventRecordNotFound
}

return nil, err
}

return &response.Result, nil
}

func (c *Client) FetchLatestSpan(ctx context.Context) (*Span, error) {
url, err := latestSpanURL(c.urlString)
if err != nil {
Expand Down Expand Up @@ -457,12 +485,16 @@ func latestSpanURL(urlString string) (*url.URL, error) {
return makeURL(urlString, fetchSpanLatest, "")
}

func stateSyncURL(urlString string, fromID uint64, to int64) (*url.URL, error) {
func stateSyncListURL(urlString string, fromID uint64, to int64) (*url.URL, error) {
queryParams := fmt.Sprintf(fetchStateSyncEventsFormat, fromID, to, stateFetchLimit)

return makeURL(urlString, fetchStateSyncEventsPath, queryParams)
}

func stateSyncURL(urlString string, id uint64) (*url.URL, error) {
return makeURL(urlString, fmt.Sprintf(fetchStateSyncEvent, fmt.Sprint(id)), "")
}

func checkpointURL(urlString string, number int64) (*url.URL, error) {
url := ""
if number == -1 {
Expand Down
15 changes: 15 additions & 0 deletions polygon/heimdall/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions polygon/heimdall/event_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type EventRecordWithTime struct {
Time time.Time `json:"record_time" yaml:"record_time"`
}

var ErrEventRecordNotFound = fmt.Errorf("event record not found")

// String returns the string representatin of a state record
func (e *EventRecordWithTime) String() string {
return fmt.Sprintf(
Expand Down Expand Up @@ -78,3 +80,8 @@ type StateSyncEventsResponse struct {
Height string `json:"height"`
Result []*EventRecordWithTime `json:"result"`
}

type StateSyncEventResponse struct {
Height string `json:"height"`
Result EventRecordWithTime `json:"result"`
}

0 comments on commit 5d92302

Please sign in to comment.