Skip to content

Commit

Permalink
feat: add new job to find out orphan block (#1265)
Browse files Browse the repository at this point in the history
* Add model for unsynced blocks

* Add the function for detecting the orphan blocks

* Add new flag for modifying the confidence

* Store the orphan blocks
  • Loading branch information
Terryhung authored Oct 6, 2023
1 parent dd8217e commit 09767ad
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 0 deletions.
170 changes: 170 additions & 0 deletions commands/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
lotuscli "github.com/filecoin-project/lotus/cli"
cid "github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"

"github.com/filecoin-project/lily/chain/actors/builtin"
"github.com/filecoin-project/lily/model"
"github.com/filecoin-project/lily/model/blocks"

"github.com/filecoin-project/lily/lens/lily"
)
Expand All @@ -28,6 +32,7 @@ var SyncCmd = &cli.Command{
Subcommands: []*cli.Command{
SyncStatusCmd,
SyncWaitCmd,
SyncIncomingBlockCmd,
},
}

Expand Down Expand Up @@ -239,3 +244,168 @@ func SyncWait(ctx context.Context, lapi lily.LilyAPI, watch bool) error {
i++
}
}

type syncOpts struct {
config string
storage string
confidence int
}

var syncFlags syncOpts

type SyncingState struct {
UnsyncedBlockHeadersByEpoch map[int64][]*blocks.UnsyncedBlockHeader
MapMutex sync.Mutex
Confidence int64
Storage model.Storage
StorageMutex sync.Mutex
}

func (ss *SyncingState) SetBlockHeaderToMap(block *blocks.UnsyncedBlockHeader) {
ss.MapMutex.Lock()
defer ss.MapMutex.Unlock()
ss.UnsyncedBlockHeadersByEpoch[block.Height] = append(ss.UnsyncedBlockHeadersByEpoch[block.Height], block)
}

func (ss *SyncingState) PersistBlocks(ctx context.Context, blocks blocks.UnsyncedBlockHeaders) error {
ss.StorageMutex.Lock()
defer ss.StorageMutex.Unlock()

return ss.Storage.PersistBatch(ctx, blocks)
}

var SyncIncomingBlockCmd = &cli.Command{
Name: "blocks",
Usage: "Start to get incoming block",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "config",
Usage: "Specify path of config file to use.",
EnvVars: []string{"LILY_CONFIG"},
Destination: &syncFlags.config,
},
&cli.StringFlag{
Name: "storage",
Usage: "Specify the storage to use, if persisting the displayed output.",
Destination: &syncFlags.storage,
},
&cli.IntFlag{
Name: "confidence",
Usage: "Sets the size of the cache used to hold tipsets for possible reversion before being committed to the database.",
EnvVars: []string{"LILY_CONFIDENCE"},
Value: 2,
Destination: &syncFlags.confidence,
},
},
Action: func(cctx *cli.Context) error {
ctx := lotuscli.ReqContext(cctx)
lapi, closer, err := GetAPI(ctx)
if err != nil {
return err
}
defer closer()

// Set up a context that is canceled when the command is interrupted
ctx, cancel := SetupContextWithCancel(ctx)
defer cancel()

strg, err := SetupStorage(syncFlags.config, syncFlags.storage)
if err != nil {
return err
}

state := &SyncingState{
UnsyncedBlockHeadersByEpoch: make(map[int64][]*blocks.UnsyncedBlockHeader),
Confidence: int64(syncFlags.confidence),
Storage: strg,
MapMutex: sync.Mutex{},
StorageMutex: sync.Mutex{},
}

go detectOrphanBlocks(ctx, lapi, state)
go getIncomingBlocks(ctx, lapi, state)

<-ctx.Done()
return nil
},
}

