Skip to content

Commit

Permalink
feat: populate index from snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Aug 11, 2024
1 parent c4e0f06 commit 4b56a36
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 3 deletions.
2 changes: 1 addition & 1 deletion chain/ethhashlookup/eth_transaction_hash_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (ei *EthTxHashLookup) UpsertUniqueHash(txHash ethtypes.EthHash, c cid.Cid)
return xerrors.New("db closed")
}

result, err := ei.stmtInsertTxHash.Exec(txHash.String(), c.String())
result, err := ei.stmtInsertUniqueTxHash.Exec(txHash.String(), c.String())
if err != nil {
return err
}
Expand Down
117 changes: 117 additions & 0 deletions chain/ethhashlookup/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package ethhashlookup

import (
"context"
"os"

"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/lib/sqlite"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)

var log = logging.Logger("txhashindex")

// ChainStore interface; we could use store.ChainStore directly,
// but this simplifies unit testing.
type ChainStore interface {
SubscribeHeadChanges(f store.ReorgNotifee)
GetSecpkMessagesForTipset(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error)
GetHeaviestTipSet() *types.TipSet
GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
}

var _ ChainStore = (*store.ChainStore)(nil)

// PopulateAfterSnapshot populates the tx hash index after a snapshot is restored.
func PopulateAfterSnapshot(ctx context.Context, path string, cs ChainStore) error {
// if a database already exists, we try to delete it and create a new one
if _, err := os.Stat(path); err == nil {
if err = os.Remove(path); err != nil {
return xerrors.Errorf("tx hash index already exists at %s and can't be deleted", path)
}
}

db, _, err := sqlite.Open(path)
if err != nil {
return xerrors.Errorf("failed to setup tx hash index db: %w", err)
}
defer func() {
if err := db.Close(); err != nil {
log.Errorf("error closing tx hash database: %s", err)
}
}()

if err = sqlite.InitDb(ctx, "message index", db, ddls, []sqlite.MigrationFunc{}); err != nil {
_ = db.Close()
return xerrors.Errorf("error creating tx hash index database: %w", err)
}

tx, err := db.Begin()
if err != nil {
return xerrors.Errorf("error when starting transaction: %w", err)
}

rollback := func() {
if err := tx.Rollback(); err != nil {
log.Errorf("error in rollback: %s", err)
}
}

insertStmt, err := tx.Prepare(insertTxHash)
if err != nil {
rollback()
return xerrors.Errorf("error preparing insertStmt: %w", err)
}

defer insertStmt.Close()

Check failure on line 69 in chain/ethhashlookup/index.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

Error return value of `insertStmt.Close` is not checked (errcheck)

var (
curTs = cs.GetHeaviestTipSet()
startHeight = curTs.Height()
msgs []*types.SignedMessage
)

for curTs != nil {
msgs, err = cs.GetSecpkMessagesForTipset(ctx, curTs)
if err != nil {
log.Infof("stopping import after %d tipsets", startHeight-curTs.Height())
break
}

for _, m := range msgs {
ethTx, err := ethtypes.EthTransactionFromSignedFilecoinMessage(m)
if err != nil {
rollback()
return err
}

txHash, err := ethTx.TxHash()
if err != nil {
rollback()
return err
}

_, err = insertStmt.Exec(txHash.String(), m.Cid().String())
if err != nil {
rollback()
return err
}
}

curTs, err = cs.GetTipSetFromKey(ctx, curTs.Parents())
if err != nil {
rollback()
return xerrors.Errorf("error walking chain: %w", err)
}
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("error committing transaction: %w", err)
}

return nil
}
15 changes: 15 additions & 0 deletions chain/store/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,18 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(ctx context.Context, cids []cid

return msgs, nil
}

// GetSecpkMessagesForTipset returns all the secpk messages for a tipset
func (cs *ChainStore) GetSecpkMessagesForTipset(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
msgs := make([]*types.SignedMessage, 0)
for _, b := range ts.Blocks() {
secpkmsgs, err := cs.SecpkMessagesForBlock(ctx, b)
if err != nil {
return nil, xerrors.Errorf("failed to get secpk messages for block: %w", err)
}

msgs = append(msgs, secpkmsgs...)
}

return msgs, nil
}
14 changes: 14 additions & 0 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/chain/beacon/drand"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/ethhashlookup"
"github.com/filecoin-project/lotus/chain/index"
proofsffi "github.com/filecoin-project/lotus/chain/proofs/ffi"
"github.com/filecoin-project/lotus/chain/stmgr"
Expand Down Expand Up @@ -650,6 +651,19 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
log.Info("populating message index done")
}

if cfg.Index.EnableAutomaticBackFillTxIndex {
log.Info("back-filling tx index...")
basePath, err := lr.SqlitePath()
if err != nil {
return err
}

if err = ethhashlookup.PopulateAfterSnapshot(ctx, filepath.Join(basePath, ethhashlookup.DefaultDbFilename), cst); err != nil {
return err
}

log.Info("populating tx index done")
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion extern/filecoin-ffi
2 changes: 1 addition & 1 deletion node/modules/ethmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.uber.org/fx"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/chain/ethhashlookup"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/messagepool"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"golang.org/x/xerrors"
)

func EthModuleAPI(cfg config.FevmConfig, enableAutomaticBackFill bool, maxAutomaticBackFillBlocks uint64) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler) (*full.EthModule, error) {
Expand Down

0 comments on commit 4b56a36

Please sign in to comment.