From f72f53bc54600a7b6360312e930bee04465e7a9d Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 16 Jun 2024 06:32:04 +0200 Subject: [PATCH] started implementation of mev block indexer --- db/mev_blocks.go | 18 ++- db/slots.go | 19 +++ services/chainservice.go | 9 +- services/mevindexer.go | 308 +++++++++++++++++++++++++++++++++++++++ test-config.yaml | 8 + types/config.go | 11 ++ 6 files changed, 370 insertions(+), 3 deletions(-) create mode 100644 services/mevindexer.go diff --git a/db/mev_blocks.go b/db/mev_blocks.go index 9acccee0..286ace02 100644 --- a/db/mev_blocks.go +++ b/db/mev_blocks.go @@ -51,7 +51,7 @@ func InsertMevBlocks(mevBlocks []*dbtypes.MevBlock, tx *sqlx.Tx) error { argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ - dbtypes.DBEnginePgsql: " ON CONFLICT (block_hash) DO UPDATE SET proposed = excluded.proposed", + dbtypes.DBEnginePgsql: " ON CONFLICT (block_hash) DO UPDATE SET proposed = excluded.proposed, seenby_relays = excluded.seenby_relays", dbtypes.DBEngineSqlite: "", })) @@ -62,7 +62,21 @@ func InsertMevBlocks(mevBlocks []*dbtypes.MevBlock, tx *sqlx.Tx) error { return nil } -func GetMevBlocksFiltered(offset uint64, limit uint32, finalizedBlock uint64, filter *dbtypes.MevBlockFilter) ([]*dbtypes.MevBlock, uint64, error) { +func GetHighestMevBlockSlotByRelay(relayId uint8) (uint64, error) { + highestSlot := uint64(0) + err := ReaderDb.Get(&highestSlot, ` + SELECT + MAX(slot_number) + FROM mev_blocks + WHERE (seenby_relays & $1) != 0 + `, uint64(1)<= totalCount || len(mevBlocks) == 0 { + break + } + } + + for _, relay := range utils.Config.MevIndexer.Relays { + lastSlot, err := db.GetHighestMevBlockSlotByRelay(relay.Index) + if err != nil { + continue + } + mev.lastLoadedSlot[relay.Index] = lastSlot + } + + mev.mevBlockCacheLoaded = true + } + + // load data from all relays + wg := &sync.WaitGroup{} + for idx := range utils.Config.MevIndexer.Relays { + wg.Add(1) + + go func(idx int, relay *types.MevRelayConfig) { + defer func() { + wg.Done() + }() + + err := mev.loadMevBlocksFromRelay(indexer, relay) + if err != nil { + logger_mev.Errorf("error loading mev blocks from relay %v (%v): %v", idx, relay.Name, err) + } + }(idx, &utils.Config.MevIndexer.Relays[idx]) + } + wg.Wait() + + mev.lastRefresh = time.Now() + return nil +} + +type mevIndexerRelayBlockResponse struct { + Slot string `json:"slot"` + ParentHash string `json:"parent_hash"` + BlockHash string `json:"block_hash"` + BuilderPubkey string `json:"builder_pubkey"` + ProposerPubkey string `json:"proposer_pubkey"` + ProposerFeeRecipient string `json:"proposer_fee_recipient"` + GasLimit string `json:"gas_limit"` + GasUsed string `json:"gas_used"` + Value string `json:"value"` + NumTx string `json:"num_tx"` + BlockNumber string `json:"block_number"` +} + +func (mev *MevIndexer) loadMevBlocksFromRelay(indexer *indexer.Indexer, relay *types.MevRelayConfig) error { + relayUrl, err := url.Parse(relay.Url) + if err != nil { + return fmt.Errorf("invalid relay url: %v", err) + } + + relayUrl.Path = path.Join(relayUrl.Path, "/relay/v1/data/bidtraces/proposer_payload_delivered?limit=1000") + apiUrl := relayUrl.String() + + logger_mev.Debugf("Loading mev blocks from relay %v: %v", relay.Name, utils.GetRedactedUrl(apiUrl)) + + client := &http.Client{Timeout: time.Second * 120} + resp, err := client.Get(apiUrl) + if err != nil { + return fmt.Errorf("could not fetch mev blocks (%v): %v", utils.GetRedactedUrl(apiUrl), err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return fmt.Errorf("could not fetch mev blocks (%v): not found", utils.GetRedactedUrl(apiUrl)) + } + data, _ := io.ReadAll(resp.Body) + return fmt.Errorf("url: %v, error-response: %s", utils.GetRedactedUrl(apiUrl), data) + } + blocksResponse := []*mevIndexerRelayBlockResponse{} + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&blocksResponse) + if err != nil { + return fmt.Errorf("error parsing mev blocks response: %v", err) + } + + // parse blocks + mev.mevBlockCacheMutex.Lock() + defer mev.mevBlockCacheMutex.Unlock() + validatorSet := indexer.GetCachedValidatorPubkeyMap() + highestSlot, finalizedEpoch, _, processedEpoch := indexer.GetCacheState() + finalizedSlot := uint64(0) + if processedEpoch >= 0 { + finalizedSlot = (uint64(processedEpoch+1) * utils.Config.Chain.Config.SlotsPerEpoch) - 1 + } else if finalizedEpoch >= 0 { + finalizedSlot = (uint64(finalizedEpoch+1) * utils.Config.Chain.Config.SlotsPerEpoch) - 1 + } + relayFlag := uint64(1) << relay.Index + + for idx, blockData := range blocksResponse { + blockHash := common.HexToHash(blockData.BlockHash) + + if mev.mevBlockCache[blockHash] == nil { + slot, err := strconv.ParseUint(blockData.Slot, 10, 64) + if err != nil { + logger_mev.Warnf("failed parsing mev block %v.Slot: %v", idx, err) + continue + } + + if int64(slot) > highestSlot { + // skip for now, process in next refresh + continue + } + + if slot <= mev.lastLoadedSlot[relay.Index] { + continue + } + + blockNumber, err := strconv.ParseUint(blockData.BlockNumber, 10, 64) + if err != nil { + logger_mev.Warnf("failed parsing mev block %v.BlockNumber: %v", idx, err) + continue + } + + txCount, err := strconv.ParseUint(blockData.NumTx, 10, 64) + if err != nil { + logger_mev.Warnf("failed parsing mev block %v.NumTx: %v", idx, err) + continue + } + + gasUsed, err := strconv.ParseUint(blockData.GasUsed, 10, 64) + if err != nil { + logger_mev.Warnf("failed parsing mev block %v.GasUsed: %v", idx, err) + continue + } + + blockValue := big.NewInt(0) + blockValue, ok := blockValue.SetString(blockData.Value, 10) + if !ok { + logger_mev.Warnf("failed parsing mev block %v.Value: big.Int.SetString failed", idx) + continue + } + blockValueBytes := blockValue.Bytes() + blockValueGwei := big.NewInt(0).Div(blockValue, utils.GWEI) + + validatorPubkey := phase0.BLSPubKey(common.FromHex(blockData.ProposerPubkey)) + validator := validatorSet[validatorPubkey] + if validator == nil { + logger_mev.Warnf("failed parsing mev block %v: ProposerPubkey (%v) not found in validator set", idx, validatorPubkey.String()) + continue + } + + mevBlock := &dbtypes.MevBlock{ + SlotNumber: slot, + BlockHash: blockHash[:], + BlockNumber: blockNumber, + BuilderPubkey: common.FromHex(blockData.BuilderPubkey), + ProposerIndex: uint64(validator.Index), + SeenbyRelays: relayFlag, + FeeRecipient: common.FromHex(blockData.ProposerFeeRecipient), + TxCount: txCount, + GasUsed: gasUsed, + BlockValue: blockValueBytes, + BlockValueGwei: blockValueGwei.Uint64(), + } + mevBlock.Proposed = mev.getMevBlockProposedStatus(indexer, mevBlock, finalizedSlot) + + mev.mevBlockCache[blockHash] = &mevIndexerBlockCache{ + updated: true, + block: mevBlock, + } + } else { + cachedBlock := mev.mevBlockCache[blockHash] + if cachedBlock.block.SeenbyRelays&relayFlag > 0 { + continue + } + + cachedBlock.block.SeenbyRelays |= relayFlag + cachedBlock.updated = true + } + } + + return nil +} + +func (mev *MevIndexer) getMevBlockProposedStatus(indexer *indexer.Indexer, mevBlock *dbtypes.MevBlock, finalizedSlot uint64) uint8 { + proposed := uint8(0) + if mevBlock.SlotNumber > finalizedSlot { + for _, block := range indexer.GetCachedBlocksByExecutionBlockHash(mevBlock.BlockHash) { + if proposed != 1 && block.IsCanonical(indexer, nil) { + proposed = 1 + } else { + proposed = 2 + } + } + } else { + for _, block := range db.GetSlotsByBlockHash(mevBlock.BlockHash) { + if proposed != 1 && block.Status == dbtypes.Canonical { + proposed = 1 + } else { + proposed = 2 + } + } + } + + return proposed +} diff --git a/test-config.yaml b/test-config.yaml index 87b02b7c..d891609f 100644 --- a/test-config.yaml +++ b/test-config.yaml @@ -66,6 +66,14 @@ indexer: disableIndexWriter: false syncEpochCooldown: 1 +mevIndexer: + # list of mev relays to crawl mev blocks from + relays: + - index: 0 # identifier for this relay in db (0-63) + name: Flashbots + url: https://boost-relay.flashbots.net/ + refreshInterval: 10m + database: engine: "sqlite" sqlite: diff --git a/types/config.go b/types/config.go index 1cf21522..16b7b802 100644 --- a/types/config.go +++ b/types/config.go @@ -114,6 +114,11 @@ type Config struct { RecheckTimeout time.Duration `yaml:"recheckTimeout" envconfig:"TXSIG_RECHECK_TIMEOUT"` } `yaml:"txsig"` + MevIndexer struct { + Relays []MevRelayConfig `yaml:"relays"` + RefreshInterval time.Duration `yaml:"refreshInterval" envconfig:"MEVINDEXER_REFRESH_INTERVAL"` + } `yaml:"mevIndexer"` + Database struct { Engine string `yaml:"engine" envconfig:"DATABASE_ENGINE"` Sqlite struct { @@ -165,6 +170,12 @@ type EndpointSshConfig struct { Keyfile string `yaml:"keyfile"` } +type MevRelayConfig struct { + Index uint8 `yaml:"index"` + Name string `yaml:"name"` + Url string `yaml:"url"` +} + type SqliteDatabaseConfig struct { File string MaxOpenConns int