func detectOrphanBlocks(ctx context.Context, lapi lily.LilyAPI, state *SyncingState) {
for range time.Tick(30 * time.Second) {
state.MapMutex.Lock()

// Get the latestEpoch in map
latestEpoch := int64(0)
for k := range state.UnsyncedBlockHeadersByEpoch {
if k > latestEpoch {
latestEpoch = k
}
}

// Check old tipset
targetEpoch := latestEpoch - state.Confidence
oldEpoches := []int64{}
for epoch, unsyncedBlocks := range state.UnsyncedBlockHeadersByEpoch {
if epoch <= targetEpoch {
// Store the old tipset, we should clear it after checking
oldEpoches = append(oldEpoches, epoch)

oldTs, err := lapi.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(epoch), types.EmptyTSK)
if err != nil {
log.Errorf("Error at getting the old tipset: %v", err)
continue
}
log.Infof("Get header cids: %v at Height: %v", oldTs.Cids(), oldTs.Height())

// Verify whether the unsynced block exists within the tipset or not.
cidMap := make(map[string]bool)
for _, cid := range oldTs.Cids() {
cidMap[cid.String()] = true
}
orphanBlocks := blocks.UnsyncedBlockHeaders{}
for _, block := range unsyncedBlocks {
if _, exists := cidMap[block.Cid]; !exists {
block.IsOrphan = true
orphanBlocks = append(orphanBlocks, block)
log.Errorf("Detect orphan block cid: %v at height: %v", block.Cid, block.Height)
}
}

// To do set the orphan to Storage
if len(orphanBlocks) > 0 && state.Storage != nil {
err := state.PersistBlocks(ctx, orphanBlocks)
if err != nil {
log.Errorf("Error at persisting the orphan blocks: %v", err)
}
}
}
}

// Clean the map
for _, epoch := range oldEpoches {
delete(state.UnsyncedBlockHeadersByEpoch, epoch)
}
state.MapMutex.Unlock()
}
}

func getIncomingBlocks(ctx context.Context, lapi lily.LilyAPI, state *SyncingState) {
incomingBlocks, err := lapi.SyncIncomingBlocks(ctx)
if err != nil {
log.Error(err)
return
}

for bh := range incomingBlocks {
block := blocks.NewUnsyncedBlockHeader(bh)
state.SetBlockHeaderToMap(block)
if state.Storage == nil {
log.Infof("Block Height: %v, Miner: %v, Cid: %v", block.Height, block.Miner, block.Cid)
} else {
err = state.PersistBlocks(ctx, blocks.UnsyncedBlockHeaders{block})
if err != nil {
log.Errorf("Error at persisting the unsynced block headers: %v", err)
}
}
}
}
50 changes: 50 additions & 0 deletions commands/util.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package commands

import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/signal"
"syscall"

"github.com/urfave/cli/v2"

"github.com/filecoin-project/lily/config"
"github.com/filecoin-project/lily/model"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
)

func FlagSet(fs ...[]cli.Flag) []cli.Flag {
Expand All @@ -30,3 +37,46 @@ func PrintNewJob(w io.Writer, res *schedule.JobSubmitResult) error {
}
return nil
}

func SetupContextWithCancel(ctx context.Context) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)

go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)
select {
case <-interrupt:
cancel()
case <-ctx.Done():
}
}()

return ctx, cancel
}

func SetupStorage(configPath string, storageStr string) (strg model.Storage, err error) {
if storageStr != "" {
cfg, err := config.FromFile(configPath)
if err != nil {
return nil, err
}

md := storage.Metadata{
JobName: storageStr,
}

// context for db connection
ctxDB := context.Background()

sc, err := storage.NewCatalog(cfg.Storage)
if err != nil {
return nil, err
}
strg, err = sc.Connect(ctxDB, syncFlags.storage, md)
if err != nil {
return nil, err
}
}

return strg, nil
}
4 changes: 4 additions & 0 deletions lens/lily/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type LilyAPI interface {
EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) //perm:read
StateListActors(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) //perm:read

// SyncIncomingBlocks returns a channel streaming incoming, potentially not
// yet synced block headers.
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) //perm:read

// trigger graceful shutdown
Shutdown(context.Context) error

