Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-gossip bls changes if submitted earlier than capella #5049

Merged
merged 18 commits into from
Jan 27, 2023
17 changes: 13 additions & 4 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function getBeaconPoolApi({
},

async getPoolBlsToExecutionChanges() {
return {data: chain.opPool.getAllBlsToExecutionChanges()};
return {data: chain.opPool.getAllBlsToExecutionChanges().map(({data}) => data)};
},

async submitPoolAttestations(attestations) {
Expand Down Expand Up @@ -103,12 +103,21 @@ export function getBeaconPoolApi({
blsToExecutionChanges.map(async (blsToExecutionChange, i) => {
try {
await validateBlsToExecutionChange(chain, blsToExecutionChange);
chain.opPool.insertBlsToExecutionChange(blsToExecutionChange);
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
// TODO: Remove below condition
// Only used for testing in devnet-3 of withdrawals
chain.opPool.insertBlsToExecutionChange(
blsToExecutionChange,
// true if pre capella else false
!(
chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH &&
// TODO: Remove this condition once testing is done
network.isSubscribedToGossipCoreTopics()
)
);
} catch (e) {
errors.push(e as Error);
logger.error(
`Error on submitPoolSyncCommitteeSignatures [${i}]`,
`Error on submitPoolBlsToExecutionChange [${i}]`,
{validatorIndex: blsToExecutionChange.message.validatorIndex},
e as Error
);
Expand Down
21 changes: 12 additions & 9 deletions packages/beacon-node/src/chain/opPools/opPool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
CachedBeaconStateAllForks,
CachedBeaconStateCapella,
computeEpochAtSlot,
computeStartSlotAtEpoch,
getAttesterSlashableIndices,
Expand All @@ -16,6 +15,7 @@ import {
import {Epoch, phase0, capella, ssz, ValidatorIndex} from "@lodestar/types";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {IBeaconDb} from "../../db/index.js";
import {SignedBLSToExecutionChangeVersioned} from "../../util/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "./utils.js";

type HexRoot = string;
Expand All @@ -34,7 +34,7 @@ export class OpPool {
/** Set of seen attester slashing indexes. No need to prune */
private readonly attesterSlashingIndexes = new Set<ValidatorIndex>();
/** Map of validator index -> SignedBLSToExecutionChange */
private readonly blsToExecutionChanges = new Map<ValidatorIndex, capella.SignedBLSToExecutionChange>();
private readonly blsToExecutionChanges = new Map<ValidatorIndex, SignedBLSToExecutionChangeVersioned>();

// Getters for metrics

Expand Down Expand Up @@ -69,7 +69,7 @@ export class OpPool {
this.insertVoluntaryExit(voluntaryExit);
}
for (const item of blsToExecutionChanges) {
this.blsToExecutionChanges.set(item.message.validatorIndex, item);
this.insertBlsToExecutionChange(item.data, item.preCapella);
}
}

Expand Down Expand Up @@ -150,8 +150,11 @@ export class OpPool {
}

/** Must be validated beforehand */
insertBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): void {
this.blsToExecutionChanges.set(blsToExecutionChange.message.validatorIndex, blsToExecutionChange);
insertBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange, preCapella = false): void {
this.blsToExecutionChanges.set(blsToExecutionChange.message.validatorIndex, {
data: blsToExecutionChange,
preCapella,
});
}

/**
Expand Down Expand Up @@ -233,8 +236,8 @@ export class OpPool {

const blsToExecutionChanges: capella.SignedBLSToExecutionChange[] = [];
for (const blsToExecutionChange of this.blsToExecutionChanges.values()) {
if (isValidBlsToExecutionChangeForBlockInclusion(state as CachedBeaconStateCapella, blsToExecutionChange)) {
blsToExecutionChanges.push(blsToExecutionChange);
if (isValidBlsToExecutionChangeForBlockInclusion(state, blsToExecutionChange.data)) {
blsToExecutionChanges.push(blsToExecutionChange.data);
if (blsToExecutionChanges.length >= MAX_BLS_TO_EXECUTION_CHANGES) {
break;
}
Expand All @@ -260,7 +263,7 @@ export class OpPool {
}

/** For beacon pool API */
getAllBlsToExecutionChanges(): capella.SignedBLSToExecutionChange[] {
getAllBlsToExecutionChanges(): SignedBLSToExecutionChangeVersioned[] {
return Array.from(this.blsToExecutionChanges.values());
}

Expand Down Expand Up @@ -348,7 +351,7 @@ export class OpPool {
// TODO CAPELLA: We need the finalizedState to safely prune BlsToExecutionChanges. Finalized state may not be
// available in the cache, so it can be null. Once there's a head only prunning strategy, change
if (finalizedState !== null) {
const validator = finalizedState.validators.getReadonly(blsToExecutionChange.message.validatorIndex);
const validator = finalizedState.validators.getReadonly(blsToExecutionChange.data.message.validatorIndex);
if (validator.withdrawalCredentials[0] !== BLS_WITHDRAWAL_PREFIX) {
this.blsToExecutionChanges.delete(key);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/opPools/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import bls from "@chainsafe/bls";
import {CoordType, Signature} from "@chainsafe/bls/types";
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
import {CachedBeaconStateCapella} from "@lodestar/state-transition";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Slot, capella} from "@lodestar/types";

/**
Expand Down Expand Up @@ -38,7 +38,7 @@ export function signatureFromBytesNoCheck(signature: Uint8Array): Signature {
* can become invalid for certain forks.
*/
export function isValidBlsToExecutionChangeForBlockInclusion(
state: CachedBeaconStateCapella,
state: CachedBeaconStateAllForks,
signedBLSToExecutionChange: capella.SignedBLSToExecutionChange
): boolean {
// For each condition from https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/beacon-chain.md#new-process_bls_to_execution_change
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {ValidatorIndex, capella, ssz} from "@lodestar/types";
import {ValidatorIndex} from "@lodestar/types";
import {IChainForkConfig} from "@lodestar/config";
import {Db, Bucket, Repository} from "@lodestar/db";
import {SignedBLSToExecutionChangeVersioned, signedBLSToExecutionChangeVersionedType} from "../../util/types.js";

export class BLSToExecutionChangeRepository extends Repository<ValidatorIndex, capella.SignedBLSToExecutionChange> {
export class BLSToExecutionChangeRepository extends Repository<ValidatorIndex, SignedBLSToExecutionChangeVersioned> {
constructor(config: IChainForkConfig, db: Db) {
super(config, db, Bucket.capella_blsToExecutionChange, ssz.capella.SignedBLSToExecutionChange);
super(config, db, Bucket.capella_blsToExecutionChange, signedBLSToExecutionChangeVersionedType);
}

getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex {
return value.message.validatorIndex;
getId(value: SignedBLSToExecutionChangeVersioned): ValidatorIndex {
return value.data.message.validatorIndex;
}
}
75 changes: 75 additions & 0 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {routes} from "@lodestar/api";
import {IMetrics} from "../metrics/index.js";
import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js";
import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js";
import {INetworkOptions} from "./options.js";
import {INetwork, Libp2p} from "./interface.js";
import {ReqRespBeaconNode, ReqRespHandlers, doBeaconBlocksMaybeBlobsByRange} from "./reqresp/index.js";
Expand All @@ -32,6 +33,9 @@ import {PeersData} from "./peers/peersData.js";
import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js";
import {Discv5Worker} from "./discv5/index.js";

// How many changes to batch cleanup
const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10;

interface INetworkModules {
config: IBeaconConfig;
libp2p: Libp2p;
Expand Down Expand Up @@ -63,6 +67,7 @@ export class Network implements INetwork {
private readonly signal: AbortSignal;

private subscribedForks = new Set<ForkName>();
private regossipBlsChangesPromise: Promise<void> | null = null;

constructor(private readonly opts: INetworkOptions, modules: INetworkModules) {
const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules;
Expand Down Expand Up @@ -411,6 +416,20 @@ export class Network implements INetwork {
}
}
}

// If we are subscribed and post capella fork epoch, try gossiping the cached bls changes
if (
this.isSubscribedToGossipCoreTopics() &&
epoch >= this.config.CAPELLA_FORK_EPOCH &&
!this.regossipBlsChangesPromise
) {
this.regossipBlsChangesPromise = this.regossipCachedBlsChanges()
// If the processing fails for e.g. because of lack of peers set the promise
// to be null again to be retried
.catch((_e) => {
this.regossipBlsChangesPromise = null;
});
}
} catch (e) {
this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -482,6 +501,62 @@ export class Network implements INetwork {
return topics;
}

private async regossipCachedBlsChanges(): Promise<void> {
let gossipedIndexes = [];
let includedIndexes = [];
let totalProcessed = 0;

this.logger.debug("Re-gossiping unsubmitted cached bls changes");
try {
const headState = this.chain.getHeadState();
for (const poolData of this.chain.opPool.getAllBlsToExecutionChanges()) {
const {data: value, preCapella} = poolData;
if (preCapella) {
if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) {
await this.gossip.publishBlsToExecutionChange(value);
gossipedIndexes.push(value.message.validatorIndex);
} else {
// No need to gossip if its already been in the headState
// TODO: Should use final state?
includedIndexes.push(value.message.validatorIndex);
}

this.chain.opPool.insertBlsToExecutionChange(value, false);
totalProcessed += 1;

// Cleanup in small batches
if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT === 0) {
this.logger.debug("Gossiped cached blsChanges", {
gossipedIndexes: `${gossipedIndexes}`,
includedIndexes: `${includedIndexes}`,
totalProcessed,
});
gossipedIndexes = [];
includedIndexes = [];
}
}
}

// Log any remaining changes
if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT !== 0) {
this.logger.debug("Gossiped cached blsChanges", {
gossipedIndexes: `${gossipedIndexes}`,
includedIndexes: `${includedIndexes}`,
totalProcessed,
});
}
} catch (e) {
this.logger.error("Failed to completely gossip unsubmitted cached bls changes", {totalProcessed}, e as Error);
// Throw error so that the promise can be set null to be retied
throw e;
}
if (totalProcessed > 0) {
this.logger.info("Regossiped unsubmitted blsChanges", {totalProcessed});
} else {
this.logger.debug("No unsubmitted blsChanges to gossip", {totalProcessed});
}
}

private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise<void> => {
if (this.hasAttachedSyncCommitteeMember()) {
try {
Expand Down
14 changes: 14 additions & 0 deletions packages/beacon-node/src/util/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {ContainerType, ValueOf} from "@chainsafe/ssz";
import {ssz} from "@lodestar/types";

// Misc SSZ types used only in the beacon-node package, no need to upstream to types

export const signedBLSToExecutionChangeVersionedType = new ContainerType(
{
// Assumes less than 256 forks, sounds reasonable in our lifetime
preCapella: ssz.Boolean,
data: ssz.capella.SignedBLSToExecutionChange,
},
{jsonCase: "eth2", typeName: "SignedBLSToExecutionChangeVersionedType"}
);
export type SignedBLSToExecutionChangeVersioned = ValueOf<typeof signedBLSToExecutionChangeVersionedType>;
2 changes: 1 addition & 1 deletion packages/beacon-node/test/utils/mocks/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export class MockBeaconChain implements IBeaconChain {
{},
{
config: this.config,
db: db,
db,
metrics: null,
emitter: this.emitter,
logger: this.logger,
Expand Down