Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache and retransmit bls changes if submitted early #5031

Closed
wants to merge 13 commits into from
6 changes: 5 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ export function getBeaconPoolApi({
try {
await validateBlsToExecutionChange(chain, blsToExecutionChange);
chain.opPool.insertBlsToExecutionChange(blsToExecutionChange);
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
if (chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH) {
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
} else {
await chain.cacheBlsToExecutionChanges(blsToExecutionChange);
g11tech marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to explicitly call this? chain.opPool.insertBlsToExecutionChange should eventually write to DB on shutdown

}
} catch (e) {
errors.push(e as Error);
logger.error(
Expand Down
21 changes: 19 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@ import {
PubkeyIndexMap,
} from "@lodestar/state-transition";
import {IBeaconConfig} from "@lodestar/config";
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {ProcessShutdownCallback} from "@lodestar/validator";
import {ILogger, pruneSetToMax, toHex} from "@lodestar/utils";
Expand Down Expand Up @@ -122,9 +134,9 @@ export class BeaconChain implements IBeaconChain {
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedBlobsSidecarCache = new Map<RootHex, eip4844.BlobsSidecar>();
readonly opts: IChainOptions;
readonly db: IBeaconDb;

protected readonly blockProcessor: BlockProcessor;
protected readonly db: IBeaconDb;
private readonly archiver: Archiver;
private abortController = new AbortController();
private successfulExchangeTransition = false;
Expand Down Expand Up @@ -779,4 +791,9 @@ export class BeaconChain implements IBeaconChain {
}
}
}

/** Must be validated beforehand */
async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void> {
return this.db.blsToExecutionChangeCache.add(blsToExecutionChange);
}
}
17 changes: 16 additions & 1 deletion packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {IBeaconConfig} from "@lodestar/config";
import {CompositeTypeAny, TreeView, Type} from "@chainsafe/ssz";
Expand All @@ -8,6 +20,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {IMetrics} from "../metrics/metrics.js";
import {IBeaconDb} from "../db/index.js";
import {IBeaconClock} from "./clock/interface.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator} from "./regen/index.js";
Expand Down Expand Up @@ -90,6 +103,7 @@ export interface IBeaconChain {
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly producedBlobsSidecarCache: Map<RootHex, eip4844.BlobsSidecar>;
readonly opts: IChainOptions;
readonly db: IBeaconDb;

/** Stop beacon chain processing */
close(): Promise<void>;
Expand Down Expand Up @@ -133,6 +147,7 @@ export interface IBeaconChain {
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
updateBuilderStatus(clockSlot: Slot): void;
cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void>;
}

export type SSZObjectType =
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/db/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "./repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js";

Expand All @@ -33,6 +34,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
attesterSlashing: AttesterSlashingRepository;
depositEvent: DepositEventRepository;
blsToExecutionChange: BLSToExecutionChangeRepository;
blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository;

depositDataRoot: DepositDataRootRepository;
eth1Data: Eth1DataRepository;
Expand All @@ -58,6 +60,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
this.stateArchive = new StateArchiveRepository(this.config, this.db);
this.voluntaryExit = new VoluntaryExitRepository(this.config, this.db);
this.blsToExecutionChange = new BLSToExecutionChangeRepository(this.config, this.db);
this.blsToExecutionChangeCache = new BLSToExecutionChangeCacheRepository(this.config, this.db);
this.proposerSlashing = new ProposerSlashingRepository(this.config, this.db);
this.attesterSlashing = new AttesterSlashingRepository(this.config, this.db);
this.depositEvent = new DepositEventRepository(this.config, this.db);
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/db/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "./repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js";

Expand All @@ -43,6 +44,7 @@ export interface IBeaconDb {
attesterSlashing: AttesterSlashingRepository;
depositEvent: DepositEventRepository;
blsToExecutionChange: BLSToExecutionChangeRepository;
blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository;

// eth1 processing
preGenesisState: PreGenesisState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,16 @@ export class BLSToExecutionChangeRepository extends Repository<ValidatorIndex, c
return value.message.validatorIndex;
}
}

export class BLSToExecutionChangeCacheRepository extends Repository<
ValidatorIndex,
capella.SignedBLSToExecutionChange
> {
constructor(config: IChainForkConfig, db: Db) {
super(config, db, Bucket.capella_blsToExecutionChangeCache, ssz.capella.SignedBLSToExecutionChange);
}

getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex {
return value.message.validatorIndex;
}
}
2 changes: 1 addition & 1 deletion packages/beacon-node/src/db/repositories/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ export {CheckpointHeaderRepository} from "./lightclientCheckpointHeader.js";
export {SyncCommitteeRepository} from "./lightclientSyncCommittee.js";
export {SyncCommitteeWitnessRepository} from "./lightclientSyncCommitteeWitness.js";
export {BackfilledRanges} from "./backfilledRanges.js";
export {BLSToExecutionChangeRepository} from "./blsToExecutionChange.js";
export {BLSToExecutionChangeRepository, BLSToExecutionChangeCacheRepository} from "./blsToExecutionChange.js";
30 changes: 30 additions & 0 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export class Network implements INetwork {
private readonly signal: AbortSignal;

private subscribedForks = new Set<ForkName>();
private cachedBlsChangesPromise: Promise<void> | null = null;

constructor(private readonly opts: INetworkOptions, modules: INetworkModules) {
const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules;
Expand Down Expand Up @@ -411,6 +412,17 @@ export class Network implements INetwork {
}
}
}

// If we are subscribed and post capella fork epoch, try gossiping the cached bls changes
if (
this.isSubscribedToGossipCoreTopics() &&
epoch >= this.config.CAPELLA_FORK_EPOCH &&
!this.cachedBlsChangesPromise
) {
this.cachedBlsChangesPromise = this.gossipCachedBlsChanges().then(() => {
this.cachedBlsChangesPromise = null;
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move this code above under the // On fork transition comment, with some retry in case there are no peers

} catch (e) {
this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -482,6 +494,24 @@ export class Network implements INetwork {
return topics;
}

private async gossipCachedBlsChanges(): Promise<void> {
const gossipedKeys = [];
try {
this.logger.info("Re-gossiping the cached bls changes");
for await (const {key, value} of this.chain.db.blsToExecutionChangeCache.entriesStream()) {
await this.gossip.publishBlsToExecutionChange(value);
gossipedKeys.push(key);
}
} catch (e) {
this.logger.error("Failed to gossip all cached bls changes", {}, e as Error);
} finally {
this.logger.info("Gossiped cached blsChanges", {size: gossipedKeys.length});
await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys).catch((e) => {
this.logger.error("Could not clear gossiped blsChanges from blsToExecutionChangeCache", {}, e as Error);
});
}
}

private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise<void> => {
if (this.hasAttachedSyncCommitteeMember()) {
try {
Expand Down
9 changes: 6 additions & 3 deletions packages/beacon-node/test/utils/mocks/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {CheckpointBalancesCache} from "../../../../src/chain/balancesCache.js";
import {IChainOptions} from "../../../../src/chain/options.js";
import {BlockAttributes} from "../../../../src/chain/produceBlock/produceBlockBody.js";
import {ReqRespBlockResponse} from "../../../../src/network/index.js";
import {IBeaconDb} from "../../../../src/db/index.js";

/* eslint-disable @typescript-eslint/no-empty-function */

Expand All @@ -67,6 +68,7 @@ export class MockBeaconChain implements IBeaconChain {
safeSlotsToImportOptimistically: 0,
suggestedFeeRecipient: "0x0000000000000000000000000000000000000000",
};
readonly db: IBeaconDb;
readonly anchorStateLatestBlockSlot: Slot;

readonly bls: IBlsVerifier;
Expand Down Expand Up @@ -128,21 +130,21 @@ export class MockBeaconChain implements IBeaconChain {
this.forkChoice = mockForkChoice();
this.stateCache = new StateContextCache({});
this.checkpointStateCache = new CheckpointStateCache({});
const db = new StubbedBeaconDb();
this.db = new StubbedBeaconDb();
this.regen = new StateRegenerator({
config: this.config,
forkChoice: this.forkChoice,
stateCache: this.stateCache,
checkpointStateCache: this.checkpointStateCache,
db,
db: this.db,
metrics: null,
emitter: this.emitter,
});
this.lightClientServer = new LightClientServer(
{},
{
config: this.config,
db: db,
db: this.db,
metrics: null,
emitter: this.emitter,
logger: this.logger,
Expand Down Expand Up @@ -224,6 +226,7 @@ export class MockBeaconChain implements IBeaconChain {

async updateBeaconProposerData(): Promise<void> {}
updateBuilderStatus(): void {}
async cacheBlsToExecutionChanges(): Promise<void> {}
}

const root = ssz.Root.defaultValue() as Uint8Array;
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/test/utils/mocks/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "../../../src/db/repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "../../../src/db/single/index.js";
import {createStubInstance} from "../types.js";
Expand Down Expand Up @@ -45,6 +46,7 @@ export function getStubbedBeaconDb(): IBeaconDb {
attesterSlashing: createStubInstance(AttesterSlashingRepository),
depositEvent: createStubInstance(DepositEventRepository),
blsToExecutionChange: createStubInstance(BLSToExecutionChangeRepository),
blsToExecutionChangeCache: createStubInstance(BLSToExecutionChangeCacheRepository),

// eth1 processing
preGenesisState: createStubInstance(PreGenesisState),
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/test/utils/stub/beaconDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
StateArchiveRepository,
VoluntaryExitRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
} from "../../../src/db/repositories/index.js";
Expand All @@ -32,6 +33,8 @@ export class StubbedBeaconDb extends BeaconDb {

voluntaryExit: SinonStubbedInstance<VoluntaryExitRepository> & VoluntaryExitRepository;
blsToExecutionChange: SinonStubbedInstance<BLSToExecutionChangeRepository> & BLSToExecutionChangeRepository;
blsToExecutionChangeCache: SinonStubbedInstance<BLSToExecutionChangeCacheRepository> &
BLSToExecutionChangeCacheRepository;
proposerSlashing: SinonStubbedInstance<ProposerSlashingRepository> & ProposerSlashingRepository;
attesterSlashing: SinonStubbedInstance<AttesterSlashingRepository> & AttesterSlashingRepository;
depositEvent: SinonStubbedInstance<DepositEventRepository> & DepositEventRepository;
Expand All @@ -49,6 +52,7 @@ export class StubbedBeaconDb extends BeaconDb {

this.voluntaryExit = createStubInstance(VoluntaryExitRepository);
this.blsToExecutionChange = createStubInstance(BLSToExecutionChangeRepository);
this.blsToExecutionChangeCache = createStubInstance(BLSToExecutionChangeCacheRepository);
this.proposerSlashing = createStubInstance(ProposerSlashingRepository);
this.attesterSlashing = createStubInstance(AttesterSlashingRepository);
this.depositEvent = createStubInstance(DepositEventRepository);
Expand Down
1 change: 1 addition & 0 deletions packages/db/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export enum Bucket {
phase0_proposerSlashing = 14, // ValidatorIndex -> ProposerSlashing
phase0_attesterSlashing = 15, // Root -> AttesterSlashing
capella_blsToExecutionChange = 16, // ValidatorIndex -> SignedBLSToExecutionChange
capella_blsToExecutionChangeCache = 17, // ValidatorIndex -> SignedBLSToExecutionChange
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to recycle the existing in-memory + DB pool. Why is this extra bucket required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm i guess the current db pool before capella can be used as to recycle, i was going for caching any generic publishing errors but i guess its over kill

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem could be, once capella is hit, it would become unclear which ones are received from gossip and which ones have been submitted, as there is always a possibility of restarts while the gossip job is not complete, so would like to retain this as a separate repo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can clean it post capella

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not matter where an object is received. You persist to db changes that are still valid to be included in a block

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem could be, once capella is hit, it would become unclear which ones are received from gossip and which ones have been submitted

That's a fair point. However, it's not a consensus issue to re-broadcast other validator messages. After thinking about it I'm not opposed to mark those objects in some way as "submitted pre-capella"

// validator
// validator = 16, // DEPRECATED on v0.11.0
// lastProposedBlock = 17, // DEPRECATED on v0.11.0
Expand Down