Skip to content

Commit

Permalink
Merge 1ea60b9 into b84c234
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech authored Jan 24, 2023
2 parents b84c234 + 1ea60b9 commit 6166395
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 15 deletions.
13 changes: 11 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,20 @@ export function getBeaconPoolApi({
try {
await validateBlsToExecutionChange(chain, blsToExecutionChange);
chain.opPool.insertBlsToExecutionChange(blsToExecutionChange);
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
if (
chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH &&
// TODO: Remove below condition
// Only used for testing in devnet-3 of withdrawals
network.isSubscribedToGossipCoreTopics()
) {
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
} else {
await chain.cacheBlsToExecutionChanges(blsToExecutionChange);
}
} catch (e) {
errors.push(e as Error);
logger.error(
`Error on submitPoolSyncCommitteeSignatures [${i}]`,
`Error on submitPoolBlsToExecutionChange [${i}]`,
{validatorIndex: blsToExecutionChange.message.validatorIndex},
e as Error
);
Expand Down
21 changes: 19 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@ import {
PubkeyIndexMap,
} from "@lodestar/state-transition";
import {IBeaconConfig} from "@lodestar/config";
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {ProcessShutdownCallback} from "@lodestar/validator";
import {ILogger, pruneSetToMax, toHex} from "@lodestar/utils";
Expand Down Expand Up @@ -122,9 +134,9 @@ export class BeaconChain implements IBeaconChain {
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedBlobsSidecarCache = new Map<RootHex, eip4844.BlobsSidecar>();
readonly opts: IChainOptions;
readonly db: IBeaconDb;

protected readonly blockProcessor: BlockProcessor;
protected readonly db: IBeaconDb;
private readonly archiver: Archiver;
private abortController = new AbortController();
private successfulExchangeTransition = false;
Expand Down Expand Up @@ -779,4 +791,9 @@ export class BeaconChain implements IBeaconChain {
}
}
}

/** Must be validated beforehand */
async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void> {
return this.db.blsToExecutionChangeCache.add(blsToExecutionChange);
}
}
17 changes: 16 additions & 1 deletion packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {IBeaconConfig} from "@lodestar/config";
import {CompositeTypeAny, TreeView, Type} from "@chainsafe/ssz";
Expand All @@ -8,6 +20,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {IMetrics} from "../metrics/metrics.js";
import {IBeaconDb} from "../db/index.js";
import {IBeaconClock} from "./clock/interface.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator} from "./regen/index.js";
Expand Down Expand Up @@ -90,6 +103,7 @@ export interface IBeaconChain {
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly producedBlobsSidecarCache: Map<RootHex, eip4844.BlobsSidecar>;
readonly opts: IChainOptions;
readonly db: IBeaconDb;

/** Stop beacon chain processing */
close(): Promise<void>;
Expand Down Expand Up @@ -133,6 +147,7 @@ export interface IBeaconChain {
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
updateBuilderStatus(clockSlot: Slot): void;
cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void>;
}

export type SSZObjectType =
Expand Down
5 changes: 2 additions & 3 deletions packages/beacon-node/src/chain/opPools/opPool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
CachedBeaconStateAllForks,
CachedBeaconStateCapella,
computeEpochAtSlot,
computeStartSlotAtEpoch,
getAttesterSlashableIndices,
Expand Down Expand Up @@ -69,7 +68,7 @@ export class OpPool {
this.insertVoluntaryExit(voluntaryExit);
}
for (const item of blsToExecutionChanges) {
this.blsToExecutionChanges.set(item.message.validatorIndex, item);
this.insertBlsToExecutionChange(item);
}
}

Expand Down Expand Up @@ -233,7 +232,7 @@ export class OpPool {

const blsToExecutionChanges: capella.SignedBLSToExecutionChange[] = [];
for (const blsToExecutionChange of this.blsToExecutionChanges.values()) {
if (isValidBlsToExecutionChangeForBlockInclusion(state as CachedBeaconStateCapella, blsToExecutionChange)) {
if (isValidBlsToExecutionChangeForBlockInclusion(state, blsToExecutionChange)) {
blsToExecutionChanges.push(blsToExecutionChange);
if (blsToExecutionChanges.length >= MAX_BLS_TO_EXECUTION_CHANGES) {
break;
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/opPools/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import bls from "@chainsafe/bls";
import {CoordType, Signature} from "@chainsafe/bls/types";
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
import {CachedBeaconStateCapella} from "@lodestar/state-transition";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Slot, capella} from "@lodestar/types";

/**
Expand Down Expand Up @@ -38,7 +38,7 @@ export function signatureFromBytesNoCheck(signature: Uint8Array): Signature {
* can become invalid for certain forks.
*/
export function isValidBlsToExecutionChangeForBlockInclusion(
state: CachedBeaconStateCapella,
state: CachedBeaconStateAllForks,
signedBLSToExecutionChange: capella.SignedBLSToExecutionChange
): boolean {
// For each condition from https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/beacon-chain.md#new-process_bls_to_execution_change
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/db/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "./repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js";

Expand All @@ -33,6 +34,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
attesterSlashing: AttesterSlashingRepository;
depositEvent: DepositEventRepository;
blsToExecutionChange: BLSToExecutionChangeRepository;
blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository;

depositDataRoot: DepositDataRootRepository;
eth1Data: Eth1DataRepository;
Expand All @@ -58,6 +60,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
this.stateArchive = new StateArchiveRepository(this.config, this.db);
this.voluntaryExit = new VoluntaryExitRepository(this.config, this.db);
this.blsToExecutionChange = new BLSToExecutionChangeRepository(this.config, this.db);
this.blsToExecutionChangeCache = new BLSToExecutionChangeCacheRepository(this.config, this.db);
this.proposerSlashing = new ProposerSlashingRepository(this.config, this.db);
this.attesterSlashing = new AttesterSlashingRepository(this.config, this.db);
this.depositEvent = new DepositEventRepository(this.config, this.db);
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/db/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "./repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js";

Expand All @@ -43,6 +44,7 @@ export interface IBeaconDb {
attesterSlashing: AttesterSlashingRepository;
depositEvent: DepositEventRepository;
blsToExecutionChange: BLSToExecutionChangeRepository;
blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository;

// eth1 processing
preGenesisState: PreGenesisState;
Expand Down
13 changes: 13 additions & 0 deletions packages/beacon-node/src/db/repositories/blsToExecutionChange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,16 @@ export class BLSToExecutionChangeRepository extends Repository<ValidatorIndex, c
return value.message.validatorIndex;
}
}

export class BLSToExecutionChangeCacheRepository extends Repository<
ValidatorIndex,
capella.SignedBLSToExecutionChange
> {
constructor(config: IChainForkConfig, db: Db) {
super(config, db, Bucket.capella_blsToExecutionChangeCache, ssz.capella.SignedBLSToExecutionChange);
}

getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex {
return value.message.validatorIndex;
}
}
2 changes: 1 addition & 1 deletion packages/beacon-node/src/db/repositories/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ export {CheckpointHeaderRepository} from "./lightclientCheckpointHeader.js";
export {SyncCommitteeRepository} from "./lightclientSyncCommittee.js";
export {SyncCommitteeWitnessRepository} from "./lightclientSyncCommitteeWitness.js";
export {BackfilledRanges} from "./backfilledRanges.js";
export {BLSToExecutionChangeRepository} from "./blsToExecutionChange.js";
export {BLSToExecutionChangeRepository, BLSToExecutionChangeCacheRepository} from "./blsToExecutionChange.js";
58 changes: 58 additions & 0 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {routes} from "@lodestar/api";
import {IMetrics} from "../metrics/index.js";
import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js";
import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js";
import {INetworkOptions} from "./options.js";
import {INetwork, Libp2p} from "./interface.js";
import {ReqRespBeaconNode, ReqRespHandlers, doBeaconBlocksMaybeBlobsByRange} from "./reqresp/index.js";
Expand All @@ -32,6 +33,9 @@ import {PeersData} from "./peers/peersData.js";
import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js";
import {Discv5Worker} from "./discv5/index.js";

// How many changes to batch cleanup
const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10;

interface INetworkModules {
config: IBeaconConfig;
libp2p: Libp2p;
Expand Down Expand Up @@ -63,6 +67,7 @@ export class Network implements INetwork {
private readonly signal: AbortSignal;

private subscribedForks = new Set<ForkName>();
private cachedBlsChangesPromise: Promise<void> | null = null;

constructor(private readonly opts: INetworkOptions, modules: INetworkModules) {
const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules;
Expand Down Expand Up @@ -411,6 +416,17 @@ export class Network implements INetwork {
}
}
}

// If we are subscribed and post capella fork epoch, try gossiping the cached bls changes
if (
this.isSubscribedToGossipCoreTopics() &&
epoch >= this.config.CAPELLA_FORK_EPOCH &&
!this.cachedBlsChangesPromise
) {
this.cachedBlsChangesPromise = this.gossipCachedBlsChanges().then(() => {
this.cachedBlsChangesPromise = null;
});
}
} catch (e) {
this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -482,6 +498,48 @@ export class Network implements INetwork {
return topics;
}

private async gossipCachedBlsChanges(): Promise<void> {
let gossipedKeys: number[] = [];
let includedKeys: number[] = [];
let totalProcessed = 0;
this.logger.info("Re-gossiping the cached bls changes");

try {
const headState = this.chain.getHeadState();
for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream()) {
if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) {
await this.gossip.publishBlsToExecutionChange(value);
gossipedKeys.push(value.message.validatorIndex);
} else {
// No need to gossip if its already been in the headState
// TODO: Should use final state?
includedKeys.push(value.message.validatorIndex);
}
totalProcessed += 1;

// Cleanup in small batches
if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT === 0) {
await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys);
await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys);
includedKeys = [];
gossipedKeys = [];
}
}
} catch (e) {
this.logger.error("Failed to gossip all cached bls changes", {totalProcessed}, e as Error);
} finally {
this.logger.info("Gossiped cached blsChanges", {
gossipedIndexes: `${gossipedKeys}`,
alreadyIncludedIndexes: `${includedKeys}`,
totalProcessed,
});
// Cleanup whatever was in the last batch
await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys);
await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys);
}
this.logger.info("Processed cached blsChanges", {totalProcessed});
}

