From a39a1dd7bebba70433f9c5e86d2031ebf446cc5b Mon Sep 17 00:00:00 2001 From: harkamal Date: Sat, 21 Jan 2023 23:43:00 +0530 Subject: [PATCH 01/13] Cache and retransmit bls changes if submitted early --- .../src/api/impl/beacon/pool/index.ts | 6 +++++- packages/beacon-node/src/chain/chain.ts | 19 ++++++++++++++++++- packages/beacon-node/src/chain/interface.ts | 15 ++++++++++++++- packages/beacon-node/src/db/beacon.ts | 3 +++ packages/beacon-node/src/db/interface.ts | 2 ++ .../db/repositories/blsToExecutionChange.ts | 13 +++++++++++++ .../beacon-node/src/db/repositories/index.ts | 2 +- .../beacon-node/test/utils/stub/beaconDb.ts | 3 +++ packages/db/src/schema.ts | 1 + 9 files changed, 60 insertions(+), 4 deletions(-) diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index 20e8cf56543a..fd90761b6868 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -104,7 +104,11 @@ export function getBeaconPoolApi({ try { await validateBlsToExecutionChange(chain, blsToExecutionChange); chain.opPool.insertBlsToExecutionChange(blsToExecutionChange); - await network.gossip.publishBlsToExecutionChange(blsToExecutionChange); + try { + await network.gossip.publishBlsToExecutionChange(blsToExecutionChange); + } catch (e) { + await chain.cacheBlsToExecutionChanges(blsToExecutionChange); + } } catch (e) { errors.push(e as Error); logger.error( diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index ab86b36b71c7..b7b0526ff522 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -12,7 +12,19 @@ import { PubkeyIndexMap, } from "@lodestar/state-transition"; import {IBeaconConfig} from "@lodestar/config"; -import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types"; +import { + allForks, + UintNum64, + Root, + phase0, + Slot, + RootHex, + Epoch, + ValidatorIndex, + eip4844, + Wei, + capella, +} from "@lodestar/types"; import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {ProcessShutdownCallback} from "@lodestar/validator"; import {ILogger, pruneSetToMax, toHex} from "@lodestar/utils"; @@ -779,4 +791,9 @@ export class BeaconChain implements IBeaconChain { } } } + + /** Must be validated beforehand */ + async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise { + return this.db.blsToExecutionChangeCache.add(blsToExecutionChange); + } } diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 8242bf677ee3..ba362f1ca6d5 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -1,4 +1,16 @@ -import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types"; +import { + allForks, + UintNum64, + Root, + phase0, + Slot, + RootHex, + Epoch, + ValidatorIndex, + eip4844, + Wei, + capella, +} from "@lodestar/types"; import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {IBeaconConfig} from "@lodestar/config"; import {CompositeTypeAny, TreeView, Type} from "@chainsafe/ssz"; @@ -133,6 +145,7 @@ export interface IBeaconChain { /** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */ persistInvalidSszView(view: TreeView, suffix?: string): void; updateBuilderStatus(clockSlot: Slot): void; + cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise; } export type SSZObjectType = diff --git a/packages/beacon-node/src/db/beacon.ts b/packages/beacon-node/src/db/beacon.ts index 39419bfbd017..0fa328d84012 100644 --- a/packages/beacon-node/src/db/beacon.ts +++ b/packages/beacon-node/src/db/beacon.ts @@ -18,6 +18,7 @@ import { BlobsSidecarRepository, BlobsSidecarArchiveRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, } from "./repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js"; @@ -33,6 +34,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb { attesterSlashing: AttesterSlashingRepository; depositEvent: DepositEventRepository; blsToExecutionChange: BLSToExecutionChangeRepository; + blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository; depositDataRoot: DepositDataRootRepository; eth1Data: Eth1DataRepository; @@ -58,6 +60,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb { this.stateArchive = new StateArchiveRepository(this.config, this.db); this.voluntaryExit = new VoluntaryExitRepository(this.config, this.db); this.blsToExecutionChange = new BLSToExecutionChangeRepository(this.config, this.db); + this.blsToExecutionChangeCache = new BLSToExecutionChangeCacheRepository(this.config, this.db); this.proposerSlashing = new ProposerSlashingRepository(this.config, this.db); this.attesterSlashing = new AttesterSlashingRepository(this.config, this.db); this.depositEvent = new DepositEventRepository(this.config, this.db); diff --git a/packages/beacon-node/src/db/interface.ts b/packages/beacon-node/src/db/interface.ts index 4cba2364d9e4..0d29dae2a167 100644 --- a/packages/beacon-node/src/db/interface.ts +++ b/packages/beacon-node/src/db/interface.ts @@ -17,6 +17,7 @@ import { BlobsSidecarRepository, BlobsSidecarArchiveRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, } from "./repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js"; @@ -43,6 +44,7 @@ export interface IBeaconDb { attesterSlashing: AttesterSlashingRepository; depositEvent: DepositEventRepository; blsToExecutionChange: BLSToExecutionChangeRepository; + blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository; // eth1 processing preGenesisState: PreGenesisState; diff --git a/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts b/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts index 3f41dfa9fc1f..ce53150fa00a 100644 --- a/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts +++ b/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts @@ -11,3 +11,16 @@ export class BLSToExecutionChangeRepository extends Repository { + constructor(config: IChainForkConfig, db: Db) { + super(config, db, Bucket.capella_blsToExecutionChangeCache, ssz.capella.SignedBLSToExecutionChange); + } + + getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex { + return value.message.validatorIndex; + } +} diff --git a/packages/beacon-node/src/db/repositories/index.ts b/packages/beacon-node/src/db/repositories/index.ts index 3de56a845c3e..801b50226500 100644 --- a/packages/beacon-node/src/db/repositories/index.ts +++ b/packages/beacon-node/src/db/repositories/index.ts @@ -17,4 +17,4 @@ export {CheckpointHeaderRepository} from "./lightclientCheckpointHeader.js"; export {SyncCommitteeRepository} from "./lightclientSyncCommittee.js"; export {SyncCommitteeWitnessRepository} from "./lightclientSyncCommitteeWitness.js"; export {BackfilledRanges} from "./backfilledRanges.js"; -export {BLSToExecutionChangeRepository} from "./blsToExecutionChange.js"; +export {BLSToExecutionChangeRepository, BLSToExecutionChangeCacheRepository} from "./blsToExecutionChange.js"; diff --git a/packages/beacon-node/test/utils/stub/beaconDb.ts b/packages/beacon-node/test/utils/stub/beaconDb.ts index 3e775030f4d4..cfb691fa2cdd 100644 --- a/packages/beacon-node/test/utils/stub/beaconDb.ts +++ b/packages/beacon-node/test/utils/stub/beaconDb.ts @@ -14,6 +14,7 @@ import { StateArchiveRepository, VoluntaryExitRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, BlobsSidecarRepository, BlobsSidecarArchiveRepository, } from "../../../src/db/repositories/index.js"; @@ -32,6 +33,8 @@ export class StubbedBeaconDb extends BeaconDb { voluntaryExit: SinonStubbedInstance & VoluntaryExitRepository; blsToExecutionChange: SinonStubbedInstance & BLSToExecutionChangeRepository; + blsToExecutionChangeCache: SinonStubbedInstance & + BLSToExecutionChangeCacheRepository; proposerSlashing: SinonStubbedInstance & ProposerSlashingRepository; attesterSlashing: SinonStubbedInstance & AttesterSlashingRepository; depositEvent: SinonStubbedInstance & DepositEventRepository; diff --git a/packages/db/src/schema.ts b/packages/db/src/schema.ts index 72ba515bbab6..d66a0b70bcbd 100644 --- a/packages/db/src/schema.ts +++ b/packages/db/src/schema.ts @@ -33,6 +33,7 @@ export enum Bucket { phase0_proposerSlashing = 14, // ValidatorIndex -> ProposerSlashing phase0_attesterSlashing = 15, // Root -> AttesterSlashing capella_blsToExecutionChange = 16, // ValidatorIndex -> SignedBLSToExecutionChange + capella_blsToExecutionChangeCache = 17, // ValidatorIndex -> SignedBLSToExecutionChange // validator // validator = 16, // DEPRECATED on v0.11.0 // lastProposedBlock = 17, // DEPRECATED on v0.11.0 From b438d81e6ec08fa8525a9a725838774261160e7a Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 22 Jan 2023 02:45:13 +0530 Subject: [PATCH 02/13] regossip when synced on/post capella --- packages/beacon-node/src/chain/chain.ts | 2 +- packages/beacon-node/src/chain/interface.ts | 2 ++ packages/beacon-node/src/network/network.ts | 26 +++++++++++++++++++++ packages/beacon-node/test/utils/mocks/db.ts | 2 ++ 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index b7b0526ff522..d10513d4abcd 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -134,9 +134,9 @@ export class BeaconChain implements IBeaconChain { /** Map keyed by executionPayload.blockHash of the block for those blobs */ readonly producedBlobsSidecarCache = new Map(); readonly opts: IChainOptions; + readonly db: IBeaconDb; protected readonly blockProcessor: BlockProcessor; - protected readonly db: IBeaconDb; private readonly archiver: Archiver; private abortController = new AbortController(); private successfulExchangeTransition = false; diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index ba362f1ca6d5..1db4a6b04c20 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -20,6 +20,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {IEth1ForBlockProduction} from "../eth1/index.js"; import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js"; import {IMetrics} from "../metrics/metrics.js"; +import {IBeaconDb} from "../db/index.js"; import {IBeaconClock} from "./clock/interface.js"; import {ChainEventEmitter} from "./emitter.js"; import {IStateRegenerator} from "./regen/index.js"; @@ -102,6 +103,7 @@ export interface IBeaconChain { readonly checkpointBalancesCache: CheckpointBalancesCache; readonly producedBlobsSidecarCache: Map; readonly opts: IChainOptions; + readonly db: IBeaconDb; /** Stop beacon chain processing */ close(): Promise; diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 728564ca2eb2..24ab2710562a 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -63,6 +63,7 @@ export class Network implements INetwork { private readonly signal: AbortSignal; private subscribedForks = new Set(); + private cachedBlsChangesPromise: Promise | null = null; constructor(private readonly opts: INetworkOptions, modules: INetworkModules) { const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules; @@ -411,6 +412,17 @@ export class Network implements INetwork { } } } + + // If we are subscribed and post capella fork epoch, try gossiping the cached bls changes + if ( + this.isSubscribedToGossipCoreTopics() && + epoch >= this.config.CAPELLA_FORK_EPOCH && + !this.cachedBlsChangesPromise + ) { + this.cachedBlsChangesPromise = this.gossipCachedBlsChanges().then(() => { + this.cachedBlsChangesPromise = null; + }); + } } catch (e) { this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error); } @@ -482,6 +494,20 @@ export class Network implements INetwork { return topics; } + private async gossipCachedBlsChanges(): Promise { + const gossipedKeys = []; + try { + for await (const {key, value} of this.chain.db.blsToExecutionChangeCache.entriesStream()) { + await this.gossip.publishBlsToExecutionChange(value); + gossipedKeys.push(key); + } + } finally { + await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys).catch((e) => { + this.logger.error("Could not clear gossiped blsChanges from blsToExecutionChangeCache", {}, e as Error); + }); + } + } + private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise => { if (this.hasAttachedSyncCommitteeMember()) { try { diff --git a/packages/beacon-node/test/utils/mocks/db.ts b/packages/beacon-node/test/utils/mocks/db.ts index 70e60e2234c2..20e9ca0f2c84 100644 --- a/packages/beacon-node/test/utils/mocks/db.ts +++ b/packages/beacon-node/test/utils/mocks/db.ts @@ -17,6 +17,7 @@ import { BlobsSidecarRepository, BlobsSidecarArchiveRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, } from "../../../src/db/repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "../../../src/db/single/index.js"; import {createStubInstance} from "../types.js"; @@ -45,6 +46,7 @@ export function getStubbedBeaconDb(): IBeaconDb { attesterSlashing: createStubInstance(AttesterSlashingRepository), depositEvent: createStubInstance(DepositEventRepository), blsToExecutionChange: createStubInstance(BLSToExecutionChangeRepository), + blsToExecutionChangeCache: createStubInstance(BLSToExecutionChangeCacheRepository), // eth1 processing preGenesisState: createStubInstance(PreGenesisState), From 97fb72fc21f126a2020e7d006e83e599844cc1be Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 22 Jan 2023 13:14:50 +0530 Subject: [PATCH 03/13] fix tests --- packages/beacon-node/test/utils/mocks/chain/chain.ts | 9 ++++++--- packages/beacon-node/test/utils/stub/beaconDb.ts | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index fb7843195dcb..5c41a8246424 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -42,6 +42,7 @@ import {CheckpointBalancesCache} from "../../../../src/chain/balancesCache.js"; import {IChainOptions} from "../../../../src/chain/options.js"; import {BlockAttributes} from "../../../../src/chain/produceBlock/produceBlockBody.js"; import {ReqRespBlockResponse} from "../../../../src/network/index.js"; +import {IBeaconDb} from "../../../../src/db/index.js"; /* eslint-disable @typescript-eslint/no-empty-function */ @@ -67,6 +68,7 @@ export class MockBeaconChain implements IBeaconChain { safeSlotsToImportOptimistically: 0, suggestedFeeRecipient: "0x0000000000000000000000000000000000000000", }; + readonly db: IBeaconDb; readonly anchorStateLatestBlockSlot: Slot; readonly bls: IBlsVerifier; @@ -128,13 +130,13 @@ export class MockBeaconChain implements IBeaconChain { this.forkChoice = mockForkChoice(); this.stateCache = new StateContextCache({}); this.checkpointStateCache = new CheckpointStateCache({}); - const db = new StubbedBeaconDb(); + this.db = new StubbedBeaconDb(); this.regen = new StateRegenerator({ config: this.config, forkChoice: this.forkChoice, stateCache: this.stateCache, checkpointStateCache: this.checkpointStateCache, - db, + db: this.db, metrics: null, emitter: this.emitter, }); @@ -142,7 +144,7 @@ export class MockBeaconChain implements IBeaconChain { {}, { config: this.config, - db: db, + db: this.db, metrics: null, emitter: this.emitter, logger: this.logger, @@ -224,6 +226,7 @@ export class MockBeaconChain implements IBeaconChain { async updateBeaconProposerData(): Promise {} updateBuilderStatus(): void {} + async cacheBlsToExecutionChanges(): Promise {} } const root = ssz.Root.defaultValue() as Uint8Array; diff --git a/packages/beacon-node/test/utils/stub/beaconDb.ts b/packages/beacon-node/test/utils/stub/beaconDb.ts index cfb691fa2cdd..6004885759a7 100644 --- a/packages/beacon-node/test/utils/stub/beaconDb.ts +++ b/packages/beacon-node/test/utils/stub/beaconDb.ts @@ -52,6 +52,7 @@ export class StubbedBeaconDb extends BeaconDb { this.voluntaryExit = createStubInstance(VoluntaryExitRepository); this.blsToExecutionChange = createStubInstance(BLSToExecutionChangeRepository); + this.blsToExecutionChangeCache = createStubInstance(BLSToExecutionChangeCacheRepository); this.proposerSlashing = createStubInstance(ProposerSlashingRepository); this.attesterSlashing = createStubInstance(AttesterSlashingRepository); this.depositEvent = createStubInstance(DepositEventRepository); From 6f6595749a9e8385b0901f24d416203eed7c34d9 Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 22 Jan 2023 18:04:00 +0530 Subject: [PATCH 04/13] modify publish vs cache condition --- packages/beacon-node/src/api/impl/beacon/pool/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index fd90761b6868..dd7e190efff9 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -104,9 +104,9 @@ export function getBeaconPoolApi({ try { await validateBlsToExecutionChange(chain, blsToExecutionChange); chain.opPool.insertBlsToExecutionChange(blsToExecutionChange); - try { + if (chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH) { await network.gossip.publishBlsToExecutionChange(blsToExecutionChange); - } catch (e) { + } else { await chain.cacheBlsToExecutionChanges(blsToExecutionChange); } } catch (e) { From 5cc993cd7f165a04d2bff701b36c874b2ce15517 Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 22 Jan 2023 19:06:08 +0530 Subject: [PATCH 05/13] add logging --- packages/beacon-node/src/network/network.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 24ab2710562a..e860b7949ae7 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -497,11 +497,13 @@ export class Network implements INetwork { private async gossipCachedBlsChanges(): Promise { const gossipedKeys = []; try { + this.logger.info("Re-gossiping the cached bls changes"); for await (const {key, value} of this.chain.db.blsToExecutionChangeCache.entriesStream()) { await this.gossip.publishBlsToExecutionChange(value); gossipedKeys.push(key); } } finally { + this.logger.info("Gossiped cached blsChanges", {size: gossipedKeys.length}); await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys).catch((e) => { this.logger.error("Could not clear gossiped blsChanges from blsToExecutionChangeCache", {}, e as Error); }); From 6e3bf0adf89a7d71d6d3cb2d8aafdb753b3630ee Mon Sep 17 00:00:00 2001 From: harkamal Date: Sun, 22 Jan 2023 19:39:55 +0530 Subject: [PATCH 06/13] add error logging --- packages/beacon-node/src/network/network.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index e860b7949ae7..ea6745a1f02e 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -502,6 +502,8 @@ export class Network implements INetwork { await this.gossip.publishBlsToExecutionChange(value); gossipedKeys.push(key); } + } catch (e) { + this.logger.error("Failed to gossip all cached bls changes", {}, e as Error); } finally { this.logger.info("Gossiped cached blsChanges", {size: gossipedKeys.length}); await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys).catch((e) => { From dd6d8f79a62144ed58e0dcaa18f917ae1cdcd980 Mon Sep 17 00:00:00 2001 From: harkamal Date: Mon, 23 Jan 2023 16:56:12 +0530 Subject: [PATCH 07/13] batchify the processing --- packages/beacon-node/src/network/network.ts | 36 +++++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index ea6745a1f02e..50880541dab2 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -32,6 +32,9 @@ import {PeersData} from "./peers/peersData.js"; import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js"; import {Discv5Worker} from "./discv5/index.js"; +// How many changes to batch +const CACHED_BLS_BATCH_GOSSIP_LIMIT = 10; + interface INetworkModules { config: IBeaconConfig; libp2p: Libp2p; @@ -495,21 +498,26 @@ export class Network implements INetwork { } private async gossipCachedBlsChanges(): Promise { - const gossipedKeys = []; - try { - this.logger.info("Re-gossiping the cached bls changes"); - for await (const {key, value} of this.chain.db.blsToExecutionChangeCache.entriesStream()) { - await this.gossip.publishBlsToExecutionChange(value); - gossipedKeys.push(key); + let gossipedKeys: number[]; + do { + gossipedKeys = []; + try { + this.logger.info("Re-gossiping the cached bls changes"); + for await (const {key, value} of this.chain.db.blsToExecutionChangeCache.entriesStream({ + limit: CACHED_BLS_BATCH_GOSSIP_LIMIT, + })) { + await this.gossip.publishBlsToExecutionChange(value); + gossipedKeys.push(key); + } + } catch (e) { + this.logger.error("Failed to gossip all cached bls changes", {}, e as Error); + } finally { + this.logger.info("Gossiped cached blsChanges", {validatorIndexs: `${gossipedKeys}`, size: gossipedKeys.length}); + // If this fails promise will not be set to null and hence gossipCachedBlsChanges will not be + // triggered till reboot + await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); } - } catch (e) { - this.logger.error("Failed to gossip all cached bls changes", {}, e as Error); - } finally { - this.logger.info("Gossiped cached blsChanges", {size: gossipedKeys.length}); - await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys).catch((e) => { - this.logger.error("Could not clear gossiped blsChanges from blsToExecutionChangeCache", {}, e as Error); - }); - } + } while (gossipedKeys.length === CACHED_BLS_BATCH_GOSSIP_LIMIT); } private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise => { From 31b59b9d5549a9ee4bf3adc34225e8a4575cc389 Mon Sep 17 00:00:00 2001 From: harkamal Date: Mon, 23 Jan 2023 22:00:09 +0530 Subject: [PATCH 08/13] fix some of the cases --- .../src/api/impl/beacon/pool/index.ts | 2 +- .../beacon-node/src/chain/opPools/opPool.ts | 3 +- .../beacon-node/src/chain/opPools/utils.ts | 4 +-- packages/beacon-node/src/network/network.ts | 29 +++++++++++++++---- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index dd7e190efff9..ee71f6278eac 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -112,7 +112,7 @@ export function getBeaconPoolApi({ } catch (e) { errors.push(e as Error); logger.error( - `Error on submitPoolSyncCommitteeSignatures [${i}]`, + `Error on submitPoolBlsToExecutionChange [${i}]`, {validatorIndex: blsToExecutionChange.message.validatorIndex}, e as Error ); diff --git a/packages/beacon-node/src/chain/opPools/opPool.ts b/packages/beacon-node/src/chain/opPools/opPool.ts index 565243c9b600..ca753b0d6c0e 100644 --- a/packages/beacon-node/src/chain/opPools/opPool.ts +++ b/packages/beacon-node/src/chain/opPools/opPool.ts @@ -1,6 +1,5 @@ import { CachedBeaconStateAllForks, - CachedBeaconStateCapella, computeEpochAtSlot, computeStartSlotAtEpoch, getAttesterSlashableIndices, @@ -233,7 +232,7 @@ export class OpPool { const blsToExecutionChanges: capella.SignedBLSToExecutionChange[] = []; for (const blsToExecutionChange of this.blsToExecutionChanges.values()) { - if (isValidBlsToExecutionChangeForBlockInclusion(state as CachedBeaconStateCapella, blsToExecutionChange)) { + if (isValidBlsToExecutionChangeForBlockInclusion(state, blsToExecutionChange)) { blsToExecutionChanges.push(blsToExecutionChange); if (blsToExecutionChanges.length >= MAX_BLS_TO_EXECUTION_CHANGES) { break; diff --git a/packages/beacon-node/src/chain/opPools/utils.ts b/packages/beacon-node/src/chain/opPools/utils.ts index 103f7c8b579f..906f51593bde 100644 --- a/packages/beacon-node/src/chain/opPools/utils.ts +++ b/packages/beacon-node/src/chain/opPools/utils.ts @@ -1,7 +1,7 @@ import bls from "@chainsafe/bls"; import {CoordType, Signature} from "@chainsafe/bls/types"; import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params"; -import {CachedBeaconStateCapella} from "@lodestar/state-transition"; +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {Slot, capella} from "@lodestar/types"; /** @@ -38,7 +38,7 @@ export function signatureFromBytesNoCheck(signature: Uint8Array): Signature { * can become invalid for certain forks. */ export function isValidBlsToExecutionChangeForBlockInclusion( - state: CachedBeaconStateCapella, + state: CachedBeaconStateAllForks, signedBLSToExecutionChange: capella.SignedBLSToExecutionChange ): boolean { // For each condition from https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/beacon-chain.md#new-process_bls_to_execution_change diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 50880541dab2..37b6b8cd956d 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -11,6 +11,7 @@ import {routes} from "@lodestar/api"; import {IMetrics} from "../metrics/index.js"; import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js"; import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js"; +import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js"; import {INetworkOptions} from "./options.js"; import {INetwork, Libp2p} from "./interface.js"; import {ReqRespBeaconNode, ReqRespHandlers, doBeaconBlocksMaybeBlobsByRange} from "./reqresp/index.js"; @@ -499,25 +500,41 @@ export class Network implements INetwork { private async gossipCachedBlsChanges(): Promise { let gossipedKeys: number[]; + let includedKeys: number[]; + let processedKeys: number; + let totalProcessed = 0; + this.logger.info("Re-gossiping the cached bls changes"); do { gossipedKeys = []; + includedKeys = []; try { - this.logger.info("Re-gossiping the cached bls changes"); + const headState = this.chain.getHeadState(); for await (const {key, value} of this.chain.db.blsToExecutionChangeCache.entriesStream({ limit: CACHED_BLS_BATCH_GOSSIP_LIMIT, })) { - await this.gossip.publishBlsToExecutionChange(value); - gossipedKeys.push(key); + if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) { + await this.gossip.publishBlsToExecutionChange(value); + gossipedKeys.push(value.message.validatorIndex); + } else { + includedKeys.push(value.message.validatorIndex); + } } } catch (e) { - this.logger.error("Failed to gossip all cached bls changes", {}, e as Error); + this.logger.error("Failed to gossip all cached bls changes", {totalProcessed}, e as Error); } finally { - this.logger.info("Gossiped cached blsChanges", {validatorIndexs: `${gossipedKeys}`, size: gossipedKeys.length}); + processedKeys = gossipedKeys.length + includedKeys.length; + totalProcessed += processedKeys; + this.logger.info("Gossiped cached blsChanges", { + gossipedIndexes: `${gossipedKeys}`, + alreadyIncludedIndexes: `${includedKeys}`, + processedKeys, + }); // If this fails promise will not be set to null and hence gossipCachedBlsChanges will not be // triggered till reboot await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); } - } while (gossipedKeys.length === CACHED_BLS_BATCH_GOSSIP_LIMIT); + } while (processedKeys === CACHED_BLS_BATCH_GOSSIP_LIMIT); + this.logger.info("Processed cached blsChanges", {totalProcessed}); } private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise => { From bc695e3a2c9a3e7ee885c4733f6a779b8f57b845 Mon Sep 17 00:00:00 2001 From: harkamal Date: Mon, 23 Jan 2023 22:24:09 +0530 Subject: [PATCH 09/13] fix log issues --- packages/beacon-node/src/api/impl/beacon/pool/index.ts | 7 ++++++- packages/beacon-node/src/chain/opPools/opPool.ts | 2 +- packages/beacon-node/src/network/network.ts | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index ee71f6278eac..f0f37a874270 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -104,7 +104,12 @@ export function getBeaconPoolApi({ try { await validateBlsToExecutionChange(chain, blsToExecutionChange); chain.opPool.insertBlsToExecutionChange(blsToExecutionChange); - if (chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH) { + if ( + chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH && + // TODO: Remove below condition + // Only used for testing in devnet-3 of withdrawals + network.isSubscribedToGossipCoreTopics() + ) { await network.gossip.publishBlsToExecutionChange(blsToExecutionChange); } else { await chain.cacheBlsToExecutionChanges(blsToExecutionChange); diff --git a/packages/beacon-node/src/chain/opPools/opPool.ts b/packages/beacon-node/src/chain/opPools/opPool.ts index ca753b0d6c0e..9120a49a28dc 100644 --- a/packages/beacon-node/src/chain/opPools/opPool.ts +++ b/packages/beacon-node/src/chain/opPools/opPool.ts @@ -68,7 +68,7 @@ export class OpPool { this.insertVoluntaryExit(voluntaryExit); } for (const item of blsToExecutionChanges) { - this.blsToExecutionChanges.set(item.message.validatorIndex, item); + this.insertBlsToExecutionChange(item); } } diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 37b6b8cd956d..497f4801222c 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -509,7 +509,7 @@ export class Network implements INetwork { includedKeys = []; try { const headState = this.chain.getHeadState(); - for await (const {key, value} of this.chain.db.blsToExecutionChangeCache.entriesStream({ + for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream({ limit: CACHED_BLS_BATCH_GOSSIP_LIMIT, })) { if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) { From 3c674ff6de1b3bfcdbe623146d69891d7c246fbc Mon Sep 17 00:00:00 2001 From: harkamal Date: Tue, 24 Jan 2023 00:06:38 +0530 Subject: [PATCH 10/13] also delete included keys --- packages/beacon-node/src/network/network.ts | 1 + packages/beacon-node/test/utils/mocks/chain/chain.ts | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 497f4801222c..fc302182aa22 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -532,6 +532,7 @@ export class Network implements INetwork { // If this fails promise will not be set to null and hence gossipCachedBlsChanges will not be // triggered till reboot await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); + await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys); } } while (processedKeys === CACHED_BLS_BATCH_GOSSIP_LIMIT); this.logger.info("Processed cached blsChanges", {totalProcessed}); diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index 5c41a8246424..272fcaf90c57 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -226,7 +226,10 @@ export class MockBeaconChain implements IBeaconChain { async updateBeaconProposerData(): Promise {} updateBuilderStatus(): void {} - async cacheBlsToExecutionChanges(): Promise {} + + async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise { + return this.db.blsToExecutionChangeCache.add(blsToExecutionChange); + } } const root = ssz.Root.defaultValue() as Uint8Array; From adb54fd643504edc263497a2bd0fc2f1684c688e Mon Sep 17 00:00:00 2001 From: harkamal Date: Tue, 24 Jan 2023 00:19:55 +0530 Subject: [PATCH 11/13] fix lint --- .../beacon-node/test/utils/mocks/chain/chain.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index 272fcaf90c57..ea82546fc513 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -1,7 +1,20 @@ import sinon from "sinon"; import {CompositeTypeAny, toHexString, TreeView} from "@chainsafe/ssz"; -import {phase0, allForks, UintNum64, Root, Slot, ssz, Uint16, UintBn64, RootHex, eip4844, Wei} from "@lodestar/types"; +import { + phase0, + allForks, + UintNum64, + Root, + Slot, + ssz, + Uint16, + UintBn64, + RootHex, + eip4844, + Wei, + capella, +} from "@lodestar/types"; import {IBeaconConfig} from "@lodestar/config"; import {BeaconStateAllForks, CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {CheckpointWithHex, IForkChoice, ProtoBlock, ExecutionStatus, AncestorStatus} from "@lodestar/fork-choice"; From 06e3bb38ade6a8a0231aadc58cf8155cd21f84a9 Mon Sep 17 00:00:00 2001 From: harkamal Date: Tue, 24 Jan 2023 14:17:52 +0530 Subject: [PATCH 12/13] simplify gossip loop --- packages/beacon-node/src/network/network.ts | 72 +++++++++++---------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index fc302182aa22..991b9233edc2 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -33,8 +33,8 @@ import {PeersData} from "./peers/peersData.js"; import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js"; import {Discv5Worker} from "./discv5/index.js"; -// How many changes to batch -const CACHED_BLS_BATCH_GOSSIP_LIMIT = 10; +// How many changes to batch cleanup +const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10; interface INetworkModules { config: IBeaconConfig; @@ -499,42 +499,46 @@ export class Network implements INetwork { } private async gossipCachedBlsChanges(): Promise { - let gossipedKeys: number[]; - let includedKeys: number[]; - let processedKeys: number; + let gossipedKeys: number[] = []; + let includedKeys: number[] = []; let totalProcessed = 0; this.logger.info("Re-gossiping the cached bls changes"); - do { - gossipedKeys = []; - includedKeys = []; - try { - const headState = this.chain.getHeadState(); - for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream({ - limit: CACHED_BLS_BATCH_GOSSIP_LIMIT, - })) { - if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) { - await this.gossip.publishBlsToExecutionChange(value); - gossipedKeys.push(value.message.validatorIndex); - } else { - includedKeys.push(value.message.validatorIndex); - } + + try { + const headState = this.chain.getHeadState(); + for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream({ + limit: CACHED_BLS_BATCH_GOSSIP_LIMIT, + })) { + if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) { + await this.gossip.publishBlsToExecutionChange(value); + gossipedKeys.push(value.message.validatorIndex); + } else { + // No need to gossip if its already been in the headState + // TODO: Should use final state? + includedKeys.push(value.message.validatorIndex); + } + totalProcessed += 1; + + // Cleanup in small batches + if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT === 0) { + await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); + await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys); + includedKeys = []; + gossipedKeys = []; } - } catch (e) { - this.logger.error("Failed to gossip all cached bls changes", {totalProcessed}, e as Error); - } finally { - processedKeys = gossipedKeys.length + includedKeys.length; - totalProcessed += processedKeys; - this.logger.info("Gossiped cached blsChanges", { - gossipedIndexes: `${gossipedKeys}`, - alreadyIncludedIndexes: `${includedKeys}`, - processedKeys, - }); - // If this fails promise will not be set to null and hence gossipCachedBlsChanges will not be - // triggered till reboot - await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); - await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys); } - } while (processedKeys === CACHED_BLS_BATCH_GOSSIP_LIMIT); + } catch (e) { + this.logger.error("Failed to gossip all cached bls changes", {totalProcessed}, e as Error); + } finally { + this.logger.info("Gossiped cached blsChanges", { + gossipedIndexes: `${gossipedKeys}`, + alreadyIncludedIndexes: `${includedKeys}`, + totalProcessed, + }); + // Cleanup whatever was in the last batch + await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); + await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys); + } this.logger.info("Processed cached blsChanges", {totalProcessed}); } From 1ea60b9fe800b6423fa98a8338770f0043d5ef0f Mon Sep 17 00:00:00 2001 From: harkamal Date: Tue, 24 Jan 2023 14:20:57 +0530 Subject: [PATCH 13/13] fix build --- packages/beacon-node/src/network/network.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 991b9233edc2..48ce1c1db322 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -506,9 +506,7 @@ export class Network implements INetwork { try { const headState = this.chain.getHeadState(); - for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream({ - limit: CACHED_BLS_BATCH_GOSSIP_LIMIT, - })) { + for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream()) { if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) { await this.gossip.publishBlsToExecutionChange(value); gossipedKeys.push(value.message.validatorIndex);