Skip to content

Commit

Permalink
feat: emit eventstream events with incoming gossip (#5596)
Browse files Browse the repository at this point in the history
Emit eventstream events with incoming gossip
  • Loading branch information
dapplion authored Jun 26, 2023
1 parent ad971c6 commit a208afb
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
7 changes: 7 additions & 0 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export function getBeaconPoolApi({
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);

const sentPeers = await network.publishBeaconAttestation(attestation, subnet);
metrics?.onPoolSubmitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers);
} catch (e) {
Expand Down Expand Up @@ -108,6 +111,7 @@ export function getBeaconPoolApi({
async submitPoolVoluntaryExit(voluntaryExit) {
await validateGossipVoluntaryExit(chain, voluntaryExit);
chain.opPool.insertVoluntaryExit(voluntaryExit);
chain.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
await network.publishVoluntaryExit(voluntaryExit);
},

Expand All @@ -121,6 +125,9 @@ export function getBeaconPoolApi({
await validateBlsToExecutionChange(chain, blsToExecutionChange, true);
const preCapella = chain.clock.currentEpoch < chain.config.CAPELLA_FORK_EPOCH;
chain.opPool.insertBlsToExecutionChange(blsToExecutionChange, preCapella);

chain.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange);

if (!preCapella) {
await network.publishBlsToExecutionChange(blsToExecutionChange);
}
Expand Down
34 changes: 22 additions & 12 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import {writeBlockInputToDb} from "./writeBlockInputToDb.js";
* Fork-choice allows to import attestations from current (0) or past (1) epoch.
*/
const FORK_CHOICE_ATT_EPOCH_LIMIT = 1;
/**
* Emit eventstream events for block contents events only for blocks that are recent enough to clock
*/
const EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS = 64;

/**
* Imports a fully verified block into the chain state. Produces multiple permanent side-effects.
Expand Down Expand Up @@ -149,11 +153,6 @@ export async function importBlock(
blockRootHex,
block.message.slot
);

// don't want to log the processed attestations here as there are so many attestations and it takes too much disc space,
// users may want to keep more log files instead of unnecessary processed attestations log
// see https://github.com/ChainSafe/lodestar/pull/4032
this.emitter.emit(routes.events.EventType.attestation, attestation);
} catch (e) {
// a block has a lot of attestations and it may has same error, we don't want to log all of them
if (e instanceof ForkChoiceError && e.type.code === ForkChoiceErrorCode.INVALID_ATTESTATION) {
Expand Down Expand Up @@ -370,14 +369,25 @@ export async function importBlock(
}
}

// Send block events
// Send block events, only for recent enough blocks

for (const voluntaryExit of block.message.body.voluntaryExits) {
this.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
}

for (const blsToExecutionChange of (block.message.body as capella.BeaconBlockBody).blsToExecutionChanges ?? []) {
this.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange);
if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
// NOTE: Skip looping if there are no listeners from the API
if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) {
for (const voluntaryExit of block.message.body.voluntaryExits) {
this.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
}
}
if (this.emitter.listenerCount(routes.events.EventType.blsToExecutionChange)) {
for (const blsToExecutionChange of (block.message.body as capella.BeaconBlockBody).blsToExecutionChanges ?? []) {
this.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange);
}
}
if (this.emitter.listenerCount(routes.events.EventType.attestation)) {
for (const attestation of block.message.body.attestations) {
this.emitter.emit(routes.events.EventType.attestation, attestation);
}
}
}

// Register stat metrics about the block after importing it
Expand Down
13 changes: 13 additions & 0 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {BeaconConfig} from "@lodestar/config";
import {Logger, prettyBytes} from "@lodestar/utils";
import {Root, Slot, ssz} from "@lodestar/types";
import {ForkName, ForkSeq} from "@lodestar/params";
import {routes} from "@lodestar/api";
import {Metrics} from "../../metrics/index.js";
import {OpSource} from "../../metrics/validatorMonitor.js";
import {IBeaconChain} from "../../chain/index.js";
Expand Down Expand Up @@ -177,6 +178,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
const blockInput = getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip, serializedData);
await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec);
handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);

// Do not emit block on eventstream API, it will be emitted after successful import
},

[GossipType.blob_sidecar]: async (_data, _topic, _peerIdStr, _seenTimestampSec) => {
Expand Down Expand Up @@ -235,6 +238,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
);
}
}

chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate);
},

[GossipType.beacon_attestation]: async ({serializedData, msgSlot}, {subnet}, _peer, seenTimestampSec) => {
Expand Down Expand Up @@ -278,6 +283,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error);
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
},

[GossipType.attester_slashing]: async ({serializedData}, topic) => {
Expand Down Expand Up @@ -318,6 +325,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
} catch (e) {
logger.error("Error adding voluntaryExit to pool", {}, e as Error);
}

chain.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
},

[GossipType.sync_committee_contribution_and_proof]: async ({serializedData}, topic) => {
Expand All @@ -340,6 +349,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
} catch (e) {
logger.error("Error adding to contributionAndProof pool", {}, e as Error);
}

chain.emitter.emit(routes.events.EventType.contributionAndProof, contributionAndProof);
},

[GossipType.sync_committee]: async ({serializedData}, topic) => {
Expand Down Expand Up @@ -386,6 +397,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
} catch (e) {
logger.error("Error adding blsToExecutionChange to pool", {}, e as Error);
}

chain.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange);
},
};
}
Expand Down

0 comments on commit a208afb

Please sign in to comment.