Skip to content

Commit

Permalink
feat: fill transaction index gap on enabling automatic backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Aug 7, 2024
1 parent 4b75fa8 commit 0ec35b0
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 4 deletions.
22 changes: 22 additions & 0 deletions chain/ethhashlookup/eth_transaction_hash_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
getCidFromHash = `SELECT cid FROM eth_tx_hashes WHERE hash = ?`
getHashFromCid = `SELECT hash FROM eth_tx_hashes WHERE cid = ?`
deleteOlderThan = `DELETE FROM eth_tx_hashes WHERE insertion_time < datetime('now', ?);`
getLastTxHash = `SELECT hash, cid FROM eth_tx_hashes ORDER BY insertion_time DESC LIMIT 1`
)

type EthTxHashLookup struct {
Expand All @@ -42,6 +43,7 @@ type EthTxHashLookup struct {
stmtGetCidFromHash *sql.Stmt
stmtGetHashFromCid *sql.Stmt
stmtDeleteOlderThan *sql.Stmt
stmtGetLastTxHash *sql.Stmt
}

func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLookup, error) {
Expand Down Expand Up @@ -82,6 +84,10 @@ func (ei *EthTxHashLookup) initStatements() (err error) {
if err != nil {
return xerrors.Errorf("prepare stmtDeleteOlderThan: %w", err)
}
ei.stmtGetLastTxHash, err = ei.db.Prepare(getLastTxHash)
if err != nil {
return xerrors.Errorf("prepare stmtGetMostRecentTxHash: %w", err)
}
return nil
}

Expand Down Expand Up @@ -128,6 +134,22 @@ func (ei *EthTxHashLookup) GetHashFromCid(c cid.Cid) (ethtypes.EthHash, error) {
return ethtypes.ParseEthHash(hashString)
}

// GetLastTransaction returns the most recent transaction hash and cid from the database based on insertion_time
func (ei *EthTxHashLookup) GetLastTransaction() (string, cid.Cid, error) {
if ei.db == nil {
return "", cid.Undef, xerrors.New("db closed")
}

var hashStr, cidStr string
err := ei.stmtGetLastTxHash.QueryRow().Scan(&hashStr, &cidStr)
if err != nil {
return "", cid.Undef, err
}

c, err := cid.Decode(cidStr)
return hashStr, c, err
}

func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) {
if ei.db == nil {
return 0, xerrors.New("db closed")
Expand Down
2 changes: 1 addition & 1 deletion node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func ConfigFullNode(c interface{}) Option {

If(cfg.Fevm.EnableEthRPC,
Override(new(*full.EthEventHandler), modules.EthEventHandler(cfg.Events, cfg.Fevm.EnableEthRPC)),
Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm)),
Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm, cfg.Index.EnableAutomaticBackFill)),
Override(new(full.EthEventAPI), From(new(*full.EthEventHandler))),
),
If(!cfg.Fevm.EnableEthRPC,
Expand Down
5 changes: 4 additions & 1 deletion node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type ApisConfig struct {
}

type JournalConfig struct {
//Events of the form: "system1:event1,system1:event2[,...]"
// Events of the form: "system1:event1,system1:event2[,...]"
DisabledEvents string
}

Expand Down Expand Up @@ -621,6 +621,9 @@ type IndexConfig struct {
// EXPERIMENTAL FEATURE. USE WITH CAUTION
// EnableMsgIndex enables indexing of messages on chain.
EnableMsgIndex bool

// EnableAutomaticBackFill enables automatic index back-filling
EnableAutomaticBackFill bool
}

type HarmonyDB struct {
Expand Down
43 changes: 43 additions & 0 deletions node/impl/full/txhashmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,49 @@ func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) e
return nil
}

// FillIndexGap back-fills the gap between the current head and the last populated transaction index
func (m *EthTxHashManager) FillIndexGap(ctx context.Context) error {
lastTxHash, lastTxCid, err := m.TransactionHashLookup.GetLastTransaction()
if err != nil {
return err
}

log.Infof("Start filling transaction index gap from: %s", lastTxHash)

totalProcessedBlock := 0
ts := m.StateAPI.Chain.GetHeaviestTipSet()
for _, block := range ts.Blocks() {
msgs, err := m.StateAPI.Chain.SecpkMessagesForBlock(ctx, block)
if err != nil {
// If we can't find the messages, we've either imported from snapshot or pruned the store
log.Debug("exiting message mapping population at epoch ", ts.Height())
return nil
}

for _, msg := range msgs {
if msg.Cid() == lastTxCid {
// We've reached the last indexed transaction
return nil
}

// process the message
m.ProcessSignedMessage(ctx, msg)
}

totalProcessedBlock++
}

ts, err = m.StateAPI.Chain.GetTipSetFromKey(ctx, ts.Parents())

Check failure on line 58 in node/impl/full/txhashmanager.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

SA4006: this value of `ts` is never used (staticcheck)
if err != nil {
return err
}

log.Infof("Finished filling transaction index gap. Total processed block: %d", totalProcessedBlock)

return nil
}

// PopulateExistingMappings walk back from the current head to the minimum height and populate the eth transaction hash lookup database
func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error {
if minHeight < buildconstants.UpgradeHyggeHeight {
minHeight = buildconstants.UpgradeHyggeHeight
Expand Down
10 changes: 8 additions & 2 deletions node/modules/ethmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.uber.org/fx"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"

Expand All @@ -21,7 +22,7 @@ import (
"github.com/filecoin-project/lotus/node/repo"
)

func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler) (*full.EthModule, error) {
func EthModuleAPI(cfg config.FevmConfig, enableAutomaticBackFill bool) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler) (*full.EthModule, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI, ethEventHandler *full.EthEventHandler) (*full.EthModule, error) {
ctx := helpers.LifecycleCtx(mctx, lc)

Expand Down Expand Up @@ -57,6 +58,11 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
if err != nil {
return nil, err
}
} else if enableAutomaticBackFill { // If the db exists and back-fill is enabled, we'll back-fill missing entries
err = ethTxHashManager.FillIndexGap(mctx)
if err != nil {
return nil, xerrors.Errorf("error when back-filling transaction index gap: %w", err)
}
}

// prefill the whole skiplist cache maintained internally by the GetTipsetByHeight
Expand All @@ -78,7 +84,7 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
}

// Tipset listener
_ = ev.Observe(&ethTxHashManager)
ev.Observe(&ethTxHashManager)

ch, err := mp.Updates(ctx)
if err != nil {
Expand Down

0 comments on commit 0ec35b0

Please sign in to comment.