Skip to content

Commit

Permalink
feat: Add new function for filtering invalid message (#1269)
Browse files Browse the repository at this point in the history
* Add new function for filtering invalid message
  • Loading branch information
Terryhung authored Oct 12, 2023
1 parent 09767ad commit 4e07a9d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 0 deletions.
4 changes: 4 additions & 0 deletions chain/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet)
return t.node.MessagesForTipSetBlocks(ctx, ts)
}

func (t *DataSource) MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) {
return t.node.MessagesWithDeduplicationForTipSet(ctx, ts)
}

func (t *DataSource) StateListActors(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) {
return t.node.StateListActors(ctx, tsk)
}
Expand Down
1 change: 1 addition & 0 deletions lens/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ChainAPI interface {

MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*BlockMessages, error)
TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*BlockMessageReceipts, error)
MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error)

// added during hyperspace
ChainGetEvents(ctx context.Context, root cid.Cid) ([]types.Event, error)
Expand Down
18 changes: 18 additions & 0 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,24 @@ func (m *LilyNodeAPI) MessagesForTipSetBlocks(ctx context.Context, ts *types.Tip
return out, nil
}

func (m *LilyNodeAPI) MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) {
blkMsgs, err := m.ChainAPI.Chain.BlockMsgsForTipset(ctx, ts)
msgMap := make(map[cid.Cid]types.ChainMsg)
if err != nil {
return msgMap, err
}
for _, blk := range blkMsgs {
for _, msg := range blk.BlsMessages {
msgMap[msg.Cid()] = msg
}
for _, msg := range blk.SecpkMessages {
msgMap[msg.Cid()] = msg
}
}

return msgMap, nil
}

// TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`).
func (m *LilyNodeAPI) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) {
// sanity check args
Expand Down
1 change: 1 addition & 0 deletions tasks/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type DataSource interface {

TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error)
MessageReceiptEvents(ctx context.Context, root cid.Cid) ([]types.Event, error)
MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error)

DiffSectors(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.SectorChanges, error)
DiffPreCommits(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.PreCommitChanges, error)
Expand Down
19 changes: 19 additions & 0 deletions tasks/messages/gaseconomy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
messagemodel "github.com/filecoin-project/lily/model/messages"
visormodel "github.com/filecoin-project/lily/model/visor"
"github.com/filecoin-project/lily/tasks"

logging "github.com/ipfs/go-log/v2"
)

type Task struct {
Expand All @@ -28,6 +30,8 @@ func NewTask(node tasks.DataSource) *Task {
}
}

var log = logging.Logger("lily/tasks/gaseconomy")

func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSet")
if span.IsRecording() {
Expand All @@ -44,6 +48,13 @@ func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.
StateRoot: current.ParentState().String(),
}

validMsgCid, err := t.node.MessagesWithDeduplicationForTipSet(ctx, current)
if err != nil {
log.Errorf("Error at getting messages with deduplication: %v", err)
}

log.Infof("Get the count of valid messages: %v", len(validMsgCid))

msgrec, err := t.node.TipSetBlockMessages(ctx, current)
if err != nil {
report.ErrorsDetected = fmt.Errorf("getting tipset messages receipts: %w", err)
Expand All @@ -65,6 +76,10 @@ func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.
}

for _, msg := range mr.BlsMessages {
if _, exists := validMsgCid[msg.Cid()]; !exists {
log.Errorf("Get invalid message cid: %v", msg.Cid())
continue
}
// calculate total gas limit of executed messages regardless of duplicates.
totalGasLimit += msg.GasLimit
if exeMsgSeen[msg.Cid()] {
Expand All @@ -76,6 +91,10 @@ func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.

}
for _, msg := range mr.SecpMessages {
if _, exists := validMsgCid[msg.Cid()]; !exists {
log.Errorf("Get invalid message cid: %v", msg.Cid())
continue
}
// calculate total gas limit of executed messages regardless of duplicates.
totalGasLimit += msg.VMMessage().GasLimit
if exeMsgSeen[msg.Cid()] {
Expand Down
5 changes: 5 additions & 0 deletions testutil/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (aw *APIWrapper) TipSetMessageReceipts(ctx context.Context, ts, pts *types.
panic("implement me")
}

func (aw *APIWrapper) MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) {
//TODO implement me
panic("implement me")
}

func (aw *APIWrapper) CirculatingSupply(ctx context.Context, key types.TipSetKey) (api.CirculatingSupply, error) {
return aw.StateVMCirculatingSupplyInternal(ctx, key)
}
Expand Down

0 comments on commit 4e07a9d

Please sign in to comment.