diff --git a/chain/datasource/datasource.go b/chain/datasource/datasource.go index 937fb915..61b50c1e 100644 --- a/chain/datasource/datasource.go +++ b/chain/datasource/datasource.go @@ -147,6 +147,10 @@ func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet) return t.node.MessagesForTipSetBlocks(ctx, ts) } +func (t *DataSource) MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) { + return t.node.MessagesWithDeduplicationForTipSet(ctx, ts) +} + func (t *DataSource) StateListActors(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) { return t.node.StateListActors(ctx, tsk) } diff --git a/lens/interface.go b/lens/interface.go index fe361852..a0e5b38d 100644 --- a/lens/interface.go +++ b/lens/interface.go @@ -51,6 +51,7 @@ type ChainAPI interface { MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*BlockMessages, error) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*BlockMessageReceipts, error) + MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) // added during hyperspace ChainGetEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) diff --git a/lens/lily/impl.go b/lens/lily/impl.go index f7039ffa..230047bf 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -592,6 +592,24 @@ func (m *LilyNodeAPI) MessagesForTipSetBlocks(ctx context.Context, ts *types.Tip return out, nil } +func (m *LilyNodeAPI) MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) { + blkMsgs, err := m.ChainAPI.Chain.BlockMsgsForTipset(ctx, ts) + msgMap := make(map[cid.Cid]types.ChainMsg) + if err != nil { + return msgMap, err + } + for _, blk := range blkMsgs { + for _, msg := range blk.BlsMessages { + msgMap[msg.Cid()] = msg + } + for _, msg := range blk.SecpkMessages { + msgMap[msg.Cid()] = msg + } + } + + return msgMap, nil +} + // TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`). func (m *LilyNodeAPI) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) { // sanity check args diff --git a/tasks/api.go b/tasks/api.go index 8be5a549..a2f6e694 100644 --- a/tasks/api.go +++ b/tasks/api.go @@ -70,6 +70,7 @@ type DataSource interface { TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) MessageReceiptEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) + MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) DiffSectors(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.SectorChanges, error) DiffPreCommits(ctx context.Context, addr address.Address, ts, pts *types.TipSet, pre, cur miner.State) (*miner.PreCommitChanges, error) diff --git a/tasks/messages/gaseconomy/task.go b/tasks/messages/gaseconomy/task.go index 487afd82..49a65652 100644 --- a/tasks/messages/gaseconomy/task.go +++ b/tasks/messages/gaseconomy/task.go @@ -16,6 +16,8 @@ import ( messagemodel "github.com/filecoin-project/lily/model/messages" visormodel "github.com/filecoin-project/lily/model/visor" "github.com/filecoin-project/lily/tasks" + + logging "github.com/ipfs/go-log/v2" ) type Task struct { @@ -28,6 +30,8 @@ func NewTask(node tasks.DataSource) *Task { } } +var log = logging.Logger("lily/tasks/gaseconomy") + func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSet") if span.IsRecording() { @@ -44,6 +48,13 @@ func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model. StateRoot: current.ParentState().String(), } + validMsgCid, err := t.node.MessagesWithDeduplicationForTipSet(ctx, current) + if err != nil { + log.Errorf("Error at getting messages with deduplication: %v", err) + } + + log.Infof("Get the count of valid messages: %v", len(validMsgCid)) + msgrec, err := t.node.TipSetBlockMessages(ctx, current) if err != nil { report.ErrorsDetected = fmt.Errorf("getting tipset messages receipts: %w", err) @@ -65,6 +76,10 @@ func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model. } for _, msg := range mr.BlsMessages { + if _, exists := validMsgCid[msg.Cid()]; !exists { + log.Errorf("Get invalid message cid: %v", msg.Cid()) + continue + } // calculate total gas limit of executed messages regardless of duplicates. totalGasLimit += msg.GasLimit if exeMsgSeen[msg.Cid()] { @@ -76,6 +91,10 @@ func (t *Task) ProcessTipSet(ctx context.Context, current *types.TipSet) (model. } for _, msg := range mr.SecpMessages { + if _, exists := validMsgCid[msg.Cid()]; !exists { + log.Errorf("Get invalid message cid: %v", msg.Cid()) + continue + } // calculate total gas limit of executed messages regardless of duplicates. totalGasLimit += msg.VMMessage().GasLimit if exeMsgSeen[msg.Cid()] { diff --git a/testutil/lens.go b/testutil/lens.go index 1c99f9e5..a631512a 100644 --- a/testutil/lens.go +++ b/testutil/lens.go @@ -51,6 +51,11 @@ func (aw *APIWrapper) TipSetMessageReceipts(ctx context.Context, ts, pts *types. panic("implement me") } +func (aw *APIWrapper) MessagesWithDeduplicationForTipSet(ctx context.Context, ts *types.TipSet) (map[cid.Cid]types.ChainMsg, error) { + //TODO implement me + panic("implement me") +} + func (aw *APIWrapper) CirculatingSupply(ctx context.Context, key types.TipSetKey) (api.CirculatingSupply, error) { return aw.StateVMCirculatingSupplyInternal(ctx, key) }