Expand Down
8 changes: 8 additions & 0 deletions lens/lily/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type LilyAPIStruct struct {
EthGetTransactionByHash func(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) `perm:"read"`
StateListActors func(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) `perm:"read"`

// SyncIncomingBlocks returns a channel streaming incoming, potentially not
// yet synced block headers.
SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"`

LogList func(context.Context) ([]string, error) `perm:"read"`
LogSetLevel func(context.Context, string, string) error `perm:"read"`
LogSetLevelRegex func(context.Context, string, string) error `perm:"read"`
Expand Down Expand Up @@ -295,3 +299,7 @@ func (s *LilyAPIStruct) StateListActors(ctx context.Context, tsk types.TipSetKey
func (s *LilyAPIStruct) EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) {
return s.Internal.EthGetTransactionByHash(ctx, txHash)
}

func (s *LilyAPIStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
return s.Internal.SyncIncomingBlocks(ctx)
}
64 changes: 64 additions & 0 deletions model/blocks/unsynced_header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package blocks

import (
"context"

"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
"github.com/filecoin-project/lotus/chain/types"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)

type UnsyncedBlockHeader struct {
Height int64 `pg:",pk,use_zero,notnull"`
Cid string `pg:",pk,notnull"`
Miner string `pg:",notnull"`
ParentWeight string `pg:",notnull"`
ParentBaseFee string `pg:",notnull"`
ParentStateRoot string `pg:",notnull"`

WinCount int64 `pg:",use_zero"`
Timestamp uint64 `pg:",use_zero"`
ForkSignaling uint64 `pg:",use_zero"`
IsOrphan bool `pg:",notnull"`
}

func NewUnsyncedBlockHeader(bh *types.BlockHeader) *UnsyncedBlockHeader {
return &UnsyncedBlockHeader{
Cid: bh.Cid().String(),
Miner: bh.Miner.String(),
ParentWeight: bh.ParentWeight.String(),
ParentBaseFee: bh.ParentBaseFee.String(),
ParentStateRoot: bh.ParentStateRoot.String(),
Height: int64(bh.Height),
WinCount: bh.ElectionProof.WinCount,
Timestamp: bh.Timestamp,
ForkSignaling: bh.ForkSignaling,
IsOrphan: false,
}
}

func (bh *UnsyncedBlockHeader) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "unsynced_block_headers"))
metrics.RecordCount(ctx, metrics.PersistModel, 1)
return s.PersistModel(ctx, bh)
}

type UnsyncedBlockHeaders []*UnsyncedBlockHeader

func (bhl UnsyncedBlockHeaders) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
if len(bhl) == 0 {
return nil
}
ctx, span := otel.Tracer("").Start(ctx, "UnsyncedBlockHeaders.Persist")
if span.IsRecording() {
span.SetAttributes(attribute.Int("count", len(bhl)))
}
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "block_headers"))
metrics.RecordCount(ctx, metrics.PersistModel, len(bhl))
return s.PersistModel(ctx, bhl)
}
25 changes: 25 additions & 0 deletions schemas/v1/34_unsynced_block_headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package v1

func init() {
patches.Register(
34,
`
CREATE TABLE IF NOT EXISTS {{ .SchemaName | default "public"}}.unsynced_block_headers (
height BIGINT NOT NULL,
cid TEXT NOT NULL,
miner TEXT,
parent_weight TEXT,
parent_base_fee TEXT,
parent_state_root TEXT,
win_count BIGINT,
"timestamp" BIGINT,
fork_signaling BIGINT,
is_orphan BOOLEAN,
PRIMARY KEY(height, cid)
);
CREATE INDEX IF NOT EXISTS unsynced_block_headers_height_idx ON {{ .SchemaName | default "public"}}.unsynced_block_headers USING btree (height DESC);
CREATE INDEX IF NOT EXISTS unsynced_block_headers_timestamp_idx ON {{ .SchemaName | default "public"}}.unsynced_block_headers USING btree ("timestamp");
CREATE INDEX IF NOT EXISTS unsynced_block_headers_miner_idx ON {{ .SchemaName | default "public"}}.unsynced_block_headers USING hash (miner);
`,
)
}

0 comments on commit 09767ad

Please sign in to comment.