diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index 20e8cf56543a..4d9b9e488446 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -41,7 +41,7 @@ export function getBeaconPoolApi({ }, async getPoolBlsToExecutionChanges() { - return {data: chain.opPool.getAllBlsToExecutionChanges()}; + return {data: chain.opPool.getAllBlsToExecutionChanges().map(({data}) => data)}; }, async submitPoolAttestations(attestations) { @@ -103,12 +103,21 @@ export function getBeaconPoolApi({ blsToExecutionChanges.map(async (blsToExecutionChange, i) => { try { await validateBlsToExecutionChange(chain, blsToExecutionChange); - chain.opPool.insertBlsToExecutionChange(blsToExecutionChange); - await network.gossip.publishBlsToExecutionChange(blsToExecutionChange); + // TODO: Remove below condition + // Only used for testing in devnet-3 of withdrawals + chain.opPool.insertBlsToExecutionChange( + blsToExecutionChange, + // true if pre capella else false + !( + chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH && + // TODO: Remove this condition once testing is done + network.isSubscribedToGossipCoreTopics() + ) + ); } catch (e) { errors.push(e as Error); logger.error( - `Error on submitPoolSyncCommitteeSignatures [${i}]`, + `Error on submitPoolBlsToExecutionChange [${i}]`, {validatorIndex: blsToExecutionChange.message.validatorIndex}, e as Error ); diff --git a/packages/beacon-node/src/chain/opPools/opPool.ts b/packages/beacon-node/src/chain/opPools/opPool.ts index 565243c9b600..67e68fd8bee2 100644 --- a/packages/beacon-node/src/chain/opPools/opPool.ts +++ b/packages/beacon-node/src/chain/opPools/opPool.ts @@ -1,6 +1,5 @@ import { CachedBeaconStateAllForks, - CachedBeaconStateCapella, computeEpochAtSlot, computeStartSlotAtEpoch, getAttesterSlashableIndices, @@ -16,6 +15,7 @@ import { import {Epoch, phase0, capella, ssz, ValidatorIndex} from "@lodestar/types"; import {fromHexString, toHexString} from "@chainsafe/ssz"; import {IBeaconDb} from "../../db/index.js"; +import {SignedBLSToExecutionChangeVersioned} from "../../util/types.js"; import {isValidBlsToExecutionChangeForBlockInclusion} from "./utils.js"; type HexRoot = string; @@ -34,7 +34,7 @@ export class OpPool { /** Set of seen attester slashing indexes. No need to prune */ private readonly attesterSlashingIndexes = new Set(); /** Map of validator index -> SignedBLSToExecutionChange */ - private readonly blsToExecutionChanges = new Map(); + private readonly blsToExecutionChanges = new Map(); // Getters for metrics @@ -69,7 +69,7 @@ export class OpPool { this.insertVoluntaryExit(voluntaryExit); } for (const item of blsToExecutionChanges) { - this.blsToExecutionChanges.set(item.message.validatorIndex, item); + this.insertBlsToExecutionChange(item.data, item.preCapella); } } @@ -150,8 +150,11 @@ export class OpPool { } /** Must be validated beforehand */ - insertBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): void { - this.blsToExecutionChanges.set(blsToExecutionChange.message.validatorIndex, blsToExecutionChange); + insertBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange, preCapella = false): void { + this.blsToExecutionChanges.set(blsToExecutionChange.message.validatorIndex, { + data: blsToExecutionChange, + preCapella, + }); } /** @@ -233,8 +236,8 @@ export class OpPool { const blsToExecutionChanges: capella.SignedBLSToExecutionChange[] = []; for (const blsToExecutionChange of this.blsToExecutionChanges.values()) { - if (isValidBlsToExecutionChangeForBlockInclusion(state as CachedBeaconStateCapella, blsToExecutionChange)) { - blsToExecutionChanges.push(blsToExecutionChange); + if (isValidBlsToExecutionChangeForBlockInclusion(state, blsToExecutionChange.data)) { + blsToExecutionChanges.push(blsToExecutionChange.data); if (blsToExecutionChanges.length >= MAX_BLS_TO_EXECUTION_CHANGES) { break; } @@ -260,7 +263,7 @@ export class OpPool { } /** For beacon pool API */ - getAllBlsToExecutionChanges(): capella.SignedBLSToExecutionChange[] { + getAllBlsToExecutionChanges(): SignedBLSToExecutionChangeVersioned[] { return Array.from(this.blsToExecutionChanges.values()); } @@ -348,7 +351,7 @@ export class OpPool { // TODO CAPELLA: We need the finalizedState to safely prune BlsToExecutionChanges. Finalized state may not be // available in the cache, so it can be null. Once there's a head only prunning strategy, change if (finalizedState !== null) { - const validator = finalizedState.validators.getReadonly(blsToExecutionChange.message.validatorIndex); + const validator = finalizedState.validators.getReadonly(blsToExecutionChange.data.message.validatorIndex); if (validator.withdrawalCredentials[0] !== BLS_WITHDRAWAL_PREFIX) { this.blsToExecutionChanges.delete(key); } diff --git a/packages/beacon-node/src/chain/opPools/utils.ts b/packages/beacon-node/src/chain/opPools/utils.ts index 103f7c8b579f..906f51593bde 100644 --- a/packages/beacon-node/src/chain/opPools/utils.ts +++ b/packages/beacon-node/src/chain/opPools/utils.ts @@ -1,7 +1,7 @@ import bls from "@chainsafe/bls"; import {CoordType, Signature} from "@chainsafe/bls/types"; import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params"; -import {CachedBeaconStateCapella} from "@lodestar/state-transition"; +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {Slot, capella} from "@lodestar/types"; /** @@ -38,7 +38,7 @@ export function signatureFromBytesNoCheck(signature: Uint8Array): Signature { * can become invalid for certain forks. */ export function isValidBlsToExecutionChangeForBlockInclusion( - state: CachedBeaconStateCapella, + state: CachedBeaconStateAllForks, signedBLSToExecutionChange: capella.SignedBLSToExecutionChange ): boolean { // For each condition from https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/beacon-chain.md#new-process_bls_to_execution_change diff --git a/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts b/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts index 3f41dfa9fc1f..a62b606613c9 100644 --- a/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts +++ b/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts @@ -1,13 +1,14 @@ -import {ValidatorIndex, capella, ssz} from "@lodestar/types"; +import {ValidatorIndex} from "@lodestar/types"; import {IChainForkConfig} from "@lodestar/config"; import {Db, Bucket, Repository} from "@lodestar/db"; +import {SignedBLSToExecutionChangeVersioned, signedBLSToExecutionChangeVersionedType} from "../../util/types.js"; -export class BLSToExecutionChangeRepository extends Repository { +export class BLSToExecutionChangeRepository extends Repository { constructor(config: IChainForkConfig, db: Db) { - super(config, db, Bucket.capella_blsToExecutionChange, ssz.capella.SignedBLSToExecutionChange); + super(config, db, Bucket.capella_blsToExecutionChange, signedBLSToExecutionChangeVersionedType); } - getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex { - return value.message.validatorIndex; + getId(value: SignedBLSToExecutionChangeVersioned): ValidatorIndex { + return value.data.message.validatorIndex; } } diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 728564ca2eb2..9ebb1317d412 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -11,6 +11,7 @@ import {routes} from "@lodestar/api"; import {IMetrics} from "../metrics/index.js"; import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js"; import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js"; +import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js"; import {INetworkOptions} from "./options.js"; import {INetwork, Libp2p} from "./interface.js"; import {ReqRespBeaconNode, ReqRespHandlers, doBeaconBlocksMaybeBlobsByRange} from "./reqresp/index.js"; @@ -32,6 +33,9 @@ import {PeersData} from "./peers/peersData.js"; import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js"; import {Discv5Worker} from "./discv5/index.js"; +// How many changes to batch cleanup +const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10; + interface INetworkModules { config: IBeaconConfig; libp2p: Libp2p; @@ -63,6 +67,7 @@ export class Network implements INetwork { private readonly signal: AbortSignal; private subscribedForks = new Set(); + private regossipBlsChangesPromise: Promise | null = null; constructor(private readonly opts: INetworkOptions, modules: INetworkModules) { const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules; @@ -411,6 +416,20 @@ export class Network implements INetwork { } } } + + // If we are subscribed and post capella fork epoch, try gossiping the cached bls changes + if ( + this.isSubscribedToGossipCoreTopics() && + epoch >= this.config.CAPELLA_FORK_EPOCH && + !this.regossipBlsChangesPromise + ) { + this.regossipBlsChangesPromise = this.regossipCachedBlsChanges() + // If the processing fails for e.g. because of lack of peers set the promise + // to be null again to be retried + .catch((_e) => { + this.regossipBlsChangesPromise = null; + }); + } } catch (e) { this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error); } @@ -482,6 +501,62 @@ export class Network implements INetwork { return topics; } + private async regossipCachedBlsChanges(): Promise { + let gossipedIndexes = []; + let includedIndexes = []; + let totalProcessed = 0; + + this.logger.debug("Re-gossiping unsubmitted cached bls changes"); + try { + const headState = this.chain.getHeadState(); + for (const poolData of this.chain.opPool.getAllBlsToExecutionChanges()) { + const {data: value, preCapella} = poolData; + if (preCapella) { + if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) { + await this.gossip.publishBlsToExecutionChange(value); + gossipedIndexes.push(value.message.validatorIndex); + } else { + // No need to gossip if its already been in the headState + // TODO: Should use final state? + includedIndexes.push(value.message.validatorIndex); + } + + this.chain.opPool.insertBlsToExecutionChange(value, false); + totalProcessed += 1; + + // Cleanup in small batches + if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT === 0) { + this.logger.debug("Gossiped cached blsChanges", { + gossipedIndexes: `${gossipedIndexes}`, + includedIndexes: `${includedIndexes}`, + totalProcessed, + }); + gossipedIndexes = []; + includedIndexes = []; + } + } + } + + // Log any remaining changes + if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT !== 0) { + this.logger.debug("Gossiped cached blsChanges", { + gossipedIndexes: `${gossipedIndexes}`, + includedIndexes: `${includedIndexes}`, + totalProcessed, + }); + } + } catch (e) { + this.logger.error("Failed to completely gossip unsubmitted cached bls changes", {totalProcessed}, e as Error); + // Throw error so that the promise can be set null to be retied + throw e; + } + if (totalProcessed > 0) { + this.logger.info("Regossiped unsubmitted blsChanges", {totalProcessed}); + } else { + this.logger.debug("No unsubmitted blsChanges to gossip", {totalProcessed}); + } + } + private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise => { if (this.hasAttachedSyncCommitteeMember()) { try { diff --git a/packages/beacon-node/src/util/types.ts b/packages/beacon-node/src/util/types.ts new file mode 100644 index 000000000000..545a706c7511 --- /dev/null +++ b/packages/beacon-node/src/util/types.ts @@ -0,0 +1,14 @@ +import {ContainerType, ValueOf} from "@chainsafe/ssz"; +import {ssz} from "@lodestar/types"; + +// Misc SSZ types used only in the beacon-node package, no need to upstream to types + +export const signedBLSToExecutionChangeVersionedType = new ContainerType( + { + // Assumes less than 256 forks, sounds reasonable in our lifetime + preCapella: ssz.Boolean, + data: ssz.capella.SignedBLSToExecutionChange, + }, + {jsonCase: "eth2", typeName: "SignedBLSToExecutionChangeVersionedType"} +); +export type SignedBLSToExecutionChangeVersioned = ValueOf; diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index fb7843195dcb..2c757e5afb1e 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -142,7 +142,7 @@ export class MockBeaconChain implements IBeaconChain { {}, { config: this.config, - db: db, + db, metrics: null, emitter: this.emitter, logger: this.logger,