private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise<void> => {
if (this.hasAttachedSyncCommitteeMember()) {
try {
Expand Down
27 changes: 23 additions & 4 deletions packages/beacon-node/test/utils/mocks/chain/chain.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
import sinon from "sinon";

import {CompositeTypeAny, toHexString, TreeView} from "@chainsafe/ssz";
import {phase0, allForks, UintNum64, Root, Slot, ssz, Uint16, UintBn64, RootHex, eip4844, Wei} from "@lodestar/types";
import {
phase0,
allForks,
UintNum64,
Root,
Slot,
ssz,
Uint16,
UintBn64,
RootHex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {IBeaconConfig} from "@lodestar/config";
import {BeaconStateAllForks, CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {CheckpointWithHex, IForkChoice, ProtoBlock, ExecutionStatus, AncestorStatus} from "@lodestar/fork-choice";
Expand Down Expand Up @@ -42,6 +55,7 @@ import {CheckpointBalancesCache} from "../../../../src/chain/balancesCache.js";
import {IChainOptions} from "../../../../src/chain/options.js";
import {BlockAttributes} from "../../../../src/chain/produceBlock/produceBlockBody.js";
import {ReqRespBlockResponse} from "../../../../src/network/index.js";
import {IBeaconDb} from "../../../../src/db/index.js";

/* eslint-disable @typescript-eslint/no-empty-function */

Expand All @@ -67,6 +81,7 @@ export class MockBeaconChain implements IBeaconChain {
safeSlotsToImportOptimistically: 0,
suggestedFeeRecipient: "0x0000000000000000000000000000000000000000",
};
readonly db: IBeaconDb;
readonly anchorStateLatestBlockSlot: Slot;

readonly bls: IBlsVerifier;
Expand Down Expand Up @@ -128,21 +143,21 @@ export class MockBeaconChain implements IBeaconChain {
this.forkChoice = mockForkChoice();
this.stateCache = new StateContextCache({});
this.checkpointStateCache = new CheckpointStateCache({});
const db = new StubbedBeaconDb();
this.db = new StubbedBeaconDb();
this.regen = new StateRegenerator({
config: this.config,
forkChoice: this.forkChoice,
stateCache: this.stateCache,
checkpointStateCache: this.checkpointStateCache,
db,
db: this.db,
metrics: null,
emitter: this.emitter,
});
this.lightClientServer = new LightClientServer(
{},
{
config: this.config,
db: db,
db: this.db,
metrics: null,
emitter: this.emitter,
logger: this.logger,
Expand Down Expand Up @@ -224,6 +239,10 @@ export class MockBeaconChain implements IBeaconChain {

async updateBeaconProposerData(): Promise<void> {}
updateBuilderStatus(): void {}

async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void> {
return this.db.blsToExecutionChangeCache.add(blsToExecutionChange);
}
}

const root = ssz.Root.defaultValue() as Uint8Array;
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/test/utils/mocks/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "../../../src/db/repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "../../../src/db/single/index.js";
import {createStubInstance} from "../types.js";
Expand Down Expand Up @@ -45,6 +46,7 @@ export function getStubbedBeaconDb(): IBeaconDb {
attesterSlashing: createStubInstance(AttesterSlashingRepository),
depositEvent: createStubInstance(DepositEventRepository),
blsToExecutionChange: createStubInstance(BLSToExecutionChangeRepository),
blsToExecutionChangeCache: createStubInstance(BLSToExecutionChangeCacheRepository),

// eth1 processing
preGenesisState: createStubInstance(PreGenesisState),
Expand Down
Loading

0 comments on commit 6166395

Please sign in to comment.