From 244215b164d2e619b4d001211ce9a7e222c942a9 Mon Sep 17 00:00:00 2001 From: kasteph Date: Mon, 17 Jul 2023 08:25:10 -0700 Subject: [PATCH] fix: add Transform implementation for PreCommitInfoExtractorV8 (#1242) * fix: add Transform implementation for PreCommitInfoExtractorV8 * fix: skip diff if same root * fix: check earlier * chore: skip when cids are also same in miner * check root of maps --- chain/actors/adt/diff/generic.go | 1 + chain/actors/builtin/init/diff.go | 17 +++++++++++- chain/actors/builtin/market/diff.go | 5 ++-- chain/actors/builtin/miner/diff.go | 14 ++++++++++ chain/actors/builtin/miner/diff_v8.go | 14 ++++++++++ chain/indexer/integrated/processor/state.go | 25 ++++++++++++------ .../processor/state_internal_test.go | 10 +------ .../integrated/processor/state_test.go | 26 ++++++++++++------- chain/indexer/tasktype/table_tasks.go | 5 ++++ chain/indexer/tasktype/tasks.go | 1 + chain/indexer/tasktype/tasks_test.go | 4 +-- storage/sql.go | 10 ++++--- tasks/actorstate/miner/precommit.go | 14 ++++++++++ 13 files changed, 112 insertions(+), 34 deletions(-) diff --git a/chain/actors/adt/diff/generic.go b/chain/actors/adt/diff/generic.go index 4ae4aa054..82cad0235 100644 --- a/chain/actors/adt/diff/generic.go +++ b/chain/actors/adt/diff/generic.go @@ -33,6 +33,7 @@ type ArrayDiffer interface { func CompareArray(preArr, curArr adt.Array, out ArrayDiffer) error { notNew := make(map[int64]struct{}, curArr.Length()) prevVal := new(typegen.Deferred) + if err := preArr.ForEach(prevVal, func(i int64) error { curVal := new(typegen.Deferred) found, err := curArr.Get(uint64(i), curVal) diff --git a/chain/actors/builtin/init/diff.go b/chain/actors/builtin/init/diff.go index 6f75d5aca..50ac357e4 100644 --- a/chain/actors/builtin/init/diff.go +++ b/chain/actors/builtin/init/diff.go @@ -36,6 +36,21 @@ func DiffAddressMap(ctx context.Context, store adt.Store, pre, cur State) (*Addr } mapDiffer := NewAddressMapDiffer(pre, cur) + + premR, err := prem.Root() + if err != nil { + return nil, err + } + + curmR, err := curm.Root() + if err != nil { + return nil, err + } + + if premR.Equals(curmR) { + return mapDiffer.Results, nil + } + if requiresLegacyDiffing(pre, cur, &adt.MapOpts{ Bitwidth: pre.AddressMapBitWidth(), @@ -45,7 +60,7 @@ func DiffAddressMap(ctx context.Context, store adt.Store, pre, cur State) (*Addr Bitwidth: cur.AddressMapBitWidth(), HashFunc: cur.AddressMapHashFunction(), }) { - log.Warnw("actor HAMT opts differ, running slower generic map diff", "preCID", pre.Code(), "curCID", cur.Code()) + log.Warnw("actor HAMT opts differ, running slower generic map diff ", "preCID ", pre.Code(), "curCID ", cur.Code()) if err := diff.CompareMap(prem, curm, mapDiffer); err != nil { return nil, err } diff --git a/chain/actors/builtin/market/diff.go b/chain/actors/builtin/market/diff.go index 283e29d09..64b986585 100644 --- a/chain/actors/builtin/market/diff.go +++ b/chain/actors/builtin/market/diff.go @@ -23,6 +23,7 @@ func DiffDealProposals(ctx context.Context, store adt.Store, pre, cur State) (*D preOpts := pre.DealProposalsAmtBitwidth() curOpts := cur.DealProposalsAmtBitwidth() preP, err := pre.Proposals() + if err != nil { return nil, err } @@ -33,7 +34,7 @@ func DiffDealProposals(ctx context.Context, store adt.Store, pre, cur State) (*D diffContainer := NewMarketProposalsDiffContainer(preP, curP) if requiresLegacyDiffing(pre, cur, preOpts, curOpts) { - log.Warn("actor AMT opts differ, running slower generic array diff", "preCID", pre.Code(), "curCID", cur.Code()) + log.Warn("actor AMT opts differ, running slower generic array diff ", "preCID ", pre.Code(), "curCID ", cur.Code()) if err := diff.CompareArray(preP.array(), curP.array(), diffContainer); err != nil { return nil, fmt.Errorf("diffing deal states: %w", err) } @@ -115,7 +116,7 @@ func DiffDealStates(ctx context.Context, store adt.Store, pre, cur State) (*Deal diffContainer := NewMarketStatesDiffContainer(preS, curS) if requiresLegacyDiffing(pre, cur, preOpts, curOpts) { - log.Warn("actor AMT opts differ, running slower generic array diff", "preCID", pre.Code(), "curCID", cur.Code()) + log.Warn("actor AMT opts differ, running slower generic array diff ", "preCID ", pre.Code(), "curCID ", cur.Code()) if err := diff.CompareArray(preS.array(), curS.array(), diffContainer); err != nil { return nil, fmt.Errorf("diffing deal states: %w", err) } diff --git a/chain/actors/builtin/miner/diff.go b/chain/actors/builtin/miner/diff.go index 990544aa1..ca40175ce 100644 --- a/chain/actors/builtin/miner/diff.go +++ b/chain/actors/builtin/miner/diff.go @@ -153,6 +153,20 @@ func DiffSectors(ctx context.Context, store adt.Store, pre, cur State) (*SectorC preBw := pre.SectorsAmtBitwidth() curBw := cur.SectorsAmtBitwidth() diffContainer := NewSectorDiffContainer(pre, cur) + + presR, err := pres.Root() + if err != nil { + return nil, err + } + cursR, err := curs.Root() + if err != nil { + return nil, err + } + + if presR.Equals(cursR) { + return diffContainer.Results, nil + } + if ArrayRequiresLegacyDiffing(pre, cur, preBw, curBw) { if span.IsRecording() { span.SetAttributes(attribute.String("diff", "slow")) diff --git a/chain/actors/builtin/miner/diff_v8.go b/chain/actors/builtin/miner/diff_v8.go index 24b0d8a7c..7408805dc 100644 --- a/chain/actors/builtin/miner/diff_v8.go +++ b/chain/actors/builtin/miner/diff_v8.go @@ -28,7 +28,21 @@ func DiffPreCommitsV8(ctx context.Context, store adt.Store, pre, cur State) (*Pr return nil, err } + prepR, err := prep.Root() + if err != nil { + return nil, err + } + + curpR, err := curp.Root() + if err != nil { + return nil, err + } + diffContainer := NewPreCommitDiffContainerV8(pre, cur) + if prepR.Equals(curpR) { + return diffContainer.Results, nil + } + if MapRequiresLegacyDiffing(pre, cur, &adt.MapOpts{ Bitwidth: pre.SectorsAmtBitwidth(), diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index e4eff07e3..05f8ddc31 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -449,19 +449,28 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces ), minertask.LockedFundsExtractor{}, ) + case tasktype.MinerPreCommitInfoV1_8: + out.ActorProcessors[t] = actorstate.NewTaskWithTransformer( + api, + actorstate.NewCustomTypedActorExtractorMap( + map[cid.Cid][]actorstate.ActorStateExtractor{ + mineractors.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}}, + mineractors.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}}, + mineractors.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}}, + mineractors.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}}, + mineractors.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}}, + mineractors.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}}, + mineractors.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}}, + mineractors.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}}, + }, + ), + minertask.PreCommitInfoExtractorV8{}, + ) case tasktype.MinerPreCommitInfo: out.ActorProcessors[t] = actorstate.NewTaskWithTransformer( api, actorstate.NewCustomTypedActorExtractorMap( map[cid.Cid][]actorstate.ActorStateExtractor{ - mineractors.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}}, - mineractors.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}}, - mineractors.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}}, - mineractors.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}}, - mineractors.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}}, - mineractors.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}}, - mineractors.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}}, - mineractors.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}}, mineractors.VersionCodes()[actorstypes.Version9]: {minertask.PreCommitInfoExtractorV9{}}, mineractors.VersionCodes()[actorstypes.Version10]: {minertask.PreCommitInfoExtractorV9{}}, mineractors.VersionCodes()[actorstypes.Version11]: {minertask.PreCommitInfoExtractorV9{}}, diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index f5d40415f..873de0d96 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -52,7 +52,7 @@ func TestNewProcessor(t *testing.T) { proc, err := New(nil, t.Name(), tasktype.AllTableTasks) require.NoError(t, err) require.Equal(t, t.Name(), proc.name) - require.Len(t, proc.actorProcessors, 24) + require.Len(t, proc.actorProcessors, 25) require.Len(t, proc.tipsetProcessors, 10) require.Len(t, proc.tipsetsProcessors, 14) require.Len(t, proc.builtinProcessors, 1) @@ -122,14 +122,6 @@ func TestNewProcessor(t *testing.T) { nil, actorstate.NewCustomTypedActorExtractorMap( map[cid.Cid][]actorstate.ActorStateExtractor{ - miner.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}}, miner.VersionCodes()[actorstypes.Version9]: {minertask.PreCommitInfoExtractorV9{}}, miner.VersionCodes()[actorstypes.Version10]: {minertask.PreCommitInfoExtractorV9{}}, miner.VersionCodes()[actorstypes.Version11]: {minertask.PreCommitInfoExtractorV9{}}, diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index fe04834c0..4d57e5d8b 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -116,18 +116,26 @@ func TestMakeProcessorsActors(t *testing.T) { ), transformer: minertask.V7SectorInfoExtractor{}, }, + { + taskName: tasktype.MinerPreCommitInfoV1_8, + extractor: actorstate.NewCustomTypedActorExtractorMap( + map[cid.Cid][]actorstate.ActorStateExtractor{ + miner.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}}, + miner.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}}, + miner.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}}, + miner.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}}, + miner.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}}, + miner.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}}, + miner.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}}, + miner.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}}, + }, + ), + transformer: minertask.PreCommitInfoExtractorV8{}, + }, { taskName: tasktype.MinerPreCommitInfo, extractor: actorstate.NewCustomTypedActorExtractorMap( map[cid.Cid][]actorstate.ActorStateExtractor{ - miner.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}}, - miner.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}}, miner.VersionCodes()[actorstypes.Version9]: {minertask.PreCommitInfoExtractorV9{}}, miner.VersionCodes()[actorstypes.Version10]: {minertask.PreCommitInfoExtractorV9{}}, miner.VersionCodes()[actorstypes.Version11]: {minertask.PreCommitInfoExtractorV9{}}, @@ -425,7 +433,7 @@ func TestMakeProcessorsAllTasks(t *testing.T) { // If this test fails it indicates a new processor and/or task name was added and test should be created for it in one of the above test cases. proc, err := processor.MakeProcessors(nil, append(tasktype.AllTableTasks, processor.BuiltinTaskName)) require.NoError(t, err) - require.Len(t, proc.ActorProcessors, 24) + require.Len(t, proc.ActorProcessors, 25) require.Len(t, proc.TipsetProcessors, 10) require.Len(t, proc.TipsetsProcessors, 14) require.Len(t, proc.ReportProcessors, 1) diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index 3c70c2d7e..686b88f7f 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -12,6 +12,7 @@ const ( MinerSectorInfoV1_6 = "miner_sector_infos" MinerSectorPost = "miner_sector_post" MinerPreCommitInfo = "miner_pre_commit_info" + MinerPreCommitInfoV1_8 = "miner_pre_commit_info_v8" MinerSectorEvent = "miner_sector_event" MinerCurrentDeadlineInfo = "miner_current_deadline_info" MinerFeeDebt = "miner_fee_debt" @@ -63,6 +64,7 @@ var AllTableTasks = []string{ MinerSectorInfoV1_6, MinerSectorPost, MinerPreCommitInfo, + MinerPreCommitInfoV1_8, MinerSectorEvent, MinerCurrentDeadlineInfo, MinerFeeDebt, @@ -114,6 +116,7 @@ var TableLookup = map[string]struct{}{ MinerSectorInfoV1_6: {}, MinerSectorPost: {}, MinerPreCommitInfo: {}, + MinerPreCommitInfoV1_8: {}, MinerSectorEvent: {}, MinerCurrentDeadlineInfo: {}, MinerFeeDebt: {}, @@ -165,6 +168,7 @@ var TableComment = map[string]string{ MinerSectorInfoV1_6: `MinerSectorInfoV1_6 is exported from the miner actor iff the actor code is less than v7. The table keeps its original name since that's a requirement to support lily backfills`, MinerSectorPost: ``, MinerPreCommitInfo: ``, + MinerPreCommitInfoV1_8: `MinerPreCommitInfo using actors v1 to v8.`, MinerSectorEvent: ``, MinerCurrentDeadlineInfo: ``, MinerFeeDebt: ``, @@ -221,6 +225,7 @@ var TableFieldComments = map[string]map[string]string{ MinerSectorInfoV1_6: {}, MinerSectorPost: {}, MinerPreCommitInfo: {}, + MinerPreCommitInfoV1_8: {}, MinerSectorEvent: {}, MinerCurrentDeadlineInfo: {}, MinerFeeDebt: {}, diff --git a/chain/indexer/tasktype/tasks.go b/chain/indexer/tasktype/tasks.go index fdc03d8fa..5ab806f9c 100644 --- a/chain/indexer/tasktype/tasks.go +++ b/chain/indexer/tasktype/tasks.go @@ -38,6 +38,7 @@ var TaskLookup = map[string][]string{ MinerSectorInfoV1_6, MinerSectorPost, MinerPreCommitInfo, + MinerPreCommitInfoV1_8, MinerSectorEvent, MinerCurrentDeadlineInfo, MinerFeeDebt, diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index 0a0532f81..ee7ed0b5e 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -29,7 +29,7 @@ func TestMakeTaskNamesAlias(t *testing.T) { { taskAlias: tasktype.ActorStatesMinerTask, tasks: []string{tasktype.MinerSectorDeal, tasktype.MinerSectorInfoV7, tasktype.MinerSectorInfoV1_6, - tasktype.MinerSectorPost, tasktype.MinerPreCommitInfo, tasktype.MinerSectorEvent, + tasktype.MinerSectorPost, tasktype.MinerPreCommitInfo, tasktype.MinerPreCommitInfoV1_8, tasktype.MinerSectorEvent, tasktype.MinerCurrentDeadlineInfo, tasktype.MinerFeeDebt, tasktype.MinerLockedFund, tasktype.MinerInfo, tasktype.MinerBeneficiary}, }, @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) { } func TestMakeAllTaskNames(t *testing.T) { - const TotalTableTasks = 48 + const TotalTableTasks = 49 actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks) require.NoError(t, err) // if this test fails it means a new task name was added, update the above test diff --git a/storage/sql.go b/storage/sql.go index c3575ed02..0996f7bb3 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -49,6 +49,7 @@ var Models = []interface{}{ (*miner.MinerSectorInfoV1_6)(nil), (*miner.MinerSectorPost)(nil), (*miner.MinerPreCommitInfo)(nil), + (*miner.MinerPreCommitInfoV9)(nil), (*miner.MinerSectorEvent)(nil), (*miner.MinerCurrentDeadlineInfo)(nil), (*miner.MinerFeeDebt)(nil), @@ -352,10 +353,13 @@ func verifyModel(ctx context.Context, db *pg.DB, schemaName string, m *orm.Table } // Some common aliases - if datatype == "timestamp with time zone" { + switch datatype { + case "timestamp with time zone": + fallthrough + case "timestamp without time zone": datatype = "timestamptz" - } else if datatype == "timestamp without time zone" { - datatype = "timestamp" + case "ARRAY": + datatype = "bigint[]" } if datatype != fld.SQLType { diff --git a/tasks/actorstate/miner/precommit.go b/tasks/actorstate/miner/precommit.go index 8c7d70dcc..939f59013 100644 --- a/tasks/actorstate/miner/precommit.go +++ b/tasks/actorstate/miner/precommit.go @@ -68,3 +68,17 @@ func (PreCommitInfoExtractorV8) Extract(ctx context.Context, a actorstate.ActorI return preCommitModel, nil } + +func (PreCommitInfoExtractorV8) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) { + persistableList := make(minermodel.MinerPreCommitInfoList, 0, len(data)) + for _, d := range data { + ml, ok := d.(minermodel.MinerPreCommitInfoList) + if !ok { + return nil, fmt.Errorf("expected MinerPreCommitInfoList type but got: %T", d) + } + for _, m := range ml { + persistableList = append(persistableList, m) + } + } + return model.PersistableList{persistableList}, nil +}