Skip to content

Commit

Permalink
Merge 901c142 into c7abf8e
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech authored Jan 26, 2023
2 parents c7abf8e + 901c142 commit 9d0bded
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 21 deletions.
17 changes: 13 additions & 4 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function getBeaconPoolApi({
},

async getPoolBlsToExecutionChanges() {
return {data: chain.opPool.getAllBlsToExecutionChanges()};
return {data: chain.opPool.getAllBlsToExecutionChanges().map(({data}) => data)};
},

async submitPoolAttestations(attestations) {
Expand Down Expand Up @@ -103,12 +103,21 @@ export function getBeaconPoolApi({
blsToExecutionChanges.map(async (blsToExecutionChange, i) => {
try {
await validateBlsToExecutionChange(chain, blsToExecutionChange);
chain.opPool.insertBlsToExecutionChange(blsToExecutionChange);
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
// TODO: Remove below condition
// Only used for testing in devnet-3 of withdrawals
chain.opPool.insertBlsToExecutionChange(
blsToExecutionChange,
// true if pre capella else false
!(
chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH &&
// TODO: Remove this condition once testing is done
network.isSubscribedToGossipCoreTopics()
)
);
} catch (e) {
errors.push(e as Error);
logger.error(
`Error on submitPoolSyncCommitteeSignatures [${i}]`,
`Error on submitPoolBlsToExecutionChange [${i}]`,
{validatorIndex: blsToExecutionChange.message.validatorIndex},
e as Error
);
Expand Down
21 changes: 12 additions & 9 deletions packages/beacon-node/src/chain/opPools/opPool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
CachedBeaconStateAllForks,
CachedBeaconStateCapella,
computeEpochAtSlot,
computeStartSlotAtEpoch,
getAttesterSlashableIndices,
Expand All @@ -16,6 +15,7 @@ import {
import {Epoch, phase0, capella, ssz, ValidatorIndex} from "@lodestar/types";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {IBeaconDb} from "../../db/index.js";
import {SignedBLSToExecutionChangeVersioned} from "../../util/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "./utils.js";

type HexRoot = string;
Expand All @@ -34,7 +34,7 @@ export class OpPool {
/** Set of seen attester slashing indexes. No need to prune */
private readonly attesterSlashingIndexes = new Set<ValidatorIndex>();
/** Map of validator index -> SignedBLSToExecutionChange */
private readonly blsToExecutionChanges = new Map<ValidatorIndex, capella.SignedBLSToExecutionChange>();
private readonly blsToExecutionChanges = new Map<ValidatorIndex, SignedBLSToExecutionChangeVersioned>();

// Getters for metrics

Expand Down Expand Up @@ -69,7 +69,7 @@ export class OpPool {
this.insertVoluntaryExit(voluntaryExit);
}
for (const item of blsToExecutionChanges) {
this.blsToExecutionChanges.set(item.message.validatorIndex, item);
this.insertBlsToExecutionChange(item.data, item.preCapella);
}
}

Expand Down Expand Up @@ -150,8 +150,11 @@ export class OpPool {
}

/** Must be validated beforehand */
insertBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): void {
this.blsToExecutionChanges.set(blsToExecutionChange.message.validatorIndex, blsToExecutionChange);
insertBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange, preCapella = false): void {
this.blsToExecutionChanges.set(blsToExecutionChange.message.validatorIndex, {
data: blsToExecutionChange,
preCapella,
});
}

/**
Expand Down Expand Up @@ -233,8 +236,8 @@ export class OpPool {

const blsToExecutionChanges: capella.SignedBLSToExecutionChange[] = [];
for (const blsToExecutionChange of this.blsToExecutionChanges.values()) {
if (isValidBlsToExecutionChangeForBlockInclusion(state as CachedBeaconStateCapella, blsToExecutionChange)) {
blsToExecutionChanges.push(blsToExecutionChange);
if (isValidBlsToExecutionChangeForBlockInclusion(state, blsToExecutionChange.data)) {
blsToExecutionChanges.push(blsToExecutionChange.data);
if (blsToExecutionChanges.length >= MAX_BLS_TO_EXECUTION_CHANGES) {
break;
}
Expand All @@ -260,7 +263,7 @@ export class OpPool {
}

/** For beacon pool API */
getAllBlsToExecutionChanges(): capella.SignedBLSToExecutionChange[] {
getAllBlsToExecutionChanges(): SignedBLSToExecutionChangeVersioned[] {
return Array.from(this.blsToExecutionChanges.values());
}

Expand Down Expand Up @@ -348,7 +351,7 @@ export class OpPool {
// TODO CAPELLA: We need the finalizedState to safely prune BlsToExecutionChanges. Finalized state may not be
// available in the cache, so it can be null. Once there's a head only prunning strategy, change
if (finalizedState !== null) {
const validator = finalizedState.validators.getReadonly(blsToExecutionChange.message.validatorIndex);
const validator = finalizedState.validators.getReadonly(blsToExecutionChange.data.message.validatorIndex);
if (validator.withdrawalCredentials[0] !== BLS_WITHDRAWAL_PREFIX) {
this.blsToExecutionChanges.delete(key);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/opPools/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import bls from "@chainsafe/bls";
import {CoordType, Signature} from "@chainsafe/bls/types";
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
import {CachedBeaconStateCapella} from "@lodestar/state-transition";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Slot, capella} from "@lodestar/types";

/**
Expand Down Expand Up @@ -38,7 +38,7 @@ export function signatureFromBytesNoCheck(signature: Uint8Array): Signature {
* can become invalid for certain forks.
*/
export function isValidBlsToExecutionChangeForBlockInclusion(
state: CachedBeaconStateCapella,
state: CachedBeaconStateAllForks,
signedBLSToExecutionChange: capella.SignedBLSToExecutionChange
): boolean {
// For each condition from https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/beacon-chain.md#new-process_bls_to_execution_change
Expand Down
11 changes: 6 additions & 5 deletions packages/beacon-node/src/db/repositories/blsToExecutionChange.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {ValidatorIndex, capella, ssz} from "@lodestar/types";
import {ValidatorIndex} from "@lodestar/types";
import {IChainForkConfig} from "@lodestar/config";
import {Db, Bucket, Repository} from "@lodestar/db";
import {SignedBLSToExecutionChangeVersioned, signedBLSToExecutionChangeVersionedType} from "../../util/types.js";

export class BLSToExecutionChangeRepository extends Repository<ValidatorIndex, capella.SignedBLSToExecutionChange> {
export class BLSToExecutionChangeRepository extends Repository<ValidatorIndex, SignedBLSToExecutionChangeVersioned> {
constructor(config: IChainForkConfig, db: Db) {
super(config, db, Bucket.capella_blsToExecutionChange, ssz.capella.SignedBLSToExecutionChange);
super(config, db, Bucket.capella_blsToExecutionChange, signedBLSToExecutionChangeVersionedType);
}

getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex {
return value.message.validatorIndex;
getId(value: SignedBLSToExecutionChangeVersioned): ValidatorIndex {
return value.data.message.validatorIndex;
}
}
75 changes: 75 additions & 0 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {routes} from "@lodestar/api";
import {IMetrics} from "../metrics/index.js";
import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js";
import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js";
import {INetworkOptions} from "./options.js";
import {INetwork, Libp2p} from "./interface.js";
import {ReqRespBeaconNode, ReqRespHandlers, doBeaconBlocksMaybeBlobsByRange} from "./reqresp/index.js";
Expand All @@ -32,6 +33,9 @@ import {PeersData} from "./peers/peersData.js";
import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js";
import {Discv5Worker} from "./discv5/index.js";

// How many changes to batch cleanup
const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10;

interface INetworkModules {
config: IBeaconConfig;
libp2p: Libp2p;
Expand Down Expand Up @@ -63,6 +67,7 @@ export class Network implements INetwork {
private readonly signal: AbortSignal;

private subscribedForks = new Set<ForkName>();
private regossipBlsChangesPromise: Promise<void> | null = null;

constructor(private readonly opts: INetworkOptions, modules: INetworkModules) {
const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules;
Expand Down Expand Up @@ -411,6 +416,20 @@ export class Network implements INetwork {
}
}
}

// If we are subscribed and post capella fork epoch, try gossiping the cached bls changes
if (
this.isSubscribedToGossipCoreTopics() &&
epoch >= this.config.CAPELLA_FORK_EPOCH &&
!this.regossipBlsChangesPromise
) {
this.regossipBlsChangesPromise = this.regossipCachedBlsChanges()
// If the processing fails for e.g. because of lack of peers set the promise
// to be null again to be retried
.catch((_e) => {
this.regossipBlsChangesPromise = null;
});
}
} catch (e) {
this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -482,6 +501,62 @@ export class Network implements INetwork {
return topics;
}

private async regossipCachedBlsChanges(): Promise<void> {
let gossipedIndexes = [];
let includedIndexes = [];
let totalProcessed = 0;

this.logger.debug("Re-gossiping unsubmitted cached bls changes");
try {
const headState = this.chain.getHeadState();
for (const poolData of this.chain.opPool.getAllBlsToExecutionChanges()) {
const {data: value, preCapella} = poolData;
if (preCapella) {
if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) {
await this.gossip.publishBlsToExecutionChange(value);
gossipedIndexes.push(value.message.validatorIndex);
} else {
// No need to gossip if its already been in the headState
// TODO: Should use final state?
includedIndexes.push(value.message.validatorIndex);
}

this.chain.opPool.insertBlsToExecutionChange(value, false);
totalProcessed += 1;

// Cleanup in small batches
if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT === 0) {
this.logger.debug("Gossiped cached blsChanges", {
gossipedIndexes: `${gossipedIndexes}`,
includedIndexes: `${includedIndexes}`,
totalProcessed,
});
gossipedIndexes = [];
includedIndexes = [];
}
}
}

// Log any remaining changes
if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT !== 0) {
this.logger.debug("Gossiped cached blsChanges", {
gossipedIndexes: `${gossipedIndexes}`,
includedIndexes: `${includedIndexes}`,
totalProcessed,
});
}
} catch (e) {
this.logger.error("Failed to completely gossip unsubmitted cached bls changes", {totalProcessed}, e as Error);
// Throw error so that the promise can be set null to be retied
throw e;
}
if (totalProcessed > 0) {
this.logger.info("Regossiped unsubmitted blsChanges", {totalProcessed});
} else {
this.logger.debug("No unsubmitted blsChanges to gossip", {totalProcessed});
}
}

private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise<void> => {
if (this.hasAttachedSyncCommitteeMember()) {
try {
Expand Down
14 changes: 14 additions & 0 deletions packages/beacon-node/src/util/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {ContainerType, ValueOf} from "@chainsafe/ssz";
import {ssz} from "@lodestar/types";

// Misc SSZ types used only in the beacon-node package, no need to upstream to types

export const signedBLSToExecutionChangeVersionedType = new ContainerType(
{
// Assumes less than 256 forks, sounds reasonable in our lifetime
preCapella: ssz.Boolean,
data: ssz.capella.SignedBLSToExecutionChange,
},
{jsonCase: "eth2", typeName: "SignedBLSToExecutionChangeVersionedType"}
);
export type SignedBLSToExecutionChangeVersioned = ValueOf<typeof signedBLSToExecutionChangeVersionedType>;
2 changes: 1 addition & 1 deletion packages/beacon-node/test/utils/mocks/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export class MockBeaconChain implements IBeaconChain {
{},
{
config: this.config,
db: db,
db,
metrics: null,
emitter: this.emitter,
logger: this.logger,
Expand Down

0 comments on commit 9d0bded

Please sign in to comment.