Skip to content

Commit

Permalink
fix: align model's primary key with database schema (#1271)
Browse files Browse the repository at this point in the history
* Fix the primary key mismatch problem

* Fix the upsert action in postgresql function

* Add more field to promise data unique for several models
  • Loading branch information
Terryhung authored Oct 18, 2023
1 parent 4e07a9d commit 95c4053
Show file tree
Hide file tree
Showing 18 changed files with 67 additions and 26 deletions.
4 changes: 2 additions & 2 deletions model/actordumps/fevm_actor_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type FEVMActorDump struct {
// Height message was executed at.
Height int64 `pg:",pk,notnull,use_zero"`
// Actor address.
ActorID string `pg:",notnull"`
ActorID string `pg:",pk,notnull"`
// Actor Address in ETH.
EthAddress string `pg:",notnull"`
// Contract Bytecode.
Expand All @@ -25,7 +25,7 @@ type FEVMActorDump struct {
// Balance of EVM actor in attoFIL.
Balance string `pg:"type:numeric,notnull"`
// The next actor nonce that is expected to appear on chain.
Nonce uint64 `pg:",use_zero"`
Nonce uint64 `pg:",pk,use_zero"`
// Actor Name
ActorName string `pg:",notnull"`
}
Expand Down
2 changes: 2 additions & 0 deletions model/actors/common/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type ActorState struct {
Code string `pg:",pk,notnull"`
// Top level of state data as json.
State string `pg:",type:jsonb,notnull"`
// Address of actor.
Address string `pg:",pk,notnull"`
}

func (as *ActorState) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
8 changes: 4 additions & 4 deletions model/actors/market/dealstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
type MarketDealState struct {
Height int64 `pg:",pk,notnull,use_zero"`
DealID uint64 `pg:",pk,use_zero"`
SectorStartEpoch int64 `pg:",pk,use_zero"`
LastUpdateEpoch int64 `pg:",pk,use_zero"`
SlashEpoch int64 `pg:",pk,use_zero"`
SectorStartEpoch int64 `pg:",use_zero"`
LastUpdateEpoch int64 `pg:",use_zero"`
SlashEpoch int64 `pg:",use_zero"`

StateRoot string `pg:",notnull"`
StateRoot string `pg:",pk,notnull"`
}

func (ds *MarketDealState) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
2 changes: 1 addition & 1 deletion model/actors/miner/sectorevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type MinerSectorEvent struct {
SectorID uint64 `pg:",pk,use_zero"`
StateRoot string `pg:",pk,notnull"`

Event string `pg:"type:miner_sector_event_type" pg:",pk,notnull"` // nolint: staticcheck
Event string `pg:",pk,type:miner_sector_event_type,notnull"` // nolint: staticcheck
}

func (mse *MinerSectorEvent) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
2 changes: 1 addition & 1 deletion model/blocks/drand.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type DrandBlockEntrie struct {
// Round is the round number of randomness used.
Round uint64 `pg:",pk,use_zero"`
// Block is the CID of the block.
Block string `pg:",notnull"`
Block string `pg:",pk,notnull"`
}

func (dbe *DrandBlockEntrie) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
2 changes: 1 addition & 1 deletion model/blocks/parent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type BlockParent struct {
Height int64 `pg:",pk,notnull,use_zero"`
Block string `pg:",pk,notnull"`
Parent string `pg:",notnull"`
Parent string `pg:",pk,notnull"`
}

func (bp *BlockParent) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
2 changes: 1 addition & 1 deletion model/chain/economics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type ChainEconomics struct {
tableName struct{} `pg:"chain_economics"` // nolint: structcheck
Height int64 `pg:",pk,notnull,use_zero"`
ParentStateRoot string `pg:",notnull"`
ParentStateRoot string `pg:",pk,notnull"`
CirculatingFil string `pg:"type:numeric,notnull"`
VestedFil string `pg:"type:numeric,notnull"`
MinedFil string `pg:"type:numeric,notnull"`
Expand Down
4 changes: 2 additions & 2 deletions model/fevm/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type FEVMContract struct {
// Height message was executed at.
Height int64 `pg:",pk,notnull,use_zero"`
// Actor address.
ActorID string `pg:",notnull"`
ActorID string `pg:",pk,notnull"`
// Actor Address in ETH.
EthAddress string `pg:",notnull"`
// Contract Bytecode.
Expand All @@ -25,7 +25,7 @@ type FEVMContract struct {
// Balance of EVM actor in attoFIL.
Balance string `pg:"type:numeric,notnull"`
// The next actor nonce that is expected to appear on chain.
Nonce uint64 `pg:",use_zero"`
Nonce uint64 `pg:",pk,use_zero"`
}

func (f *FEVMContract) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
2 changes: 1 addition & 1 deletion model/fevm/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type FEVMReceipt struct {
// Message CID
Message string `pg:",use_zero"`
// Hash of transaction.
TransactionHash string `pg:",notnull"`
TransactionHash string `pg:",pk,notnull"`
// Integer of the transactions index position in the block.
TransactionIndex uint64 `pg:",use_zero"`
// Hash of the block where this transaction was in.
Expand Down
11 changes: 6 additions & 5 deletions model/messages/actorevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ type ActorEvent struct {
MessageCid string `pg:",pk,notnull"`
EventIndex int64 `pg:",pk,notnull,use_zero"`

Emitter string `pg:",notnull"`
Flags []byte `pg:",notnull"`
Codec uint64 `pg:",notnull,use_zero"`
Key string `pg:",notnull"`
Value []byte `pg:",notnull"`
Emitter string `pg:",notnull"`
Flags []byte `pg:",notnull"`
Codec uint64 `pg:",notnull,use_zero"`
Key string `pg:",notnull"`
Value []byte `pg:",notnull"`
EntryIndex int64 `pg:",pk,notnull,use_zero"`
}

func (a *ActorEvent) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
2 changes: 1 addition & 1 deletion model/messages/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type VMMessage struct {
// Returns value of message receipt.
Returns string `pg:",type:jsonb"`
// Index indicating the order of the messages execution.
Index uint64 `pg:",notnull,use_zero"`
Index uint64 `pg:",pk,notnull,use_zero"`
}

func (v *VMMessage) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error {
Expand Down
15 changes: 15 additions & 0 deletions schemas/v1/35_add_pkey_for_tables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package v1

func init() {
patches.Register(
35,
`
ALTER TABLE {{ .SchemaName | default "public"}}.drand_block_entries DROP CONSTRAINT IF EXISTS drand_block_entries_pkey CASCADE, ADD PRIMARY KEY (round, block);
ALTER TABLE {{ .SchemaName | default "public"}}.vm_messages DROP CONSTRAINT IF EXISTS vm_messages_pkey CASCADE, ADD PRIMARY KEY(height, state_root, cid, source, index);
ALTER TABLE {{ .SchemaName | default "public"}}.actor_events ADD COLUMN IF NOT EXISTS entry_index bigint;
ALTER TABLE {{ .SchemaName | default "public"}}.actor_events DROP CONSTRAINT IF EXISTS actor_events_pkey CASCADE, ADD PRIMARY KEY(height, state_root, message_cid, event_index, entry_index);
ALTER TABLE {{ .SchemaName | default "public"}}.actor_states ADD COLUMN IF NOT EXISTS address text DEFAULT '';
ALTER TABLE {{ .SchemaName | default "public"}}.actor_states DROP CONSTRAINT IF EXISTS actor_states_pkey CASCADE, ADD PRIMARY KEY(height, head, code, address);
`,
)
}
11 changes: 10 additions & 1 deletion storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,17 @@ func (s *TxStorage) PersistModel(ctx context.Context, m interface{}) error {
}

}

// Prepare the conflict and upsert sql string
conflict := ""
upsert := ""
if s.upsert {
conflict, upsert := GenerateUpsertStrings(m)
conflict, upsert = GenerateUpsertStrings(m)
}

// If the upsert string is left blank, indicating that all fields serve as primary keys.
// In such a case, proceed with the standard insert process.
if s.upsert && len(upsert) > 0 {
if _, err := s.tx.ModelContext(ctx, m).
OnConflict(conflict).
Set(upsert).
Expand Down
2 changes: 2 additions & 0 deletions tasks/actorstate/actorstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type ActorStateAPI interface {
DiffPreCommitsV8(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.PreCommitChangesV8, error)

MinerLoad(store adt.Store, act *types.Actor) (miner.State, error)

LookupRobustAddress(ctx context.Context, idAddr address.Address, tsk types.TipSetKey) (address.Address, error)
}

// An ActorStateExtractor extracts actor state into a persistable format
Expand Down
15 changes: 11 additions & 4 deletions tasks/actorstate/raw/actor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ func (RawActorStateExtractor) Extract(ctx context.Context, a actorstate.ActorInf
return nil, err
}

// get the roubust address address from api
address, err := node.LookupRobustAddress(ctx, a.Address, a.Current.Key())
if err != nil {
address = a.Address
}

return &commonmodel.ActorState{
Height: int64(a.Current.Height()),
Head: a.Actor.Head.String(),
Code: a.Actor.Code.String(),
State: string(state),
Height: int64(a.Current.Height()),
Head: a.Actor.Head.String(),
Code: a.Actor.Code.String(),
Address: address.String(),
State: string(state),
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion tasks/actorstate/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ func (t *Task) startActorStateExtraction(ctx context.Context, current, executed
for addr, ac := range actors {
addr := addr
ac := ac

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
3 changes: 2 additions & 1 deletion tasks/messages/actorevent/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut
}

for evtIdx, event := range events {
for _, e := range event.Entries {
for entryIdx, e := range event.Entries {
emitter, err := address.NewIDAddress(uint64(event.Emitter))
if err != nil {
errorsDetected = append(errorsDetected, &messages.MessageError{
Expand All @@ -109,6 +109,7 @@ func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, execut
Codec: e.Codec,
Key: e.Key,
Value: e.Value,
EntryIndex: int64(entryIdx),
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions tasks/test/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (m *MockActorStateAPI) Store() adt.Store {
return nil
}

func (m *MockActorStateAPI) LookupRobustAddress(_ context.Context, _ address.Address, _ types.TipSetKey) (address.Address, error) {
m.Called()
return address.TestAddress, nil
}

func (m *MockActorStateAPI) DiffPreCommits(ctx context.Context, _ address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.PreCommitChanges, error) {
args := m.Called(ctx, ts, pts, pre, cur)
tsmsgs := args.Get(0)
Expand Down

0 comments on commit 95c4053

Please sign in to comment.