Skip to content

Commit

Permalink
feat: add new actor dump: miner (#1264)
Browse files Browse the repository at this point in the history
* Add new actor dump: miner

* Use the lookup function for finding the robust address
  • Loading branch information
Terryhung authored Oct 2, 2023
1 parent ba0aa2d commit 7326abd
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 4 deletions.
80 changes: 78 additions & 2 deletions chain/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/go-state-types/builtin"
adt2 "github.com/filecoin-project/go-state-types/builtin/v10/util/adt"
"github.com/filecoin-project/lotus/api"
initactor "github.com/filecoin-project/lotus/chain/actors/builtin/init"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
Expand All @@ -42,12 +43,14 @@ var (
diffPreCommitCacheSize int
diffSectorCacheSize int
actorCacheSize int
addressCacheSize int

tipsetMessageReceiptSizeEnv = "LILY_TIPSET_MSG_RECEIPT_CACHE_SIZE"
executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE"
diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE"
diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE"
actorCacheSizeEnv = "LILY_ACTOR_CACHE_SIZE"
addressCacheSizeEnv = "LILY_ADDRESS_CACHE_SIZE"
)

func getCacheSizeFromEnv(env string, defaultValue int) int {
Expand All @@ -67,6 +70,7 @@ func init() {
diffPreCommitCacheSize = getCacheSizeFromEnv(diffPreCommitCacheSizeEnv, 500)
diffSectorCacheSize = getCacheSizeFromEnv(diffSectorCacheSizeEnv, 500)
actorCacheSize = getCacheSizeFromEnv(actorCacheSizeEnv, 5000)
addressCacheSize = getCacheSizeFromEnv(addressCacheSizeEnv, 4)
}

var _ tasks.DataSource = (*DataSource)(nil)
Expand Down Expand Up @@ -104,6 +108,11 @@ func NewDataSource(node lens.API) (*DataSource, error) {
return nil, err
}

t.addressCache, err = lru.New(addressCacheSize)
if err != nil {
return nil, err
}

return t, nil
}

Expand All @@ -122,7 +131,8 @@ type DataSource struct {
diffPreCommitCache *lru.Cache
diffPreCommitGroup singleflight.Group

actorCache *lru.Cache
actorCache *lru.Cache
addressCache *lru.Cache
}

func (t *DataSource) MessageReceiptEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) {
Expand Down Expand Up @@ -253,7 +263,12 @@ func (t *DataSource) ActorInfo(ctx context.Context, addr address.Address, tsk ty
actorInfo := tasks.ActorInfo{}
if err == nil {
if act.Address == nil {
act.Address = &addr
robustAddress, err := t.LookupRobustAddress(ctx, addr, tsk)
if err == nil {
act.Address = &robustAddress
} else {
act.Address = &addr
}
}
actorInfo.Actor = act
actorName, actorFamily, err := util.ActorNameAndFamilyFromCode(act.Code)
Expand All @@ -271,6 +286,67 @@ func (t *DataSource) ActorInfo(ctx context.Context, addr address.Address, tsk ty
return &actorInfo, err
}

func genIdAddressCacheKey(tsk types.TipSetKey) string {
key, keyErr := asKey(KeyPrefix{"IdRobustAddress"}, tsk)
if keyErr != nil {
return "IdRobustAddressDefaultkey"
}
return key
}

func (t *DataSource) SetIdRobustAddressMap(ctx context.Context, tsk types.TipSetKey) error {
ctx, span := otel.Tracer("").Start(ctx, "DataSource.SetIdAddressMap")
if span.IsRecording() {
span.SetAttributes(attribute.String("tipset", tsk.String()))
}
defer span.End()

key := genIdAddressCacheKey(tsk)

initActor, err := t.Actor(ctx, initactor.Address, tsk)
if err != nil {
return err
}

initState, err := initactor.Load(t.Store(), initActor)
if err != nil {
return err
}

idRobustAddress := make(map[uint64]address.Address)

_ = initState.ForEachActor(func(id abi.ActorID, addr address.Address) error {
idRobustAddress[uint64(id)] = addr
return nil
})

t.addressCache.Add(key, idRobustAddress)

return nil
}

func (t *DataSource) LookupRobustAddress(ctx context.Context, idAddr address.Address, tsk types.TipSetKey) (address.Address, error) {
robustAddress := address.Undef

key := genIdAddressCacheKey(tsk)
value, found := t.addressCache.Get(key)
if found {
idRobustAddress := value.(map[uint64]address.Address)
idAddrDecoded, err := address.IDFromAddress(idAddr)
if err != nil {
return robustAddress, err
}

address, exists := idRobustAddress[idAddrDecoded]
if exists {
return address, nil
}
}

// Use the default way: StateLookup
return t.node.StateLookupRobustAddress(ctx, idAddr, tsk)
}

func (t *DataSource) MinerPower(ctx context.Context, addr address.Address, ts *types.TipSet) (*api.MinerPower, error) {
ctx, span := otel.Tracer("").Start(ctx, "DataSource.MinerPower")
if span.IsRecording() {
Expand Down
22 changes: 22 additions & 0 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
actorstypes "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/manifest"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -70,6 +71,7 @@ import (

// actor dump
fevmactordumptask "github.com/filecoin-project/lily/tasks/periodic_actor_dump/fevm_actor"
mineractordumptask "github.com/filecoin-project/lily/tasks/periodic_actor_dump/miner_actor"

"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/metrics"
Expand Down Expand Up @@ -415,6 +417,15 @@ func (sp *StateProcessor) startActor(ctx context.Context, current, executed *typ
return taskNames
}

// isStoragePowerActor will check if the actor is storage power or not
func isStoragePowerActor(c cid.Cid) bool {
name, _, ok := actors.GetActorMetaByCode(c)
if ok {
return name == manifest.PowerKey
}
return false
}

// startPeriodicActorDump starts all TipSetsProcessor's in parallel, their results are emitted on the `results` channel.
// A list containing all executed task names is returned.
func (sp *StateProcessor) startPeriodicActorDump(ctx context.Context, current *types.TipSet, interval int, results chan *Result) []string {
Expand Down Expand Up @@ -447,9 +458,18 @@ func (sp *StateProcessor) startPeriodicActorDump(ctx context.Context, current *t
actors[manifest.EthAccountKey] = append(actors[manifest.EthAccountKey], actor)
} else if builtin.IsPlaceholderActor(actor.Code) {
actors[manifest.PlaceholderKey] = append(actors[manifest.PlaceholderKey], actor)
} else if isStoragePowerActor(actor.Code) {
// Power Actor
actors[manifest.PowerKey] = append(actors[manifest.PowerKey], actor)
}
}

// Set the Map to Cache
err := sp.api.SetIdRobustAddressMap(ctx, current.Key())
if err != nil {
log.Errorf("Error at setting IdRobustAddressMap: %v", err)
}

for taskName, proc := range sp.periodicActorDumpProcessors {
name := taskName
p := proc
Expand Down Expand Up @@ -789,6 +809,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
//
case tasktype.FEVMActorDump:
out.PeriodicActorDumpProcessors[t] = fevmactordumptask.NewTask(api)
case tasktype.MinerActorDump:
out.PeriodicActorDumpProcessors[t] = mineractordumptask.NewTask(api)

case BuiltinTaskName:
out.ReportProcessors[t] = indexertask.NewTask(api)
Expand Down
7 changes: 6 additions & 1 deletion chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ const (
FEVMContract = "fevm_contract"
FEVMTrace = "fevm_trace"

// New task types
// New task types: full dump
FEVMActorDump = "fevm_actor_dump"
MinerActorDump = "miner_actor_dump"
)

var AllTableTasks = []string{
Expand Down Expand Up @@ -107,6 +108,7 @@ var AllTableTasks = []string{
FEVMContract,
FEVMTrace,
FEVMActorDump,
MinerActorDump,
}

var TableLookup = map[string]struct{}{
Expand Down Expand Up @@ -160,6 +162,7 @@ var TableLookup = map[string]struct{}{
FEVMContract: {},
FEVMTrace: {},
FEVMActorDump: {},
MinerActorDump: {},
}

var TableComment = map[string]string{
Expand Down Expand Up @@ -213,6 +216,7 @@ var TableComment = map[string]string{
FEVMContract: ``,
FEVMTrace: ``,
FEVMActorDump: ``,
MinerActorDump: ``,
}

var TableFieldComments = map[string]map[string]string{
Expand Down Expand Up @@ -323,4 +327,5 @@ var TableFieldComments = map[string]map[string]string{
FEVMContract: {},
FEVMTrace: {},
FEVMActorDump: {},
MinerActorDump: {},
}
1 change: 1 addition & 0 deletions chain/indexer/tasktype/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ var TaskLookup = map[string][]string{
},
ActorDump: {
FEVMActorDump,
MinerActorDump,
},
}

Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/tasktype/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
}

func TestMakeAllTaskNames(t *testing.T) {
const TotalTableTasks = 50
const TotalTableTasks = 51
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
require.NoError(t, err)
// if this test fails it means a new task name was added, update the above test
Expand Down
1 change: 1 addition & 0 deletions lens/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type ChainAPI interface {

type StateAPI interface {
StateGetActor(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error)
StateLookupRobustAddress(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateListActors(context.Context, types.TipSetKey) ([]address.Address, error)
StateChangedActors(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error)

Expand Down
138 changes: 138 additions & 0 deletions model/actordumps/miner_actor_dump.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package actordumps

import (
"context"
"encoding/json"

"go.opencensus.io/tag"
"go.opentelemetry.io/otel"

"github.com/filecoin-project/lily/chain/actors/builtin/miner"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
"github.com/filecoin-project/lotus/chain/types"
)

type MinerActorDump struct {
tableName struct{} `pg:"miner_actor_dumps"` // nolint: structcheck

Height int64 `pg:",pk,notnull,use_zero"`
MinerID string `pg:",pk,notnull"`
MinerAddress string `pg:",pk,notnull"`
StateRoot string `pg:",notnull"`

// Miner Info
OwnerID string `pg:",notnull"`
OwnerAddress string `pg:",notnull"`
WorkerID string `pg:",notnull"`
WorkerAddress string `pg:",notnull"`

ConsensusFaultedElapsed int64 `pg:",notnull,use_zero"`

PeerID string `pg:",notnull"`
ControlAddresses string `pg:",type:jsonb"`
Beneficiary string `pg:",notnull"`
BeneficiaryAddress string `pg:",notnull"`

SectorSize uint64 `pg:",use_zero"`
NumLiveSectors uint64 `pg:",use_zero"`

// Claims
RawBytePower string `pg:"type:numeric,notnull"`
QualityAdjPower string `pg:"type:numeric,notnull"`

// Fil Related Fields
// Locked Funds
TotalLockedFunds string `pg:"type:numeric,notnull"`
VestingFunds string `pg:"type:numeric,notnull"`
InitialPledge string `pg:"type:numeric,notnull"`
PreCommitDeposits string `pg:"type:numeric,notnull"`

// Balance
AvailableBalance string `pg:"type:numeric,notnull"`
Balance string `pg:"type:numeric,notnull"`

FeeDebt string `pg:"type:numeric,notnull"`
}

func (m *MinerActorDump) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
ctx, span := otel.Tracer("").Start(ctx, "MinerActorDump.Persist")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_actor_dumps"))
metrics.RecordCount(ctx, metrics.PersistModel, 1)
return s.PersistModel(ctx, m)
}

func (m *MinerActorDump) UpdateMinerInfo(minerState miner.State) error {
minerInfo, err := minerState.Info()
if err != nil {
return err
}

m.PeerID = string(minerInfo.PeerId)
m.WorkerID = minerInfo.Worker.String()
m.OwnerID = minerInfo.Owner.String()
m.ConsensusFaultedElapsed = int64(minerInfo.ConsensusFaultElapsed)
m.SectorSize = uint64(minerInfo.SectorSize)
m.Beneficiary = minerInfo.Beneficiary.String()

var ctrlAddresses []string
for _, addr := range minerInfo.ControlAddresses {
ctrlAddresses = append(ctrlAddresses, addr.String())
}

b, err := json.Marshal(ctrlAddresses)
if err == nil {
m.ControlAddresses = string(b)
}

num, err := minerState.NumLiveSectors()
if err == nil {
m.NumLiveSectors = num
}

return err
}

func (m *MinerActorDump) UpdateBalanceInfo(actor *types.ActorV5, minerState miner.State) error {
m.Balance = actor.Balance.String()

availableBalance, err := minerState.AvailableBalance(actor.Balance)
if err != nil {
return err
}
m.AvailableBalance = availableBalance.String()

feeDebt, err := minerState.FeeDebt()
if err != nil {
return err
}
m.FeeDebt = feeDebt.String()

lockedFunds, err := minerState.LockedFunds()
if err != nil {
return err
}
m.InitialPledge = lockedFunds.InitialPledgeRequirement.String()
m.VestingFunds = lockedFunds.VestingFunds.String()
m.PreCommitDeposits = lockedFunds.PreCommitDeposits.String()
m.TotalLockedFunds = lockedFunds.TotalLockedFunds().String()

return nil
}

type MinerActorDumpList []*MinerActorDump

func (ml MinerActorDumpList) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
ctx, span := otel.Tracer("").Start(ctx, "MinerActorDumpList.Persist")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_actor_dumps"))

if len(ml) == 0 {
return nil
}
metrics.RecordCount(ctx, metrics.PersistModel, len(ml))
return s.PersistModel(ctx, ml)
}
Loading

0 comments on commit 7326abd

Please sign in to comment.