Skip to content

Commit

Permalink
Merge 5ac55d3 into fbe9beb
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Jul 14, 2023
2 parents fbe9beb + 5ac55d3 commit 28a055b
Show file tree
Hide file tree
Showing 20 changed files with 402 additions and 112 deletions.
24 changes: 12 additions & 12 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {routes, ServerApi} from "@lodestar/api";
import {Epoch, ssz} from "@lodestar/types";
import {SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {validateGossipAttestation} from "../../../../chain/validation/index.js";
import {validateGossipAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateGossipProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
import {validateGossipVoluntaryExit} from "../../../../chain/validation/voluntaryExit.js";
import {validateBlsToExecutionChange} from "../../../../chain/validation/blsToExecutionChange.js";
import {validateSyncCommitteeSigOnly} from "../../../../chain/validation/syncCommittee.js";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
import {validateApiVoluntaryExit} from "../../../../chain/validation/voluntaryExit.js";
import {validateApiBlsToExecutionChange} from "../../../../chain/validation/blsToExecutionChange.js";
import {validateApiSyncCommittee} from "../../../../chain/validation/syncCommittee.js";
import {ApiModules} from "../../types.js";
import {AttestationError, GossipAction, SyncCommitteeError} from "../../../../chain/errors/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js";
Expand Down Expand Up @@ -53,7 +53,7 @@ export function getBeaconPoolApi({
attestations.map(async (attestation, i) => {
try {
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
const validateFn = () => validateGossipAttestation(chain, {attestation, serializedData: null}, null);
const validateFn = () => validateApiAttestation(chain, {attestation, serializedData: null});
const {slot, beaconBlockRoot} = attestation.data;
// when a validator is configured with multiple beacon node urls, this attestation data may come from another beacon node
// and the block hasn't been in our forkchoice since we haven't seen / processing that block
Expand Down Expand Up @@ -97,19 +97,19 @@ export function getBeaconPoolApi({
},

async submitPoolAttesterSlashings(attesterSlashing) {
await validateGossipAttesterSlashing(chain, attesterSlashing);
await validateApiAttesterSlashing(chain, attesterSlashing);
chain.opPool.insertAttesterSlashing(attesterSlashing);
await network.publishAttesterSlashing(attesterSlashing);
},

async submitPoolProposerSlashings(proposerSlashing) {
await validateGossipProposerSlashing(chain, proposerSlashing);
await validateApiProposerSlashing(chain, proposerSlashing);
chain.opPool.insertProposerSlashing(proposerSlashing);
await network.publishProposerSlashing(proposerSlashing);
},

async submitPoolVoluntaryExit(voluntaryExit) {
await validateGossipVoluntaryExit(chain, voluntaryExit);
await validateApiVoluntaryExit(chain, voluntaryExit);
chain.opPool.insertVoluntaryExit(voluntaryExit);
chain.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
await network.publishVoluntaryExit(voluntaryExit);
Expand All @@ -122,7 +122,7 @@ export function getBeaconPoolApi({
blsToExecutionChanges.map(async (blsToExecutionChange, i) => {
try {
// Ignore even if the change exists and reprocess
await validateBlsToExecutionChange(chain, blsToExecutionChange, true);
await validateApiBlsToExecutionChange(chain, blsToExecutionChange);
const preCapella = chain.clock.currentEpoch < chain.config.CAPELLA_FORK_EPOCH;
chain.opPool.insertBlsToExecutionChange(blsToExecutionChange, preCapella);

Expand Down Expand Up @@ -182,7 +182,7 @@ export function getBeaconPoolApi({

// Verify signature only, all other data is very likely to be correct, since the `signature` object is created by this node.
// Worst case if `signature` is not valid, gossip peers will drop it and slightly downscore us.
await validateSyncCommitteeSigOnly(chain, state, signature);
await validateApiSyncCommittee(chain, state, signature);

// The same validator can appear multiple times in the sync committee. It can appear multiple times per
// subnet even. First compute on which subnet the signature must be broadcasted to.
Expand Down
9 changes: 2 additions & 7 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {Root, Slot, ValidatorIndex, ssz, Epoch, ProducedBlockSource, bellatrix,
import {ExecutionStatus} from "@lodestar/fork-choice";
import {toHex} from "@lodestar/utils";
import {AttestationError, AttestationErrorCode, GossipAction, SyncCommitteeError} from "../../../chain/errors/index.js";
import {validateGossipAggregateAndProof} from "../../../chain/validation/index.js";
import {validateApiAggregateAndProof} from "../../../chain/validation/index.js";
import {ZERO_HASH} from "../../../constants/index.js";
import {SyncState} from "../../../sync/index.js";
import {isOptimisticBlock} from "../../../util/forkChoice.js";
Expand Down Expand Up @@ -554,12 +554,7 @@ export function getValidatorApi({
try {
// TODO: Validate in batch
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
const validateFn = () =>
validateGossipAggregateAndProof(
chain,
signedAggregateAndProof,
true // skip known attesters check
);
const validateFn = () => validateApiAggregateAndProof(chain, signedAggregateAndProof);
const {slot, beaconBlockRoot} = signedAggregateAndProof.message.aggregate.data;
// when a validator is configured with multiple beacon node urls, this attestation may come from another beacon node
// and the block hasn't been in our forkchoice since we haven't seen / processing that block
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/chain/bls/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export type VerifySignatureOpts = {
* Ignore the batchable option if this is true.
*/
verifyOnMainThread?: boolean;
/**
* Some signature sets are more important than others, and should be verified first.
*/
priority?: boolean;
};

export interface IBlsVerifier {
Expand Down
35 changes: 28 additions & 7 deletions packages/beacon-node/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {Metrics} from "../../../metrics/index.js";
import {IBlsVerifier, VerifySignatureOpts} from "../interface.js";
import {getAggregatedPubkey, getAggregatedPubkeysCount} from "../utils.js";
import {verifySignatureSetsMaybeBatch} from "../maybeBatch.js";
import {LinkedList} from "../../../util/array.js";
import {BlsWorkReq, BlsWorkResult, WorkerData, WorkResultCode} from "./types.js";
import {chunkifyMaximizeChunkSize} from "./utils.js";
import {defaultPoolSize} from "./poolSize.js";
Expand Down Expand Up @@ -106,9 +107,10 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {

private readonly format: PointFormat;
private readonly workers: WorkerDescriptor[];
private readonly jobs: JobQueueItem[] = [];
private readonly jobs = new LinkedList<JobQueueItem>();
private bufferedJobs: {
jobs: JobQueueItem[];
jobs: LinkedList<JobQueueItem>;
prioritizedJobs: LinkedList<JobQueueItem>;
sigCount: number;
firstPush: number;
timeout: NodeJS.Timeout;
Expand Down Expand Up @@ -151,6 +153,13 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
async verifySignatureSets(sets: ISignatureSet[], opts: VerifySignatureOpts = {}): Promise<boolean> {
// Pubkeys are aggregated in the main thread regardless if verified in workers or in main thread
this.metrics?.bls.aggregatedPubkeys.inc(getAggregatedPubkeysCount(sets));
this.metrics?.blsThreadPool.totalSigSets.inc(sets.length);
if (opts.priority) {
this.metrics?.blsThreadPool.prioritizedSigSets.inc(sets.length);
}
if (opts.batchable) {
this.metrics?.blsThreadPool.batchableSigSets.inc(sets.length);
}

if (opts.verifyOnMainThread && !this.blsVerifyAllMultiThread) {
const timer = this.metrics?.blsThreadPool.mainThreadDurationInThreadPool.startTimer();
Expand Down Expand Up @@ -199,7 +208,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
for (const job of this.jobs) {
job.reject(new QueueError({code: QueueErrorCode.QUEUE_ABORTED}));
}
this.jobs.splice(0, this.jobs.length);
this.jobs.clear();

// Terminate all workers. await to ensure no workers are left hanging
await Promise.all(
Expand Down Expand Up @@ -272,18 +281,21 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
return new Promise<boolean>((resolve, reject) => {
const job = {resolve, reject, addedTimeMs: Date.now(), workReq};

// below is for non-priority jobs
// Append batchable sets to `bufferedJobs`, starting a timeout to push them into `jobs`.
// Do not call `runJob()`, it is called from `runBufferedJobs()`
if (workReq.opts.batchable) {
if (!this.bufferedJobs) {
this.bufferedJobs = {
jobs: [],
jobs: new LinkedList(),
prioritizedJobs: new LinkedList(),
sigCount: 0,
firstPush: Date.now(),
timeout: setTimeout(this.runBufferedJobs, MAX_BUFFER_WAIT_MS),
};
}
this.bufferedJobs.jobs.push(job);
const jobs = workReq.opts.priority ? this.bufferedJobs.prioritizedJobs : this.bufferedJobs.jobs;
jobs.push(job);
this.bufferedJobs.sigCount += job.workReq.sets.length;
if (this.bufferedJobs.sigCount > MAX_BUFFERED_SIGS) {
clearTimeout(this.bufferedJobs.timeout);
Expand All @@ -295,7 +307,11 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
// This is useful to allow batching job submitted from a synchronous for loop,
// and to prevent large stacks since runJob may be called recursively.
else {
this.jobs.push(job);
if (workReq.opts.priority) {
this.jobs.unshift(job);
} else {
this.jobs.push(job);
}
setTimeout(this.runJob, 0);
}
});
Expand Down Expand Up @@ -424,7 +440,12 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
*/
private runBufferedJobs = (): void => {
if (this.bufferedJobs) {
this.jobs.push(...this.bufferedJobs.jobs);
for (const job of this.bufferedJobs.jobs) {
this.jobs.push(job);
}
for (const job of this.bufferedJobs.prioritizedJobs) {
this.jobs.unshift(job);
}
this.bufferedJobs = null;
setTimeout(this.runJob, 0);
}
Expand Down
27 changes: 24 additions & 3 deletions packages/beacon-node/src/chain/validation/aggregateAndProof.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,33 @@ export type AggregateAndProofValidationResult = {
attDataRootHex: RootHex;
};

export async function validateApiAggregateAndProof(
chain: IBeaconChain,
signedAggregateAndProof: phase0.SignedAggregateAndProof
): Promise<AggregateAndProofValidationResult> {
const skipValidationKnownAttesters = true;
const prioritizeBls = true;
return validateAggregateAndProof(chain, signedAggregateAndProof, null, {skipValidationKnownAttesters, prioritizeBls});
}

export async function validateGossipAggregateAndProof(
chain: IBeaconChain,
signedAggregateAndProof: phase0.SignedAggregateAndProof,
skipValidationKnownAttesters = false,
serializedData: Uint8Array | null = null
serializedData: Uint8Array
): Promise<AggregateAndProofValidationResult> {
return validateAggregateAndProof(chain, signedAggregateAndProof, serializedData);
}

async function validateAggregateAndProof(
chain: IBeaconChain,
signedAggregateAndProof: phase0.SignedAggregateAndProof,
serializedData: Uint8Array | null = null,
opts: {skipValidationKnownAttesters: boolean; prioritizeBls: boolean} = {
skipValidationKnownAttesters: false,
prioritizeBls: false,
}
): Promise<AggregateAndProofValidationResult> {
const {skipValidationKnownAttesters, prioritizeBls} = opts;
// Do checks in this order:
// - do early checks (w/o indexed attestation)
// - > obtain indexed attestation and committes per slot
Expand Down Expand Up @@ -176,7 +197,7 @@ export async function validateGossipAggregateAndProof(
];
// no need to write to SeenAttestationDatas

if (!(await chain.bls.verifySignatureSets(signatureSets, {batchable: true}))) {
if (!(await chain.bls.verifySignatureSets(signatureSets, {batchable: true, priority: prioritizeBls}))) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.INVALID_SIGNATURE});
}

Expand Down
59 changes: 46 additions & 13 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,65 @@ export type AttestationValidationResult = {
attDataRootHex: RootHex;
};

export type AttestationOrBytes =
// for api
| {attestation: phase0.Attestation; serializedData: null}
// for gossip
| {
attestation: null;
serializedData: Uint8Array;
// available in NetworkProcessor since we check for unknown block root attestations
attSlot: Slot;
};
export type AttestationOrBytes = ApiAttestation | GossipAttestation;

/** attestation from api */
export type ApiAttestation = {attestation: phase0.Attestation; serializedData: null};

/** attestation from gossip */
export type GossipAttestation = {
attestation: null;
serializedData: Uint8Array;
// available in NetworkProcessor since we check for unknown block root attestations
attSlot: Slot;
};

/**
* The beacon chain shufflings are designed to provide 1 epoch lookahead
* At each state, we have previous shuffling, current shuffling and next shuffling
*/
const SHUFFLING_LOOK_AHEAD_EPOCHS = 1;

/**
* Validate attestations from gossip
* - Only deserialize the attestation if needed, use the cached AttestationData instead
* - This is to avoid deserializing similar attestation multiple times which could help the gc
* - subnet is required
* - do not prioritize bls signature set
*/
export async function validateGossipAttestation(
chain: IBeaconChain,
attestationOrBytes: GossipAttestation,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number
): Promise<AttestationValidationResult> {
return validateAttestation(chain, attestationOrBytes, subnet);
}

/**
* Validate attestations from api
* - no need to deserialize attestation
* - no subnet
* - prioritize bls signature set
*/
export async function validateApiAttestation(
chain: IBeaconChain,
attestationOrBytes: ApiAttestation
): Promise<AttestationValidationResult> {
const prioritizeBls = true;
return validateAttestation(chain, attestationOrBytes, null, prioritizeBls);
}

/**
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
*/
export async function validateGossipAttestation(
async function validateAttestation(
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number | null
subnet: number | null,
prioritizeBls = false
): Promise<AttestationValidationResult> {
// Do checks in this order:
// - do early checks (w/o indexed attestation)
Expand Down Expand Up @@ -268,7 +301,7 @@ export async function validateGossipAttestation(
}
}

if (!(await chain.bls.verifySignatureSets([signatureSet], {batchable: true}))) {
if (!(await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls}))) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.INVALID_SIGNATURE});
}

Expand Down
18 changes: 17 additions & 1 deletion packages/beacon-node/src/chain/validation/attesterSlashing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,25 @@ import {
import {IBeaconChain} from "..";
import {AttesterSlashingError, AttesterSlashingErrorCode, GossipAction} from "../errors/index.js";

export async function validateApiAttesterSlashing(
chain: IBeaconChain,
attesterSlashing: phase0.AttesterSlashing
): Promise<void> {
const prioritizeBls = true;
return validateAttesterSlashing(chain, attesterSlashing, prioritizeBls);
}

export async function validateGossipAttesterSlashing(
chain: IBeaconChain,
attesterSlashing: phase0.AttesterSlashing
): Promise<void> {
return validateAttesterSlashing(chain, attesterSlashing);
}

export async function validateAttesterSlashing(
chain: IBeaconChain,
attesterSlashing: phase0.AttesterSlashing,
prioritizeBls = false
): Promise<void> {
// [IGNORE] At least one index in the intersection of the attesting indices of each attestation has not yet been seen
// in any prior attester_slashing (i.e.
Expand All @@ -36,7 +52,7 @@ export async function validateGossipAttesterSlashing(
}

const signatureSets = getAttesterSlashingSignatureSets(state, attesterSlashing);
if (!(await chain.bls.verifySignatureSets(signatureSets, {batchable: true}))) {
if (!(await chain.bls.verifySignatureSets(signatureSets, {batchable: true, priority: prioritizeBls}))) {
throw new AttesterSlashingError(GossipAction.REJECT, {
code: AttesterSlashingErrorCode.INVALID,
error: Error("Invalid signature"),
Expand Down
Loading

0 comments on commit 28a055b

Please sign in to comment.